Skip to main content

videocall_cli/producers/
camera.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use crate::cli_args::IndexKind;
20use crate::video_encoder::Frame;
21use anyhow::Result;
22use protobuf::Message;
23use std::sync::atomic::AtomicBool;
24use std::sync::Arc;
25use std::thread::JoinHandle;
26use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
27use tokio::sync::mpsc::{self, Sender};
28use tracing::{debug, error, info};
29use videocall_nokhwa::pixel_format::I420Format;
30use videocall_nokhwa::utils::RequestedFormat;
31use videocall_nokhwa::utils::RequestedFormatType;
32use videocall_nokhwa::{
33    utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat},
34    Camera,
35};
36
37use videocall_types::protos::media_packet::media_packet::MediaType;
38use videocall_types::protos::media_packet::{MediaPacket, VideoCodec, VideoMetadata};
39use videocall_types::protos::packet_wrapper::{packet_wrapper::PacketType, PacketWrapper};
40
41use super::encoder_thread::encoder_thread;
42use super::producer::Producer;
43
44pub struct CameraPacket {
45    pub data: Vec<u8>,
46    pub _format: FrameFormat,
47    pub age: u128,
48}
49
50impl CameraPacket {
51    pub fn new(data: Vec<u8>, format: FrameFormat, age: u128) -> CameraPacket {
52        CameraPacket {
53            data,
54            _format: format,
55            age,
56        }
57    }
58}
59
60pub fn transform_video_chunk(frame: &Frame, email: &str) -> PacketWrapper {
61    let frame_type = if frame.key {
62        "key".to_string()
63    } else {
64        "delta".to_string()
65    };
66    let media_packet: MediaPacket = MediaPacket {
67        data: frame.data.to_vec(),
68        frame_type,
69        email: email.to_owned(),
70        media_type: MediaType::VIDEO.into(),
71        timestamp: since_the_epoch().as_micros() as f64,
72        video_metadata: Some(VideoMetadata {
73            sequence: frame.pts as u64,
74            codec: VideoCodec::VP9_PROFILE0_LEVEL10_8BIT.into(),
75            ..Default::default()
76        })
77        .into(),
78        ..Default::default()
79    };
80    let data = media_packet.write_to_bytes().unwrap();
81    PacketWrapper {
82        data,
83        email: media_packet.email,
84        packet_type: PacketType::MEDIA.into(),
85        ..Default::default()
86    }
87}
88
89pub static THRESHOLD_MILLIS: u128 = 1000;
90
91pub fn since_the_epoch() -> Duration {
92    SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
93}
94
95#[derive(Clone, Debug)]
96pub struct CameraConfig {
97    pub width: u32,
98    pub height: u32,
99    pub framerate: u32,
100    pub video_device_index: IndexKind,
101    pub frame_format: FrameFormat,
102    pub bitrate_kbps: u32,
103    pub cpu_used: u8,
104}
105
106pub struct CameraDaemon {
107    config: CameraConfig,
108    user_id: String,
109    cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
110    cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
111    wt_tx: Sender<Vec<u8>>,
112    quit: Arc<AtomicBool>,
113    handles: Vec<JoinHandle<()>>,
114}
115
116impl Producer for CameraDaemon {
117    fn start(&mut self) -> Result<()> {
118        self.handles.push(self.camera_thread()?);
119        let encoder = encoder_thread(
120            self.cam_rx.take().unwrap(),
121            self.wt_tx.clone(),
122            self.quit.clone(),
123            self.config.clone(),
124            self.user_id.clone(),
125        );
126        self.handles.push(encoder);
127        Ok(())
128    }
129
130    fn stop(&mut self) -> Result<()> {
131        self.quit.store(true, std::sync::atomic::Ordering::Relaxed);
132        for handle in self.handles.drain(..) {
133            handle.join().unwrap();
134        }
135        Ok(())
136    }
137}
138
139impl CameraDaemon {
140    pub fn from_config(
141        config: CameraConfig,
142        user_id: String,
143        wt_tx: Sender<Vec<u8>>,
144    ) -> CameraDaemon {
145        let (cam_tx, cam_rx) = mpsc::channel(100);
146        CameraDaemon {
147            config,
148            user_id,
149            cam_rx: Some(cam_rx),
150            cam_tx: Arc::new(cam_tx),
151            quit: Arc::new(AtomicBool::new(false)),
152            handles: vec![],
153            wt_tx,
154        }
155    }
156
157    fn camera_thread(&self) -> Result<JoinHandle<()>> {
158        let devices = videocall_nokhwa::query(ApiBackend::Auto)?;
159        for (i, camera_info) in devices.iter().enumerate() {
160            info!("AVAILABLE CAMERA DEVICE INDEX {}: {:?}", i, camera_info);
161        }
162        let cam_tx = self.cam_tx.clone();
163        let width = self.config.width;
164        let height = self.config.height;
165        let framerate = self.config.framerate;
166        let frame_format = self.config.frame_format;
167        let video_device_index = match &self.config.video_device_index {
168            IndexKind::String(s) => CameraIndex::String(s.clone()),
169            IndexKind::Index(i) => CameraIndex::Index(*i),
170        };
171        let quit = self.quit.clone();
172        Ok(std::thread::spawn(move || {
173            debug!("Camera opened... waiting for frames");
174            let mut camera = match Camera::new(
175                video_device_index,
176                RequestedFormat::new::<I420Format>(RequestedFormatType::Exact(
177                    CameraFormat::new_from(width, height, frame_format, framerate),
178                )),
179            ) {
180                Ok(camera) => camera,
181                Err(e) => {
182                    panic!("{e}\n please run 'info --list-formats' to see the available resolutions, frame formats and fps")
183                }
184            };
185            let actual_resolution = camera.resolution();
186            camera.open_stream().unwrap();
187            println!("Requested format: {frame_format:?}");
188            println!("Actual stream format: {:?}", camera.frame_format());
189
190            // Allocate buffer for raw data based on actual format
191            let mut i420_image_buffer = vec![
192                0u8;
193                buffer_size_i420(actual_resolution.width(), actual_resolution.height())
194                    as usize
195            ];
196
197            let frame_time = Duration::from_millis(1000u64 / framerate as u64);
198            let mut last_frame_time = Instant::now();
199            loop {
200                // use last_frame_time to calculate if we should skip this frame
201                let elapsed = last_frame_time.elapsed();
202                if elapsed < frame_time {
203                    continue;
204                }
205                last_frame_time = Instant::now();
206                let frame = camera.frame().unwrap();
207                frame
208                    .decode_image_to_buffer::<I420Format>(&mut i420_image_buffer)
209                    .unwrap();
210                // Check if we should quit
211                if quit.load(std::sync::atomic::Ordering::Relaxed) {
212                    info!("Quit signal received, exiting frame loop.");
213                    return;
214                }
215
216                // Try sending the frame over the channel
217                if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
218                    i420_image_buffer.clone(),
219                    frame_format,
220                    since_the_epoch().as_millis(),
221                ))) {
222                    error!("Error sending image: {}", e);
223                }
224            }
225        }))
226    }
227}
228
229pub fn buffer_size_i420(width: u32, height: u32) -> u32 {
230    width
231        .checked_mul(height)
232        .and_then(|y_size| y_size.checked_add(y_size / 2)) // Total size = Y + U + V
233        .expect("Buffer size calculation overflowed")
234}