rs2_stream/media/
streaming.rs

1//! Main streaming implementation
2
3use super::priority_queue::MediaPriorityQueue;
4use super::types::*;
5use crate::queue::Queue;
6use crate::*;
7use futures_core::Stream;
8use std::path::PathBuf;
9use std::sync::Arc;
10use tokio::fs::File;
11use tokio::io::{AsyncReadExt, BufReader};
12
13pub struct MediaStreamingService {
14    chunk_queue: Arc<MediaPriorityQueue>,
15    metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
16}
17
18impl MediaStreamingService {
19    pub fn new(buffer_capacity: usize) -> Self {
20        Self {
21            chunk_queue: Arc::new(MediaPriorityQueue::new(buffer_capacity, 64)),
22            metrics: Arc::new(tokio::sync::Mutex::new(StreamMetrics {
23                stream_id: String::new(),
24                bytes_processed: 0,
25                chunks_processed: 0,
26                dropped_chunks: 0,
27                average_chunk_size: 0.0,
28                buffer_utilization: 0.0,
29                last_updated: chrono::Utc::now(),
30            })),
31        }
32    }
33
34    /// Start streaming from a file
35    pub async fn start_file_stream(
36        &self,
37        file_path: PathBuf,
38        stream_config: MediaStream,
39    ) -> RS2Stream<MediaChunk> {
40        let file = self.acquire_file_resource(file_path).await;
41        let chunk_queue = Arc::clone(&self.chunk_queue);
42        let metrics = Arc::clone(&self.metrics);
43
44        self.create_chunk_stream(file, stream_config, chunk_queue, metrics)
45    }
46
47    /// Start streaming from live input (camera, microphone, etc.)
48    pub async fn start_live_stream(
49        &self,
50        stream_config: MediaStream,
51    ) -> RS2Stream<MediaChunk> {
52        let chunk_queue = Arc::clone(&self.chunk_queue);
53        let metrics = Arc::clone(&self.metrics);
54
55        // Create live stream using from_iter with throttling
56        auto_backpressure_drop_newest(
57            throttle(
58                from_iter(0u64..)
59                    .take_rs2(stream_config.metadata.get("max_chunks")
60                        .and_then(|s| s.parse().ok())
61                        .unwrap_or(u64::MAX as usize))
62                    .par_eval_map_rs2(4, move |sequence| {
63                        let queue = Arc::clone(&chunk_queue);
64                        let metrics = Arc::clone(&metrics);
65                        let config = stream_config.clone();
66
67                        async move {
68                            // Simulate live capture - create chunk directly here
69                            tokio::time::sleep(std::time::Duration::from_micros(100)).await;
70
71                            let chunk = MediaChunk {
72                                stream_id: config.id.clone(),
73                                sequence_number: sequence,
74                                data: vec![0u8; config.chunk_size],
75                                chunk_type: if sequence % 30 == 0 {
76                                    ChunkType::VideoIFrame
77                                } else if sequence % 3 == 0 {
78                                    ChunkType::VideoBFrame
79                                } else {
80                                    ChunkType::VideoPFrame
81                                },
82                                priority: if sequence % 30 == 0 {
83                                    MediaPriority::High
84                                } else if sequence % 3 == 0 {
85                                    MediaPriority::Low
86                                } else {
87                                    MediaPriority::Normal
88                                },
89                                timestamp: std::time::Duration::from_millis(sequence * 33),
90                                is_final: false,
91                                checksum: None,
92                            };
93
94                            // Update metrics
95                            {
96                                let mut m = metrics.lock().await;
97                                m.chunks_processed += 1;
98                                m.bytes_processed += chunk.data.len() as u64;
99                                m.average_chunk_size = m.bytes_processed as f64 / m.chunks_processed as f64;
100                                m.last_updated = chrono::Utc::now();
101                            }
102
103                            // Try to enqueue (don't block for live streaming)
104                            if let Err(_) = queue.try_enqueue(chunk.clone()).await {
105                                let mut m = metrics.lock().await;
106                                m.dropped_chunks += 1;
107                            }
108
109                            chunk
110                        }
111                    }),
112                std::time::Duration::from_millis(33) // ~30fps
113            ),
114            512
115        )
116    }
117
118    async fn acquire_file_resource(&self, path: PathBuf) -> File {
119        File::open(&path).await
120            .unwrap_or_else(|e| panic!("Failed to open media file {:?}: {}", path, e))
121    }
122
123    fn create_chunk_stream(
124        &self,
125        mut file: File,
126        config: MediaStream,
127        queue: Arc<MediaPriorityQueue>,
128        metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
129    ) -> RS2Stream<MediaChunk> {
130        // Use unfold to read file sequentially
131        auto_backpressure_block(
132            unfold((file, 0u64), move |state| {
133                let queue = Arc::clone(&queue);
134                let metrics = Arc::clone(&metrics);
135                let config = config.clone();
136
137                async move {
138                    let (mut file, sequence) = state;
139
140                    // Read chunk from file
141                    let mut buffer = vec![0u8; config.chunk_size];
142                    match file.read(&mut buffer).await {
143                        Ok(0) => {
144                            // EOF reached
145                            None
146                        }
147                        Ok(bytes_read) => {
148                            // Truncate buffer to actual bytes read
149                            buffer.truncate(bytes_read);
150
151                            // Determine chunk type inline
152                            let chunk_type = if sequence % 30 == 0 {
153                                ChunkType::VideoIFrame
154                            } else if sequence % 3 == 0 {
155                                ChunkType::VideoBFrame
156                            } else {
157                                ChunkType::VideoPFrame
158                            };
159
160                            // Determine priority inline
161                            let priority = if sequence % 30 == 0 {
162                                MediaPriority::High
163                            } else if sequence % 3 == 0 {
164                                MediaPriority::Low
165                            } else {
166                                MediaPriority::Normal
167                            };
168
169                            let chunk = MediaChunk {
170                                stream_id: config.id.clone(),
171                                sequence_number: sequence,
172                                data: buffer,
173                                chunk_type,
174                                priority,
175                                timestamp: std::time::Duration::from_millis(sequence * 33),
176                                is_final: bytes_read < config.chunk_size,
177                                checksum: None,
178                            };
179
180                            // Update metrics inline
181                            {
182                                let mut m = metrics.lock().await;
183                                m.chunks_processed += 1;
184                                m.bytes_processed += chunk.data.len() as u64;
185                                m.average_chunk_size = m.bytes_processed as f64 / m.chunks_processed as f64;
186                                m.last_updated = chrono::Utc::now();
187                            }
188
189                            // Enqueue with priority
190                            if let Err(_) = queue.enqueue(chunk.clone()).await {
191                                let mut m = metrics.lock().await;
192                                m.dropped_chunks += 1;
193                            }
194
195                            Some((chunk, (file, sequence + 1)))
196                        }
197                        Err(e) => {
198                            log::error!("Error reading file: {}", e);
199                            None
200                        }
201                    }
202                }
203            }),
204            256
205        )
206    }
207
208    /// Create a chunk for live streaming
209    async fn create_live_chunk(&self, config: &MediaStream, sequence: u64) -> MediaChunk {
210        // Simulate capturing from live source
211        tokio::time::sleep(std::time::Duration::from_micros(100)).await;
212
213        MediaChunk {
214            stream_id: config.id.clone(),
215            sequence_number: sequence,
216            data: vec![0u8; config.chunk_size], // Mock data - replace with actual capture
217            chunk_type: self.determine_chunk_type(sequence),
218            priority: self.determine_priority(sequence),
219            timestamp: std::time::Duration::from_millis(sequence * 33),
220            is_final: false, // Live streams don't end
221            checksum: None,
222        }
223    }
224
225    /// Determine chunk type based on sequence
226    pub fn determine_chunk_type(&self, sequence: u64) -> ChunkType {
227        if sequence % 30 == 0 {
228            ChunkType::VideoIFrame // Keyframe every 30 frames
229        } else if sequence % 3 == 0 {
230            ChunkType::VideoBFrame // B-frame every 3rd frame
231        } else {
232            ChunkType::VideoPFrame // P-frame otherwise
233        }
234    }
235
236    /// Determine priority based on sequence and chunk type
237    pub fn determine_priority(&self, sequence: u64) -> MediaPriority {
238        if sequence % 30 == 0 {
239            MediaPriority::High // I-frames are high priority
240        } else if sequence % 3 == 0 {
241            MediaPriority::Low  // B-frames are low priority
242        } else {
243            MediaPriority::Normal // P-frames are normal priority
244        }
245    }
246
247    /// Update metrics efficiently
248    async fn update_metrics(&self, metrics: &Arc<tokio::sync::Mutex<StreamMetrics>>, chunk: &MediaChunk) {
249        let mut m = metrics.lock().await;
250        m.chunks_processed += 1;
251        m.bytes_processed += chunk.data.len() as u64;
252        m.average_chunk_size = m.bytes_processed as f64 / m.chunks_processed as f64;
253        m.last_updated = chrono::Utc::now();
254    }
255
256    /// Get stream from priority queue
257    pub fn get_chunk_stream(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
258        self.chunk_queue.dequeue()
259    }
260
261    /// Get current metrics with updated buffer utilization
262    pub async fn get_metrics(&self) -> StreamMetrics {
263        let mut metrics = self.metrics.lock().await;
264
265        // Update buffer utilization based on queue length
266        let queue_len = self.chunk_queue.len().await;
267        let queue_capacity = 1024; // You might want to store this in the service
268        metrics.buffer_utilization = queue_len as f64 / queue_capacity as f64;
269
270        metrics.clone()
271    }
272
273    /// Create a metrics monitoring stream
274    pub fn get_metrics_stream(&self) -> RS2Stream<StreamMetrics> {
275        let metrics = Arc::clone(&self.metrics);
276        let chunk_queue = Arc::clone(&self.chunk_queue);
277
278        tick(std::time::Duration::from_secs(1), ())
279            .par_eval_map_rs2(1, move |_| {
280                let metrics = Arc::clone(&metrics);
281                let chunk_queue = Arc::clone(&chunk_queue);
282
283                async move {
284                    let mut m = metrics.lock().await;
285
286                    // Update real-time buffer utilization
287                    let queue_len = chunk_queue.len().await;
288                    m.buffer_utilization = queue_len as f64 / 1024.0;
289
290                    m.clone()
291                }
292            })
293    }
294
295    /// Gracefully shutdown the streaming service
296    pub async fn shutdown(&self) {
297        log::info!("Shutting down media streaming service");
298        self.chunk_queue.close().await;
299    }
300}
301
302/// Factory for creating different types of streaming services
303pub struct StreamingServiceFactory;
304
305impl StreamingServiceFactory {
306    /// Create service optimized for live streaming
307    pub fn create_live_streaming_service() -> MediaStreamingService {
308        MediaStreamingService::new(2048) // Larger buffer for live
309    }
310
311    /// Create service optimized for file streaming  
312    pub fn create_file_streaming_service() -> MediaStreamingService {
313        MediaStreamingService::new(512) // Smaller buffer for files
314    }
315
316    /// Create service optimized for low-latency streaming
317    pub fn create_low_latency_service() -> MediaStreamingService {
318        MediaStreamingService::new(128) // Very small buffer for low latency
319    }
320}