rs2_stream/media/
streaming.rs1use super::priority_queue::MediaPriorityQueue;
4use super::types::*;
5use crate::queue::Queue;
6use crate::*;
7use futures_core::Stream;
8use std::path::PathBuf;
9use std::sync::Arc;
10use tokio::fs::File;
11use tokio::io::{AsyncReadExt, BufReader};
12
13pub struct MediaStreamingService {
14 chunk_queue: Arc<MediaPriorityQueue>,
15 metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
16}
17
18impl MediaStreamingService {
19 pub fn new(buffer_capacity: usize) -> Self {
20 Self {
21 chunk_queue: Arc::new(MediaPriorityQueue::new(buffer_capacity, 64)),
22 metrics: Arc::new(tokio::sync::Mutex::new(StreamMetrics {
23 stream_id: String::new(),
24 bytes_processed: 0,
25 chunks_processed: 0,
26 dropped_chunks: 0,
27 average_chunk_size: 0.0,
28 buffer_utilization: 0.0,
29 last_updated: chrono::Utc::now(),
30 })),
31 }
32 }
33
34 pub async fn start_file_stream(
36 &self,
37 file_path: PathBuf,
38 stream_config: MediaStream,
39 ) -> RS2Stream<MediaChunk> {
40 let file = self.acquire_file_resource(file_path).await;
41 let chunk_queue = Arc::clone(&self.chunk_queue);
42 let metrics = Arc::clone(&self.metrics);
43
44 self.create_chunk_stream(file, stream_config, chunk_queue, metrics)
45 }
46
47 pub async fn start_live_stream(
49 &self,
50 stream_config: MediaStream,
51 ) -> RS2Stream<MediaChunk> {
52 let chunk_queue = Arc::clone(&self.chunk_queue);
53 let metrics = Arc::clone(&self.metrics);
54
55 auto_backpressure_drop_newest(
57 throttle(
58 from_iter(0u64..)
59 .take_rs2(stream_config.metadata.get("max_chunks")
60 .and_then(|s| s.parse().ok())
61 .unwrap_or(u64::MAX as usize))
62 .par_eval_map_rs2(4, move |sequence| {
63 let queue = Arc::clone(&chunk_queue);
64 let metrics = Arc::clone(&metrics);
65 let config = stream_config.clone();
66
67 async move {
68 tokio::time::sleep(std::time::Duration::from_micros(100)).await;
70
71 let chunk = MediaChunk {
72 stream_id: config.id.clone(),
73 sequence_number: sequence,
74 data: vec![0u8; config.chunk_size],
75 chunk_type: if sequence % 30 == 0 {
76 ChunkType::VideoIFrame
77 } else if sequence % 3 == 0 {
78 ChunkType::VideoBFrame
79 } else {
80 ChunkType::VideoPFrame
81 },
82 priority: if sequence % 30 == 0 {
83 MediaPriority::High
84 } else if sequence % 3 == 0 {
85 MediaPriority::Low
86 } else {
87 MediaPriority::Normal
88 },
89 timestamp: std::time::Duration::from_millis(sequence * 33),
90 is_final: false,
91 checksum: None,
92 };
93
94 {
96 let mut m = metrics.lock().await;
97 m.chunks_processed += 1;
98 m.bytes_processed += chunk.data.len() as u64;
99 m.average_chunk_size = m.bytes_processed as f64 / m.chunks_processed as f64;
100 m.last_updated = chrono::Utc::now();
101 }
102
103 if let Err(_) = queue.try_enqueue(chunk.clone()).await {
105 let mut m = metrics.lock().await;
106 m.dropped_chunks += 1;
107 }
108
109 chunk
110 }
111 }),
112 std::time::Duration::from_millis(33) ),
114 512
115 )
116 }
117
118 async fn acquire_file_resource(&self, path: PathBuf) -> File {
119 File::open(&path).await
120 .unwrap_or_else(|e| panic!("Failed to open media file {:?}: {}", path, e))
121 }
122
123 fn create_chunk_stream(
124 &self,
125 mut file: File,
126 config: MediaStream,
127 queue: Arc<MediaPriorityQueue>,
128 metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
129 ) -> RS2Stream<MediaChunk> {
130 auto_backpressure_block(
132 unfold((file, 0u64), move |state| {
133 let queue = Arc::clone(&queue);
134 let metrics = Arc::clone(&metrics);
135 let config = config.clone();
136
137 async move {
138 let (mut file, sequence) = state;
139
140 let mut buffer = vec![0u8; config.chunk_size];
142 match file.read(&mut buffer).await {
143 Ok(0) => {
144 None
146 }
147 Ok(bytes_read) => {
148 buffer.truncate(bytes_read);
150
151 let chunk_type = if sequence % 30 == 0 {
153 ChunkType::VideoIFrame
154 } else if sequence % 3 == 0 {
155 ChunkType::VideoBFrame
156 } else {
157 ChunkType::VideoPFrame
158 };
159
160 let priority = if sequence % 30 == 0 {
162 MediaPriority::High
163 } else if sequence % 3 == 0 {
164 MediaPriority::Low
165 } else {
166 MediaPriority::Normal
167 };
168
169 let chunk = MediaChunk {
170 stream_id: config.id.clone(),
171 sequence_number: sequence,
172 data: buffer,
173 chunk_type,
174 priority,
175 timestamp: std::time::Duration::from_millis(sequence * 33),
176 is_final: bytes_read < config.chunk_size,
177 checksum: None,
178 };
179
180 {
182 let mut m = metrics.lock().await;
183 m.chunks_processed += 1;
184 m.bytes_processed += chunk.data.len() as u64;
185 m.average_chunk_size = m.bytes_processed as f64 / m.chunks_processed as f64;
186 m.last_updated = chrono::Utc::now();
187 }
188
189 if let Err(_) = queue.enqueue(chunk.clone()).await {
191 let mut m = metrics.lock().await;
192 m.dropped_chunks += 1;
193 }
194
195 Some((chunk, (file, sequence + 1)))
196 }
197 Err(e) => {
198 log::error!("Error reading file: {}", e);
199 None
200 }
201 }
202 }
203 }),
204 256
205 )
206 }
207
208 async fn create_live_chunk(&self, config: &MediaStream, sequence: u64) -> MediaChunk {
210 tokio::time::sleep(std::time::Duration::from_micros(100)).await;
212
213 MediaChunk {
214 stream_id: config.id.clone(),
215 sequence_number: sequence,
216 data: vec![0u8; config.chunk_size], chunk_type: self.determine_chunk_type(sequence),
218 priority: self.determine_priority(sequence),
219 timestamp: std::time::Duration::from_millis(sequence * 33),
220 is_final: false, checksum: None,
222 }
223 }
224
225 pub fn determine_chunk_type(&self, sequence: u64) -> ChunkType {
227 if sequence % 30 == 0 {
228 ChunkType::VideoIFrame } else if sequence % 3 == 0 {
230 ChunkType::VideoBFrame } else {
232 ChunkType::VideoPFrame }
234 }
235
236 pub fn determine_priority(&self, sequence: u64) -> MediaPriority {
238 if sequence % 30 == 0 {
239 MediaPriority::High } else if sequence % 3 == 0 {
241 MediaPriority::Low } else {
243 MediaPriority::Normal }
245 }
246
247 async fn update_metrics(&self, metrics: &Arc<tokio::sync::Mutex<StreamMetrics>>, chunk: &MediaChunk) {
249 let mut m = metrics.lock().await;
250 m.chunks_processed += 1;
251 m.bytes_processed += chunk.data.len() as u64;
252 m.average_chunk_size = m.bytes_processed as f64 / m.chunks_processed as f64;
253 m.last_updated = chrono::Utc::now();
254 }
255
256 pub fn get_chunk_stream(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
258 self.chunk_queue.dequeue()
259 }
260
261 pub async fn get_metrics(&self) -> StreamMetrics {
263 let mut metrics = self.metrics.lock().await;
264
265 let queue_len = self.chunk_queue.len().await;
267 let queue_capacity = 1024; metrics.buffer_utilization = queue_len as f64 / queue_capacity as f64;
269
270 metrics.clone()
271 }
272
273 pub fn get_metrics_stream(&self) -> RS2Stream<StreamMetrics> {
275 let metrics = Arc::clone(&self.metrics);
276 let chunk_queue = Arc::clone(&self.chunk_queue);
277
278 tick(std::time::Duration::from_secs(1), ())
279 .par_eval_map_rs2(1, move |_| {
280 let metrics = Arc::clone(&metrics);
281 let chunk_queue = Arc::clone(&chunk_queue);
282
283 async move {
284 let mut m = metrics.lock().await;
285
286 let queue_len = chunk_queue.len().await;
288 m.buffer_utilization = queue_len as f64 / 1024.0;
289
290 m.clone()
291 }
292 })
293 }
294
295 pub async fn shutdown(&self) {
297 log::info!("Shutting down media streaming service");
298 self.chunk_queue.close().await;
299 }
300}
301
302pub struct StreamingServiceFactory;
304
305impl StreamingServiceFactory {
306 pub fn create_live_streaming_service() -> MediaStreamingService {
308 MediaStreamingService::new(2048) }
310
311 pub fn create_file_streaming_service() -> MediaStreamingService {
313 MediaStreamingService::new(512) }
315
316 pub fn create_low_latency_service() -> MediaStreamingService {
318 MediaStreamingService::new(128) }
320}