use super::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecordingStatus {
Recording,
Paused,
Finished,
Failed,
}
#[derive(Debug, Clone)]
pub struct RecordingSession {
pub id: u64,
pub stream_key: String,
pub destination: String,
pub status: RecordingStatus,
pub bytes_written: u64,
pub packet_count: u64,
pub started_at: u64,
pub ended_at: u64,
pub first_pts: Option<u32>,
pub last_pts: Option<u32>,
}
impl RecordingSession {
#[must_use]
pub fn new(
id: u64,
stream_key: impl Into<String>,
destination: impl Into<String>,
started_at: u64,
) -> Self {
Self {
id,
stream_key: stream_key.into(),
destination: destination.into(),
status: RecordingStatus::Recording,
bytes_written: 0,
packet_count: 0,
started_at,
ended_at: 0,
first_pts: None,
last_pts: None,
}
}
#[must_use]
pub fn duration_ms(&self) -> Option<u32> {
match (self.first_pts, self.last_pts) {
(Some(first), Some(last)) => Some(last.wrapping_sub(first)),
_ => None,
}
}
pub fn ingest(&mut self, packet: &MediaPacket) {
if self.status != RecordingStatus::Recording {
return;
}
self.bytes_written += packet.data.len() as u64;
self.packet_count += 1;
if self.first_pts.is_none() {
self.first_pts = Some(packet.timestamp);
}
self.last_pts = Some(packet.timestamp);
}
pub fn finish(&mut self, ended_at: u64) {
self.status = RecordingStatus::Finished;
self.ended_at = ended_at;
}
pub fn fail(&mut self, ended_at: u64) {
self.status = RecordingStatus::Failed;
self.ended_at = ended_at;
}
pub fn pause(&mut self) {
if self.status == RecordingStatus::Recording {
self.status = RecordingStatus::Paused;
}
}
pub fn resume(&mut self) {
if self.status == RecordingStatus::Paused {
self.status = RecordingStatus::Recording;
}
}
}
pub struct RecordingRegistry {
sessions: Arc<RwLock<HashMap<u64, RecordingSession>>>,
next_id: Arc<RwLock<u64>>,
}
impl RecordingRegistry {
#[must_use]
pub fn new() -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
next_id: Arc::new(RwLock::new(1)),
}
}
pub async fn start_session(
&self,
stream_key: impl Into<String>,
destination: impl Into<String>,
now_secs: u64,
) -> u64 {
let id = {
let mut next = self.next_id.write().await;
let id = *next;
*next += 1;
id
};
let session = RecordingSession::new(id, stream_key, destination, now_secs);
let mut sessions = self.sessions.write().await;
sessions.insert(id, session);
id
}
pub async fn ingest(&self, session_id: u64, packet: &MediaPacket) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.ingest(packet);
}
}
pub async fn finish_session(&self, session_id: u64, now_secs: u64) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.finish(now_secs);
}
}
pub async fn fail_session(&self, session_id: u64, now_secs: u64) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.fail(now_secs);
}
}
pub async fn pause_session(&self, session_id: u64) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.pause();
}
}
pub async fn resume_session(&self, session_id: u64) {
let mut sessions = self.sessions.write().await;
if let Some(session) = sessions.get_mut(&session_id) {
session.resume();
}
}
pub async fn get_session(&self, session_id: u64) -> Option<RecordingSession> {
let sessions = self.sessions.read().await;
sessions.get(&session_id).cloned()
}
pub async fn sessions_for_stream(&self, stream_key: &str) -> Vec<RecordingSession> {
let sessions = self.sessions.read().await;
sessions
.values()
.filter(|s| s.stream_key == stream_key)
.cloned()
.collect()
}
pub async fn active_count(&self) -> usize {
let sessions = self.sessions.read().await;
sessions
.values()
.filter(|s| {
s.status == RecordingStatus::Recording || s.status == RecordingStatus::Paused
})
.count()
}
pub async fn prune_completed(&self) {
let mut sessions = self.sessions.write().await;
sessions.retain(|_, s| {
s.status == RecordingStatus::Recording || s.status == RecordingStatus::Paused
});
}
}
impl Default for RecordingRegistry {
fn default() -> Self {
Self::new()
}
}