rs2_stream/media/
streaming.rs1use super::priority_queue::MediaPriorityQueue;
4use super::types::*;
5use crate::stream_performance_metrics::StreamMetrics;
6use crate::{auto_backpressure_block, tick, unfold};
7use crate::{auto_backpressure_drop_newest, from_iter, throttle, RS2Stream, RS2StreamExt};
8use futures_core::Stream;
9use std::path::PathBuf;
10use std::sync::Arc;
11use tokio::fs::File;
12use tokio::io::AsyncReadExt;
13
14pub struct MediaStreamingService {
15 chunk_queue: Arc<MediaPriorityQueue>,
16 metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
17}
18
19impl MediaStreamingService {
20 pub fn new(buffer_capacity: usize) -> Self {
21 Self {
22 chunk_queue: Arc::new(MediaPriorityQueue::new(buffer_capacity, 64)),
23 metrics: Arc::new(tokio::sync::Mutex::new(
24 StreamMetrics::new().with_name("media-stream".to_string()),
25 )),
26 }
27 }
28
29 pub async fn start_file_stream(
31 &self,
32 file_path: PathBuf,
33 stream_config: MediaStream,
34 ) -> RS2Stream<MediaChunk> {
35 let file = self.acquire_file_resource(file_path).await;
36 let chunk_queue = Arc::clone(&self.chunk_queue);
37 let metrics = Arc::clone(&self.metrics);
38
39 self.create_chunk_stream(file, stream_config, chunk_queue, metrics)
40 }
41
42 pub async fn start_live_stream(&self, stream_config: MediaStream) -> RS2Stream<MediaChunk> {
44 let chunk_queue = Arc::clone(&self.chunk_queue);
45 let metrics = Arc::clone(&self.metrics);
46
47 auto_backpressure_drop_newest(
49 throttle(
50 from_iter(0u64..)
51 .take_rs2(
52 stream_config
53 .metadata
54 .get("max_chunks")
55 .and_then(|s| s.parse().ok())
56 .unwrap_or(u64::MAX as usize),
57 )
58 .par_eval_map_rs2(4, move |sequence| {
59 let queue = Arc::clone(&chunk_queue);
60 let metrics = Arc::clone(&metrics);
61 let config = stream_config.clone();
62
63 async move {
64 tokio::time::sleep(std::time::Duration::from_micros(100)).await;
66
67 let chunk = MediaChunk {
68 stream_id: config.id.clone(),
69 sequence_number: sequence,
70 data: vec![0u8; config.chunk_size],
71 chunk_type: if sequence % 30 == 0 {
72 ChunkType::VideoIFrame
73 } else if sequence % 3 == 0 {
74 ChunkType::VideoBFrame
75 } else {
76 ChunkType::VideoPFrame
77 },
78 priority: if sequence % 30 == 0 {
79 MediaPriority::High
80 } else if sequence % 3 == 0 {
81 MediaPriority::Low
82 } else {
83 MediaPriority::Normal
84 },
85 timestamp: std::time::Duration::from_millis(sequence * 33),
86 is_final: false,
87 checksum: None,
88 };
89
90 {
92 let mut m = metrics.lock().await;
93 m.items_processed += 1;
94 m.bytes_processed += chunk.data.len() as u64;
95 m.average_item_size =
96 m.bytes_processed as f64 / m.items_processed as f64;
97 m.last_activity = Some(std::time::Instant::now());
98 }
99
100 if let Err(_) = queue.try_enqueue(chunk.clone()).await {
102 let mut m = metrics.lock().await;
103 m.errors += 1;
104 }
105
106 chunk
107 }
108 }),
109 std::time::Duration::from_millis(33), ),
111 512,
112 )
113 }
114
115 async fn acquire_file_resource(&self, path: PathBuf) -> File {
116 File::open(&path)
117 .await
118 .unwrap_or_else(|e| panic!("Failed to open media file {:?}: {}", path, e))
119 }
120
121 fn create_chunk_stream(
122 &self,
123 file: File,
124 config: MediaStream,
125 queue: Arc<MediaPriorityQueue>,
126 metrics: Arc<tokio::sync::Mutex<StreamMetrics>>,
127 ) -> RS2Stream<MediaChunk> {
128 auto_backpressure_block(
130 unfold((file, 0u64), move |state| {
131 let queue = Arc::clone(&queue);
132 let metrics = Arc::clone(&metrics);
133 let config = config.clone();
134
135 async move {
136 let (mut file, sequence) = state;
137
138 let mut buffer = vec![0u8; config.chunk_size];
140 match file.read(&mut buffer).await {
141 Ok(0) => {
142 None
144 }
145 Ok(bytes_read) => {
146 buffer.truncate(bytes_read);
148
149 let chunk_type = if sequence % 30 == 0 {
151 ChunkType::VideoIFrame
152 } else if sequence % 3 == 0 {
153 ChunkType::VideoBFrame
154 } else {
155 ChunkType::VideoPFrame
156 };
157
158 let priority = if sequence % 30 == 0 {
160 MediaPriority::High
161 } else if sequence % 3 == 0 {
162 MediaPriority::Low
163 } else {
164 MediaPriority::Normal
165 };
166
167 let chunk = MediaChunk {
168 stream_id: config.id.clone(),
169 sequence_number: sequence,
170 data: buffer,
171 chunk_type,
172 priority,
173 timestamp: std::time::Duration::from_millis(sequence * 33),
174 is_final: bytes_read < config.chunk_size,
175 checksum: None,
176 };
177
178 {
180 let mut m = metrics.lock().await;
181 m.items_processed += 1;
182 m.bytes_processed += chunk.data.len() as u64;
183 m.average_item_size =
184 m.bytes_processed as f64 / m.items_processed as f64;
185 m.last_activity = Some(std::time::Instant::now());
186 }
187
188 if let Err(_) = queue.enqueue(chunk.clone()).await {
190 let mut m = metrics.lock().await;
191 m.errors += 1;
192 }
193
194 Some((chunk, (file, sequence + 1)))
195 }
196 Err(e) => {
197 log::error!("Error reading file: {}", e);
198 None
199 }
200 }
201 }
202 }),
203 256,
204 )
205 }
206
207 async fn create_live_chunk(&self, config: &MediaStream, sequence: u64) -> MediaChunk {
209 tokio::time::sleep(std::time::Duration::from_micros(100)).await;
211
212 MediaChunk {
213 stream_id: config.id.clone(),
214 sequence_number: sequence,
215 data: vec![0u8; config.chunk_size], chunk_type: self.determine_chunk_type(sequence),
217 priority: self.determine_priority(sequence),
218 timestamp: std::time::Duration::from_millis(sequence * 33),
219 is_final: false, checksum: None,
221 }
222 }
223
224 pub fn determine_chunk_type(&self, sequence: u64) -> ChunkType {
226 if sequence % 30 == 0 {
227 ChunkType::VideoIFrame } else if sequence % 3 == 0 {
229 ChunkType::VideoBFrame } else {
231 ChunkType::VideoPFrame }
233 }
234
235 pub fn determine_priority(&self, sequence: u64) -> MediaPriority {
237 if sequence % 30 == 0 {
238 MediaPriority::High } else if sequence % 3 == 0 {
240 MediaPriority::Low } else {
242 MediaPriority::Normal }
244 }
245
246 async fn update_metrics(
248 &self,
249 metrics: &Arc<tokio::sync::Mutex<StreamMetrics>>,
250 chunk: &MediaChunk,
251 ) {
252 let mut m = metrics.lock().await;
253 m.items_processed += 1;
254 m.bytes_processed += chunk.data.len() as u64;
255 m.average_item_size = m.bytes_processed as f64 / m.items_processed as f64;
256 m.last_activity = Some(std::time::Instant::now());
257 }
258
259 pub fn get_chunk_stream(&self) -> impl Stream<Item = MediaChunk> + Send + 'static {
261 self.chunk_queue.dequeue()
262 }
263
264 pub async fn get_metrics(&self) -> StreamMetrics {
266 let metrics = self.metrics.lock().await;
267 metrics.clone()
268 }
269
270 pub fn get_metrics_stream(&self) -> RS2Stream<StreamMetrics> {
272 let metrics = Arc::clone(&self.metrics);
273
274 tick(std::time::Duration::from_secs(1), ()).par_eval_map_rs2(1, move |_| {
275 let metrics = Arc::clone(&metrics);
276
277 async move {
278 let m = metrics.lock().await;
279 m.clone()
280 }
281 })
282 }
283
284 pub async fn shutdown(&self) {
286 log::info!("Shutting down media streaming service");
287 self.chunk_queue.close().await;
288 }
289}
290
291pub struct StreamingServiceFactory;
293
294impl StreamingServiceFactory {
295 pub fn create_live_streaming_service() -> MediaStreamingService {
297 MediaStreamingService::new(2048) }
299
300 pub fn create_file_streaming_service() -> MediaStreamingService {
302 MediaStreamingService::new(512) }
304
305 pub fn create_low_latency_service() -> MediaStreamingService {
307 MediaStreamingService::new(128) }
309}