Skip to main content

videocall_cli/producers/
microphone.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use cpal::traits::{DeviceTrait, HostTrait, StreamTrait};
20use opus::Channels;
21use protobuf::{Message, MessageField};
22use std::sync::atomic::AtomicBool;
23use std::sync::Arc;
24use std::thread::JoinHandle;
25use std::time::Duration;
26use tokio::sync::mpsc::Sender;
27use tracing::{error, info};
28use videocall_types::protos::media_packet::media_packet::MediaType;
29use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
30use videocall_types::protos::packet_wrapper::packet_wrapper::PacketType;
31use videocall_types::protos::packet_wrapper::PacketWrapper;
32
33pub struct MicrophoneDaemon {
34    stop: Arc<AtomicBool>,
35    handles: Vec<JoinHandle<anyhow::Result<()>>>,
36}
37
38impl Default for MicrophoneDaemon {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl MicrophoneDaemon {
45    pub fn new() -> Self {
46        Self {
47            stop: Arc::new(AtomicBool::new(false)),
48            handles: vec![],
49        }
50    }
51
52    pub fn start(
53        &mut self,
54        wt_tx: Sender<Vec<u8>>,
55        device: String,
56        email: String,
57    ) -> anyhow::Result<()> {
58        self.handles.push(start_microphone(
59            device.clone(),
60            wt_tx.clone(),
61            email,
62            self.stop.clone(),
63        )?);
64        Ok(())
65    }
66
67    pub fn stop(&mut self) {
68        self.stop.store(true, std::sync::atomic::Ordering::Relaxed);
69        for handle in self.handles.drain(..) {
70            if let Err(e) = handle.join() {
71                error!("Failed to join microphone thread: {:?}", e);
72            }
73        }
74    }
75}
76
77fn start_microphone(
78    device: String,
79    wt_tx: Sender<Vec<u8>>,
80    email: String,
81    stop: Arc<AtomicBool>,
82) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
83    let host = cpal::default_host();
84
85    // Set up the input device and stream with the default input config.
86    let device = if device == "default" {
87        host.default_input_device()
88    } else {
89        host.input_devices()?
90            .find(|x| x.name().map(|y| y == device).unwrap_or(false))
91    }
92    .expect("failed to find input device");
93
94    info!("Input device: {}", device.name()?);
95    let range = cpal::SupportedBufferSize::Range { min: 960, max: 960 };
96    let config = cpal::SupportedStreamConfig::new(
97        1,
98        cpal::SampleRate(48000),
99        range,
100        cpal::SampleFormat::I16,
101    );
102
103    let mut encoder = opus::Encoder::new(48000, Channels::Mono, opus::Application::Voip)?;
104    info!("Opus encoder created {:?}", encoder);
105
106    let err_fn = move |err| {
107        error!("an error occurred on stream: {}", err);
108    };
109
110    Ok(std::thread::spawn(move || {
111        let stream = match config.sample_format() {
112            cpal::SampleFormat::I16 => device.build_input_stream(
113                &config.into(),
114                move |data, _: &_| {
115                    for chunk in data.chunks_exact(960) {
116                        match encode_and_send_i16(chunk, &mut encoder, &wt_tx, email.clone()) {
117                            Ok(_) => {}
118                            Err(e) => {
119                                error!("Failed to encode and send audio: {}", e);
120                            }
121                        }
122                    }
123                },
124                err_fn,
125                None,
126            )?,
127            sample_format => {
128                return Err(anyhow::Error::msg(format!(
129                    "Unsupported sample format '{sample_format}'"
130                )))
131            }
132        };
133        info!("Begin streaming audio...");
134        stream.play().expect("failed to play stream");
135
136        loop {
137            if stop.load(std::sync::atomic::Ordering::Relaxed) {
138                break;
139            }
140            std::thread::sleep(Duration::from_secs(1));
141        }
142        Ok(())
143    }))
144}
145
146fn encode_and_send_i16(
147    input: &[i16],
148    encoder: &mut opus::Encoder,
149    wt_tx: &Sender<Vec<u8>>,
150    email: String,
151) -> anyhow::Result<()> {
152    let output = encoder.encode_vec(input, 960)?;
153    let output = transform_audio_chunk(output, email, 0);
154    let output_bytes = output?.write_to_bytes()?;
155    tracing::info!("Queueing AUDIO packet: {} bytes", output_bytes.len());
156    wt_tx.try_send(output_bytes)?;
157    tracing::debug!("Audio packet queued successfully");
158    Ok(())
159}
160
161fn transform_audio_chunk(
162    data: Vec<u8>,
163    email: String,
164    sequence: u64,
165) -> anyhow::Result<PacketWrapper> {
166    Ok(PacketWrapper {
167        packet_type: PacketType::MEDIA.into(),
168        email: email.clone(),
169        data: MediaPacket {
170            media_type: MediaType::AUDIO.into(),
171            data,
172            email,
173            frame_type: String::from("key"),
174            timestamp: get_micros_now(),
175            // TODO: Duration of the audio in microseconds.
176            duration: 0.0,
177            video_metadata: MessageField(Some(Box::new(VideoMetadata {
178                sequence,
179                ..Default::default()
180            }))),
181            ..Default::default()
182        }
183        .write_to_bytes()?,
184        ..Default::default()
185    })
186}
187
188fn get_micros_now() -> f64 {
189    let now = std::time::SystemTime::now();
190    let duration = now.duration_since(std::time::UNIX_EPOCH).unwrap();
191    duration.as_micros() as f64
192}