Skip to main content

kino_core/
session.rs

1//! Player Session - Main orchestrator for playback
2//!
3//! Coordinates:
4//! - Manifest loading and parsing
5//! - Segment fetching and buffering
6//! - ABR selection
7//! - State machine transitions
8//! - Analytics events
9
10use crate::{
11    abr::{AbrContext, AbrEngine},
12    analytics::{AnalyticsEmitter, AnalyticsEvent},
13    buffer::{BufferConfig, BufferManager},
14    Error,
15    manifest::{create_parser, Manifest},
16    types::*,
17    Result,
18};
19use reqwest::Client;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22use tokio::sync::{RwLock, watch};
23use tracing::{debug, info, instrument, warn};
24use url::Url;
25
26/// Player session managing a single playback
27pub struct PlayerSession {
28    /// Unique session ID
29    id: SessionId,
30    /// Session configuration
31    config: PlayerConfig,
32    /// Current player state
33    state: Arc<RwLock<PlayerState>>,
34    /// State change broadcaster
35    state_tx: watch::Sender<PlayerState>,
36    /// Buffer manager
37    buffer: Arc<BufferManager>,
38    /// ABR engine
39    abr: Arc<RwLock<AbrEngine>>,
40    /// HTTP client
41    client: Client,
42    /// Current manifest
43    manifest: Arc<RwLock<Option<Manifest>>>,
44    /// Current rendition
45    current_rendition: Arc<RwLock<Option<Rendition>>>,
46    /// Playback position
47    position: Arc<RwLock<f64>>,
48    /// Content duration (if known)
49    duration: Arc<RwLock<Option<f64>>>,
50    /// Quality metrics
51    metrics: Arc<RwLock<QualityMetrics>>,
52    /// Analytics emitter
53    analytics: Option<Arc<AnalyticsEmitter>>,
54    /// Session start time
55    start_time: Instant,
56}
57
58impl PlayerSession {
59    /// Create a new player session
60    pub fn new(config: PlayerConfig) -> Self {
61        let (state_tx, _) = watch::channel(PlayerState::Idle);
62
63        let buffer_config = BufferConfig {
64            min_buffer_time: config.min_buffer_time,
65            max_buffer_time: config.max_buffer_time,
66            rebuffer_threshold: config.rebuffer_threshold,
67            prefetch_enabled: config.prefetch_enabled,
68            ..Default::default()
69        };
70
71        let analytics = if config.analytics_enabled {
72            Some(Arc::new(AnalyticsEmitter::new()))
73        } else {
74            None
75        };
76
77        Self {
78            id: SessionId::new(),
79            config: config.clone(),
80            state: Arc::new(RwLock::new(PlayerState::Idle)),
81            state_tx,
82            buffer: Arc::new(BufferManager::new(buffer_config)),
83            abr: Arc::new(RwLock::new(AbrEngine::new(config.abr_algorithm))),
84            client: Client::builder()
85                .timeout(Duration::from_millis(config.request_timeout_ms))
86                .build()
87                .expect("Failed to create HTTP client"),
88            manifest: Arc::new(RwLock::new(None)),
89            current_rendition: Arc::new(RwLock::new(None)),
90            position: Arc::new(RwLock::new(0.0)),
91            duration: Arc::new(RwLock::new(None)),
92            metrics: Arc::new(RwLock::new(QualityMetrics::default())),
93            analytics,
94            start_time: Instant::now(),
95        }
96    }
97
98    /// Get session ID
99    pub fn id(&self) -> SessionId {
100        self.id
101    }
102
103    /// Get current state
104    pub async fn state(&self) -> PlayerState {
105        *self.state.read().await
106    }
107
108    /// Subscribe to state changes
109    pub fn subscribe_state(&self) -> watch::Receiver<PlayerState> {
110        self.state_tx.subscribe()
111    }
112
113    /// Transition to new state
114    async fn set_state(&self, new_state: PlayerState) -> Result<()> {
115        let current = *self.state.read().await;
116
117        if !current.can_transition_to(new_state) {
118            return Err(Error::InvalidStateTransition {
119                from: current.to_string(),
120                to: new_state.to_string(),
121            });
122        }
123
124        *self.state.write().await = new_state;
125        let _ = self.state_tx.send(new_state);
126
127        // Emit analytics event
128        if let Some(ref analytics) = self.analytics {
129            analytics.emit(AnalyticsEvent::StateChange {
130                from: current,
131                to: new_state,
132                position: *self.position.read().await,
133            }).await;
134        }
135
136        info!(from = %current, to = %new_state, "State transition");
137
138        Ok(())
139    }
140
141    /// Load content from URL
142    #[instrument(skip(self))]
143    pub async fn load(&self, url: &Url) -> Result<()> {
144        info!(url = %url, session_id = %self.id, "Loading content");
145
146        self.set_state(PlayerState::Loading).await?;
147
148        // Parse manifest
149        let parser = create_parser(url);
150        let manifest = parser.parse(url).await?;
151
152        info!(
153            renditions = manifest.renditions.len(),
154            is_live = manifest.is_live,
155            "Manifest parsed"
156        );
157
158        // Store manifest
159        *self.manifest.write().await = Some(manifest.clone());
160
161        // Set duration if VOD
162        if let Some(duration) = manifest.duration {
163            *self.duration.write().await = Some(duration.as_secs_f64());
164        }
165
166        // Select initial rendition
167        let context = self.create_abr_context().await;
168        let mut abr = self.abr.write().await;
169        if let Some(rendition) = abr.select_rendition(&manifest.renditions, &context) {
170            *self.current_rendition.write().await = Some(rendition.clone());
171            info!(rendition = %rendition.id, bandwidth = rendition.bandwidth, "Initial rendition selected");
172        }
173
174        // Emit load event
175        if let Some(ref analytics) = self.analytics {
176            analytics.emit(AnalyticsEvent::Load {
177                url: url.to_string(),
178                is_live: manifest.is_live,
179            }).await;
180        }
181
182        // Transition to buffering
183        self.set_state(PlayerState::Buffering).await?;
184
185        Ok(())
186    }
187
188    /// Start playback
189    #[instrument(skip(self))]
190    pub async fn play(&self) -> Result<()> {
191        let current_state = self.state().await;
192
193        match current_state {
194            PlayerState::Buffering => {
195                // Wait for buffer
196                if self.buffer.can_start_playback().await {
197                    self.set_state(PlayerState::Playing).await?;
198                }
199            }
200            PlayerState::Paused => {
201                self.set_state(PlayerState::Playing).await?;
202            }
203            PlayerState::Ended => {
204                // Restart from beginning
205                self.seek(0.0).await?;
206                self.set_state(PlayerState::Playing).await?;
207            }
208            _ => {
209                warn!(state = %current_state, "Cannot play from current state");
210            }
211        }
212
213        // Emit play event
214        if let Some(ref analytics) = self.analytics {
215            analytics.emit(AnalyticsEvent::Play {
216                position: *self.position.read().await,
217            }).await;
218        }
219
220        Ok(())
221    }
222
223    /// Pause playback
224    #[instrument(skip(self))]
225    pub async fn pause(&self) -> Result<()> {
226        if self.state().await == PlayerState::Playing {
227            self.set_state(PlayerState::Paused).await?;
228
229            // Emit pause event
230            if let Some(ref analytics) = self.analytics {
231                analytics.emit(AnalyticsEvent::Pause {
232                    position: *self.position.read().await,
233                }).await;
234            }
235        }
236        Ok(())
237    }
238
239    /// Seek to position
240    #[instrument(skip(self))]
241    pub async fn seek(&self, position: f64) -> Result<()> {
242        let duration = self.duration.read().await;
243
244        // Clamp position
245        let clamped = if let Some(dur) = *duration {
246            position.clamp(0.0, dur)
247        } else {
248            position.max(0.0)
249        };
250
251        info!(from = *self.position.read().await, to = clamped, "Seeking");
252
253        // Update state
254        let was_playing = self.state().await == PlayerState::Playing;
255        self.set_state(PlayerState::Seeking).await?;
256
257        // Check if position is buffered
258        let is_buffered = self.buffer.seek(clamped).await?;
259
260        // Update position
261        *self.position.write().await = clamped;
262
263        // Emit seek event
264        if let Some(ref analytics) = self.analytics {
265            analytics.emit(AnalyticsEvent::Seek {
266                from: *self.position.read().await,
267                to: clamped,
268            }).await;
269        }
270
271        if is_buffered && was_playing {
272            self.set_state(PlayerState::Playing).await?;
273        } else {
274            self.set_state(PlayerState::Buffering).await?;
275        }
276
277        Ok(())
278    }
279
280    /// Stop playback and reset
281    #[instrument(skip(self))]
282    pub async fn stop(&self) -> Result<()> {
283        info!("Stopping playback");
284
285        self.buffer.clear().await;
286        *self.position.write().await = 0.0;
287        *self.manifest.write().await = None;
288        *self.current_rendition.write().await = None;
289
290        // Force state to Idle
291        *self.state.write().await = PlayerState::Idle;
292        let _ = self.state_tx.send(PlayerState::Idle);
293
294        // Emit end event
295        if let Some(ref analytics) = self.analytics {
296            analytics.emit(AnalyticsEvent::End {
297                position: *self.position.read().await,
298                watch_time: self.start_time.elapsed().as_secs_f64(),
299            }).await;
300        }
301
302        Ok(())
303    }
304
305    /// Get current position
306    pub async fn position(&self) -> f64 {
307        *self.position.read().await
308    }
309
310    /// Get content duration
311    pub async fn duration(&self) -> Option<f64> {
312        *self.duration.read().await
313    }
314
315    /// Get current rendition
316    pub async fn current_rendition(&self) -> Option<Rendition> {
317        self.current_rendition.read().await.clone()
318    }
319
320    /// Get buffer level
321    pub async fn buffer_level(&self) -> f64 {
322        self.buffer.buffer_level().await
323    }
324
325    /// Get quality metrics
326    pub async fn metrics(&self) -> QualityMetrics {
327        self.metrics.read().await.clone()
328    }
329
330    /// Get buffered ranges
331    pub async fn buffered_ranges(&self) -> Vec<(f64, f64)> {
332        self.buffer.buffered_ranges().await
333    }
334
335    /// Create ABR context from current state
336    async fn create_abr_context(&self) -> AbrContext {
337        let manifest = self.manifest.read().await;
338        let is_live = manifest.as_ref().map(|m| m.is_live).unwrap_or(false);
339
340        AbrContext {
341            buffer_level: self.buffer.buffer_level().await,
342            target_buffer: self.config.max_buffer_time,
343            playback_rate: 1.0,
344            is_live,
345            screen_width: None,
346            max_bitrate: self.config.max_bitrate,
347            network: NetworkInfo {
348                bandwidth_estimate: self.abr.read().await.bandwidth_estimate(),
349                ..Default::default()
350            },
351        }
352    }
353
354    /// Fetch next segment
355    #[instrument(skip(self))]
356    pub async fn fetch_segment(&self, segment: &Segment) -> Result<bytes::Bytes> {
357        let start = Instant::now();
358
359        let response = self
360            .client
361            .get(segment.uri.clone())
362            .send()
363            .await
364            .map_err(|e| Error::SegmentFetch {
365                url: segment.uri.to_string(),
366                source: e,
367            })?;
368
369        let data = response
370            .bytes()
371            .await
372            .map_err(|e| Error::SegmentFetch {
373                url: segment.uri.to_string(),
374                source: e,
375            })?;
376
377        let duration = start.elapsed();
378        let bytes = data.len();
379
380        // Record bandwidth measurement
381        self.abr.write().await.record_measurement(bytes, duration);
382
383        debug!(
384            segment = segment.number,
385            bytes = bytes,
386            duration_ms = duration.as_millis(),
387            "Segment fetched"
388        );
389
390        Ok(data)
391    }
392
393    /// Update playback position (called by renderer)
394    pub async fn update_position(&self, position: f64) {
395        *self.position.write().await = position;
396        self.buffer.update_position(position).await;
397
398        // Check for end of content
399        if let Some(duration) = *self.duration.read().await {
400            if position >= duration - 0.5 {
401                let _ = self.set_state(PlayerState::Ended).await;
402            }
403        }
404
405        // Check buffer health
406        if self.state().await == PlayerState::Playing && !self.buffer.is_buffer_healthy().await {
407            let mut metrics = self.metrics.write().await;
408            metrics.stall_count += 1;
409            let _ = self.set_state(PlayerState::Buffering).await;
410
411            // Emit rebuffer event
412            if let Some(ref analytics) = self.analytics {
413                analytics.emit(AnalyticsEvent::Rebuffer {
414                    position,
415                    buffer_level: self.buffer.buffer_level().await,
416                }).await;
417            }
418        }
419    }
420
421    /// Report dropped frame
422    pub async fn report_dropped_frame(&self) {
423        let mut metrics = self.metrics.write().await;
424        metrics.dropped_frames += 1;
425    }
426
427    /// Report decoded frame
428    pub async fn report_decoded_frame(&self) {
429        let mut metrics = self.metrics.write().await;
430        metrics.decoded_frames += 1;
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use super::*;
437
438    #[tokio::test]
439    async fn test_session_creation() {
440        let config = PlayerConfig::default();
441        let session = PlayerSession::new(config);
442
443        assert_eq!(session.state().await, PlayerState::Idle);
444        assert_eq!(session.position().await, 0.0);
445    }
446
447    #[tokio::test]
448    async fn test_state_transitions() {
449        let config = PlayerConfig::default();
450        let session = PlayerSession::new(config);
451
452        // Valid: Idle -> Loading
453        assert!(session.set_state(PlayerState::Loading).await.is_ok());
454        assert_eq!(session.state().await, PlayerState::Loading);
455
456        // Valid: Loading -> Buffering
457        assert!(session.set_state(PlayerState::Buffering).await.is_ok());
458
459        // Invalid: Buffering -> Ended (need to go through Playing first)
460        // Actually Buffering -> Playing -> Ended is the path
461    }
462}