Skip to main content

arcly_stream/engine/
builder.rs

1//! The engine builder — replaces `ApplicationRegistry::from_config`.
2
3use super::{AppSpec, Engine, EngineConfig};
4use crate::auth::{AllowAll, StreamAuthenticator};
5use crate::inbound::InboundProtocol;
6use crate::observe::{NoopObserver, Observer};
7use std::sync::Arc;
8
9/// Fluent builder for an [`Engine`]. Takes plain owned specs instead of a
10/// deserialized config schema, so consumers never adopt `stream-center`'s TOML.
11///
12/// ```
13/// use arcly_stream::prelude::*;
14///
15/// let engine = Engine::builder()
16///     .max_publishers(1_000)
17///     .application(AppSpec::new("live").broadcast_capacity(8192))
18///     .build();
19/// assert_eq!(engine.list_apps().len(), 1);
20/// ```
21pub struct EngineBuilder {
22    config: EngineConfig,
23    apps: Vec<AppSpec>,
24    observer: Option<Arc<dyn Observer>>,
25    authenticator: Option<Arc<dyn StreamAuthenticator>>,
26    protocols: Vec<Box<dyn InboundProtocol>>,
27}
28
29impl EngineBuilder {
30    /// A fresh builder with default config, no apps, and permit-all auth.
31    pub fn new() -> Self {
32        Self {
33            config: EngineConfig::default(),
34            apps: Vec::new(),
35            observer: None,
36            authenticator: None,
37            protocols: Vec::new(),
38        }
39    }
40
41    /// Set the hard cap on concurrent publishing streams.
42    pub fn max_publishers(mut self, n: usize) -> Self {
43        self.config.max_publishers = n;
44        self
45    }
46
47    /// Replace the whole engine config.
48    pub fn config(mut self, config: EngineConfig) -> Self {
49        self.config = config;
50        self
51    }
52
53    /// Register an application.
54    pub fn application(mut self, app: AppSpec) -> Self {
55        self.apps.push(app);
56        self
57    }
58
59    /// Register many applications at once.
60    pub fn applications(mut self, apps: impl IntoIterator<Item = AppSpec>) -> Self {
61        self.apps.extend(apps);
62        self
63    }
64
65    /// Install a telemetry observer. Defaults to [`NoopObserver`] if unset.
66    pub fn observer<O: Observer>(mut self, observer: O) -> Self {
67        self.observer = Some(Arc::new(observer));
68        self
69    }
70
71    /// Install a pre-shared observer (when the host already holds an `Arc`).
72    pub fn observer_arc(mut self, observer: Arc<dyn Observer>) -> Self {
73        self.observer = Some(observer);
74        self
75    }
76
77    /// Install a stream authenticator. Defaults to [`AllowAll`] if unset.
78    pub fn authenticator<A: StreamAuthenticator>(mut self, auth: A) -> Self {
79        self.authenticator = Some(Arc::new(auth));
80        self
81    }
82
83    /// Install a pre-shared authenticator (when the host already holds an `Arc`).
84    pub fn authenticator_arc(mut self, auth: Arc<dyn StreamAuthenticator>) -> Self {
85        self.authenticator = Some(auth);
86        self
87    }
88
89    /// Set the idle timeout after which frame-less streams are reaped.
90    pub fn idle_timeout(mut self, timeout: std::time::Duration) -> Self {
91        self.config.idle_timeout = Some(timeout);
92        self
93    }
94
95    /// Register an inbound protocol worker (RTMP, or any custom
96    /// [`InboundProtocol`] from your own crate). Workers registered here are run
97    /// by [`Engine::serve_registered`](Engine::serve_registered).
98    ///
99    /// ```no_run
100    /// # use arcly_stream::prelude::*;
101    /// # use arcly_stream::protocol::rtmp::RtmpHandler;
102    /// # async fn run() -> arcly_stream::Result<()> {
103    /// let engine = Engine::builder()
104    ///     .application(AppSpec::new("live").gop_cache(120))
105    ///     .protocol(RtmpHandler::new("0.0.0.0:1935".parse().unwrap()))
106    ///     // .protocol(MyCustomProtocol::new())   // your own InboundProtocol
107    ///     .build();
108    /// engine.serve_registered(CancellationToken::new()).await
109    /// # }
110    /// ```
111    pub fn protocol<P: InboundProtocol>(mut self, protocol: P) -> Self {
112        self.protocols.push(Box::new(protocol));
113        self
114    }
115
116    /// Register a pre-boxed protocol worker (when the host already holds a
117    /// `Box<dyn InboundProtocol>`, e.g. built dynamically from config).
118    pub fn protocol_boxed(mut self, protocol: Box<dyn InboundProtocol>) -> Self {
119        self.protocols.push(protocol);
120        self
121    }
122
123    /// Build the engine. Any protocol workers registered via
124    /// [`protocol`](Self::protocol) are carried into the engine and run by
125    /// [`Engine::serve_registered`](Engine::serve_registered).
126    pub fn build(self) -> Arc<Engine> {
127        let observer = self.observer.unwrap_or_else(|| Arc::new(NoopObserver));
128        let authenticator = self.authenticator.unwrap_or_else(|| Arc::new(AllowAll));
129        Engine::from_parts(
130            self.config,
131            self.apps,
132            observer,
133            authenticator,
134            self.protocols,
135        )
136    }
137}
138
139impl Default for EngineBuilder {
140    fn default() -> Self {
141        Self::new()
142    }
143}