videocall_cli/producers/
microphone.rs1use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
2use opus::Channels;
3use protobuf::{Message, MessageField};
4use std::sync::atomic::AtomicBool;
5use std::sync::Arc;
6use std::thread::JoinHandle;
7use std::time::Duration;
8use tokio::sync::mpsc::Sender;
9use tracing::{error, info};
10use videocall_types::protos::media_packet::media_packet::MediaType;
11use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
12use videocall_types::protos::packet_wrapper::packet_wrapper::PacketType;
13use videocall_types::protos::packet_wrapper::PacketWrapper;
14
15pub struct MicrophoneDaemon {
16 stop: Arc<AtomicBool>,
17 handles: Vec<JoinHandle<anyhow::Result<()>>>,
18}
19
20impl Default for MicrophoneDaemon {
21 fn default() -> Self {
22 Self::new()
23 }
24}
25
26impl MicrophoneDaemon {
27 pub fn new() -> Self {
28 Self {
29 stop: Arc::new(AtomicBool::new(false)),
30 handles: vec![],
31 }
32 }
33
34 pub fn start(
35 &mut self,
36 quic_tx: Sender<Vec<u8>>,
37 device: String,
38 email: String,
39 ) -> anyhow::Result<()> {
40 self.handles.push(start_microphone(
41 device.clone(),
42 quic_tx.clone(),
43 email,
44 self.stop.clone(),
45 )?);
46 Ok(())
47 }
48
49 pub fn stop(&mut self) {
50 self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
51 for handle in self.handles.drain(..) {
52 if let Err(e) = handle.join() {
53 error!("Failed to join microphone thread: {:?}", e);
54 }
55 }
56 }
57}
58
59fn start_microphone(
60 device: String,
61 quic_tx: Sender<Vec<u8>>,
62 email: String,
63 stop: Arc<AtomicBool>,
64) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
65 let host = cpal::default_host();
66
67 let device = if device == "default" {
69 host.default_input_device()
70 } else {
71 host.input_devices()?
72 .find(|x| x.name().map(|y| y == device).unwrap_or(false))
73 }
74 .expect("failed to find input device");
75
76 info!("Input device: {}", device.name()?);
77 let range = cpal::SupportedBufferSize::Range { min: 960, max: 960 };
78 let config = cpal::SupportedStreamConfig::new(
79 1,
80 cpal::SampleRate(48000),
81 range,
82 cpal::SampleFormat::I16,
83 );
84
85 let mut encoder = opus::Encoder::new(48000, Channels::Mono, opus::Application::Voip)?;
86 info!("Opus encoder created {:?}", encoder);
87
88 let err_fn = move |err| {
89 error!("an error occurred on stream: {}", err);
90 };
91
92 Ok(std::thread::spawn(move || {
93 let stream = match config.sample_format() {
94 cpal::SampleFormat::I16 => device.build_input_stream(
95 &config.into(),
96 move |data, _: &_| {
97 for chunk in data.chunks_exact(960) {
98 match encode_and_send_i16(chunk, &mut encoder, &quic_tx, email.clone()) {
99 Ok(_) => {}
100 Err(e) => {
101 error!("Failed to encode and send audio: {}", e);
102 }
103 }
104 }
105 },
106 err_fn,
107 None,
108 )?,
109 sample_format => {
110 return Err(anyhow::Error::msg(format!(
111 "Unsupported sample format '{sample_format}'"
112 )))
113 }
114 };
115 info!("Begin streaming audio...");
116 stream.play().expect("failed to play stream");
117
118 loop {
119 if stop.load(std::sync::atomic::Ordering::Relaxed) {
120 break;
121 }
122 std::thread::sleep(Duration::from_secs(1));
123 }
124 Ok(())
125 }))
126}
127
128fn encode_and_send_i16(
129 input: &[i16],
130 encoder: &mut opus::Encoder,
131 quic_tx: &Sender<Vec<u8>>,
132 email: String,
133) -> anyhow::Result<()> {
134 let output = encoder.encode_vec(input, 960)?;
135 let output = transform_audio_chunk(output, email, 0);
136 let output = output?.write_to_bytes()?;
137 quic_tx.try_send(output)?;
138 Ok(())
139}
140
141fn transform_audio_chunk(
142 data: Vec<u8>,
143 email: String,
144 sequence: u64,
145) -> anyhow::Result<PacketWrapper> {
146 Ok(PacketWrapper {
147 packet_type: PacketType::MEDIA.into(),
148 email: email.clone(),
149 data: MediaPacket {
150 media_type: MediaType::AUDIO.into(),
151 data,
152 email,
153 frame_type: String::from("key"),
154 timestamp: get_micros_now(),
155 duration: 0.0,
157 video_metadata: MessageField(Some(Box::new(VideoMetadata {
158 sequence,
159 ..Default::default()
160 }))),
161 ..Default::default()
162 }
163 .write_to_bytes()?,
164 ..Default::default()
165 })
166}
167
168fn get_micros_now() -> f64 {
169 let now = std::time::SystemTime::now();
170 let duration = now.duration_since(std::time::UNIX_EPOCH).unwrap();
171 duration.as_micros() as f64
172}