1use crate::{
11 abr::{AbrContext, AbrEngine},
12 analytics::{AnalyticsEmitter, AnalyticsEvent},
13 buffer::{BufferConfig, BufferManager},
14 Error,
15 manifest::{create_parser, Manifest},
16 types::*,
17 Result,
18};
19use reqwest::Client;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{RwLock, watch};
23use tracing::{debug, info, instrument, warn};
24use url::Url;
25
26pub struct PlayerSession {
28 id: SessionId,
30 config: PlayerConfig,
32 state: Arc<RwLock<PlayerState>>,
34 state_tx: watch::Sender<PlayerState>,
36 buffer: Arc<BufferManager>,
38 abr: Arc<RwLock<AbrEngine>>,
40 client: Client,
42 manifest: Arc<RwLock<Option<Manifest>>>,
44 current_rendition: Arc<RwLock<Option<Rendition>>>,
46 position: Arc<RwLock<f64>>,
48 duration: Arc<RwLock<Option<f64>>>,
50 metrics: Arc<RwLock<QualityMetrics>>,
52 analytics: Option<Arc<AnalyticsEmitter>>,
54 start_time: Instant,
56}
57
58impl PlayerSession {
59 pub fn new(config: PlayerConfig) -> Self {
61 let (state_tx, _) = watch::channel(PlayerState::Idle);
62
63 let buffer_config = BufferConfig {
64 min_buffer_time: config.min_buffer_time,
65 max_buffer_time: config.max_buffer_time,
66 rebuffer_threshold: config.rebuffer_threshold,
67 prefetch_enabled: config.prefetch_enabled,
68 ..Default::default()
69 };
70
71 let analytics = if config.analytics_enabled {
72 Some(Arc::new(AnalyticsEmitter::new()))
73 } else {
74 None
75 };
76
77 Self {
78 id: SessionId::new(),
79 config: config.clone(),
80 state: Arc::new(RwLock::new(PlayerState::Idle)),
81 state_tx,
82 buffer: Arc::new(BufferManager::new(buffer_config)),
83 abr: Arc::new(RwLock::new(AbrEngine::new(config.abr_algorithm))),
84 client: Client::builder()
85 .timeout(Duration::from_millis(config.request_timeout_ms))
86 .build()
87 .expect("Failed to create HTTP client"),
88 manifest: Arc::new(RwLock::new(None)),
89 current_rendition: Arc::new(RwLock::new(None)),
90 position: Arc::new(RwLock::new(0.0)),
91 duration: Arc::new(RwLock::new(None)),
92 metrics: Arc::new(RwLock::new(QualityMetrics::default())),
93 analytics,
94 start_time: Instant::now(),
95 }
96 }
97
98 pub fn id(&self) -> SessionId {
100 self.id
101 }
102
103 pub async fn state(&self) -> PlayerState {
105 *self.state.read().await
106 }
107
108 pub fn subscribe_state(&self) -> watch::Receiver<PlayerState> {
110 self.state_tx.subscribe()
111 }
112
113 async fn set_state(&self, new_state: PlayerState) -> Result<()> {
115 let current = *self.state.read().await;
116
117 if !current.can_transition_to(new_state) {
118 return Err(Error::InvalidStateTransition {
119 from: current.to_string(),
120 to: new_state.to_string(),
121 });
122 }
123
124 *self.state.write().await = new_state;
125 let _ = self.state_tx.send(new_state);
126
127 if let Some(ref analytics) = self.analytics {
129 analytics.emit(AnalyticsEvent::StateChange {
130 from: current,
131 to: new_state,
132 position: *self.position.read().await,
133 }).await;
134 }
135
136 info!(from = %current, to = %new_state, "State transition");
137
138 Ok(())
139 }
140
141 #[instrument(skip(self))]
143 pub async fn load(&self, url: &Url) -> Result<()> {
144 info!(url = %url, session_id = %self.id, "Loading content");
145
146 self.set_state(PlayerState::Loading).await?;
147
148 let parser = create_parser(url);
150 let manifest = parser.parse(url).await?;
151
152 info!(
153 renditions = manifest.renditions.len(),
154 is_live = manifest.is_live,
155 "Manifest parsed"
156 );
157
158 *self.manifest.write().await = Some(manifest.clone());
160
161 if let Some(duration) = manifest.duration {
163 *self.duration.write().await = Some(duration.as_secs_f64());
164 }
165
166 let context = self.create_abr_context().await;
168 let mut abr = self.abr.write().await;
169 if let Some(rendition) = abr.select_rendition(&manifest.renditions, &context) {
170 *self.current_rendition.write().await = Some(rendition.clone());
171 info!(rendition = %rendition.id, bandwidth = rendition.bandwidth, "Initial rendition selected");
172 }
173
174 if let Some(ref analytics) = self.analytics {
176 analytics.emit(AnalyticsEvent::Load {
177 url: url.to_string(),
178 is_live: manifest.is_live,
179 }).await;
180 }
181
182 self.set_state(PlayerState::Buffering).await?;
184
185 Ok(())
186 }
187
188 #[instrument(skip(self))]
190 pub async fn play(&self) -> Result<()> {
191 let current_state = self.state().await;
192
193 match current_state {
194 PlayerState::Buffering => {
195 if self.buffer.can_start_playback().await {
197 self.set_state(PlayerState::Playing).await?;
198 }
199 }
200 PlayerState::Paused => {
201 self.set_state(PlayerState::Playing).await?;
202 }
203 PlayerState::Ended => {
204 self.seek(0.0).await?;
206 self.set_state(PlayerState::Playing).await?;
207 }
208 _ => {
209 warn!(state = %current_state, "Cannot play from current state");
210 }
211 }
212
213 if let Some(ref analytics) = self.analytics {
215 analytics.emit(AnalyticsEvent::Play {
216 position: *self.position.read().await,
217 }).await;
218 }
219
220 Ok(())
221 }
222
223 #[instrument(skip(self))]
225 pub async fn pause(&self) -> Result<()> {
226 if self.state().await == PlayerState::Playing {
227 self.set_state(PlayerState::Paused).await?;
228
229 if let Some(ref analytics) = self.analytics {
231 analytics.emit(AnalyticsEvent::Pause {
232 position: *self.position.read().await,
233 }).await;
234 }
235 }
236 Ok(())
237 }
238
239 #[instrument(skip(self))]
241 pub async fn seek(&self, position: f64) -> Result<()> {
242 let duration = self.duration.read().await;
243
244 let clamped = if let Some(dur) = *duration {
246 position.clamp(0.0, dur)
247 } else {
248 position.max(0.0)
249 };
250
251 info!(from = *self.position.read().await, to = clamped, "Seeking");
252
253 let was_playing = self.state().await == PlayerState::Playing;
255 self.set_state(PlayerState::Seeking).await?;
256
257 let is_buffered = self.buffer.seek(clamped).await?;
259
260 *self.position.write().await = clamped;
262
263 if let Some(ref analytics) = self.analytics {
265 analytics.emit(AnalyticsEvent::Seek {
266 from: *self.position.read().await,
267 to: clamped,
268 }).await;
269 }
270
271 if is_buffered && was_playing {
272 self.set_state(PlayerState::Playing).await?;
273 } else {
274 self.set_state(PlayerState::Buffering).await?;
275 }
276
277 Ok(())
278 }
279
280 #[instrument(skip(self))]
282 pub async fn stop(&self) -> Result<()> {
283 info!("Stopping playback");
284
285 self.buffer.clear().await;
286 *self.position.write().await = 0.0;
287 *self.manifest.write().await = None;
288 *self.current_rendition.write().await = None;
289
290 *self.state.write().await = PlayerState::Idle;
292 let _ = self.state_tx.send(PlayerState::Idle);
293
294 if let Some(ref analytics) = self.analytics {
296 analytics.emit(AnalyticsEvent::End {
297 position: *self.position.read().await,
298 watch_time: self.start_time.elapsed().as_secs_f64(),
299 }).await;
300 }
301
302 Ok(())
303 }
304
305 pub async fn position(&self) -> f64 {
307 *self.position.read().await
308 }
309
310 pub async fn duration(&self) -> Option<f64> {
312 *self.duration.read().await
313 }
314
315 pub async fn current_rendition(&self) -> Option<Rendition> {
317 self.current_rendition.read().await.clone()
318 }
319
320 pub async fn buffer_level(&self) -> f64 {
322 self.buffer.buffer_level().await
323 }
324
325 pub async fn metrics(&self) -> QualityMetrics {
327 self.metrics.read().await.clone()
328 }
329
330 pub async fn buffered_ranges(&self) -> Vec<(f64, f64)> {
332 self.buffer.buffered_ranges().await
333 }
334
335 async fn create_abr_context(&self) -> AbrContext {
337 let manifest = self.manifest.read().await;
338 let is_live = manifest.as_ref().map(|m| m.is_live).unwrap_or(false);
339
340 AbrContext {
341 buffer_level: self.buffer.buffer_level().await,
342 target_buffer: self.config.max_buffer_time,
343 playback_rate: 1.0,
344 is_live,
345 screen_width: None,
346 max_bitrate: self.config.max_bitrate,
347 network: NetworkInfo {
348 bandwidth_estimate: self.abr.read().await.bandwidth_estimate(),
349 ..Default::default()
350 },
351 }
352 }
353
354 #[instrument(skip(self))]
356 pub async fn fetch_segment(&self, segment: &Segment) -> Result<bytes::Bytes> {
357 let start = Instant::now();
358
359 let response = self
360 .client
361 .get(segment.uri.clone())
362 .send()
363 .await
364 .map_err(|e| Error::SegmentFetch {
365 url: segment.uri.to_string(),
366 source: e,
367 })?;
368
369 let data = response
370 .bytes()
371 .await
372 .map_err(|e| Error::SegmentFetch {
373 url: segment.uri.to_string(),
374 source: e,
375 })?;
376
377 let duration = start.elapsed();
378 let bytes = data.len();
379
380 self.abr.write().await.record_measurement(bytes, duration);
382
383 debug!(
384 segment = segment.number,
385 bytes = bytes,
386 duration_ms = duration.as_millis(),
387 "Segment fetched"
388 );
389
390 Ok(data)
391 }
392
393 pub async fn update_position(&self, position: f64) {
395 *self.position.write().await = position;
396 self.buffer.update_position(position).await;
397
398 if let Some(duration) = *self.duration.read().await {
400 if position >= duration - 0.5 {
401 let _ = self.set_state(PlayerState::Ended).await;
402 }
403 }
404
405 if self.state().await == PlayerState::Playing && !self.buffer.is_buffer_healthy().await {
407 let mut metrics = self.metrics.write().await;
408 metrics.stall_count += 1;
409 let _ = self.set_state(PlayerState::Buffering).await;
410
411 if let Some(ref analytics) = self.analytics {
413 analytics.emit(AnalyticsEvent::Rebuffer {
414 position,
415 buffer_level: self.buffer.buffer_level().await,
416 }).await;
417 }
418 }
419 }
420
421 pub async fn report_dropped_frame(&self) {
423 let mut metrics = self.metrics.write().await;
424 metrics.dropped_frames += 1;
425 }
426
427 pub async fn report_decoded_frame(&self) {
429 let mut metrics = self.metrics.write().await;
430 metrics.decoded_frames += 1;
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437
438 #[tokio::test]
439 async fn test_session_creation() {
440 let config = PlayerConfig::default();
441 let session = PlayerSession::new(config);
442
443 assert_eq!(session.state().await, PlayerState::Idle);
444 assert_eq!(session.position().await, 0.0);
445 }
446
447 #[tokio::test]
448 async fn test_state_transitions() {
449 let config = PlayerConfig::default();
450 let session = PlayerSession::new(config);
451
452 assert!(session.set_state(PlayerState::Loading).await.is_ok());
454 assert_eq!(session.state().await, PlayerState::Loading);
455
456 assert!(session.set_state(PlayerState::Buffering).await.is_ok());
458
459 }
462}