use crate::observe::{NoopObserver, Observer};
use crate::{frame::FrameFlags, AppName, MediaFrame, StreamId, StreamKey};
use arc_swap::{ArcSwap, ArcSwapOption};
use std::net::SocketAddr;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex as StdMutex, OnceLock};
use std::time::{Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, RwLock};
pub(crate) fn now_ms() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
pub(crate) fn mono_ms() -> u64 {
static EPOCH: OnceLock<Instant> = OnceLock::new();
EPOCH.get_or_init(Instant::now).elapsed().as_millis() as u64
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum StreamState {
Idle,
Publishing,
Transcoding,
Recording,
Ended,
}
impl std::fmt::Display for StreamState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
StreamState::Idle => "idle",
StreamState::Publishing => "publishing",
StreamState::Transcoding => "transcoding",
StreamState::Recording => "recording",
StreamState::Ended => "ended",
})
}
}
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub key: StreamKey,
pub publisher_addr: Option<SocketAddr>,
pub width: u32,
pub height: u32,
pub fps: f64,
pub video_bitrate_bps: u64,
pub audio_bitrate_bps: u64,
pub started_at_ms: u64,
pub ingest_protocol: String,
}
impl StreamMetadata {
pub fn new(app: AppName, stream_id: StreamId) -> Self {
Self {
key: StreamKey::new(app, stream_id),
publisher_addr: None,
width: 0,
height: 0,
fps: 0.0,
video_bitrate_bps: 0,
audio_bitrate_bps: 0,
started_at_ms: 0,
ingest_protocol: String::new(),
}
}
}
#[derive(Debug, Clone, Copy, Default)]
pub struct Qos {
pub video_bitrate_bps: u64,
pub audio_bitrate_bps: u64,
pub fps: f64,
pub total_frames: u64,
pub total_bytes: u64,
}
#[derive(Default)]
struct QosCounters {
total_frames: AtomicU64,
total_bytes: AtomicU64,
window_start_ms: AtomicU64,
window_video_bytes: AtomicU64,
window_audio_bytes: AtomicU64,
window_video_frames: AtomicU64,
cur_video_bitrate: AtomicU64,
cur_audio_bitrate: AtomicU64,
cur_fps_milli: AtomicU64, last_frame_ms: AtomicU64,
}
struct GopBuffer {
frames: Vec<Arc<MediaFrame>>,
capacity: usize,
}
#[derive(Clone)]
pub struct StreamHandle {
metadata: Arc<RwLock<StreamMetadata>>,
state: Arc<RwLock<StreamState>>,
key: StreamKey,
tx: Arc<ArcSwapOption<broadcast::Sender<Arc<MediaFrame>>>>,
video_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
audio_config: Arc<ArcSwap<Option<Arc<MediaFrame>>>>,
gop: Arc<StdMutex<GopBuffer>>,
gop_capacity: usize,
qos: Arc<QosCounters>,
observer: Arc<dyn Observer>,
}
impl StreamHandle {
pub fn new(app: AppName, stream_id: StreamId, capacity: usize) -> Self {
Self::with_observer(app, stream_id, capacity, 0, Arc::new(NoopObserver))
}
pub fn with_observer(
app: AppName,
stream_id: StreamId,
capacity: usize,
gop_capacity: usize,
observer: Arc<dyn Observer>,
) -> Self {
let (tx, _) = broadcast::channel(capacity);
let qos = QosCounters::default();
qos.last_frame_ms.store(mono_ms(), Ordering::Relaxed);
Self {
metadata: Arc::new(RwLock::new(StreamMetadata::new(
app.clone(),
stream_id.clone(),
))),
state: Arc::new(RwLock::new(StreamState::Idle)),
key: StreamKey::new(app, stream_id),
tx: Arc::new(ArcSwapOption::new(Some(Arc::new(tx)))),
video_config: Arc::new(ArcSwap::new(Arc::new(None))),
audio_config: Arc::new(ArcSwap::new(Arc::new(None))),
gop: Arc::new(StdMutex::new(GopBuffer {
frames: Vec::new(),
capacity: gop_capacity,
})),
gop_capacity,
qos: Arc::new(qos),
observer,
}
}
pub fn key(&self) -> &StreamKey {
&self.key
}
pub fn publish_frame(&self, frame: MediaFrame) -> crate::Result<usize> {
self.observer.on_frame(&self.key, &frame);
let len = frame.data.len() as u64;
let is_audio = frame.is_audio();
let is_key = frame.is_keyframe();
let is_config = frame.flags.contains(FrameFlags::CONFIG);
let arc = Arc::new(frame);
if is_config {
if is_audio {
self.audio_config.store(Arc::new(Some(Arc::clone(&arc))));
} else {
self.video_config.store(Arc::new(Some(Arc::clone(&arc))));
}
}
if self.gop_capacity > 0 {
if let Ok(mut g) = self.gop.lock() {
if is_key {
g.frames.clear();
g.frames.push(Arc::clone(&arc));
} else if !is_config && !g.frames.is_empty() && g.frames.len() < g.capacity {
g.frames.push(Arc::clone(&arc));
}
}
}
self.record_qos(len, is_audio, is_key);
let count = match self.tx.load_full() {
Some(tx) => tx.send(arc).unwrap_or(0),
None => 0,
};
Ok(count)
}
fn record_qos(&self, len: u64, is_audio: bool, _is_key: bool) {
let q = &self.qos;
let now = mono_ms();
q.total_frames.fetch_add(1, Ordering::Relaxed);
q.total_bytes.fetch_add(len, Ordering::Relaxed);
q.last_frame_ms.store(now, Ordering::Relaxed);
if is_audio {
q.window_audio_bytes.fetch_add(len, Ordering::Relaxed);
} else {
q.window_video_bytes.fetch_add(len, Ordering::Relaxed);
q.window_video_frames.fetch_add(1, Ordering::Relaxed);
}
let ws = q.window_start_ms.load(Ordering::Relaxed);
if ws == 0 {
q.window_start_ms.store(now, Ordering::Relaxed);
} else if now.saturating_sub(ws) >= 1000 {
let elapsed = (now - ws) as f64 / 1000.0;
let vbytes = q.window_video_bytes.swap(0, Ordering::Relaxed);
let abytes = q.window_audio_bytes.swap(0, Ordering::Relaxed);
let vframes = q.window_video_frames.swap(0, Ordering::Relaxed);
q.cur_video_bitrate
.store((vbytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
q.cur_audio_bitrate
.store((abytes as f64 * 8.0 / elapsed) as u64, Ordering::Relaxed);
q.cur_fps_milli.store(
(vframes as f64 / elapsed * 1000.0) as u64,
Ordering::Relaxed,
);
q.window_start_ms.store(now, Ordering::Relaxed);
}
}
pub fn qos(&self) -> Qos {
let q = &self.qos;
Qos {
video_bitrate_bps: q.cur_video_bitrate.load(Ordering::Relaxed),
audio_bitrate_bps: q.cur_audio_bitrate.load(Ordering::Relaxed),
fps: q.cur_fps_milli.load(Ordering::Relaxed) as f64 / 1000.0,
total_frames: q.total_frames.load(Ordering::Relaxed),
total_bytes: q.total_bytes.load(Ordering::Relaxed),
}
}
pub fn last_frame_ms(&self) -> u64 {
self.qos.last_frame_ms.load(Ordering::Relaxed)
}
pub fn cached_configs(&self) -> (Option<Arc<MediaFrame>>, Option<Arc<MediaFrame>>) {
let video = (**self.video_config.load()).clone();
let audio = (**self.audio_config.load()).clone();
(video, audio)
}
pub fn replay_buffer(&self) -> Vec<Arc<MediaFrame>> {
let (vcfg, acfg) = self.cached_configs();
let mut out = Vec::new();
out.extend(vcfg);
out.extend(acfg);
if self.gop_capacity > 0 {
if let Ok(g) = self.gop.lock() {
for f in &g.frames {
if !out.iter().any(|c| Arc::ptr_eq(c, f)) {
out.push(Arc::clone(f));
}
}
}
}
out
}
pub fn subscribe(&self) -> broadcast::Receiver<Arc<MediaFrame>> {
match self.tx.load_full() {
Some(tx) => tx.subscribe(),
None => {
let (_, rx) = broadcast::channel(1);
rx
}
}
}
pub fn subscribe_resilient(&self) -> Subscription {
Subscription {
rx: self.subscribe(),
key: self.key.clone(),
observer: Arc::clone(&self.observer),
max_lag: None,
skipped: 0,
}
}
pub fn subscriber_count(&self) -> usize {
self.tx
.load_full()
.map(|tx| tx.receiver_count())
.unwrap_or(0)
}
pub fn close(&self) {
self.tx.store(None);
}
pub async fn set_state(&self, state: StreamState) {
let mut guard = self.state.write().await;
*guard = state;
}
pub async fn current_state(&self) -> StreamState {
self.state.read().await.clone()
}
pub async fn metadata_snapshot(&self) -> StreamMetadata {
let mut m = self.metadata.read().await.clone();
let q = self.qos();
m.video_bitrate_bps = q.video_bitrate_bps;
m.audio_bitrate_bps = q.audio_bitrate_bps;
if q.fps > 0.0 {
m.fps = q.fps;
}
m
}
pub async fn update_metadata(&self, f: impl FnOnce(&mut StreamMetadata)) {
let mut guard = self.metadata.write().await;
f(&mut guard);
}
}
impl std::fmt::Debug for StreamHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamHandle")
.field("key", &self.key)
.field("subscribers", &self.subscriber_count())
.finish()
}
}
pub struct Subscription {
rx: broadcast::Receiver<Arc<MediaFrame>>,
key: StreamKey,
observer: Arc<dyn Observer>,
max_lag: Option<u64>,
skipped: u64,
}
impl Subscription {
pub fn max_lag(mut self, max: u64) -> Self {
self.max_lag = Some(max);
self
}
pub fn dropped(&self) -> u64 {
self.skipped
}
pub async fn recv(&mut self) -> Option<Arc<MediaFrame>> {
loop {
match self.rx.recv().await {
Ok(frame) => return Some(frame),
Err(RecvError::Lagged(skipped)) => {
self.skipped = self.skipped.saturating_add(skipped);
self.observer.on_subscriber_lagged(&self.key, skipped);
if let Some(max) = self.max_lag {
if self.skipped > max {
self.observer.on_subscriber_evicted(&self.key);
return None;
}
}
continue;
}
Err(RecvError::Closed) => return None,
}
}
}
pub fn raw(&mut self) -> &mut broadcast::Receiver<Arc<MediaFrame>> {
&mut self.rx
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::CodecId;
fn video(pts: i64, key: bool) -> MediaFrame {
MediaFrame::new_video(
pts,
pts,
bytes::Bytes::from_static(&[0, 0, 0, 1, 0x65]),
CodecId::H264,
key,
)
}
#[tokio::test]
async fn close_terminates_recv_while_a_handle_clone_is_held() {
let handle = StreamHandle::new("live".into(), "show".into(), 16);
let mut sub = handle.subscribe_resilient();
let retained = handle.clone();
handle.publish_frame(video(0, true)).unwrap();
assert!(sub.recv().await.is_some(), "frame delivered before close");
retained.close();
let got = tokio::time::timeout(std::time::Duration::from_secs(5), sub.recv())
.await
.expect("recv resolved after close (no hang)");
assert!(got.is_none(), "recv returns None once the stream is closed");
assert_eq!(retained.publish_frame(video(1, false)).unwrap(), 0);
assert_eq!(retained.subscriber_count(), 0);
}
#[test]
fn mono_clock_is_monotonic_and_drives_last_frame() {
let a = mono_ms();
let b = mono_ms();
assert!(b >= a, "monotonic clock never goes backward");
let handle = StreamHandle::new("live".into(), "m".into(), 4);
let before = mono_ms();
handle.publish_frame(video(0, true)).unwrap();
assert!(
handle.last_frame_ms() >= before,
"last_frame_ms advances on the monotonic clock"
);
}
}