use chrono::Utc;
use futures_util::StreamExt;
use rs2_stream::media::streaming::StreamingServiceFactory;
use rs2_stream::media::types::{MediaChunk, MediaStream, MediaType, QualityLevel};
use rs2_stream::rs2::*;
use rs2_stream::stream_performance_metrics::StreamMetrics;
use std::collections::HashMap;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let streaming_service = StreamingServiceFactory::create_low_latency_service();
let stream_config = MediaStream {
id: "example-live-stream".to_string(),
user_id: 1,
content_type: MediaType::Mixed,
quality: QualityLevel::High,
chunk_size: 32 * 1024, created_at: Utc::now(),
metadata: HashMap::new(),
};
println!("Starting live stream with ID: {}", stream_config.id);
let chunk_stream = streaming_service.start_live_stream(stream_config).await;
let metrics_stream = streaming_service.get_metrics_stream();
tokio::spawn(monitor_metrics(metrics_stream));
let mut chunk_count = 0;
let mut chunk_stream = std::pin::pin!(chunk_stream);
let start_time = std::time::Instant::now();
let duration = std::time::Duration::from_secs(10);
println!("Processing live stream for 10 seconds...");
while let Some(chunk) = chunk_stream.next().await {
chunk_count += 1;
process_chunk(&chunk);
if chunk_count % 10 == 0 {
println!(
"Processed {} chunks, latest: type={:?}, size={} bytes",
chunk_count,
chunk.chunk_type,
chunk.data.len()
);
}
if start_time.elapsed() >= duration {
println!("Time limit reached, stopping stream");
break;
}
}
let metrics = streaming_service.get_metrics().await;
println!("\nFinal Stream Metrics:");
println!(" Name: {}", metrics.name.as_deref().unwrap_or("unknown"));
println!(" Bytes processed: {}", metrics.bytes_processed);
println!(" Items processed: {}", metrics.items_processed);
println!(" Errors: {}", metrics.errors);
println!(
" Average item size: {:.2} bytes",
metrics.average_item_size
);
println!(" Processing time: {:?}", metrics.processing_time);
streaming_service.shutdown().await;
println!("Streaming service shut down");
sleep(Duration::from_millis(100)).await;
Ok(())
}
fn process_chunk(_chunk: &MediaChunk) {
std::thread::sleep(std::time::Duration::from_millis(5));
}
async fn monitor_metrics(metrics_stream: RS2Stream<StreamMetrics>) {
let mut metrics_stream = std::pin::pin!(metrics_stream);
println!("Starting metrics monitor...");
while let Some(metrics) = metrics_stream.next().await {
println!(
"[Metrics] Items: {}, Errors: {}, Avg size: {:.1} bytes",
metrics.items_processed, metrics.errors, metrics.average_item_size
);
sleep(Duration::from_secs(1)).await;
}
println!("Metrics monitor stopped");
}