videocall_cli/producers/
camera.rs

1use crate::cli_args::IndexKind;
2use crate::video_encoder::Frame;
3use anyhow::Result;
4use protobuf::Message;
5use std::sync::atomic::AtomicBool;
6use std::sync::Arc;
7use std::thread::JoinHandle;
8use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
9use tokio::sync::mpsc::{self, Sender};
10use tracing::{debug, error, info};
11use videocall_nokhwa::pixel_format::I420Format;
12use videocall_nokhwa::utils::RequestedFormat;
13use videocall_nokhwa::utils::RequestedFormatType;
14use videocall_nokhwa::{
15    utils::{ApiBackend, CameraFormat, CameraIndex, FrameFormat},
16    Camera,
17};
18
19use videocall_types::protos::media_packet::media_packet::MediaType;
20use videocall_types::protos::media_packet::{MediaPacket, VideoMetadata};
21use videocall_types::protos::packet_wrapper::{packet_wrapper::PacketType, PacketWrapper};
22
23use super::encoder_thread::encoder_thread;
24use super::producer::Producer;
25
26pub struct CameraPacket {
27    pub data: Vec<u8>,
28    pub _format: FrameFormat,
29    pub age: u128,
30}
31
32impl CameraPacket {
33    pub fn new(data: Vec<u8>, format: FrameFormat, age: u128) -> CameraPacket {
34        CameraPacket {
35            data,
36            _format: format,
37            age,
38        }
39    }
40}
41
42pub fn transform_video_chunk(frame: &Frame, email: &str) -> PacketWrapper {
43    let frame_type = if frame.key {
44        "key".to_string()
45    } else {
46        "delta".to_string()
47    };
48    let media_packet: MediaPacket = MediaPacket {
49        data: frame.data.to_vec(),
50        frame_type,
51        email: email.to_owned(),
52        media_type: MediaType::VIDEO.into(),
53        timestamp: since_the_epoch().as_micros() as f64,
54        video_metadata: Some(VideoMetadata {
55            sequence: frame.pts as u64,
56            ..Default::default()
57        })
58        .into(),
59        ..Default::default()
60    };
61    let data = media_packet.write_to_bytes().unwrap();
62    PacketWrapper {
63        data,
64        email: media_packet.email,
65        packet_type: PacketType::MEDIA.into(),
66        ..Default::default()
67    }
68}
69
70pub static THRESHOLD_MILLIS: u128 = 1000;
71
72pub fn since_the_epoch() -> Duration {
73    SystemTime::now().duration_since(UNIX_EPOCH).unwrap()
74}
75
76#[derive(Clone, Debug)]
77pub struct CameraConfig {
78    pub width: u32,
79    pub height: u32,
80    pub framerate: u32,
81    pub video_device_index: IndexKind,
82    pub frame_format: FrameFormat,
83    pub bitrate_kbps: u32,
84    pub cpu_used: u8,
85}
86
87pub struct CameraDaemon {
88    config: CameraConfig,
89    user_id: String,
90    cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
91    cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
92    quic_tx: Arc<Sender<Vec<u8>>>,
93    quit: Arc<AtomicBool>,
94    handles: Vec<JoinHandle<()>>,
95}
96
97impl Producer for CameraDaemon {
98    fn start(&mut self) -> Result<()> {
99        self.handles.push(self.camera_thread()?);
100        let encoder = encoder_thread(
101            self.cam_rx.take().unwrap(),
102            self.quic_tx.clone(),
103            self.quit.clone(),
104            self.config.clone(),
105            self.user_id.clone(),
106        );
107        self.handles.push(encoder);
108        Ok(())
109    }
110
111    fn stop(&mut self) -> Result<()> {
112        self.quit.store(true, std::sync::atomic::Ordering::Relaxed);
113        for handle in self.handles.drain(..) {
114            handle.join().unwrap();
115        }
116        Ok(())
117    }
118}
119
120impl CameraDaemon {
121    pub fn from_config(
122        config: CameraConfig,
123        user_id: String,
124        quic_tx: Sender<Vec<u8>>,
125    ) -> CameraDaemon {
126        let (cam_tx, cam_rx) = mpsc::channel(100);
127        CameraDaemon {
128            config,
129            user_id,
130            cam_rx: Some(cam_rx),
131            cam_tx: Arc::new(cam_tx),
132            quit: Arc::new(AtomicBool::new(false)),
133            handles: vec![],
134            quic_tx: Arc::new(quic_tx),
135        }
136    }
137
138    fn camera_thread(&self) -> Result<JoinHandle<()>> {
139        let devices = videocall_nokhwa::query(ApiBackend::Auto)?;
140        for (i, camera_info) in devices.iter().enumerate() {
141            info!("AVAILABLE CAMERA DEVICE INDEX {}: {:?}", i, camera_info);
142        }
143        let cam_tx = self.cam_tx.clone();
144        let width = self.config.width;
145        let height = self.config.height;
146        let framerate = self.config.framerate;
147        let frame_format = self.config.frame_format;
148        let video_device_index = match &self.config.video_device_index {
149            IndexKind::String(s) => CameraIndex::String(s.clone()),
150            IndexKind::Index(i) => CameraIndex::Index(*i),
151        };
152        let quit = self.quit.clone();
153        Ok(std::thread::spawn(move || {
154            debug!("Camera opened... waiting for frames");
155            let mut camera = match Camera::new(
156                video_device_index,
157                RequestedFormat::new::<I420Format>(RequestedFormatType::Exact(
158                    CameraFormat::new_from(width, height, frame_format, framerate),
159                )),
160            ) {
161                Ok(camera) => camera,
162                Err(e) => {
163                    panic!("{}\n please run 'info --list-formats' to see the available resolutions, frame formats and fps", e)
164                }
165            };
166            let actual_resolution = camera.resolution();
167            camera.open_stream().unwrap();
168            println!("Requested format: {:?}", frame_format);
169            println!("Actual stream format: {:?}", camera.frame_format());
170
171            // Allocate buffer for raw data based on actual format
172            let mut i420_image_buffer = vec![
173                0u8;
174                buffer_size_i420(actual_resolution.width(), actual_resolution.height())
175                    as usize
176            ];
177
178            let frame_time = Duration::from_millis(1000u64 / framerate as u64);
179            let mut last_frame_time = Instant::now();
180            loop {
181                // use last_frame_time to calculate if we should skip this frame
182                let elapsed = last_frame_time.elapsed();
183                if elapsed < frame_time {
184                    continue;
185                }
186                last_frame_time = Instant::now();
187                let frame = camera.frame().unwrap();
188                frame
189                    .decode_image_to_buffer::<I420Format>(&mut i420_image_buffer)
190                    .unwrap();
191                // Check if we should quit
192                if quit.load(std::sync::atomic::Ordering::Relaxed) {
193                    info!("Quit signal received, exiting frame loop.");
194                    return;
195                }
196
197                // Try sending the frame over the channel
198                if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
199                    i420_image_buffer.clone(),
200                    frame_format,
201                    since_the_epoch().as_millis(),
202                ))) {
203                    error!("Error sending image: {}", e);
204                }
205            }
206        }))
207    }
208}
209
210pub fn buffer_size_i420(width: u32, height: u32) -> u32 {
211    width
212        .checked_mul(height)
213        .and_then(|y_size| y_size.checked_add(y_size / 2)) // Total size = Y + U + V
214        .expect("Buffer size calculation overflowed")
215}