Skip to main content

arcly_stream/bus/
application.rs

1//! One registered application and its live streams.
2
3use super::events::{StreamEvent, StreamEventKind};
4use super::handle::{StreamHandle, StreamState};
5use crate::observe::Observer;
6use crate::{AppName, Result, StreamError, StreamId};
7use dashmap::DashMap;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use tokio::sync::broadcast;
11use tracing::info;
12
13/// One registered application (e.g. "live", "vod").
14///
15/// Decoupled from `stream-center`'s `sc-config::AppConfig`: it takes a plain
16/// `broadcast_capacity` and an injected [`Observer`] instead of reaching for a
17/// metrics singleton.
18pub struct Application {
19    /// Application name (e.g. `"live"`).
20    pub name: AppName,
21    /// Per-stream broadcast channel capacity.
22    pub broadcast_capacity: usize,
23    /// Keyframe-anchored GOP replay buffer size, in frames (0 disables it).
24    pub gop_capacity: usize,
25    streams: DashMap<StreamId, StreamHandle>,
26    /// Broadcast channel for stream lifecycle events within this application.
27    event_tx: broadcast::Sender<StreamEvent>,
28    /// Serializes concurrent start_publish calls to eliminate the check-then-insert
29    /// race condition (TOCTOU) that could allow two publishers to claim the same stream.
30    pub_lock: tokio::sync::Mutex<()>,
31    /// Lock-free count of active streams; updated on start/end publish.
32    stream_count: AtomicUsize,
33    /// Injected telemetry hook (no-op by default).
34    observer: Arc<dyn Observer>,
35}
36
37impl Application {
38    /// Create an application with the given per-stream capacities and observer.
39    pub fn new(
40        name: AppName,
41        broadcast_capacity: usize,
42        gop_capacity: usize,
43        observer: Arc<dyn Observer>,
44    ) -> Arc<Self> {
45        let (event_tx, _) = broadcast::channel(64);
46        Arc::new(Self {
47            name,
48            broadcast_capacity,
49            gop_capacity,
50            streams: DashMap::new(),
51            event_tx,
52            pub_lock: tokio::sync::Mutex::new(()),
53            stream_count: AtomicUsize::new(0),
54            observer,
55        })
56    }
57
58    /// Register a new publishing stream.  Fails if a stream with the same ID
59    /// is already in the `Publishing` or `Transcoding` state.
60    pub async fn start_publish(&self, stream_id: StreamId) -> Result<StreamHandle> {
61        // Hold this mutex for the duration of the check+insert to prevent two
62        // concurrent callers from both seeing the stream as free and both registering.
63        let _guard = self.pub_lock.lock().await;
64
65        if let Some(existing) = self.streams.get(&stream_id) {
66            let state = existing.current_state().await;
67            if matches!(state, StreamState::Publishing | StreamState::Transcoding) {
68                return Err(StreamError::StreamAlreadyPublishing {
69                    app: self.name.to_string(),
70                    stream_id: stream_id.to_string(),
71                });
72            }
73        }
74
75        let handle = StreamHandle::with_observer(
76            self.name.clone(),
77            stream_id.clone(),
78            self.broadcast_capacity,
79            self.gop_capacity,
80            Arc::clone(&self.observer),
81        );
82        handle.set_state(StreamState::Publishing).await;
83        let started_at_ms = super::handle::now_ms();
84        handle
85            .update_metadata(|m| m.started_at_ms = started_at_ms)
86            .await;
87        self.streams.insert(stream_id.clone(), handle.clone());
88        self.stream_count.fetch_add(1, Ordering::Relaxed);
89        // Release the lock before notifying the observer to avoid holding it
90        // across a potentially-blocking host callback.
91        drop(_guard);
92        self.observer.on_publish_started(self.name.as_str());
93
94        info!(app = %self.name, stream = %stream_id, "Stream publish started");
95        self.emit(stream_id.clone(), StreamEventKind::PublishStarted);
96
97        Ok(handle)
98    }
99
100    /// Mark a stream as ended and remove it from the active registry.
101    /// Returns `true` if the stream was present and removed, `false` if it
102    /// was already gone (idempotent).
103    pub async fn end_publish(&self, stream_id: &StreamId) -> Result<bool> {
104        if let Some((_, handle)) = self.streams.remove(stream_id) {
105            self.stream_count.fetch_sub(1, Ordering::Relaxed);
106            self.observer.on_publish_ended(self.name.as_str());
107            handle.set_state(StreamState::Ended).await;
108            // Drop the sole frame-bus sender so every subscriber's `recv`
109            // terminates with `Closed`, even if some consumer still holds a
110            // handle clone (e.g. a WHEP egress pump).
111            handle.close();
112            info!(app = %self.name, stream = %stream_id, "Stream publish ended");
113            self.emit(stream_id.clone(), StreamEventKind::PublishEnded);
114            return Ok(true);
115        }
116        Ok(false)
117    }
118
119    /// Look up a live stream handle for playback.
120    pub fn get_stream(&self, stream_id: &StreamId) -> Option<StreamHandle> {
121        self.streams.get(stream_id).map(|r| r.clone())
122    }
123
124    /// List all active stream IDs.
125    pub fn active_streams(&self) -> Vec<StreamId> {
126        self.streams.iter().map(|r| r.key().clone()).collect()
127    }
128
129    /// Snapshot of all live stream handles (used by the engine's idle reaper).
130    pub fn active_handles(&self) -> Vec<StreamHandle> {
131        self.streams.iter().map(|r| r.value().clone()).collect()
132    }
133
134    /// Subscribe to stream lifecycle events for this application.
135    pub fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent> {
136        self.event_tx.subscribe()
137    }
138
139    /// Number of streams currently active in this application.
140    pub fn stream_count(&self) -> usize {
141        self.stream_count.load(Ordering::Relaxed)
142    }
143
144    /// Emit a lifecycle event to both the broadcast channel and the observer.
145    fn emit(&self, stream_id: StreamId, kind: StreamEventKind) {
146        let event = StreamEvent {
147            app: self.name.clone(),
148            stream_id,
149            kind,
150        };
151        self.observer.on_event(&event);
152        let _ = self.event_tx.send(event);
153    }
154}
155
156impl std::fmt::Debug for Application {
157    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158        f.debug_struct("Application")
159            .field("name", &self.name)
160            .field("stream_count", &self.streams.len())
161            .finish()
162    }
163}