Skip to main content

arcly_stream/engine/
mod.rs

1//! The composition root — the analogue of `arcly-http`'s `App`.
2//!
3//! [`Engine`] replaces `stream-center`'s `ApplicationRegistry::from_config`.
4//! It is built by a plain-Rust [`EngineBuilder`] (no TOML, no
5//! `StreamCenterConfig`) and implements all three bus contracts
6//! ([`PublishRegistry`], [`PlaybackRegistry`], [`EventBus`]), so it drops
7//! straight into protocol code written against those traits.
8
9mod builder;
10mod driver;
11
12pub use builder::EngineBuilder;
13
14use crate::auth::{Credentials, StreamAuthenticator};
15use crate::bus::{
16    Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
17};
18use crate::observe::Observer;
19use crate::{AppName, Result, StreamError, StreamId, StreamKey};
20use async_trait::async_trait;
21use dashmap::DashMap;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::sync::broadcast;
25use tracing::info;
26
27/// Plain-Rust application descriptor — replaces `sc-config::AppConfig`.
28#[derive(Debug, Clone)]
29pub struct AppSpec {
30    /// Application name (e.g. `"live"`).
31    pub name: AppName,
32    /// Per-stream broadcast channel capacity (frames buffered for slow joiners).
33    pub broadcast_capacity: usize,
34    /// Keyframe-anchored GOP replay buffer size, in frames (0 disables it).
35    /// Enables sub-second playback start for late joiners.
36    pub gop_capacity: usize,
37}
38
39impl AppSpec {
40    /// A new app with default capacities and GOP caching disabled.
41    pub fn new(name: impl Into<AppName>) -> Self {
42        Self {
43            name: name.into(),
44            broadcast_capacity: 4096,
45            gop_capacity: 0,
46        }
47    }
48
49    /// Override the per-stream broadcast channel capacity.
50    pub fn broadcast_capacity(mut self, n: usize) -> Self {
51        self.broadcast_capacity = n;
52        self
53    }
54
55    /// Enable the keyframe-anchored GOP replay buffer, bounded to `frames`
56    /// (e.g. `fps × gop_seconds`). Late joiners receive the cached configs plus
57    /// the current GOP and start decoding immediately.
58    pub fn gop_cache(mut self, frames: usize) -> Self {
59        self.gop_capacity = frames;
60        self
61    }
62}
63
64/// Engine-wide configuration — replaces `sc-config::ServerConfig` knobs the
65/// engine actually needs.
66#[derive(Debug, Clone)]
67pub struct EngineConfig {
68    /// Hard cap on concurrent publishing streams across all applications.
69    pub max_publishers: usize,
70    /// If set, a stream that has not received a frame within this window is
71    /// reaped by [`Engine::reap_idle`] / the background idle reaper.
72    pub idle_timeout: Option<std::time::Duration>,
73}
74
75impl Default for EngineConfig {
76    fn default() -> Self {
77        Self {
78            max_publishers: 10_000,
79            idle_timeout: None,
80        }
81    }
82}
83
84/// The engine. Many instances may coexist in one process (ideal for tests).
85/// Cheap to `Arc`-share.
86pub struct Engine {
87    apps: DashMap<AppName, Arc<Application>>,
88    config: EngineConfig,
89    /// Running count of active publishers across all apps.
90    active_publishers: AtomicUsize,
91    observer: Arc<dyn Observer>,
92    authenticator: Arc<dyn StreamAuthenticator>,
93    /// Protocol workers registered on the builder via
94    /// [`EngineBuilder::protocol`], consumed once by
95    /// [`serve_registered`](Engine::serve_registered).
96    pending_protocols: std::sync::Mutex<Vec<Box<dyn crate::inbound::InboundProtocol>>>,
97}
98
99impl Engine {
100    /// Start building an engine.
101    pub fn builder() -> EngineBuilder {
102        EngineBuilder::new()
103    }
104
105    /// Internal constructor used by [`EngineBuilder::build`].
106    pub(crate) fn from_parts(
107        config: EngineConfig,
108        specs: Vec<AppSpec>,
109        observer: Arc<dyn Observer>,
110        authenticator: Arc<dyn StreamAuthenticator>,
111        protocols: Vec<Box<dyn crate::inbound::InboundProtocol>>,
112    ) -> Arc<Self> {
113        let apps = DashMap::new();
114        for spec in specs {
115            let app = Application::new(
116                spec.name.clone(),
117                spec.broadcast_capacity,
118                spec.gop_capacity,
119                Arc::clone(&observer),
120            );
121            apps.insert(spec.name.clone(), app);
122            info!(app = %spec.name, "Application registered");
123        }
124        Arc::new(Self {
125            apps,
126            config,
127            active_publishers: AtomicUsize::new(0),
128            observer,
129            authenticator,
130            pending_protocols: std::sync::Mutex::new(protocols),
131        })
132    }
133
134    /// Register an application after construction (e.g. on config reload).
135    ///
136    /// Rejects a name that is already registered with
137    /// [`StreamError::AppAlreadyRegistered`] rather than silently replacing the
138    /// live [`Application`] — an overwrite would orphan that app's active
139    /// streams and leak the engine-wide publisher count that gates
140    /// [`StreamError::PublisherLimitReached`]. To change an app's settings,
141    /// drain and remove it first (a future `remove_app`), then re-register.
142    pub fn register_app(&self, spec: AppSpec) -> Result<()> {
143        use dashmap::mapref::entry::Entry;
144        match self.apps.entry(spec.name.clone()) {
145            Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
146            Entry::Vacant(slot) => {
147                let app = Application::new(
148                    spec.name.clone(),
149                    spec.broadcast_capacity,
150                    spec.gop_capacity,
151                    Arc::clone(&self.observer),
152                );
153                info!(app = %spec.name, "Application registered");
154                slot.insert(app);
155                Ok(())
156            }
157        }
158    }
159
160    /// List registered application names.
161    pub fn list_apps(&self) -> Vec<AppName> {
162        self.apps.iter().map(|r| r.key().clone()).collect()
163    }
164
165    /// Total active publishers across all applications (single atomic load).
166    pub fn total_stream_count(&self) -> usize {
167        self.active_publishers.load(Ordering::Acquire)
168    }
169
170    fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
171        self.apps
172            .get(app)
173            .map(|r| r.clone())
174            .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
175    }
176
177    /// Claim a publish slot only if the injected [`StreamAuthenticator`] permits
178    /// `creds` to publish `key`. Protocol handlers should call this rather than
179    /// [`start_publish`](PublishRegistry::start_publish) directly so the auth
180    /// policy is enforced uniformly across every transport.
181    pub async fn start_publish_authorized(
182        &self,
183        key: &StreamKey,
184        creds: &Credentials,
185    ) -> Result<StreamHandle> {
186        self.authenticator.authorize_publish(key, creds).await?;
187        self.start_publish(key).await
188    }
189
190    /// Resolve a live stream for playback only if the authenticator permits
191    /// `creds` to play `key`.
192    pub async fn open_playback_authorized(
193        &self,
194        key: &StreamKey,
195        creds: &Credentials,
196    ) -> Result<StreamHandle> {
197        self.authenticator.authorize_play(key, creds).await?;
198        self.get_stream(key)
199    }
200}
201
202#[async_trait]
203impl PublishRegistry for Engine {
204    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
205    async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
206        // Optimistically claim a publisher slot before taking the per-app lock.
207        // Roll back the increment if the app-level check later rejects the stream.
208        let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
209        if prev >= self.config.max_publishers {
210            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
211            return Err(StreamError::PublisherLimitReached {
212                limit: self.config.max_publishers,
213            });
214        }
215
216        let application = match self.get_app(&key.app) {
217            Ok(app) => app,
218            Err(e) => {
219                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
220                return Err(e);
221            }
222        };
223
224        match application.start_publish(key.stream_id.clone()).await {
225            Ok(handle) => Ok(handle),
226            Err(e) => {
227                self.active_publishers.fetch_sub(1, Ordering::AcqRel);
228                Err(e)
229            }
230        }
231    }
232
233    #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
234    async fn end_publish(&self, key: &StreamKey) -> Result<()> {
235        let application = self.get_app(&key.app)?;
236        if application.end_publish(&key.stream_id).await? {
237            self.active_publishers.fetch_sub(1, Ordering::AcqRel);
238        }
239        Ok(())
240    }
241}
242
243impl PlaybackRegistry for Engine {
244    fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
245        let application = self.get_app(&key.app)?;
246        application
247            .get_stream(&key.stream_id)
248            .ok_or_else(|| StreamError::StreamNotFound {
249                app: key.app.to_string(),
250                stream_id: key.stream_id.to_string(),
251            })
252    }
253
254    fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
255        Ok(self.get_app(app)?.active_streams())
256    }
257}
258
259impl EventBus for Engine {
260    fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
261        Ok(self.get_app(app)?.subscribe_events())
262    }
263}
264
265impl std::fmt::Debug for Engine {
266    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267        f.debug_struct("Engine")
268            .field("app_count", &self.apps.len())
269            .field("active_publishers", &self.total_stream_count())
270            .finish()
271    }
272}