use super::events::{StreamEvent, StreamEventKind};
use super::handle::{StreamHandle, StreamState};
use crate::observe::Observer;
use crate::{AppName, Result, StreamError, StreamId};
use dashmap::DashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;
pub struct Application {
pub name: AppName,
pub broadcast_capacity: usize,
pub gop_capacity: usize,
streams: DashMap<StreamId, StreamHandle>,
event_tx: broadcast::Sender<StreamEvent>,
pub_lock: tokio::sync::Mutex<()>,
stream_count: AtomicUsize,
observer: Arc<dyn Observer>,
}
impl Application {
pub fn new(
name: AppName,
broadcast_capacity: usize,
gop_capacity: usize,
observer: Arc<dyn Observer>,
) -> Arc<Self> {
let (event_tx, _) = broadcast::channel(64);
Arc::new(Self {
name,
broadcast_capacity,
gop_capacity,
streams: DashMap::new(),
event_tx,
pub_lock: tokio::sync::Mutex::new(()),
stream_count: AtomicUsize::new(0),
observer,
})
}
pub async fn start_publish(&self, stream_id: StreamId) -> Result<StreamHandle> {
let _guard = self.pub_lock.lock().await;
if let Some(existing) = self.streams.get(&stream_id) {
let state = existing.current_state().await;
if matches!(state, StreamState::Publishing | StreamState::Transcoding) {
return Err(StreamError::StreamAlreadyPublishing {
app: self.name.to_string(),
stream_id: stream_id.to_string(),
});
}
}
let handle = StreamHandle::with_observer(
self.name.clone(),
stream_id.clone(),
self.broadcast_capacity,
self.gop_capacity,
Arc::clone(&self.observer),
);
handle.set_state(StreamState::Publishing).await;
let started_at_ms = super::handle::now_ms();
handle
.update_metadata(|m| m.started_at_ms = started_at_ms)
.await;
self.streams.insert(stream_id.clone(), handle.clone());
self.stream_count.fetch_add(1, Ordering::Relaxed);
drop(_guard);
self.observer.on_publish_started(self.name.as_str());
info!(app = %self.name, stream = %stream_id, "Stream publish started");
self.emit(stream_id.clone(), StreamEventKind::PublishStarted);
Ok(handle)
}
pub async fn end_publish(&self, stream_id: &StreamId) -> Result<bool> {
if let Some((_, handle)) = self.streams.remove(stream_id) {
self.stream_count.fetch_sub(1, Ordering::Relaxed);
self.observer.on_publish_ended(self.name.as_str());
handle.set_state(StreamState::Ended).await;
handle.close();
info!(app = %self.name, stream = %stream_id, "Stream publish ended");
self.emit(stream_id.clone(), StreamEventKind::PublishEnded);
return Ok(true);
}
Ok(false)
}
pub fn get_stream(&self, stream_id: &StreamId) -> Option<StreamHandle> {
self.streams.get(stream_id).map(|r| r.clone())
}
pub fn active_streams(&self) -> Vec<StreamId> {
self.streams.iter().map(|r| r.key().clone()).collect()
}
pub fn active_handles(&self) -> Vec<StreamHandle> {
self.streams.iter().map(|r| r.value().clone()).collect()
}
pub fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent> {
self.event_tx.subscribe()
}
pub fn stream_count(&self) -> usize {
self.stream_count.load(Ordering::Relaxed)
}
fn emit(&self, stream_id: StreamId, kind: StreamEventKind) {
let event = StreamEvent {
app: self.name.clone(),
stream_id,
kind,
};
self.observer.on_event(&event);
let _ = self.event_tx.send(event);
}
}
impl std::fmt::Debug for Application {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Application")
.field("name", &self.name)
.field("stream_count", &self.streams.len())
.finish()
}
}