1mod builder;
10mod driver;
11
12pub use builder::EngineBuilder;
13
14use crate::auth::{Credentials, StreamAuthenticator};
15use crate::bus::{
16 Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
17};
18use crate::observe::Observer;
19use crate::{AppName, Result, StreamError, StreamId, StreamKey};
20use async_trait::async_trait;
21use dashmap::DashMap;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::sync::broadcast;
25use tracing::info;
26
27#[derive(Debug, Clone)]
29pub struct AppSpec {
30 pub name: AppName,
32 pub broadcast_capacity: usize,
34 pub gop_capacity: usize,
37}
38
39impl AppSpec {
40 pub fn new(name: impl Into<AppName>) -> Self {
42 Self {
43 name: name.into(),
44 broadcast_capacity: 4096,
45 gop_capacity: 0,
46 }
47 }
48
49 pub fn broadcast_capacity(mut self, n: usize) -> Self {
51 self.broadcast_capacity = n;
52 self
53 }
54
55 pub fn gop_cache(mut self, frames: usize) -> Self {
59 self.gop_capacity = frames;
60 self
61 }
62}
63
64#[derive(Debug, Clone)]
67pub struct EngineConfig {
68 pub max_publishers: usize,
70 pub idle_timeout: Option<std::time::Duration>,
73}
74
75impl Default for EngineConfig {
76 fn default() -> Self {
77 Self {
78 max_publishers: 10_000,
79 idle_timeout: None,
80 }
81 }
82}
83
84pub struct Engine {
87 apps: DashMap<AppName, Arc<Application>>,
88 config: EngineConfig,
89 active_publishers: AtomicUsize,
91 observer: Arc<dyn Observer>,
92 authenticator: Arc<dyn StreamAuthenticator>,
93 pending_protocols: std::sync::Mutex<Vec<Box<dyn crate::inbound::InboundProtocol>>>,
97}
98
99impl Engine {
100 pub fn builder() -> EngineBuilder {
102 EngineBuilder::new()
103 }
104
105 pub(crate) fn from_parts(
107 config: EngineConfig,
108 specs: Vec<AppSpec>,
109 observer: Arc<dyn Observer>,
110 authenticator: Arc<dyn StreamAuthenticator>,
111 protocols: Vec<Box<dyn crate::inbound::InboundProtocol>>,
112 ) -> Arc<Self> {
113 let apps = DashMap::new();
114 for spec in specs {
115 let app = Application::new(
116 spec.name.clone(),
117 spec.broadcast_capacity,
118 spec.gop_capacity,
119 Arc::clone(&observer),
120 );
121 apps.insert(spec.name.clone(), app);
122 info!(app = %spec.name, "Application registered");
123 }
124 Arc::new(Self {
125 apps,
126 config,
127 active_publishers: AtomicUsize::new(0),
128 observer,
129 authenticator,
130 pending_protocols: std::sync::Mutex::new(protocols),
131 })
132 }
133
134 pub fn register_app(&self, spec: AppSpec) -> Result<()> {
143 use dashmap::mapref::entry::Entry;
144 match self.apps.entry(spec.name.clone()) {
145 Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
146 Entry::Vacant(slot) => {
147 let app = Application::new(
148 spec.name.clone(),
149 spec.broadcast_capacity,
150 spec.gop_capacity,
151 Arc::clone(&self.observer),
152 );
153 info!(app = %spec.name, "Application registered");
154 slot.insert(app);
155 Ok(())
156 }
157 }
158 }
159
160 pub fn list_apps(&self) -> Vec<AppName> {
162 self.apps.iter().map(|r| r.key().clone()).collect()
163 }
164
165 pub fn total_stream_count(&self) -> usize {
167 self.active_publishers.load(Ordering::Acquire)
168 }
169
170 fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
171 self.apps
172 .get(app)
173 .map(|r| r.clone())
174 .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
175 }
176
177 pub async fn start_publish_authorized(
182 &self,
183 key: &StreamKey,
184 creds: &Credentials,
185 ) -> Result<StreamHandle> {
186 self.authenticator.authorize_publish(key, creds).await?;
187 self.start_publish(key).await
188 }
189
190 pub async fn open_playback_authorized(
193 &self,
194 key: &StreamKey,
195 creds: &Credentials,
196 ) -> Result<StreamHandle> {
197 self.authenticator.authorize_play(key, creds).await?;
198 self.get_stream(key)
199 }
200}
201
202#[async_trait]
203impl PublishRegistry for Engine {
204 #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
205 async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
206 let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
209 if prev >= self.config.max_publishers {
210 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
211 return Err(StreamError::PublisherLimitReached {
212 limit: self.config.max_publishers,
213 });
214 }
215
216 let application = match self.get_app(&key.app) {
217 Ok(app) => app,
218 Err(e) => {
219 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
220 return Err(e);
221 }
222 };
223
224 match application.start_publish(key.stream_id.clone()).await {
225 Ok(handle) => Ok(handle),
226 Err(e) => {
227 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
228 Err(e)
229 }
230 }
231 }
232
233 #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
234 async fn end_publish(&self, key: &StreamKey) -> Result<()> {
235 let application = self.get_app(&key.app)?;
236 if application.end_publish(&key.stream_id).await? {
237 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
238 }
239 Ok(())
240 }
241}
242
243impl PlaybackRegistry for Engine {
244 fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
245 let application = self.get_app(&key.app)?;
246 application
247 .get_stream(&key.stream_id)
248 .ok_or_else(|| StreamError::StreamNotFound {
249 app: key.app.to_string(),
250 stream_id: key.stream_id.to_string(),
251 })
252 }
253
254 fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
255 Ok(self.get_app(app)?.active_streams())
256 }
257}
258
259impl EventBus for Engine {
260 fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
261 Ok(self.get_app(app)?.subscribe_events())
262 }
263}
264
265impl std::fmt::Debug for Engine {
266 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
267 f.debug_struct("Engine")
268 .field("app_count", &self.apps.len())
269 .field("active_publishers", &self.total_stream_count())
270 .finish()
271 }
272}