Skip to main content

kino_core/
analytics.rs

1//! Analytics event emission
2//!
3//! Captures playback events for:
4//! - Quality of Experience (QoE) metrics
5//! - Error tracking
6//! - Usage analytics
7//! - A/B testing
8
9use crate::types::*;
10use chrono::{DateTime, Utc};
11use serde::{Deserialize, Serialize};
12use tokio::sync::{mpsc, RwLock};
13use tracing::{debug, info};
14use uuid::Uuid;
15
16/// Analytics event types
17#[derive(Debug, Clone, Serialize, Deserialize)]
18#[serde(tag = "event", rename_all = "snake_case")]
19pub enum AnalyticsEvent {
20    /// Content loaded
21    Load {
22        url: String,
23        is_live: bool,
24    },
25
26    /// Playback started
27    Play {
28        position: f64,
29    },
30
31    /// Playback paused
32    Pause {
33        position: f64,
34    },
35
36    /// Seek performed
37    Seek {
38        from: f64,
39        to: f64,
40    },
41
42    /// Rebuffering started
43    Rebuffer {
44        position: f64,
45        buffer_level: f64,
46    },
47
48    /// Rebuffering ended
49    RebufferEnd {
50        position: f64,
51        duration: f64,
52    },
53
54    /// Quality change
55    QualityChange {
56        from_bitrate: u64,
57        to_bitrate: u64,
58        from_resolution: Option<Resolution>,
59        to_resolution: Option<Resolution>,
60        reason: QualityChangeReason,
61    },
62
63    /// State change
64    StateChange {
65        from: PlayerState,
66        to: PlayerState,
67        position: f64,
68    },
69
70    /// Playback ended
71    End {
72        position: f64,
73        watch_time: f64,
74    },
75
76    /// Error occurred
77    Error {
78        code: String,
79        message: String,
80        fatal: bool,
81        position: f64,
82    },
83
84    /// Heartbeat (periodic)
85    Heartbeat {
86        position: f64,
87        buffer_level: f64,
88        bitrate: u64,
89        dropped_frames: u64,
90        decoded_frames: u64,
91    },
92
93    /// Custom event
94    Custom {
95        name: String,
96        data: serde_json::Value,
97    },
98}
99
100/// Reason for quality change
101#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
102#[serde(rename_all = "snake_case")]
103pub enum QualityChangeReason {
104    /// ABR algorithm decision
105    Abr,
106    /// User manual selection
107    Manual,
108    /// Buffer-based downgrade
109    Buffer,
110    /// Initial selection
111    Initial,
112}
113
114/// Analytics event with metadata
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct AnalyticsEventRecord {
117    /// Unique event ID
118    pub id: Uuid,
119    /// Session ID
120    pub session_id: SessionId,
121    /// Timestamp
122    pub timestamp: DateTime<Utc>,
123    /// Sequence number
124    pub sequence: u64,
125    /// The event
126    #[serde(flatten)]
127    pub event: AnalyticsEvent,
128}
129
130/// Analytics emitter
131pub struct AnalyticsEmitter {
132    /// Session ID
133    session_id: SessionId,
134    /// Event sequence counter
135    sequence: RwLock<u64>,
136    /// Event buffer
137    buffer: RwLock<Vec<AnalyticsEventRecord>>,
138    /// Maximum buffer size before flush
139    max_buffer_size: usize,
140    /// Event channel for async processing
141    event_tx: mpsc::Sender<AnalyticsEventRecord>,
142    /// Beacon endpoint (if configured)
143    beacon_url: Option<String>,
144}
145
146impl AnalyticsEmitter {
147    /// Create a new analytics emitter
148    pub fn new() -> Self {
149        let (event_tx, mut event_rx) = mpsc::channel::<AnalyticsEventRecord>(1000);
150
151        // Spawn background processor
152        tokio::spawn(async move {
153            while let Some(event) = event_rx.recv().await {
154                // In production, batch and send to analytics endpoint
155                debug!(
156                    event_id = %event.id,
157                    event = ?event.event,
158                    "Analytics event"
159                );
160            }
161        });
162
163        Self {
164            session_id: SessionId::new(),
165            sequence: RwLock::new(0),
166            buffer: RwLock::new(Vec::new()),
167            max_buffer_size: 50,
168            event_tx,
169            beacon_url: None,
170        }
171    }
172
173    /// Create with beacon endpoint
174    pub fn with_beacon(beacon_url: String) -> Self {
175        let mut emitter = Self::new();
176        emitter.beacon_url = Some(beacon_url);
177        emitter
178    }
179
180    /// Emit an analytics event
181    pub async fn emit(&self, event: AnalyticsEvent) {
182        let mut seq = self.sequence.write().await;
183        *seq += 1;
184        let sequence = *seq;
185
186        let record = AnalyticsEventRecord {
187            id: Uuid::new_v4(),
188            session_id: self.session_id,
189            timestamp: Utc::now(),
190            sequence,
191            event,
192        };
193
194        // Add to buffer
195        let mut buffer = self.buffer.write().await;
196        buffer.push(record.clone());
197
198        // Flush if buffer is full
199        if buffer.len() >= self.max_buffer_size {
200            let events: Vec<_> = buffer.drain(..).collect();
201            drop(buffer);
202            self.flush_events(events).await;
203        }
204
205        // Send to channel for async processing
206        let _ = self.event_tx.send(record).await;
207    }
208
209    /// Flush buffered events
210    async fn flush_events(&self, events: Vec<AnalyticsEventRecord>) {
211        if events.is_empty() {
212            return;
213        }
214
215        info!(count = events.len(), "Flushing analytics events");
216
217        // In production, send to analytics endpoint
218        if let Some(ref url) = self.beacon_url {
219            // Use reqwest to send events
220            // This is fire-and-forget for beacons
221            let client = reqwest::Client::new();
222            let _ = client.post(url)
223                .json(&events)
224                .send()
225                .await;
226        }
227    }
228
229    /// Get all buffered events
230    pub async fn get_events(&self) -> Vec<AnalyticsEventRecord> {
231        self.buffer.read().await.clone()
232    }
233
234    /// Clear buffer
235    pub async fn clear(&self) {
236        self.buffer.write().await.clear();
237    }
238
239    /// Set beacon endpoint
240    pub fn set_beacon_url(&mut self, url: String) {
241        self.beacon_url = Some(url);
242    }
243}
244
245impl Default for AnalyticsEmitter {
246    fn default() -> Self {
247        Self::new()
248    }
249}
250
251/// QoE (Quality of Experience) calculator
252pub struct QoeCalculator {
253    /// Initial buffer time
254    initial_buffer_time: f64,
255    /// Total rebuffer count
256    rebuffer_count: u32,
257    /// Total rebuffer duration
258    rebuffer_duration: f64,
259    /// Playback start time
260    _start_time: f64,
261    /// Quality switches
262    quality_switches: Vec<(f64, u64)>, // (timestamp, bitrate)
263    /// Average bitrate (weighted by time)
264    bitrate_samples: Vec<(f64, u64)>, // (duration, bitrate)
265}
266
267impl QoeCalculator {
268    pub fn new() -> Self {
269        Self {
270            initial_buffer_time: 0.0,
271            rebuffer_count: 0,
272            rebuffer_duration: 0.0,
273            _start_time: 0.0,
274            quality_switches: Vec::new(),
275            bitrate_samples: Vec::new(),
276        }
277    }
278
279    /// Record initial buffering time
280    pub fn record_initial_buffer(&mut self, duration: f64) {
281        self.initial_buffer_time = duration;
282    }
283
284    /// Record rebuffer event
285    pub fn record_rebuffer(&mut self, duration: f64) {
286        self.rebuffer_count += 1;
287        self.rebuffer_duration += duration;
288    }
289
290    /// Record quality switch
291    pub fn record_quality_switch(&mut self, timestamp: f64, bitrate: u64) {
292        self.quality_switches.push((timestamp, bitrate));
293    }
294
295    /// Record bitrate sample
296    pub fn record_bitrate(&mut self, duration: f64, bitrate: u64) {
297        self.bitrate_samples.push((duration, bitrate));
298    }
299
300    /// Calculate QoE score (0-100)
301    pub fn calculate_qoe(&self) -> f64 {
302        // MOS-like scoring based on:
303        // - Initial buffer time (startup delay)
304        // - Rebuffer frequency and duration
305        // - Average quality
306        // - Quality stability
307
308        let mut score = 100.0;
309
310        // Penalize initial buffer time
311        // > 2s starts reducing score
312        if self.initial_buffer_time > 2.0 {
313            score -= (self.initial_buffer_time - 2.0) * 5.0;
314        }
315
316        // Penalize rebuffers heavily
317        // Each rebuffer costs 10 points
318        score -= self.rebuffer_count as f64 * 10.0;
319
320        // Penalize rebuffer duration
321        // Each second of rebuffering costs 5 points
322        score -= self.rebuffer_duration * 5.0;
323
324        // Penalize quality switches
325        // Each switch costs 2 points
326        score -= self.quality_switches.len() as f64 * 2.0;
327
328        // Bonus for high average bitrate
329        let avg_bitrate = self.average_bitrate();
330        if avg_bitrate > 5_000_000 {
331            score += 5.0;
332        } else if avg_bitrate > 2_000_000 {
333            score += 2.0;
334        }
335
336        score.clamp(0.0, 100.0)
337    }
338
339    /// Calculate average bitrate
340    fn average_bitrate(&self) -> u64 {
341        if self.bitrate_samples.is_empty() {
342            return 0;
343        }
344
345        let total_duration: f64 = self.bitrate_samples.iter().map(|(d, _)| d).sum();
346        if total_duration == 0.0 {
347            return 0;
348        }
349
350        let weighted_sum: f64 = self.bitrate_samples
351            .iter()
352            .map(|(d, b)| d * *b as f64)
353            .sum();
354
355        (weighted_sum / total_duration) as u64
356    }
357
358    /// Get QoE breakdown
359    pub fn breakdown(&self) -> QoeBreakdown {
360        QoeBreakdown {
361            score: self.calculate_qoe(),
362            initial_buffer_time: self.initial_buffer_time,
363            rebuffer_count: self.rebuffer_count,
364            rebuffer_duration: self.rebuffer_duration,
365            quality_switches: self.quality_switches.len() as u32,
366            average_bitrate: self.average_bitrate(),
367        }
368    }
369}
370
371impl Default for QoeCalculator {
372    fn default() -> Self {
373        Self::new()
374    }
375}
376
377/// QoE score breakdown
378#[derive(Debug, Clone, Serialize, Deserialize)]
379pub struct QoeBreakdown {
380    pub score: f64,
381    pub initial_buffer_time: f64,
382    pub rebuffer_count: u32,
383    pub rebuffer_duration: f64,
384    pub quality_switches: u32,
385    pub average_bitrate: u64,
386}
387
388#[cfg(test)]
389mod tests {
390    use super::*;
391
392    #[test]
393    fn test_qoe_perfect() {
394        let calc = QoeCalculator::new();
395        assert_eq!(calc.calculate_qoe(), 100.0);
396    }
397
398    #[test]
399    fn test_qoe_with_rebuffers() {
400        let mut calc = QoeCalculator::new();
401        calc.record_rebuffer(1.0);
402        calc.record_rebuffer(2.0);
403
404        // 100 - 2*10 - 3*5 = 65
405        assert!((calc.calculate_qoe() - 65.0).abs() < 0.1);
406    }
407
408    #[test]
409    fn test_qoe_with_initial_buffer() {
410        let mut calc = QoeCalculator::new();
411        calc.record_initial_buffer(5.0); // 3 seconds over threshold
412
413        // 100 - 3*5 = 85
414        assert!((calc.calculate_qoe() - 85.0).abs() < 0.1);
415    }
416
417    #[tokio::test]
418    async fn test_analytics_emitter() {
419        let emitter = AnalyticsEmitter::new();
420
421        emitter.emit(AnalyticsEvent::Play { position: 0.0 }).await;
422        emitter.emit(AnalyticsEvent::Pause { position: 10.0 }).await;
423
424        let events = emitter.get_events().await;
425        assert_eq!(events.len(), 2);
426    }
427}