use chrono::Utc;
use futures_util::StreamExt;
use rs2_stream::media::events::MediaStreamEvent;
use rs2_stream::media::types::{QualityLevel, UserActivity};
use rs2_stream::rs2::*;
use tokio::time::{sleep, Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
println!("Media Stream Events Example");
let event_stream = create_sample_event_stream();
let mut event_handler = EventHandler::new();
println!("Processing events...\n");
let mut event_stream = std::pin::pin!(event_stream);
while let Some(event) = event_stream.next().await {
event_handler.handle_event(&event).await;
let activity: UserActivity = event.into();
println!(
"Activity: {} by user {} at {}",
activity.activity_type, activity.user_id, activity.timestamp
);
if !activity.metadata.is_empty() {
println!(" Metadata:");
for (key, value) in &activity.metadata {
println!(" {}: {}", key, value);
}
}
println!();
sleep(Duration::from_millis(500)).await;
}
let stats = event_handler.get_stats();
println!("\nEvent Statistics:");
println!(" Total events: {}", stats.total_events);
println!(" Stream started events: {}", stats.stream_started);
println!(" Stream stopped events: {}", stats.stream_stopped);
println!(" Quality changed events: {}", stats.quality_changed);
println!(" Buffer underrun events: {}", stats.buffer_underrun);
println!(" Chunk dropped events: {}", stats.chunk_dropped);
Ok(())
}
fn create_sample_event_stream() -> RS2Stream<MediaStreamEvent> {
let stream_id = "example-stream-123".to_string();
let user_id = 42;
let now = Utc::now();
let events = vec![
MediaStreamEvent::StreamStarted {
stream_id: stream_id.clone(),
user_id,
quality: QualityLevel::High,
timestamp: now,
},
MediaStreamEvent::QualityChanged {
stream_id: stream_id.clone(),
user_id,
old_quality: QualityLevel::High,
new_quality: QualityLevel::Medium,
timestamp: now + chrono::Duration::seconds(2),
},
MediaStreamEvent::BufferUnderrun {
stream_id: stream_id.clone(),
user_id,
buffer_level: 0.1,
timestamp: now + chrono::Duration::seconds(5),
},
MediaStreamEvent::ChunkDropped {
stream_id: stream_id.clone(),
user_id,
sequence_number: 42,
reason: "Network congestion".to_string(),
timestamp: now + chrono::Duration::seconds(6),
},
MediaStreamEvent::QualityChanged {
stream_id: stream_id.clone(),
user_id,
old_quality: QualityLevel::Medium,
new_quality: QualityLevel::High,
timestamp: now + chrono::Duration::seconds(8),
},
MediaStreamEvent::StreamStopped {
stream_id: stream_id.clone(),
user_id,
duration_seconds: 10,
bytes_transferred: 1_500_000,
timestamp: now + chrono::Duration::seconds(10),
},
];
from_iter(events)
}
struct EventHandler {
stats: EventStats,
}
#[derive(Default)]
struct EventStats {
total_events: usize,
stream_started: usize,
stream_stopped: usize,
quality_changed: usize,
buffer_underrun: usize,
chunk_dropped: usize,
}
impl EventHandler {
fn new() -> Self {
Self {
stats: EventStats::default(),
}
}
async fn handle_event(&mut self, event: &MediaStreamEvent) -> () {
match event {
MediaStreamEvent::StreamStarted {
stream_id,
user_id,
quality,
timestamp,
} => {
println!(
"Stream started: id={}, user={}, quality={:?}, time={}",
stream_id, user_id, quality, timestamp
);
self.stats.stream_started += 1;
}
MediaStreamEvent::StreamStopped {
stream_id,
user_id,
duration_seconds,
bytes_transferred,
timestamp,
} => {
println!(
"Stream stopped: id={}, user={}, duration={}s, bytes={}, time={}",
stream_id, user_id, duration_seconds, bytes_transferred, timestamp
);
self.stats.stream_stopped += 1;
}
MediaStreamEvent::QualityChanged {
stream_id,
user_id,
old_quality,
new_quality,
timestamp,
} => {
println!(
"Quality changed: id={}, user={}, old={:?}, new={:?}, time={}",
stream_id, user_id, old_quality, new_quality, timestamp
);
self.stats.quality_changed += 1;
}
MediaStreamEvent::BufferUnderrun {
stream_id,
user_id,
buffer_level,
timestamp,
} => {
println!(
"Buffer underrun: id={}, user={}, level={:.2}, time={}",
stream_id, user_id, buffer_level, timestamp
);
self.stats.buffer_underrun += 1;
}
MediaStreamEvent::ChunkDropped {
stream_id,
user_id,
sequence_number,
reason,
timestamp,
} => {
println!(
"Chunk dropped: id={}, user={}, seq={}, reason='{}', time={}",
stream_id, user_id, sequence_number, reason, timestamp
);
self.stats.chunk_dropped += 1;
}
}
self.stats.total_events += 1;
}
fn get_stats(&self) -> &EventStats {
&self.stats
}
}