mod builder;
mod driver;
pub use builder::EngineBuilder;
use crate::auth::{Credentials, StreamAuthenticator};
use crate::bus::{
Application, EventBus, PlaybackRegistry, PublishRegistry, StreamEvent, StreamHandle,
};
use crate::observe::Observer;
use crate::{AppName, Result, StreamError, StreamId, StreamKey};
use async_trait::async_trait;
use dashmap::DashMap;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;
#[derive(Debug, Clone)]
pub struct AppSpec {
pub name: AppName,
pub broadcast_capacity: usize,
pub gop_capacity: usize,
}
impl AppSpec {
pub fn new(name: impl Into<AppName>) -> Self {
Self {
name: name.into(),
broadcast_capacity: 4096,
gop_capacity: 0,
}
}
pub fn broadcast_capacity(mut self, n: usize) -> Self {
self.broadcast_capacity = n;
self
}
pub fn gop_cache(mut self, frames: usize) -> Self {
self.gop_capacity = frames;
self
}
}
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub max_publishers: usize,
pub idle_timeout: Option<std::time::Duration>,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
max_publishers: 10_000,
idle_timeout: None,
}
}
}
pub struct Engine {
apps: DashMap<AppName, Arc<Application>>,
config: EngineConfig,
active_publishers: AtomicUsize,
observer: Arc<dyn Observer>,
authenticator: Arc<dyn StreamAuthenticator>,
pending_protocols: std::sync::Mutex<Vec<Box<dyn crate::inbound::InboundProtocol>>>,
}
impl Engine {
pub fn builder() -> EngineBuilder {
EngineBuilder::new()
}
pub(crate) fn from_parts(
config: EngineConfig,
specs: Vec<AppSpec>,
observer: Arc<dyn Observer>,
authenticator: Arc<dyn StreamAuthenticator>,
protocols: Vec<Box<dyn crate::inbound::InboundProtocol>>,
) -> Arc<Self> {
let apps = DashMap::new();
for spec in specs {
let app = Application::new(
spec.name.clone(),
spec.broadcast_capacity,
spec.gop_capacity,
Arc::clone(&observer),
);
apps.insert(spec.name.clone(), app);
info!(app = %spec.name, "Application registered");
}
Arc::new(Self {
apps,
config,
active_publishers: AtomicUsize::new(0),
observer,
authenticator,
pending_protocols: std::sync::Mutex::new(protocols),
})
}
pub fn register_app(&self, spec: AppSpec) -> Result<()> {
use dashmap::mapref::entry::Entry;
match self.apps.entry(spec.name.clone()) {
Entry::Occupied(_) => Err(StreamError::AppAlreadyRegistered(spec.name.to_string())),
Entry::Vacant(slot) => {
let app = Application::new(
spec.name.clone(),
spec.broadcast_capacity,
spec.gop_capacity,
Arc::clone(&self.observer),
);
info!(app = %spec.name, "Application registered");
slot.insert(app);
Ok(())
}
}
}
pub fn list_apps(&self) -> Vec<AppName> {
self.apps.iter().map(|r| r.key().clone()).collect()
}
pub fn total_stream_count(&self) -> usize {
self.active_publishers.load(Ordering::Acquire)
}
fn get_app(&self, app: &AppName) -> Result<Arc<Application>> {
self.apps
.get(app)
.map(|r| r.clone())
.ok_or_else(|| StreamError::AppNotFound(app.to_string()))
}
pub async fn start_publish_authorized(
&self,
key: &StreamKey,
creds: &Credentials,
) -> Result<StreamHandle> {
self.authenticator.authorize_publish(key, creds).await?;
self.start_publish(key).await
}
pub async fn open_playback_authorized(
&self,
key: &StreamKey,
creds: &Credentials,
) -> Result<StreamHandle> {
self.authenticator.authorize_play(key, creds).await?;
self.get_stream(key)
}
}
#[async_trait]
impl PublishRegistry for Engine {
#[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
async fn start_publish(&self, key: &StreamKey) -> Result<StreamHandle> {
let prev = self.active_publishers.fetch_add(1, Ordering::AcqRel);
if prev >= self.config.max_publishers {
self.active_publishers.fetch_sub(1, Ordering::AcqRel);
return Err(StreamError::PublisherLimitReached {
limit: self.config.max_publishers,
});
}
let application = match self.get_app(&key.app) {
Ok(app) => app,
Err(e) => {
self.active_publishers.fetch_sub(1, Ordering::AcqRel);
return Err(e);
}
};
match application.start_publish(key.stream_id.clone()).await {
Ok(handle) => Ok(handle),
Err(e) => {
self.active_publishers.fetch_sub(1, Ordering::AcqRel);
Err(e)
}
}
}
#[tracing::instrument(skip(self), fields(app = %key.app, stream = %key.stream_id))]
async fn end_publish(&self, key: &StreamKey) -> Result<()> {
let application = self.get_app(&key.app)?;
if application.end_publish(&key.stream_id).await? {
self.active_publishers.fetch_sub(1, Ordering::AcqRel);
}
Ok(())
}
}
impl PlaybackRegistry for Engine {
fn get_stream(&self, key: &StreamKey) -> Result<StreamHandle> {
let application = self.get_app(&key.app)?;
application
.get_stream(&key.stream_id)
.ok_or_else(|| StreamError::StreamNotFound {
app: key.app.to_string(),
stream_id: key.stream_id.to_string(),
})
}
fn list_streams(&self, app: &AppName) -> Result<Vec<StreamId>> {
Ok(self.get_app(app)?.active_streams())
}
}
impl EventBus for Engine {
fn subscribe_events(&self, app: &AppName) -> Result<broadcast::Receiver<StreamEvent>> {
Ok(self.get_app(app)?.subscribe_events())
}
}
impl std::fmt::Debug for Engine {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Engine")
.field("app_count", &self.apps.len())
.field("active_publishers", &self.total_stream_count())
.finish()
}
}