videocall_cli/producers/
microphone.rs1use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
20use opus::Channels;
21use protobuf::{Message, MessageField};
22use std::sync::atomic::AtomicBool;
23use std::sync::Arc;
24use std::thread::JoinHandle;
25use std::time::Duration;
26use tokio::sync::mpsc::Sender;
27use tracing::{error, info};
28use videocall_types::protos::media_packet::media_packet::MediaType;
29use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
30use videocall_types::protos::packet_wrapper::packet_wrapper::PacketType;
31use videocall_types::protos::packet_wrapper::PacketWrapper;
32
33pub struct MicrophoneDaemon {
34 stop: Arc<AtomicBool>,
35 handles: Vec<JoinHandle<anyhow::Result<()>>>,
36}
37
38impl Default for MicrophoneDaemon {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl MicrophoneDaemon {
45 pub fn new() -> Self {
46 Self {
47 stop: Arc::new(AtomicBool::new(false)),
48 handles: vec![],
49 }
50 }
51
52 pub fn start(
53 &mut self,
54 wt_tx: Sender<Vec<u8>>,
55 device: String,
56 email: String,
57 ) -> anyhow::Result<()> {
58 self.handles.push(start_microphone(
59 device.clone(),
60 wt_tx.clone(),
61 email,
62 self.stop.clone(),
63 )?);
64 Ok(())
65 }
66
67 pub fn stop(&mut self) {
68 self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
69 for handle in self.handles.drain(..) {
70 if let Err(e) = handle.join() {
71 error!("Failed to join microphone thread: {:?}", e);
72 }
73 }
74 }
75}
76
77fn start_microphone(
78 device: String,
79 wt_tx: Sender<Vec<u8>>,
80 email: String,
81 stop: Arc<AtomicBool>,
82) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
83 let host = cpal::default_host();
84
85 let device = if device == "default" {
87 host.default_input_device()
88 } else {
89 host.input_devices()?
90 .find(|x| x.name().map(|y| y == device).unwrap_or(false))
91 }
92 .expect("failed to find input device");
93
94 info!("Input device: {}", device.name()?);
95 let range = cpal::SupportedBufferSize::Range { min: 960, max: 960 };
96 let config = cpal::SupportedStreamConfig::new(
97 1,
98 cpal::SampleRate(48000),
99 range,
100 cpal::SampleFormat::I16,
101 );
102
103 let mut encoder = opus::Encoder::new(48000, Channels::Mono, opus::Application::Voip)?;
104 info!("Opus encoder created {:?}", encoder);
105
106 let err_fn = move |err| {
107 error!("an error occurred on stream: {}", err);
108 };
109
110 Ok(std::thread::spawn(move || {
111 let stream = match config.sample_format() {
112 cpal::SampleFormat::I16 => device.build_input_stream(
113 &config.into(),
114 move |data, _: &_| {
115 for chunk in data.chunks_exact(960) {
116 match encode_and_send_i16(chunk, &mut encoder, &wt_tx, email.clone()) {
117 Ok(_) => {}
118 Err(e) => {
119 error!("Failed to encode and send audio: {}", e);
120 }
121 }
122 }
123 },
124 err_fn,
125 None,
126 )?,
127 sample_format => {
128 return Err(anyhow::Error::msg(format!(
129 "Unsupported sample format '{sample_format}'"
130 )))
131 }
132 };
133 info!("Begin streaming audio...");
134 stream.play().expect("failed to play stream");
135
136 loop {
137 if stop.load(std::sync::atomic::Ordering::Relaxed) {
138 break;
139 }
140 std::thread::sleep(Duration::from_secs(1));
141 }
142 Ok(())
143 }))
144}
145
146fn encode_and_send_i16(
147 input: &[i16],
148 encoder: &mut opus::Encoder,
149 wt_tx: &Sender<Vec<u8>>,
150 email: String,
151) -> anyhow::Result<()> {
152 let output = encoder.encode_vec(input, 960)?;
153 let output = transform_audio_chunk(output, email, 0);
154 let output_bytes = output?.write_to_bytes()?;
155 tracing::info!("Queueing AUDIO packet: {} bytes", output_bytes.len());
156 wt_tx.try_send(output_bytes)?;
157 tracing::debug!("Audio packet queued successfully");
158 Ok(())
159}
160
161fn transform_audio_chunk(
162 data: Vec<u8>,
163 email: String,
164 sequence: u64,
165) -> anyhow::Result<PacketWrapper> {
166 Ok(PacketWrapper {
167 packet_type: PacketType::MEDIA.into(),
168 email: email.clone(),
169 data: MediaPacket {
170 media_type: MediaType::AUDIO.into(),
171 data,
172 email,
173 frame_type: String::from("key"),
174 timestamp: get_micros_now(),
175 duration: 0.0,
177 video_metadata: MessageField(Some(Box::new(VideoMetadata {
178 sequence,
179 ..Default::default()
180 }))),
181 ..Default::default()
182 }
183 .write_to_bytes()?,
184 ..Default::default()
185 })
186}
187
188fn get_micros_now() -> f64 {
189 let now = std::time::SystemTime::now();
190 let duration = now.duration_since(std::time::UNIX_EPOCH).unwrap();
191 duration.as_micros() as f64
192}