videocall_cli/producers/
microphone.rs

1use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
2use opus::Channels;
3use protobuf::{Message, MessageField};
4use std::sync::atomic::AtomicBool;
5use std::sync::Arc;
6use std::thread::JoinHandle;
7use std::time::Duration;
8use tokio::sync::mpsc::Sender;
9use tracing::{error, info};
10use videocall_types::protos::media_packet::media_packet::MediaType;
11use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
12use videocall_types::protos::packet_wrapper::packet_wrapper::PacketType;
13use videocall_types::protos::packet_wrapper::PacketWrapper;
14
15pub struct MicrophoneDaemon {
16    stop: Arc<AtomicBool>,
17    handles: Vec<JoinHandle<anyhow::Result<()>>>,
18}
19
20impl Default for MicrophoneDaemon {
21    fn default() -> Self {
22        Self::new()
23    }
24}
25
26impl MicrophoneDaemon {
27    pub fn new() -> Self {
28        Self {
29            stop: Arc::new(AtomicBool::new(false)),
30            handles: vec![],
31        }
32    }
33
34    pub fn start(
35        &mut self,
36        quic_tx: Sender<Vec<u8>>,
37        device: String,
38        email: String,
39    ) -> anyhow::Result<()> {
40        self.handles.push(start_microphone(
41            device.clone(),
42            quic_tx.clone(),
43            email,
44            self.stop.clone(),
45        )?);
46        Ok(())
47    }
48
49    pub fn stop(&mut self) {
50        self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
51        for handle in self.handles.drain(..) {
52            if let Err(e) = handle.join() {
53                error!("Failed to join microphone thread: {:?}", e);
54            }
55        }
56    }
57}
58
59fn start_microphone(
60    device: String,
61    quic_tx: Sender<Vec<u8>>,
62    email: String,
63    stop: Arc<AtomicBool>,
64) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
65    let host = cpal::default_host();
66
67    // Set up the input device and stream with the default input config.
68    let device = if device == "default" {
69        host.default_input_device()
70    } else {
71        host.input_devices()?
72            .find(|x| x.name().map(|y| y == device).unwrap_or(false))
73    }
74    .expect("failed to find input device");
75
76    info!("Input device: {}", device.name()?);
77    let range = cpal::SupportedBufferSize::Range { min: 960, max: 960 };
78    let config = cpal::SupportedStreamConfig::new(
79        1,
80        cpal::SampleRate(48000),
81        range,
82        cpal::SampleFormat::I16,
83    );
84
85    let mut encoder = opus::Encoder::new(48000, Channels::Mono, opus::Application::Voip)?;
86    info!("Opus encoder created {:?}", encoder);
87
88    let err_fn = move |err| {
89        error!("an error occurred on stream: {}", err);
90    };
91
92    Ok(std::thread::spawn(move || {
93        let stream = match config.sample_format() {
94            cpal::SampleFormat::I16 => device.build_input_stream(
95                &config.into(),
96                move |data, _: &_| {
97                    for chunk in data.chunks_exact(960) {
98                        match encode_and_send_i16(chunk, &mut encoder, &quic_tx, email.clone()) {
99                            Ok(_) => {}
100                            Err(e) => {
101                                error!("Failed to encode and send audio: {}", e);
102                            }
103                        }
104                    }
105                },
106                err_fn,
107                None,
108            )?,
109            sample_format => {
110                return Err(anyhow::Error::msg(format!(
111                    "Unsupported sample format '{sample_format}'"
112                )))
113            }
114        };
115        info!("Begin streaming audio...");
116        stream.play().expect("failed to play stream");
117
118        loop {
119            if stop.load(std::sync::atomic::Ordering::Relaxed) {
120                break;
121            }
122            std::thread::sleep(Duration::from_secs(1));
123        }
124        Ok(())
125    }))
126}
127
128fn encode_and_send_i16(
129    input: &[i16],
130    encoder: &mut opus::Encoder,
131    quic_tx: &Sender<Vec<u8>>,
132    email: String,
133) -> anyhow::Result<()> {
134    let output = encoder.encode_vec(input, 960)?;
135    let output = transform_audio_chunk(output, email, 0);
136    let output = output?.write_to_bytes()?;
137    quic_tx.try_send(output)?;
138    Ok(())
139}
140
141fn transform_audio_chunk(
142    data: Vec<u8>,
143    email: String,
144    sequence: u64,
145) -> anyhow::Result<PacketWrapper> {
146    Ok(PacketWrapper {
147        packet_type: PacketType::MEDIA.into(),
148        email: email.clone(),
149        data: MediaPacket {
150            media_type: MediaType::AUDIO.into(),
151            data,
152            email,
153            frame_type: String::from("key"),
154            timestamp: get_micros_now(),
155            // TODO: Duration of the audio in microseconds.
156            duration: 0.0,
157            video_metadata: MessageField(Some(Box::new(VideoMetadata {
158                sequence,
159                ..Default::default()
160            }))),
161            ..Default::default()
162        }
163        .write_to_bytes()?,
164        ..Default::default()
165    })
166}
167
168fn get_micros_now() -> f64 {
169    let now = std::time::SystemTime::now();
170    let duration = now.duration_since(std::time::UNIX_EPOCH).unwrap();
171    duration.as_micros() as f64
172}