rs2_stream/media/
streaming.rs

1//! Main streaming implementation
2
3use super::priority_queue::MediaPriorityQueue;
4use super::types::*;
5use crate::stream_performance_metrics::StreamMetrics;
6use crate::{auto_backpressure_block, tick, unfold};
7use crate::{auto_backpressure_drop_newest, from_iter, throttle, RS2Stream, RS2StreamExt};
8use futures_core::Stream;
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::fs::File;
12use tokio::io::AsyncReadExt;
13
14pub struct MediaStreamingService {
15    chunk_queue: Arc<MediaPriorityQueue>,
16    metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
17}
18
19impl MediaStreamingService {
20    pub fn new(buffer_capacity: usize) -> Self {
21        Self {
22            chunk_queue: Arc::new(MediaPriorityQueue::new(buffer_capacity, 64)),
23            metrics: Arc::new(tokio::sync::Mutex::new(
24                StreamMetrics::new().with_name("media-stream".to_string()),
25            )),
26        }
27    }
28
29    /// Start streaming from a file
30    pub async fn start_file_stream(
31        &self,
32        file_path: PathBuf,
33        stream_config: MediaStream,
34    ) -> RS2Stream<MediaChunk> {
35        let file = self.acquire_file_resource(file_path).await;
36        let chunk_queue = Arc::clone(&self.chunk_queue);
37        let metrics = Arc::clone(&self.metrics);
38
39        self.create_chunk_stream(file, stream_config, chunk_queue, metrics)
40    }
41
42    /// Start streaming from live input (camera, microphone, etc.)
43    pub async fn start_live_stream(&self, stream_config: MediaStream) -> RS2Stream<MediaChunk> {
44        let chunk_queue = Arc::clone(&self.chunk_queue);
45        let metrics = Arc::clone(&self.metrics);
46
47        // Create live stream using from_iter with throttling
48        auto_backpressure_drop_newest(
49            throttle(
50                from_iter(0u64..)
51                    .take_rs2(
52                        stream_config
53                            .metadata
54                            .get("max_chunks")
55                            .and_then(|s| s.parse().ok())
56                            .unwrap_or(u64::MAX as usize),
57                    )
58                    .par_eval_map_rs2(4, move |sequence| {
59                        let queue = Arc::clone(&chunk_queue);
60                        let metrics = Arc::clone(&metrics);
61                        let config = stream_config.clone();
62
63                        async move {
64                            // Simulate live capture - create chunk directly here
65                            tokio::time::sleep(std::time::Duration::from_micros(100)).await;
66
67                            let chunk = MediaChunk {
68                                stream_id: config.id.clone(),
69                                sequence_number: sequence,
70                                data: vec![0u8; config.chunk_size],
71                                chunk_type: if sequence % 30 == 0 {
72                                    ChunkType::VideoIFrame
73                                } else if sequence % 3 == 0 {
74                                    ChunkType::VideoBFrame
75                                } else {
76                                    ChunkType::VideoPFrame
77                                },
78                                priority: if sequence % 30 == 0 {
79                                    MediaPriority::High
80                                } else if sequence % 3 == 0 {
81                                    MediaPriority::Low
82                                } else {
83                                    MediaPriority::Normal
84                                },
85                                timestamp: std::time::Duration::from_millis(sequence * 33),
86                                is_final: false,
87                                checksum: None,
88                            };
89
90                            // Update metrics
91                            {
92                                let mut m = metrics.lock().await;
93                                m.items_processed += 1;
94                                m.bytes_processed += chunk.data.len() as u64;
95                                m.average_item_size =
96                                    m.bytes_processed as f64 / m.items_processed as f64;
97                                m.last_activity = Some(std::time::Instant::now());
98                            }
99
100                            // Try to enqueue (don't block for live streaming)
101                            if let Err(_) = queue.try_enqueue(chunk.clone()).await {
102                                let mut m = metrics.lock().await;
103                                m.errors += 1;
104                            }
105
106                            chunk
107                        }
108                    }),
109                std::time::Duration::from_millis(33), // ~30fps
110            ),
111            512,
112        )
113    }
114
115    async fn acquire_file_resource(&self, path: PathBuf) -> File {
116        File::open(&path)
117            .await
118            .unwrap_or_else(|e| panic!("Failed to open media file {:?}: {}", path, e))
119    }
120
121    fn create_chunk_stream(
122        &self,
123        file: File,
124        config: MediaStream,
125        queue: Arc<MediaPriorityQueue>,
126        metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
127    ) -> RS2Stream<MediaChunk> {
128        // Use unfold to read file sequentially
129        auto_backpressure_block(
130            unfold((file, 0u64), move |state| {
131                let queue = Arc::clone(&queue);
132                let metrics = Arc::clone(&metrics);
133                let config = config.clone();
134
135                async move {
136                    let (mut file, sequence) = state;
137
138                    // Read chunk from file
139                    let mut buffer = vec![0u8; config.chunk_size];
140                    match file.read(&mut buffer).await {
141                        Ok(0) => {
142                            // EOF reached
143                            None
144                        }
145                        Ok(bytes_read) => {
146                            // Truncate buffer to actual bytes read
147                            buffer.truncate(bytes_read);
148
149                            // Determine chunk type inline
150                            let chunk_type = if sequence % 30 == 0 {
151                                ChunkType::VideoIFrame
152                            } else if sequence % 3 == 0 {
153                                ChunkType::VideoBFrame
154                            } else {
155                                ChunkType::VideoPFrame
156                            };
157
158                            // Determine priority inline
159                            let priority = if sequence % 30 == 0 {
160                                MediaPriority::High
161                            } else if sequence % 3 == 0 {
162                                MediaPriority::Low
163                            } else {
164                                MediaPriority::Normal
165                            };
166
167                            let chunk = MediaChunk {
168                                stream_id: config.id.clone(),
169                                sequence_number: sequence,
170                                data: buffer,
171                                chunk_type,
172                                priority,
173                                timestamp: std::time::Duration::from_millis(sequence * 33),
174                                is_final: bytes_read < config.chunk_size,
175                                checksum: None,
176                            };
177
178                            // Update metrics inline
179                            {
180                                let mut m = metrics.lock().await;
181                                m.items_processed += 1;
182                                m.bytes_processed += chunk.data.len() as u64;
183                                m.average_item_size =
184                                    m.bytes_processed as f64 / m.items_processed as f64;
185                                m.last_activity = Some(std::time::Instant::now());
186                            }
187
188                            // Enqueue with priority
189                            if let Err(_) = queue.enqueue(chunk.clone()).await {
190                                let mut m = metrics.lock().await;
191                                m.errors += 1;
192                            }
193
194                            Some((chunk, (file, sequence + 1)))
195                        }
196                        Err(e) => {
197                            log::error!("Error reading file: {}", e);
198                            None
199                        }
200                    }
201                }
202            }),
203            256,
204        )
205    }
206
207    /// Create a chunk for live streaming
208    async fn create_live_chunk(&self, config: &MediaStream, sequence: u64) -> MediaChunk {
209        // Simulate capturing from live source
210        tokio::time::sleep(std::time::Duration::from_micros(100)).await;
211
212        MediaChunk {
213            stream_id: config.id.clone(),
214            sequence_number: sequence,
215            data: vec![0u8; config.chunk_size], // Mock data - replace with actual capture
216            chunk_type: self.determine_chunk_type(sequence),
217            priority: self.determine_priority(sequence),
218            timestamp: std::time::Duration::from_millis(sequence * 33),
219            is_final: false, // Live streams don't end
220            checksum: None,
221        }
222    }
223
224    /// Determine chunk type based on sequence
225    pub fn determine_chunk_type(&self, sequence: u64) -> ChunkType {
226        if sequence % 30 == 0 {
227            ChunkType::VideoIFrame // Keyframe every 30 frames
228        } else if sequence % 3 == 0 {
229            ChunkType::VideoBFrame // B-frame every 3rd frame
230        } else {
231            ChunkType::VideoPFrame // P-frame otherwise
232        }
233    }
234
235    /// Determine priority based on sequence and chunk type
236    pub fn determine_priority(&self, sequence: u64) -> MediaPriority {
237        if sequence % 30 == 0 {
238            MediaPriority::High // I-frames are high priority
239        } else if sequence % 3 == 0 {
240            MediaPriority::Low // B-frames are low priority
241        } else {
242            MediaPriority::Normal // P-frames are normal priority
243        }
244    }
245
246    /// Update metrics efficiently
247    async fn update_metrics(
248        &self,
249        metrics: &Arc<tokio::sync::Mutex<StreamMetrics>>,
250        chunk: &MediaChunk,
251    ) {
252        let mut m = metrics.lock().await;
253        m.items_processed += 1;
254        m.bytes_processed += chunk.data.len() as u64;
255        m.average_item_size = m.bytes_processed as f64 / m.items_processed as f64;
256        m.last_activity = Some(std::time::Instant::now());
257    }
258
259    /// Get stream from priority queue
260    pub fn get_chunk_stream(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
261        self.chunk_queue.dequeue()
262    }
263
264    /// Get current metrics with updated buffer utilization
265    pub async fn get_metrics(&self) -> StreamMetrics {
266        let metrics = self.metrics.lock().await;
267        metrics.clone()
268    }
269
270    /// Create a metrics monitoring stream
271    pub fn get_metrics_stream(&self) -> RS2Stream<StreamMetrics> {
272        let metrics = Arc::clone(&self.metrics);
273
274        tick(std::time::Duration::from_secs(1), ()).par_eval_map_rs2(1, move |_| {
275            let metrics = Arc::clone(&metrics);
276
277            async move {
278                let m = metrics.lock().await;
279                m.clone()
280            }
281        })
282    }
283
284    /// Gracefully shutdown the streaming service
285    pub async fn shutdown(&self) {
286        log::info!("Shutting down media streaming service");
287        self.chunk_queue.close().await;
288    }
289}
290
291/// Factory for creating different types of streaming services
292pub struct StreamingServiceFactory;
293
294impl StreamingServiceFactory {
295    /// Create service optimized for live streaming
296    pub fn create_live_streaming_service() -> MediaStreamingService {
297        MediaStreamingService::new(2048) // Larger buffer for live
298    }
299
300    /// Create service optimized for file streaming
301    pub fn create_file_streaming_service() -> MediaStreamingService {
302        MediaStreamingService::new(512) // Smaller buffer for files
303    }
304
305    /// Create service optimized for low-latency streaming
306    pub fn create_low_latency_service() -> MediaStreamingService {
307        MediaStreamingService::new(128) // Very small buffer for low latency
308    }
309}