arcly_stream/bus/
application.rs1use super::events::{StreamEvent, StreamEventKind};
4use super::handle::{StreamHandle, StreamState};
5use crate::observe::Observer;
6use crate::{AppName, Result, StreamError, StreamId};
7use dashmap::DashMap;
8use std::sync::atomic::{AtomicUsize, Ordering};
9use std::sync::Arc;
10use tokio::sync::broadcast;
11use tracing::info;
12
13pub struct Application {
19 pub name: AppName,
21 pub broadcast_capacity: usize,
23 pub gop_capacity: usize,
25 streams: DashMap<StreamId, StreamHandle>,
26 event_tx: broadcast::Sender<StreamEvent>,
28 pub_lock: tokio::sync::Mutex<()>,
31 stream_count: AtomicUsize,
33 observer: Arc<dyn Observer>,
35}
36
37impl Application {
38 pub fn new(
40 name: AppName,
41 broadcast_capacity: usize,
42 gop_capacity: usize,
43 observer: Arc<dyn Observer>,
44 ) -> Arc<Self> {
45 let (event_tx, _) = broadcast::channel(64);
46 Arc::new(Self {
47 name,
48 broadcast_capacity,
49 gop_capacity,
50 streams: DashMap::new(),
51 event_tx,
52 pub_lock: tokio::sync::Mutex::new(()),
53 stream_count: AtomicUsize::new(0),
54 observer,
55 })
56 }
57
58 pub async fn start_publish(&self, stream_id: StreamId) -> Result<StreamHandle> {
61 let _guard = self.pub_lock.lock().await;
64
65 if let Some(existing) = self.streams.get(&stream_id) {
66 let state = existing.current_state().await;
67 if matches!(state, StreamState::Publishing | StreamState::Transcoding) {
68 return Err(StreamError::StreamAlreadyPublishing {
69 app: self.name.to_string(),
70 stream_id: stream_id.to_string(),
71 });
72 }
73 }
74
75 let handle = StreamHandle::with_observer(
76 self.name.clone(),
77 stream_id.clone(),
78 self.broadcast_capacity,
79 self.gop_capacity,
80 Arc::clone(&self.observer),
81 );
82 handle.set_state(StreamState::Publishing).await;
83 let started_at_ms = super::handle::now_ms();
84 handle
85 .update_metadata(|m| m.started_at_ms = started_at_ms)
86 .await;
87 self.streams.insert(stream_id.clone(), handle.clone());
88 self.stream_count.fetch_add(1, Ordering::Relaxed);
89 drop(_guard);
92 self.observer.on_publish_started(self.name.as_str());
93
94 info!(app = %self.name, stream = %stream_id, "Stream publish started");
95 self.emit(stream_id.clone(), StreamEventKind::PublishStarted);
96
97 Ok(handle)
98 }
99
100 pub async fn end_publish(&self, stream_id: &StreamId) -> Result<bool> {
104 if let Some((_, handle)) = self.streams.remove(stream_id) {
105 self.stream_count.fetch_sub(1, Ordering::Relaxed);
106 self.observer.on_publish_ended(self.name.as_str());
107 handle.set_state(StreamState::Ended).await;
108 handle.close();
112 info!(app = %self.name, stream = %stream_id, "Stream publish ended");
113 self.emit(stream_id.clone(), StreamEventKind::PublishEnded);
114 return Ok(true);
115 }
116 Ok(false)
117 }
118
119 pub fn get_stream(&self, stream_id: &StreamId) -> Option<StreamHandle> {
121 self.streams.get(stream_id).map(|r| r.clone())
122 }
123
124 pub fn active_streams(&self) -> Vec<StreamId> {
126 self.streams.iter().map(|r| r.key().clone()).collect()
127 }
128
129 pub fn active_handles(&self) -> Vec<StreamHandle> {
131 self.streams.iter().map(|r| r.value().clone()).collect()
132 }
133
134 pub fn subscribe_events(&self) -> broadcast::Receiver<StreamEvent> {
136 self.event_tx.subscribe()
137 }
138
139 pub fn stream_count(&self) -> usize {
141 self.stream_count.load(Ordering::Relaxed)
142 }
143
144 fn emit(&self, stream_id: StreamId, kind: StreamEventKind) {
146 let event = StreamEvent {
147 app: self.name.clone(),
148 stream_id,
149 kind,
150 };
151 self.observer.on_event(&event);
152 let _ = self.event_tx.send(event);
153 }
154}
155
156impl std::fmt::Debug for Application {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("Application")
159 .field("name", &self.name)
160 .field("stream_count", &self.streams.len())
161 .finish()
162 }
163}