arcly_stream/engine/
mod.rs1mod builder;
10mod driver;
11
12pub use builder::EngineBuilder;
13
14use crate::auth::{Credentials, StreamAuthenticator};
15use crate::bus::{
16 Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
17};
18use crate::observe::Observer;
19use crate::{AppName, Result, StreamError, StreamId, StreamKey};
20use async_trait::async_trait;
21use dashmap::DashMap;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::sync::Arc;
24use tokio::sync::broadcast;
25use tracing::info;
26
27#[derive(Debug, Clone)]
29pub struct AppSpec {
30 pub name: AppName,
32 pub broadcast_capacity: usize,
34 pub gop_capacity: usize,
37}
38
39impl AppSpec {
40 pub fn new(name: impl Into<AppName>) -> Self {
42 Self {
43 name: name.into(),
44 broadcast_capacity: 4096,
45 gop_capacity: 0,
46 }
47 }
48
49 pub fn broadcast_capacity(mut self, n: usize) -> Self {
51 self.broadcast_capacity = n;
52 self
53 }
54
55 pub fn gop_cache(mut self, frames: usize) -> Self {
59 self.gop_capacity = frames;
60 self
61 }
62}
63
64#[derive(Debug, Clone)]
67pub struct EngineConfig {
68 pub max_publishers: usize,
70 pub idle_timeout: Option<std::time::Duration>,
73}
74
75impl Default for EngineConfig {
76 fn default() -> Self {
77 Self {
78 max_publishers: 10_000,
79 idle_timeout: None,
80 }
81 }
82}
83
84pub struct Engine {
87 apps: DashMap<AppName, Arc<Application>>,
88 config: EngineConfig,
89 active_publishers: AtomicUsize,
91 observer: Arc<dyn Observer>,
92 authenticator: Arc<dyn StreamAuthenticator>,
93}
94
95impl Engine {
96 pub fn builder() -> EngineBuilder {
98 EngineBuilder::new()
99 }
100
101 pub(crate) fn from_parts(
103 config: EngineConfig,
104 specs: Vec<AppSpec>,
105 observer: Arc<dyn Observer>,
106 authenticator: Arc<dyn StreamAuthenticator>,
107 ) -> Arc<Self> {
108 let apps = DashMap::new();
109 for spec in specs {
110 let app = Application::new(
111 spec.name.clone(),
112 spec.broadcast_capacity,
113 spec.gop_capacity,
114 Arc::clone(&observer),
115 );
116 apps.insert(spec.name.clone(), app);
117 info!(app = %spec.name, "Application registered");
118 }
119 Arc::new(Self {
120 apps,
121 config,
122 active_publishers: AtomicUsize::new(0),
123 observer,
124 authenticator,
125 })
126 }
127
128 pub fn register_app(&self, spec: AppSpec) -> Result<()> {
137 use dashmap::mapref::entry::Entry;
138 match self.apps.entry(spec.name.clone()) {
139 Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
140 Entry::Vacant(slot) => {
141 let app = Application::new(
142 spec.name.clone(),
143 spec.broadcast_capacity,
144 spec.gop_capacity,
145 Arc::clone(&self.observer),
146 );
147 info!(app = %spec.name, "Application registered");
148 slot.insert(app);
149 Ok(())
150 }
151 }
152 }
153
154 pub fn list_apps(&self) -> Vec<AppName> {
156 self.apps.iter().map(|r| r.key().clone()).collect()
157 }
158
159 pub fn total_stream_count(&self) -> usize {
161 self.active_publishers.load(Ordering::Acquire)
162 }
163
164 fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
165 self.apps
166 .get(app)
167 .map(|r| r.clone())
168 .ok_or_else(|| StreamError::AppNotFound(app.to_string()))
169 }
170
171 pub async fn start_publish_authorized(
176 &self,
177 key: &StreamKey,
178 creds: &Credentials,
179 ) -> Result<StreamHandle> {
180 self.authenticator.authorize_publish(key, creds).await?;
181 self.start_publish(key).await
182 }
183
184 pub async fn open_playback_authorized(
187 &self,
188 key: &StreamKey,
189 creds: &Credentials,
190 ) -> Result<StreamHandle> {
191 self.authenticator.authorize_play(key, creds).await?;
192 self.get_stream(key)
193 }
194}
195
196#[async_trait]
197impl PublishRegistry for Engine {
198 #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
199 async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
200 let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
203 if prev >= self.config.max_publishers {
204 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
205 return Err(StreamError::PublisherLimitReached {
206 limit: self.config.max_publishers,
207 });
208 }
209
210 let application = match self.get_app(&key.app) {
211 Ok(app) => app,
212 Err(e) => {
213 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
214 return Err(e);
215 }
216 };
217
218 match application.start_publish(key.stream_id.clone()).await {
219 Ok(handle) => Ok(handle),
220 Err(e) => {
221 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
222 Err(e)
223 }
224 }
225 }
226
227 #[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
228 async fn end_publish(&self, key: &StreamKey) -> Result<()> {
229 let application = self.get_app(&key.app)?;
230 if application.end_publish(&key.stream_id).await? {
231 self.active_publishers.fetch_sub(1, Ordering::AcqRel);
232 }
233 Ok(())
234 }
235}
236
237impl PlaybackRegistry for Engine {
238 fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
239 let application = self.get_app(&key.app)?;
240 application
241 .get_stream(&key.stream_id)
242 .ok_or_else(|| StreamError::StreamNotFound {
243 app: key.app.to_string(),
244 stream_id: key.stream_id.to_string(),
245 })
246 }
247
248 fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
249 Ok(self.get_app(app)?.active_streams())
250 }
251}
252
253impl EventBus for Engine {
254 fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
255 Ok(self.get_app(app)?.subscribe_events())
256 }
257}
258
259impl std::fmt::Debug for Engine {
260 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
261 f.debug_struct("Engine")
262 .field("app_count", &self.apps.len())
263 .field("active_publishers", &self.total_stream_count())
264 .finish()
265 }
266}