1mod builder;
10mod driver;
11
12pub use builder::EngineBuilder;
13
14use crate::auth::{Credentials, StreamAuthenticator};
15use crate::bus::{
16 Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
17};
18use crate::observe::Observer;
19use crate::{AppName, Result, StreamError, StreamId, StreamKey};
20use async_trait::async_trait;
21use dashmap::DashMap;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::sync::broadcast;
25use tracing::info;
26
27#[derive(Debug, Clone)]
42pub struct AppSpec {
43 pub name: AppName,
45 pub broadcast_capacity: usize,
47 pub gop_capacity: usize,
50}
51
52impl AppSpec {
53 pub fn new(name: impl Into<AppName>) -> Self {
55 Self {
56 name: name.into(),
57 broadcast_capacity: 4096,
58 gop_capacity: 0,
59 }
60 }
61
62 pub fn broadcast_capacity(mut self, n: usize) -> Self {
64 self.broadcast_capacity = n;
65 self
66 }
67
68 pub fn gop_cache(mut self, frames: usize) -> Self {
72 self.gop_capacity = frames;
73 self
74 }
75}
76
77#[derive(Debug, Clone)]
80pub struct EngineConfig {
81 pub max_publishers: usize,
83 pub idle_timeout: Option<std::time::Duration>,
86}
87
88impl Default for EngineConfig {
89 fn default() -> Self {
90 Self {
91 max_publishers: 10_000,
92 idle_timeout: None,
93 }
94 }
95}
96
97pub struct Engine {
100 apps: DashMap<AppName, Arc<Application>>,
101 config: EngineConfig,
102 active_publishers: AtomicUsize,
104 observer: Arc<dyn Observer>,
105 authenticator: Arc<dyn StreamAuthenticator>,
106 pending_protocols: std::sync::Mutex<Vec<Box<dyn crate::inbound::InboundProtocol>>>,
110}
111
112impl Engine {
113 pub fn builder() -> EngineBuilder {
115 EngineBuilder::new()
116 }
117
118 pub(crate) fn from_parts(
120 config: EngineConfig,
121 specs: Vec<AppSpec>,
122 observer: Arc<dyn Observer>,
123 authenticator: Arc<dyn StreamAuthenticator>,
124 protocols: Vec<Box<dyn crate::inbound::InboundProtocol>>,
125 ) -> Arc<Self> {
126 let apps = DashMap::new();
127 for spec in specs {
128 let app = Application::new(
129 spec.name.clone(),
130 spec.broadcast_capacity,
131 spec.gop_capacity,
132 Arc::clone(&observer),
133 );
134 apps.insert(spec.name.clone(), app);
135 info!(app = %spec.name, "Application registered");
136 }
137 Arc::new(Self {
138 apps,
139 config,
140 active_publishers: AtomicUsize::new(0),
141 observer,
142 authenticator,
143 pending_protocols: std::sync::Mutex::new(protocols),
144 })
145 }
146
147 pub fn register_app(&self, spec: AppSpec) -> Result<()> {
156 use dashmap::mapref::entry::Entry;
157 match self.apps.entry(spec.name.clone()) {
158 Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
159 Entry::Vacant(slot) => {
160 let app = Application::new(
161 spec.name.clone(),
162 spec.broadcast_capacity,
163 spec.gop_capacity,
164 Arc::clone(&self.observer),
165 );
166 info!(app = %spec.name, "Application registered");
167 slot.insert(app);
168 Ok(())
169 }
170 }
171 }
172
173 pub fn list_apps(&self) -> Vec<AppName> {
175 self.apps.iter().map(|r| r.key().clone()).collect()
176 }
177
178 pub fn total_stream_count(&self) -> usize {
180 self.active_publishers.load(Ordering::Acquire)
181 }
182
183 fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
184 self.apps
185 .get(app)
186 .map(|r| r.clone())
187 .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
188 }
189
190 pub async fn start_publish_authorized(
195 &self,
196 key: &StreamKey,
197 creds: &Credentials,
198 ) -> Result<StreamHandle> {
199 self.authenticator.authorize_publish(key, creds).await?;
200 self.start_publish(key).await
201 }
202
203 pub async fn open_playback_authorized(
206 &self,
207 key: &StreamKey,
208 creds: &Credentials,
209 ) -> Result<StreamHandle> {
210 self.authenticator.authorize_play(key, creds).await?;
211 self.get_stream(key)
212 }
213}
214
215#[async_trait]
216impl PublishRegistry for Engine {
217 #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
218 async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
219 let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
222 if prev >= self.config.max_publishers {
223 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
224 return Err(StreamError::PublisherLimitReached {
225 limit: self.config.max_publishers,
226 });
227 }
228
229 let application = match self.get_app(&key.app) {
230 Ok(app) => app,
231 Err(e) => {
232 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
233 return Err(e);
234 }
235 };
236
237 match application.start_publish(key.stream_id.clone()).await {
238 Ok(handle) => Ok(handle),
239 Err(e) => {
240 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
241 Err(e)
242 }
243 }
244 }
245
246 #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
247 async fn end_publish(&self, key: &StreamKey) -> Result<()> {
248 let application = self.get_app(&key.app)?;
249 if application.end_publish(&key.stream_id).await? {
250 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
251 }
252 Ok(())
253 }
254}
255
256impl PlaybackRegistry for Engine {
257 fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
258 let application = self.get_app(&key.app)?;
259 application
260 .get_stream(&key.stream_id)
261 .ok_or_else(|| StreamError::StreamNotFound {
262 app: key.app.to_string(),
263 stream_id: key.stream_id.to_string(),
264 })
265 }
266
267 fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
268 Ok(self.get_app(app)?.active_streams())
269 }
270}
271
272impl EventBus for Engine {
273 fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
274 Ok(self.get_app(app)?.subscribe_events())
275 }
276}
277
278impl std::fmt::Debug for Engine {
279 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280 f.debug_struct("Engine")
281 .field("app_count", &self.apps.len())
282 .field("active_publishers", &self.total_stream_count())
283 .finish()
284 }
285}