#![allow(dead_code)]
use std::collections::HashMap;
use std::fmt;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct SessionId(u64);
impl SessionId {
pub const fn from_raw(id: u64) -> Self {
Self(id)
}
pub const fn raw(self) -> u64 {
self.0
}
}
impl fmt::Display for SessionId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "session-{}", self.0)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum SessionState {
Connecting,
Active,
Paused,
Reconnecting,
Closed,
Failed,
}
impl fmt::Display for SessionState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let label = match self {
Self::Connecting => "Connecting",
Self::Active => "Active",
Self::Paused => "Paused",
Self::Reconnecting => "Reconnecting",
Self::Closed => "Closed",
Self::Failed => "Failed",
};
f.write_str(label)
}
}
impl SessionState {
pub fn is_alive(self) -> bool {
!matches!(self, Self::Closed | Self::Failed)
}
}
#[derive(Debug, Clone, PartialEq)]
pub struct SessionStats {
pub bytes_sent: u64,
pub bytes_received: u64,
pub duration_ms: u64,
pub transition_count: u32,
}
impl SessionStats {
pub const fn total_bytes(&self) -> u64 {
self.bytes_sent + self.bytes_received
}
#[allow(clippy::cast_precision_loss)]
pub fn throughput_kbps(&self) -> f64 {
if self.duration_ms == 0 {
return 0.0;
}
(self.total_bytes() as f64 * 8.0) / (self.duration_ms as f64)
}
}
#[derive(Debug, Clone)]
struct SessionRecord {
id: SessionId,
state: SessionState,
label: String,
stats: SessionStats,
}
#[derive(Debug)]
pub struct SessionTracker {
sessions: HashMap<SessionId, SessionRecord>,
next_id: u64,
}
impl SessionTracker {
pub fn new() -> Self {
Self {
sessions: HashMap::new(),
next_id: 1,
}
}
pub fn open(&mut self, label: impl Into<String>) -> SessionId {
let id = SessionId(self.next_id);
self.next_id += 1;
let record = SessionRecord {
id,
state: SessionState::Connecting,
label: label.into(),
stats: SessionStats {
bytes_sent: 0,
bytes_received: 0,
duration_ms: 0,
transition_count: 0,
},
};
self.sessions.insert(id, record);
id
}
pub fn state(&self, id: SessionId) -> Option<SessionState> {
self.sessions.get(&id).map(|r| r.state)
}
pub fn transition(&mut self, id: SessionId, new_state: SessionState) -> bool {
if let Some(rec) = self.sessions.get_mut(&id) {
rec.state = new_state;
rec.stats.transition_count += 1;
true
} else {
false
}
}
pub fn record_sent(&mut self, id: SessionId, bytes: u64) {
if let Some(rec) = self.sessions.get_mut(&id) {
rec.stats.bytes_sent += bytes;
}
}
pub fn record_received(&mut self, id: SessionId, bytes: u64) {
if let Some(rec) = self.sessions.get_mut(&id) {
rec.stats.bytes_received += bytes;
}
}
pub fn update_duration(&mut self, id: SessionId, duration_ms: u64) {
if let Some(rec) = self.sessions.get_mut(&id) {
rec.stats.duration_ms = duration_ms;
}
}
pub fn stats(&self, id: SessionId) -> Option<SessionStats> {
self.sessions.get(&id).map(|r| r.stats.clone())
}
pub fn label(&self, id: SessionId) -> Option<&str> {
self.sessions.get(&id).map(|r| r.label.as_str())
}
pub fn total_sessions(&self) -> usize {
self.sessions.len()
}
pub fn active_count(&self) -> usize {
self.sessions
.values()
.filter(|r| r.state.is_alive())
.count()
}
pub fn purge_dead(&mut self) -> usize {
let before = self.sessions.len();
self.sessions.retain(|_, r| r.state.is_alive());
before - self.sessions.len()
}
pub fn session_ids(&self) -> Vec<SessionId> {
self.sessions.keys().copied().collect()
}
}
impl Default for SessionTracker {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_tracker_empty() {
let t = SessionTracker::new();
assert_eq!(t.total_sessions(), 0);
assert_eq!(t.active_count(), 0);
}
#[test]
fn test_open_creates_connecting() {
let mut t = SessionTracker::new();
let id = t.open("rtmp://live");
assert_eq!(t.state(id), Some(SessionState::Connecting));
}
#[test]
fn test_sequential_ids() {
let mut t = SessionTracker::new();
let a = t.open("a");
let b = t.open("b");
assert_eq!(b.raw() - a.raw(), 1);
}
#[test]
fn test_transition() {
let mut t = SessionTracker::new();
let id = t.open("test");
assert!(t.transition(id, SessionState::Active));
assert_eq!(t.state(id), Some(SessionState::Active));
}
#[test]
fn test_transition_unknown() {
let mut t = SessionTracker::new();
assert!(!t.transition(SessionId::from_raw(999), SessionState::Active));
}
#[test]
fn test_record_sent() {
let mut t = SessionTracker::new();
let id = t.open("test");
t.record_sent(id, 1000);
t.record_sent(id, 2000);
let s = t.stats(id).expect("should succeed in test");
assert_eq!(s.bytes_sent, 3000);
}
#[test]
fn test_record_received() {
let mut t = SessionTracker::new();
let id = t.open("test");
t.record_received(id, 500);
let s = t.stats(id).expect("should succeed in test");
assert_eq!(s.bytes_received, 500);
}
#[test]
fn test_total_bytes() {
let s = SessionStats {
bytes_sent: 100,
bytes_received: 200,
duration_ms: 1000,
transition_count: 0,
};
assert_eq!(s.total_bytes(), 300);
}
#[test]
fn test_throughput_kbps() {
let s = SessionStats {
bytes_sent: 1000,
bytes_received: 0,
duration_ms: 1000,
transition_count: 0,
};
assert!((s.throughput_kbps() - 8.0).abs() < 1e-9);
}
#[test]
fn test_throughput_zero_duration() {
let s = SessionStats {
bytes_sent: 1000,
bytes_received: 0,
duration_ms: 0,
transition_count: 0,
};
assert_eq!(s.throughput_kbps(), 0.0);
}
#[test]
fn test_active_count() {
let mut t = SessionTracker::new();
let a = t.open("a");
let _b = t.open("b");
t.transition(a, SessionState::Closed);
assert_eq!(t.active_count(), 1);
}
#[test]
fn test_purge_dead() {
let mut t = SessionTracker::new();
let a = t.open("a");
let _b = t.open("b");
t.transition(a, SessionState::Failed);
let purged = t.purge_dead();
assert_eq!(purged, 1);
assert_eq!(t.total_sessions(), 1);
}
#[test]
fn test_label() {
let mut t = SessionTracker::new();
let id = t.open("srt://host:9000");
assert_eq!(t.label(id), Some("srt://host:9000"));
}
#[test]
fn test_session_id_display() {
let id = SessionId::from_raw(42);
assert_eq!(format!("{id}"), "session-42");
}
#[test]
fn test_session_state_display() {
assert_eq!(format!("{}", SessionState::Active), "Active");
assert_eq!(format!("{}", SessionState::Reconnecting), "Reconnecting");
}
#[test]
fn test_session_state_is_alive() {
assert!(SessionState::Active.is_alive());
assert!(SessionState::Paused.is_alive());
assert!(!SessionState::Closed.is_alive());
assert!(!SessionState::Failed.is_alive());
}
}