videocall_cli/producers/
camera.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use crate::cli_args::IndexKind;
20use crate::video_encoder::Frame;
21use anyhow::Result;
22use protobuf::Message;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::thread::JoinHandle;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tokio::sync::mpsc::{self, Sender};
28use tracing::{debug, error, info};
29use videocall_nokhwa::pixel_format::I420Format;
30use videocall_nokhwa::utils::RequestedFormat;
31use videocall_nokhwa::utils::RequestedFormatType;
32use videocall_nokhwa::{
33    utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat},
34    Camera,
35};
36
37use videocall_types::protos::media_packet::media_packet::MediaType;
38use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
39use videocall_types::protos::packet_wrapper::{packet_wrapper::PacketType, PacketWrapper};
40
41use super::encoder_thread::encoder_thread;
42use super::producer::Producer;
43
44pub struct CameraPacket {
45    pub data: Vec<u8>,
46    pub _format: FrameFormat,
47    pub age: u128,
48}
49
50impl CameraPacket {
51    pub fn new(data: Vec<u8>, format: FrameFormat, age: u128) -> CameraPacket {
52        CameraPacket {
53            data,
54            _format: format,
55            age,
56        }
57    }
58}
59
60pub fn transform_video_chunk(frame: &Frame, email: &str) -> PacketWrapper {
61    let frame_type = if frame.key {
62        "key".to_string()
63    } else {
64        "delta".to_string()
65    };
66    let media_packet: MediaPacket = MediaPacket {
67        data: frame.data.to_vec(),
68        frame_type,
69        email: email.to_owned(),
70        media_type: MediaType::VIDEO.into(),
71        timestamp: since_the_epoch().as_micros() as f64,
72        video_metadata: Some(VideoMetadata {
73            sequence: frame.pts as u64,
74            ..Default::default()
75        })
76        .into(),
77        ..Default::default()
78    };
79    let data = media_packet.write_to_bytes().unwrap();
80    PacketWrapper {
81        data,
82        email: media_packet.email,
83        packet_type: PacketType::MEDIA.into(),
84        ..Default::default()
85    }
86}
87
88pub static THRESHOLD_MILLIS: u128 = 1000;
89
90pub fn since_the_epoch() -> Duration {
91    SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
92}
93
94#[derive(Clone, Debug)]
95pub struct CameraConfig {
96    pub width: u32,
97    pub height: u32,
98    pub framerate: u32,
99    pub video_device_index: IndexKind,
100    pub frame_format: FrameFormat,
101    pub bitrate_kbps: u32,
102    pub cpu_used: u8,
103}
104
105pub struct CameraDaemon {
106    config: CameraConfig,
107    user_id: String,
108    cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
109    cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
110    quic_tx: Arc<Sender<Vec<u8>>>,
111    quit: Arc<AtomicBool>,
112    handles: Vec<JoinHandle<()>>,
113}
114
115impl Producer for CameraDaemon {
116    fn start(&mut self) -> Result<()> {
117        self.handles.push(self.camera_thread()?);
118        let encoder = encoder_thread(
119            self.cam_rx.take().unwrap(),
120            self.quic_tx.clone(),
121            self.quit.clone(),
122            self.config.clone(),
123            self.user_id.clone(),
124        );
125        self.handles.push(encoder);
126        Ok(())
127    }
128
129    fn stop(&mut self) -> Result<()> {
130        self.quit.store(true, std::sync::atomic::Ordering::Relaxed);
131        for handle in self.handles.drain(..) {
132            handle.join().unwrap();
133        }
134        Ok(())
135    }
136}
137
138impl CameraDaemon {
139    pub fn from_config(
140        config: CameraConfig,
141        user_id: String,
142        quic_tx: Sender<Vec<u8>>,
143    ) -> CameraDaemon {
144        let (cam_tx, cam_rx) = mpsc::channel(100);
145        CameraDaemon {
146            config,
147            user_id,
148            cam_rx: Some(cam_rx),
149            cam_tx: Arc::new(cam_tx),
150            quit: Arc::new(AtomicBool::new(false)),
151            handles: vec![],
152            quic_tx: Arc::new(quic_tx),
153        }
154    }
155
156    fn camera_thread(&self) -> Result<JoinHandle<()>> {
157        let devices = videocall_nokhwa::query(ApiBackend::Auto)?;
158        for (i, camera_info) in devices.iter().enumerate() {
159            info!("AVAILABLE CAMERA DEVICE INDEX {}: {:?}", i, camera_info);
160        }
161        let cam_tx = self.cam_tx.clone();
162        let width = self.config.width;
163        let height = self.config.height;
164        let framerate = self.config.framerate;
165        let frame_format = self.config.frame_format;
166        let video_device_index = match &self.config.video_device_index {
167            IndexKind::String(s) => CameraIndex::String(s.clone()),
168            IndexKind::Index(i) => CameraIndex::Index(*i),
169        };
170        let quit = self.quit.clone();
171        Ok(std::thread::spawn(move || {
172            debug!("Camera opened... waiting for frames");
173            let mut camera = match Camera::new(
174                video_device_index,
175                RequestedFormat::new::<I420Format>(RequestedFormatType::Exact(
176                    CameraFormat::new_from(width, height, frame_format, framerate),
177                )),
178            ) {
179                Ok(camera) => camera,
180                Err(e) => {
181                    panic!("{}\n please run 'info --list-formats' to see the available resolutions, frame formats and fps", e)
182                }
183            };
184            let actual_resolution = camera.resolution();
185            camera.open_stream().unwrap();
186            println!("Requested format: {:?}", frame_format);
187            println!("Actual stream format: {:?}", camera.frame_format());
188
189            // Allocate buffer for raw data based on actual format
190            let mut i420_image_buffer = vec![
191                0u8;
192                buffer_size_i420(actual_resolution.width(), actual_resolution.height())
193                    as usize
194            ];
195
196            let frame_time = Duration::from_millis(1000u64 / framerate as u64);
197            let mut last_frame_time = Instant::now();
198            loop {
199                // use last_frame_time to calculate if we should skip this frame
200                let elapsed = last_frame_time.elapsed();
201                if elapsed < frame_time {
202                    continue;
203                }
204                last_frame_time = Instant::now();
205                let frame = camera.frame().unwrap();
206                frame
207                    .decode_image_to_buffer::<I420Format>(&mut i420_image_buffer)
208                    .unwrap();
209                // Check if we should quit
210                if quit.load(std::sync::atomic::Ordering::Relaxed) {
211                    info!("Quit signal received, exiting frame loop.");
212                    return;
213                }
214
215                // Try sending the frame over the channel
216                if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
217                    i420_image_buffer.clone(),
218                    frame_format,
219                    since_the_epoch().as_millis(),
220                ))) {
221                    error!("Error sending image: {}", e);
222                }
223            }
224        }))
225    }
226}
227
228pub fn buffer_size_i420(width: u32, height: u32) -> u32 {
229    width
230        .checked_mul(height)
231        .and_then(|y_size| y_size.checked_add(y_size / 2)) // Total size = Y + U + V
232        .expect("Buffer size calculation overflowed")
233}