Skip to main content

arcly_stream/engine/
mod.rs

1//! The composition root — the analogue of `arcly-http`'s `App`.
2//!
3//! [`Engine`] replaces `stream-center`'s `ApplicationRegistry::from_config`.
4//! It is built by a plain-Rust [`EngineBuilder`] (no TOML, no
5//! `StreamCenterConfig`) and implements all three bus contracts
6//! ([`PublishRegistry`], [`PlaybackRegistry`], [`EventBus`]), so it drops
7//! straight into protocol code written against those traits.
8
9mod builder;
10mod driver;
11
12pub use builder::EngineBuilder;
13
14use crate::auth::{Credentials, StreamAuthenticator};
15use crate::bus::{
16    Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
17};
18use crate::observe::Observer;
19use crate::{AppName, Result, StreamError, StreamId, StreamKey};
20use async_trait::async_trait;
21use dashmap::DashMap;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::sync::broadcast;
25use tracing::info;
26
27/// Plain-Rust application descriptor — replaces `sc-config::AppConfig`.
28#[derive(Debug, Clone)]
29pub struct AppSpec {
30    /// Application name (e.g. `"live"`).
31    pub name: AppName,
32    /// Per-stream broadcast channel capacity (frames buffered for slow joiners).
33    pub broadcast_capacity: usize,
34    /// Keyframe-anchored GOP replay buffer size, in frames (0 disables it).
35    /// Enables sub-second playback start for late joiners.
36    pub gop_capacity: usize,
37}
38
39impl AppSpec {
40    /// A new app with default capacities and GOP caching disabled.
41    pub fn new(name: impl Into<AppName>) -> Self {
42        Self {
43            name: name.into(),
44            broadcast_capacity: 4096,
45            gop_capacity: 0,
46        }
47    }
48
49    /// Override the per-stream broadcast channel capacity.
50    pub fn broadcast_capacity(mut self, n: usize) -> Self {
51        self.broadcast_capacity = n;
52        self
53    }
54
55    /// Enable the keyframe-anchored GOP replay buffer, bounded to `frames`
56    /// (e.g. `fps × gop_seconds`). Late joiners receive the cached configs plus
57    /// the current GOP and start decoding immediately.
58    pub fn gop_cache(mut self, frames: usize) -> Self {
59        self.gop_capacity = frames;
60        self
61    }
62}
63
64/// Engine-wide configuration — replaces `sc-config::ServerConfig` knobs the
65/// engine actually needs.
66#[derive(Debug, Clone)]
67pub struct EngineConfig {
68    /// Hard cap on concurrent publishing streams across all applications.
69    pub max_publishers: usize,
70    /// If set, a stream that has not received a frame within this window is
71    /// reaped by [`Engine::reap_idle`] / the background idle reaper.
72    pub idle_timeout: Option<std::time::Duration>,
73}
74
75impl Default for EngineConfig {
76    fn default() -> Self {
77        Self {
78            max_publishers: 10_000,
79            idle_timeout: None,
80        }
81    }
82}
83
84/// The engine. Many instances may coexist in one process (ideal for tests).
85/// Cheap to `Arc`-share.
86pub struct Engine {
87    apps: DashMap<AppName, Arc<Application>>,
88    config: EngineConfig,
89    /// Running count of active publishers across all apps.
90    active_publishers: AtomicUsize,
91    observer: Arc<dyn Observer>,
92    authenticator: Arc<dyn StreamAuthenticator>,
93}
94
95impl Engine {
96    /// Start building an engine.
97    pub fn builder() -> EngineBuilder {
98        EngineBuilder::new()
99    }
100
101    /// Internal constructor used by [`EngineBuilder::build`].
102    pub(crate) fn from_parts(
103        config: EngineConfig,
104        specs: Vec<AppSpec>,
105        observer: Arc<dyn Observer>,
106        authenticator: Arc<dyn StreamAuthenticator>,
107    ) -> Arc<Self> {
108        let apps = DashMap::new();
109        for spec in specs {
110            let app = Application::new(
111                spec.name.clone(),
112                spec.broadcast_capacity,
113                spec.gop_capacity,
114                Arc::clone(&observer),
115            );
116            apps.insert(spec.name.clone(), app);
117            info!(app = %spec.name, "Application registered");
118        }
119        Arc::new(Self {
120            apps,
121            config,
122            active_publishers: AtomicUsize::new(0),
123            observer,
124            authenticator,
125        })
126    }
127
128    /// Register an application after construction (e.g. on config reload).
129    ///
130    /// Rejects a name that is already registered with
131    /// [`StreamError::AppAlreadyRegistered`] rather than silently replacing the
132    /// live [`Application`] — an overwrite would orphan that app's active
133    /// streams and leak the engine-wide publisher count that gates
134    /// [`StreamError::PublisherLimitReached`]. To change an app's settings,
135    /// drain and remove it first (a future `remove_app`), then re-register.
136    pub fn register_app(&self, spec: AppSpec) -> Result<()> {
137        use dashmap::mapref::entry::Entry;
138        match self.apps.entry(spec.name.clone()) {
139            Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
140            Entry::Vacant(slot) => {
141                let app = Application::new(
142                    spec.name.clone(),
143                    spec.broadcast_capacity,
144                    spec.gop_capacity,
145                    Arc::clone(&self.observer),
146                );
147                info!(app = %spec.name, "Application registered");
148                slot.insert(app);
149                Ok(())
150            }
151        }
152    }
153
154    /// List registered application names.
155    pub fn list_apps(&self) -> Vec<AppName> {
156        self.apps.iter().map(|r| r.key().clone()).collect()
157    }
158
159    /// Total active publishers across all applications (single atomic load).
160    pub fn total_stream_count(&self) -> usize {
161        self.active_publishers.load(Ordering::Acquire)
162    }
163
164    fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
165        self.apps
166            .get(app)
167            .map(|r| r.clone())
168            .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
169    }
170
171    /// Claim a publish slot only if the injected [`StreamAuthenticator`] permits
172    /// `creds` to publish `key`. Protocol handlers should call this rather than
173    /// [`start_publish`](PublishRegistry::start_publish) directly so the auth
174    /// policy is enforced uniformly across every transport.
175    pub async fn start_publish_authorized(
176        &self,
177        key: &StreamKey,
178        creds: &Credentials,
179    ) -> Result<StreamHandle> {
180        self.authenticator.authorize_publish(key, creds).await?;
181        self.start_publish(key).await
182    }
183
184    /// Resolve a live stream for playback only if the authenticator permits
185    /// `creds` to play `key`.
186    pub async fn open_playback_authorized(
187        &self,
188        key: &StreamKey,
189        creds: &Credentials,
190    ) -> Result<StreamHandle> {
191        self.authenticator.authorize_play(key, creds).await?;
192        self.get_stream(key)
193    }
194}
195
196#[async_trait]
197impl PublishRegistry for Engine {
198    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
199    async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
200        // Optimistically claim a publisher slot before taking the per-app lock.
201        // Roll back the increment if the app-level check later rejects the stream.
202        let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
203        if prev >= self.config.max_publishers {
204            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
205            return Err(StreamError::PublisherLimitReached {
206                limit: self.config.max_publishers,
207            });
208        }
209
210        let application = match self.get_app(&key.app) {
211            Ok(app) => app,
212            Err(e) => {
213                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
214                return Err(e);
215            }
216        };
217
218        match application.start_publish(key.stream_id.clone()).await {
219            Ok(handle) => Ok(handle),
220            Err(e) => {
221                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
222                Err(e)
223            }
224        }
225    }
226
227    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
228    async fn end_publish(&self, key: &StreamKey) -> Result<()> {
229        let application = self.get_app(&key.app)?;
230        if application.end_publish(&key.stream_id).await? {
231            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
232        }
233        Ok(())
234    }
235}
236
237impl PlaybackRegistry for Engine {
238    fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
239        let application = self.get_app(&key.app)?;
240        application
241            .get_stream(&key.stream_id)
242            .ok_or_else(|| StreamError::StreamNotFound {
243                app: key.app.to_string(),
244                stream_id: key.stream_id.to_string(),
245            })
246    }
247
248    fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
249        Ok(self.get_app(app)?.active_streams())
250    }
251}
252
253impl EventBus for Engine {
254    fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
255        Ok(self.get_app(app)?.subscribe_events())
256    }
257}
258
259impl std::fmt::Debug for Engine {
260    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261        f.debug_struct("Engine")
262            .field("app_count", &self.apps.len())
263            .field("active_publishers", &self.total_stream_count())
264            .finish()
265    }
266}