1use crate::adapter::{NoopAdapter, SfuTransportAdapter};
2use crate::error::{Result, SfuError};
3use crate::events::{EventSink, RoomEvent};
4use crate::ids::{ParticipantId, RoomId, TrackId, TrackKind};
5use crate::state::{
6 ParticipantMeta, ParticipantState, RoomState, SfuState, TrackState,
7};
8use std::collections::HashSet;
9use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
10
11#[derive(Clone)]
12pub struct Sfu {
13 pub(crate) inner: Arc<RwLock<SfuState>>,
14 sink: Arc<dyn EventSink + Send + Sync>,
15 adapter: Arc<dyn SfuTransportAdapter + Send + Sync>,
16}
17
18#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct TrackSummary {
20 pub track_id: TrackId,
21 pub publisher: ParticipantId,
22 pub kind: TrackKind,
23 pub created_at_ms: u64,
24 pub codec_hint: Option<String>,
25}
26
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub struct RoomInfo {
29 pub participants: Vec<ParticipantId>,
30 pub tracks: Vec<TrackSummary>,
31 pub subscriptions_count: usize,
32 pub created_at_ms: u64,
33}
34
35impl Sfu {
36 pub fn new(sink: impl EventSink + Send + Sync + 'static) -> Self {
37 Self::with_adapter(Arc::new(sink), Arc::new(NoopAdapter))
38 }
39
40 pub fn with_adapter(
41 sink: Arc<dyn EventSink + Send + Sync>,
42 adapter: Arc<dyn SfuTransportAdapter + Send + Sync>,
43 ) -> Self {
44 Self {
45 inner: Arc::new(RwLock::new(SfuState::default())),
46 sink,
47 adapter,
48 }
49 }
50
51 pub fn create_room(&self, room_id: RoomId, now_ms: u64) -> Result<()> {
52 let mut state = self.write_state()?;
53 if state.rooms.contains_key(&room_id) {
54 return Err(SfuError::AlreadyExists);
55 }
56 state
57 .rooms
58 .insert(room_id.clone(), RoomState::new(room_id.clone(), now_ms));
59 self.sink.emit(RoomEvent::RoomCreated { room_id });
60 Ok(())
61 }
62
63 pub fn delete_room(&self, room_id: RoomId) -> Result<()> {
64 let mut state = self.write_state()?;
65 if state.rooms.remove(&room_id).is_none() {
66 return Err(SfuError::RoomNotFound);
67 }
68 self.sink.emit(RoomEvent::RoomDeleted { room_id });
69 Ok(())
70 }
71
72 pub fn list_rooms(&self) -> Result<Vec<RoomId>> {
73 let state = self.read_state()?;
74 let mut rooms: Vec<RoomId> = state.rooms.keys().cloned().collect();
75 rooms.sort_by(|a, b| a.as_str().cmp(b.as_str()));
76 Ok(rooms)
77 }
78
79 pub fn room_info(&self, room_id: RoomId) -> Result<RoomInfo> {
80 let state = self.read_state()?;
81 let room = state.rooms.get(&room_id).ok_or(SfuError::RoomNotFound)?;
82 let mut participants: Vec<ParticipantId> = room.participants.keys().cloned().collect();
83 participants.sort_by(|a, b| a.as_str().cmp(b.as_str()));
84 let mut tracks: Vec<TrackSummary> = room
85 .tracks
86 .values()
87 .map(|track| TrackSummary {
88 track_id: track.track_id.clone(),
89 publisher: track.publisher.clone(),
90 kind: track.kind,
91 created_at_ms: track.created_at_ms,
92 codec_hint: track.codec_hint.clone(),
93 })
94 .collect();
95 tracks.sort_by(|a, b| a.track_id.to_string().cmp(&b.track_id.to_string()));
96 let subscriptions_count = room
97 .subscriptions
98 .values()
99 .map(HashSet::len)
100 .sum();
101 Ok(RoomInfo {
102 participants,
103 tracks,
104 subscriptions_count,
105 created_at_ms: room.created_at_ms,
106 })
107 }
108
109 pub fn join(
110 &self,
111 room_id: RoomId,
112 participant_id: ParticipantId,
113 meta: ParticipantMeta,
114 now_ms: u64,
115 ) -> Result<()> {
116 let mut state = self.write_state()?;
117 let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
118 if room.participants.contains_key(&participant_id) {
119 return Err(SfuError::AlreadyExists);
120 }
121 let participant = ParticipantState {
122 participant_id: participant_id.clone(),
123 joined_at_ms: now_ms,
124 meta,
125 };
126 room.participants.insert(participant_id.clone(), participant);
127 room.subscriptions
128 .entry(participant_id.clone())
129 .or_insert_with(HashSet::new);
130 self.sink.emit(RoomEvent::ParticipantJoined {
131 room_id,
132 participant_id,
133 });
134 Ok(())
135 }
136
137 pub fn leave(&self, room_id: RoomId, participant_id: ParticipantId) -> Result<()> {
138 let mut state = self.write_state()?;
139 let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
140 if !room.participants.contains_key(&participant_id) {
141 return Err(SfuError::ParticipantNotFound);
142 }
143 let track_ids: Vec<TrackId> = room
144 .tracks
145 .values()
146 .filter(|track| track.publisher == participant_id)
147 .map(|track| track.track_id.clone())
148 .collect();
149 for track_id in track_ids {
150 self.remove_track(room, &room_id, &track_id)?;
151 }
152 let subscriptions = room
153 .subscriptions
154 .get(&participant_id)
155 .cloned()
156 .unwrap_or_default();
157 room.participants.remove(&participant_id);
158 room.subscriptions.remove(&participant_id);
159 for track_id in subscriptions {
160 self.adapter
161 .on_unsubscribe(&room_id, &participant_id, &track_id);
162 self.sink.emit(RoomEvent::Unsubscribed {
163 room_id: room_id.clone(),
164 participant_id: participant_id.clone(),
165 track_id,
166 });
167 }
168 self.sink.emit(RoomEvent::ParticipantLeft {
169 room_id,
170 participant_id,
171 });
172 Ok(())
173 }
174
175 pub fn publish_track(
176 &self,
177 room_id: RoomId,
178 participant_id: ParticipantId,
179 kind: TrackKind,
180 codec_hint: Option<String>,
181 now_ms: u64,
182 ) -> Result<TrackId> {
183 let mut state = self.write_state()?;
184 let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
185 if !room.participants.contains_key(&participant_id) {
186 return Err(SfuError::ParticipantNotFound);
187 }
188 let track_id = TrackId::new();
189 if room.tracks.contains_key(&track_id) {
190 return Err(SfuError::AlreadyExists);
191 }
192 let track_state = TrackState {
193 track_id: track_id.clone(),
194 publisher: participant_id.clone(),
195 kind,
196 created_at_ms: now_ms,
197 codec_hint,
198 };
199 room.tracks.insert(track_id.clone(), track_state.clone());
200 self.adapter
201 .on_track_published(&room_id, &track_state);
202 self.sink.emit(RoomEvent::TrackPublished {
203 room_id,
204 participant_id,
205 track_id: track_id.clone(),
206 kind,
207 });
208 Ok(track_id)
209 }
210
211 pub fn unpublish_track(&self, room_id: RoomId, track_id: TrackId) -> Result<()> {
212 let mut state = self.write_state()?;
213 let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
214 self.remove_track(room, &room_id, &track_id)
215 }
216
217 pub fn subscribe(
218 &self,
219 room_id: RoomId,
220 participant_id: ParticipantId,
221 track_id: TrackId,
222 ) -> Result<()> {
223 let mut state = self.write_state()?;
224 let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
225 if !room.participants.contains_key(&participant_id) {
226 return Err(SfuError::ParticipantNotFound);
227 }
228 if !room.tracks.contains_key(&track_id) {
229 return Err(SfuError::TrackNotFound);
230 }
231 let entry = room
232 .subscriptions
233 .entry(participant_id.clone())
234 .or_insert_with(HashSet::new);
235 if !entry.insert(track_id.clone()) {
236 return Err(SfuError::AlreadyExists);
237 }
238 self.adapter
239 .on_subscribe(&room_id, &participant_id, &track_id);
240 self.sink.emit(RoomEvent::Subscribed {
241 room_id,
242 participant_id,
243 track_id,
244 });
245 Ok(())
246 }
247
248 pub fn unsubscribe(
249 &self,
250 room_id: RoomId,
251 participant_id: ParticipantId,
252 track_id: TrackId,
253 ) -> Result<()> {
254 let mut state = self.write_state()?;
255 let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
256 if !room.participants.contains_key(&participant_id) {
257 return Err(SfuError::ParticipantNotFound);
258 }
259 if !room.tracks.contains_key(&track_id) {
260 return Err(SfuError::TrackNotFound);
261 }
262 let set = room
263 .subscriptions
264 .get_mut(&participant_id)
265 .ok_or(SfuError::ParticipantNotFound)?;
266 if !set.remove(&track_id) {
267 return Err(SfuError::NotAllowed);
268 }
269 self.adapter
270 .on_unsubscribe(&room_id, &participant_id, &track_id);
271 self.sink.emit(RoomEvent::Unsubscribed {
272 room_id,
273 participant_id,
274 track_id,
275 });
276 Ok(())
277 }
278
279 pub fn subscriptions(&self, room_id: RoomId, participant_id: ParticipantId) -> Result<Vec<TrackId>> {
280 let state = self.read_state()?;
281 let room = state.rooms.get(&room_id).ok_or(SfuError::RoomNotFound)?;
282 if !room.participants.contains_key(&participant_id) {
283 return Err(SfuError::ParticipantNotFound);
284 }
285 let mut list: Vec<TrackId> = room
286 .subscriptions
287 .get(&participant_id)
288 .cloned()
289 .unwrap_or_default()
290 .into_iter()
291 .collect();
292 list.sort_by(|a, b| a.to_string().cmp(&b.to_string()));
293 Ok(list)
294 }
295
296 fn remove_track(
297 &self,
298 room: &mut RoomState,
299 room_id: &RoomId,
300 track_id: &TrackId,
301 ) -> Result<()> {
302 if room.tracks.remove(track_id).is_none() {
303 return Err(SfuError::TrackNotFound);
304 }
305 let mut unsubscribed = Vec::new();
306 for (participant, set) in room.subscriptions.iter_mut() {
307 if set.remove(track_id) {
308 unsubscribed.push(participant.clone());
309 }
310 }
311 self.adapter.on_track_unpublished(room_id, track_id);
312 for participant_id in unsubscribed {
313 self.adapter
314 .on_unsubscribe(room_id, &participant_id, track_id);
315 self.sink.emit(RoomEvent::Unsubscribed {
316 room_id: room_id.clone(),
317 participant_id,
318 track_id: track_id.clone(),
319 });
320 }
321 self.sink.emit(RoomEvent::TrackUnpublished {
322 room_id: room_id.clone(),
323 track_id: track_id.clone(),
324 });
325 Ok(())
326 }
327
328 fn read_state(&self) -> Result<RwLockReadGuard<'_, SfuState>> {
329 self.inner
330 .read()
331 .map_err(|_| SfuError::Internal("state poisoned".to_string()))
332 }
333
334 fn write_state(&self) -> Result<RwLockWriteGuard<'_, SfuState>> {
335 self.inner
336 .write()
337 .map_err(|_| SfuError::Internal("state poisoned".to_string()))
338 }
339}