Skip to main content

arcly_stream/engine/
mod.rs

1//! The composition root — the analogue of `arcly-http`'s `App`.
2//!
3//! [`Engine`] replaces `stream-center`'s `ApplicationRegistry::from_config`.
4//! It is built by a plain-Rust [`EngineBuilder`] (no TOML, no
5//! `StreamCenterConfig`) and implements all three bus contracts
6//! ([`PublishRegistry`], [`PlaybackRegistry`], [`EventBus`]), so it drops
7//! straight into protocol code written against those traits.
8
9mod builder;
10mod driver;
11
12pub use builder::EngineBuilder;
13
14use crate::auth::{Credentials, StreamAuthenticator};
15use crate::bus::{
16    Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
17};
18use crate::observe::Observer;
19use crate::{AppName, Result, StreamError, StreamId, StreamKey};
20use async_trait::async_trait;
21use dashmap::DashMap;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::sync::broadcast;
25use tracing::info;
26
27/// Plain-Rust application descriptor — a named namespace of streams with its own
28/// fan-out capacity and GOP-replay policy. Build one fluently and register it via
29/// [`EngineBuilder::application`](crate::EngineBuilder::application).
30///
31/// ```
32/// use arcly_stream::prelude::*;
33///
34/// let live = AppSpec::new("live")
35///     .gop_cache(120)          // ~4s at 30fps → instant-start replay
36///     .broadcast_capacity(8192); // deeper buffer for very slow joiners
37///
38/// let engine = Engine::builder().application(live).build();
39/// assert_eq!(engine.list_apps()[0].as_str(), "live");
40/// ```
41#[derive(Debug, Clone)]
42pub struct AppSpec {
43    /// Application name (e.g. `"live"`).
44    pub name: AppName,
45    /// Per-stream broadcast channel capacity (frames buffered for slow joiners).
46    pub broadcast_capacity: usize,
47    /// Keyframe-anchored GOP replay buffer size, in frames (0 disables it).
48    /// Enables sub-second playback start for late joiners.
49    pub gop_capacity: usize,
50}
51
52impl AppSpec {
53    /// A new app with default capacities and GOP caching disabled.
54    pub fn new(name: impl Into<AppName>) -> Self {
55        Self {
56            name: name.into(),
57            broadcast_capacity: 4096,
58            gop_capacity: 0,
59        }
60    }
61
62    /// Override the per-stream broadcast channel capacity.
63    pub fn broadcast_capacity(mut self, n: usize) -> Self {
64        self.broadcast_capacity = n;
65        self
66    }
67
68    /// Enable the keyframe-anchored GOP replay buffer, bounded to `frames`
69    /// (e.g. `fps × gop_seconds`). Late joiners receive the cached configs plus
70    /// the current GOP and start decoding immediately.
71    pub fn gop_cache(mut self, frames: usize) -> Self {
72        self.gop_capacity = frames;
73        self
74    }
75}
76
77/// Engine-wide configuration — replaces `sc-config::ServerConfig` knobs the
78/// engine actually needs.
79#[derive(Debug, Clone)]
80pub struct EngineConfig {
81    /// Hard cap on concurrent publishing streams across all applications.
82    pub max_publishers: usize,
83    /// If set, a stream that has not received a frame within this window is
84    /// reaped by [`Engine::reap_idle`] / the background idle reaper.
85    pub idle_timeout: Option<std::time::Duration>,
86}
87
88impl Default for EngineConfig {
89    fn default() -> Self {
90        Self {
91            max_publishers: 10_000,
92            idle_timeout: None,
93        }
94    }
95}
96
97/// The engine. Many instances may coexist in one process (ideal for tests).
98/// Cheap to `Arc`-share.
99pub struct Engine {
100    apps: DashMap<AppName, Arc<Application>>,
101    config: EngineConfig,
102    /// Running count of active publishers across all apps.
103    active_publishers: AtomicUsize,
104    observer: Arc<dyn Observer>,
105    authenticator: Arc<dyn StreamAuthenticator>,
106    /// Protocol workers registered on the builder via
107    /// [`EngineBuilder::protocol`], consumed once by
108    /// [`serve_registered`](Engine::serve_registered).
109    pending_protocols: std::sync::Mutex<Vec<Box<dyn crate::inbound::InboundProtocol>>>,
110}
111
112impl Engine {
113    /// Start building an engine.
114    pub fn builder() -> EngineBuilder {
115        EngineBuilder::new()
116    }
117
118    /// Internal constructor used by [`EngineBuilder::build`].
119    pub(crate) fn from_parts(
120        config: EngineConfig,
121        specs: Vec<AppSpec>,
122        observer: Arc<dyn Observer>,
123        authenticator: Arc<dyn StreamAuthenticator>,
124        protocols: Vec<Box<dyn crate::inbound::InboundProtocol>>,
125    ) -> Arc<Self> {
126        let apps = DashMap::new();
127        for spec in specs {
128            let app = Application::new(
129                spec.name.clone(),
130                spec.broadcast_capacity,
131                spec.gop_capacity,
132                Arc::clone(&observer),
133            );
134            apps.insert(spec.name.clone(), app);
135            info!(app = %spec.name, "Application registered");
136        }
137        Arc::new(Self {
138            apps,
139            config,
140            active_publishers: AtomicUsize::new(0),
141            observer,
142            authenticator,
143            pending_protocols: std::sync::Mutex::new(protocols),
144        })
145    }
146
147    /// Register an application after construction (e.g. on config reload).
148    ///
149    /// Rejects a name that is already registered with
150    /// [`StreamError::AppAlreadyRegistered`] rather than silently replacing the
151    /// live [`Application`] — an overwrite would orphan that app's active
152    /// streams and leak the engine-wide publisher count that gates
153    /// [`StreamError::PublisherLimitReached`]. To change an app's settings,
154    /// drain and remove it first (a future `remove_app`), then re-register.
155    pub fn register_app(&self, spec: AppSpec) -> Result<()> {
156        use dashmap::mapref::entry::Entry;
157        match self.apps.entry(spec.name.clone()) {
158            Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
159            Entry::Vacant(slot) => {
160                let app = Application::new(
161                    spec.name.clone(),
162                    spec.broadcast_capacity,
163                    spec.gop_capacity,
164                    Arc::clone(&self.observer),
165                );
166                info!(app = %spec.name, "Application registered");
167                slot.insert(app);
168                Ok(())
169            }
170        }
171    }
172
173    /// List registered application names.
174    pub fn list_apps(&self) -> Vec<AppName> {
175        self.apps.iter().map(|r| r.key().clone()).collect()
176    }
177
178    /// Total active publishers across all applications (single atomic load).
179    pub fn total_stream_count(&self) -> usize {
180        self.active_publishers.load(Ordering::Acquire)
181    }
182
183    fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
184        self.apps
185            .get(app)
186            .map(|r| r.clone())
187            .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
188    }
189
190    /// Claim a publish slot only if the injected [`StreamAuthenticator`] permits
191    /// `creds` to publish `key`. Protocol handlers should call this rather than
192    /// [`start_publish`](PublishRegistry::start_publish) directly so the auth
193    /// policy is enforced uniformly across every transport.
194    pub async fn start_publish_authorized(
195        &self,
196        key: &StreamKey,
197        creds: &Credentials,
198    ) -> Result<StreamHandle> {
199        self.authenticator.authorize_publish(key, creds).await?;
200        self.start_publish(key).await
201    }
202
203    /// Resolve a live stream for playback only if the authenticator permits
204    /// `creds` to play `key`.
205    pub async fn open_playback_authorized(
206        &self,
207        key: &StreamKey,
208        creds: &Credentials,
209    ) -> Result<StreamHandle> {
210        self.authenticator.authorize_play(key, creds).await?;
211        self.get_stream(key)
212    }
213}
214
215#[async_trait]
216impl PublishRegistry for Engine {
217    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
218    async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
219        // Optimistically claim a publisher slot before taking the per-app lock.
220        // Roll back the increment if the app-level check later rejects the stream.
221        let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
222        if prev >= self.config.max_publishers {
223            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
224            return Err(StreamError::PublisherLimitReached {
225                limit: self.config.max_publishers,
226            });
227        }
228
229        let application = match self.get_app(&key.app) {
230            Ok(app) => app,
231            Err(e) => {
232                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
233                return Err(e);
234            }
235        };
236
237        match application.start_publish(key.stream_id.clone()).await {
238            Ok(handle) => Ok(handle),
239            Err(e) => {
240                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
241                Err(e)
242            }
243        }
244    }
245
246    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
247    async fn end_publish(&self, key: &StreamKey) -> Result<()> {
248        let application = self.get_app(&key.app)?;
249        if application.end_publish(&key.stream_id).await? {
250            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
251        }
252        Ok(())
253    }
254}
255
256impl PlaybackRegistry for Engine {
257    fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
258        let application = self.get_app(&key.app)?;
259        application
260            .get_stream(&key.stream_id)
261            .ok_or_else(|| StreamError::StreamNotFound {
262                app: key.app.to_string(),
263                stream_id: key.stream_id.to_string(),
264            })
265    }
266
267    fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
268        Ok(self.get_app(app)?.active_streams())
269    }
270}
271
272impl EventBus for Engine {
273    fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
274        Ok(self.get_app(app)?.subscribe_events())
275    }
276}
277
278impl std::fmt::Debug for Engine {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        f.debug_struct("Engine")
281            .field("app_count", &self.apps.len())
282            .field("active_publishers", &self.total_stream_count())
283            .finish()
284    }
285}