videocall_cli/producers/
test_pattern_sender.rs

1/*
2 * Copyright 2025 Security Union LLC
3 *
4 * Licensed under either of
5 *
6 * * Apache License, Version 2.0
7 *   (http://www.apache.org/licenses/LICENSE-2.0)
8 * * MIT license
9 *   (http://opensource.org/licenses/MIT)
10 *
11 * at your option.
12 *
13 * Unless you explicitly state otherwise, any contribution intentionally
14 * submitted for inclusion in the work by you, as defined in the Apache-2.0
15 * license, shall be dual licensed as above, without any additional terms or
16 * conditions.
17 */
18
19use crate::producers::camera::since_the_epoch;
20use crate::producers::producer::Producer;
21use anyhow::Result;
22use image::imageops::FilterType;
23use image::ImageBuffer;
24use image::ImageReader;
25use image::Rgb;
26use std::fs::read;
27use std::sync::atomic::AtomicBool;
28use std::sync::Arc;
29use std::thread::JoinHandle;
30use std::time::Duration;
31use tracing::{debug, error};
32use videocall_nokhwa::utils::FrameFormat;
33
34use tokio::sync::mpsc::{self, Sender};
35
36use super::camera::CameraConfig;
37use super::camera::CameraPacket;
38use super::encoder_thread::encoder_thread;
39
40pub struct TestPatternSender {
41    user_id: String,
42    cam_rx: Option<mpsc::Receiver<Option<CameraPacket>>>,
43    cam_tx: Arc<mpsc::Sender<Option<CameraPacket>>>,
44    quic_tx: Arc<Sender<Vec<u8>>>,
45    quit: Arc<AtomicBool>,
46    handles: Vec<JoinHandle<()>>,
47    config: CameraConfig,
48}
49
50impl TestPatternSender {
51    pub fn from_config(config: CameraConfig, user_id: String, quic_tx: Sender<Vec<u8>>) -> Self {
52        let (cam_tx, cam_rx) = mpsc::channel(100);
53        Self {
54            config,
55            user_id,
56            cam_rx: Some(cam_rx),
57            cam_tx: Arc::new(cam_tx),
58            quit: Arc::new(AtomicBool::new(false)),
59            handles: vec![],
60            quic_tx: Arc::new(quic_tx),
61        }
62    }
63
64    fn camera_thread(&self) -> Result<JoinHandle<()>> {
65        let quit = self.quit.clone();
66        let cam_tx = self.cam_tx.clone();
67        let frame_format = FrameFormat::NV12;
68        let interval = Duration::from_millis(1000 / self.config.framerate as u64);
69        let mut frames = vec![];
70
71        for i in 120..125 {
72            let path = format!("assets/images/sample_video_save/output_{}.jpg", i);
73            let img = read(path).unwrap();
74            let img = ImageReader::new(std::io::Cursor::new(img))
75                .with_guessed_format()
76                .unwrap();
77            // Transform the image to NV12 format
78            let img = img.decode().unwrap();
79            // Resize the image to value in config
80            let img = img.resize_exact(self.config.width, self.config.height, FilterType::Nearest);
81            let img = rgb_to_nv12(&img.to_rgb8());
82            frames.push(img);
83        }
84        let mut iterator = frames.into_iter().cycle();
85        // rotate the image
86        Ok(std::thread::spawn(move || {
87            debug!("Camera opened... waiting for frames");
88            loop {
89                // Check if we should quit
90                if quit.load(std::sync::atomic::Ordering::Relaxed) {
91                    return;
92                }
93                // Generate a test pattern
94                // Send the frame to the encoder
95                // Try sending the frame over the channel
96                let next = iterator.next().unwrap();
97                if let Err(e) = cam_tx.try_send(Some(CameraPacket::new(
98                    next,
99                    frame_format,
100                    since_the_epoch().as_millis(),
101                ))) {
102                    error!("Error sending image: {}", e);
103                }
104                // Sleep for the interval
105                std::thread::sleep(interval);
106            }
107        }))
108    }
109}
110
111impl Producer for TestPatternSender {
112    fn start(&mut self) -> anyhow::Result<()> {
113        self.handles.push(self.camera_thread()?);
114        let encoder = encoder_thread(
115            self.cam_rx.take().unwrap(),
116            self.quic_tx.clone(),
117            self.quit.clone(),
118            self.config.clone(),
119            self.user_id.clone(),
120        );
121        self.handles.push(encoder);
122        Ok(())
123    }
124
125    fn stop(&mut self) -> anyhow::Result<()> {
126        todo!()
127    }
128}
129
130// Function to convert a grayscale image buffer to NV12 format
131fn rgb_to_nv12(image: &ImageBuffer<Rgb<u8>, Vec<u8>>) -> Vec<u8> {
132    let width = image.width() as usize;
133    let height = image.height() as usize;
134    let mut nv12_data = vec![0u8; width * height * 3 / 2];
135
136    rgb_to_i420(image.as_raw(), width, height, &mut nv12_data);
137    nv12_data
138}
139
140pub fn rgb_to_i420(rgb: &[u8], width: usize, height: usize, i420: &mut [u8]) {
141    assert!(
142        i420.len() >= width * height * 3 / 2,
143        "Insufficient I420 buffer size"
144    );
145
146    let (y_plane, uv_planes) = i420.split_at_mut(width * height);
147    let (u_plane, v_plane) = uv_planes.split_at_mut(width * height / 4);
148
149    for y in 0..height {
150        for x in 0..width {
151            let rgb_index = (y * width + x) * 3;
152            let r = rgb[rgb_index] as f32;
153            let g = rgb[rgb_index + 1] as f32;
154            let b = rgb[rgb_index + 2] as f32;
155
156            // Calculate Y, U, V components
157            let y_value = (0.257 * r + 0.504 * g + 0.098 * b + 16.0).round() as u8;
158            let u_value = (-0.148 * r - 0.291 * g + 0.439 * b + 128.0).round() as u8;
159            let v_value = (0.439 * r - 0.368 * g - 0.071 * b + 128.0).round() as u8;
160
161            y_plane[y * width + x] = y_value;
162
163            if y % 2 == 0 && x % 2 == 0 {
164                let uv_index = (y / 2) * (width / 2) + (x / 2);
165                u_plane[uv_index] = u_value;
166                v_plane[uv_index] = v_value;
167            }
168        }
169    }
170}