enigma_sfu/
api.rs

1use crate::adapter::{NoopAdapter, SfuTransportAdapter};
2use crate::error::{Result, SfuError};
3use crate::events::{EventSink, RoomEvent};
4use crate::ids::{ParticipantId, RoomId, TrackId, TrackKind};
5use crate::state::{
6    ParticipantMeta, ParticipantState, RoomState, SfuState, TrackState,
7};
8use std::collections::HashSet;
9use std::sync::{Arc, RwLock, RwLockReadGuard, RwLockWriteGuard};
10
11#[derive(Clone)]
12pub struct Sfu {
13    pub(crate) inner: Arc<RwLock<SfuState>>,
14    sink: Arc<dyn EventSink + Send + Sync>,
15    adapter: Arc<dyn SfuTransportAdapter + Send + Sync>,
16}
17
18#[derive(Clone, Debug, PartialEq, Eq)]
19pub struct TrackSummary {
20    pub track_id: TrackId,
21    pub publisher: ParticipantId,
22    pub kind: TrackKind,
23    pub created_at_ms: u64,
24    pub codec_hint: Option<String>,
25}
26
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub struct RoomInfo {
29    pub participants: Vec<ParticipantId>,
30    pub tracks: Vec<TrackSummary>,
31    pub subscriptions_count: usize,
32    pub created_at_ms: u64,
33}
34
35impl Sfu {
36    pub fn new(sink: impl EventSink + Send + Sync + 'static) -> Self {
37        Self::with_adapter(Arc::new(sink), Arc::new(NoopAdapter))
38    }
39
40    pub fn with_adapter(
41        sink: Arc<dyn EventSink + Send + Sync>,
42        adapter: Arc<dyn SfuTransportAdapter + Send + Sync>,
43    ) -> Self {
44        Self {
45            inner: Arc::new(RwLock::new(SfuState::default())),
46            sink,
47            adapter,
48        }
49    }
50
51    pub fn create_room(&self, room_id: RoomId, now_ms: u64) -> Result<()> {
52        let mut state = self.write_state()?;
53        if state.rooms.contains_key(&room_id) {
54            return Err(SfuError::AlreadyExists);
55        }
56        state
57            .rooms
58            .insert(room_id.clone(), RoomState::new(room_id.clone(), now_ms));
59        self.sink.emit(RoomEvent::RoomCreated { room_id });
60        Ok(())
61    }
62
63    pub fn delete_room(&self, room_id: RoomId) -> Result<()> {
64        let mut state = self.write_state()?;
65        if state.rooms.remove(&room_id).is_none() {
66            return Err(SfuError::RoomNotFound);
67        }
68        self.sink.emit(RoomEvent::RoomDeleted { room_id });
69        Ok(())
70    }
71
72    pub fn list_rooms(&self) -> Result<Vec<RoomId>> {
73        let state = self.read_state()?;
74        let mut rooms: Vec<RoomId> = state.rooms.keys().cloned().collect();
75        rooms.sort_by(|a, b| a.as_str().cmp(b.as_str()));
76        Ok(rooms)
77    }
78
79    pub fn room_info(&self, room_id: RoomId) -> Result<RoomInfo> {
80        let state = self.read_state()?;
81        let room = state.rooms.get(&room_id).ok_or(SfuError::RoomNotFound)?;
82        let mut participants: Vec<ParticipantId> = room.participants.keys().cloned().collect();
83        participants.sort_by(|a, b| a.as_str().cmp(b.as_str()));
84        let mut tracks: Vec<TrackSummary> = room
85            .tracks
86            .values()
87            .map(|track| TrackSummary {
88                track_id: track.track_id.clone(),
89                publisher: track.publisher.clone(),
90                kind: track.kind,
91                created_at_ms: track.created_at_ms,
92                codec_hint: track.codec_hint.clone(),
93            })
94            .collect();
95        tracks.sort_by(|a, b| a.track_id.to_string().cmp(&b.track_id.to_string()));
96        let subscriptions_count = room
97            .subscriptions
98            .values()
99            .map(HashSet::len)
100            .sum();
101        Ok(RoomInfo {
102            participants,
103            tracks,
104            subscriptions_count,
105            created_at_ms: room.created_at_ms,
106        })
107    }
108
109    pub fn join(
110        &self,
111        room_id: RoomId,
112        participant_id: ParticipantId,
113        meta: ParticipantMeta,
114        now_ms: u64,
115    ) -> Result<()> {
116        let mut state = self.write_state()?;
117        let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
118        if room.participants.contains_key(&participant_id) {
119            return Err(SfuError::AlreadyExists);
120        }
121        let participant = ParticipantState {
122            participant_id: participant_id.clone(),
123            joined_at_ms: now_ms,
124            meta,
125        };
126        room.participants.insert(participant_id.clone(), participant);
127        room.subscriptions
128            .entry(participant_id.clone())
129            .or_insert_with(HashSet::new);
130        self.sink.emit(RoomEvent::ParticipantJoined {
131            room_id,
132            participant_id,
133        });
134        Ok(())
135    }
136
137    pub fn leave(&self, room_id: RoomId, participant_id: ParticipantId) -> Result<()> {
138        let mut state = self.write_state()?;
139        let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
140        if !room.participants.contains_key(&participant_id) {
141            return Err(SfuError::ParticipantNotFound);
142        }
143        let track_ids: Vec<TrackId> = room
144            .tracks
145            .values()
146            .filter(|track| track.publisher == participant_id)
147            .map(|track| track.track_id.clone())
148            .collect();
149        for track_id in track_ids {
150            self.remove_track(room, &room_id, &track_id)?;
151        }
152        let subscriptions = room
153            .subscriptions
154            .get(&participant_id)
155            .cloned()
156            .unwrap_or_default();
157        room.participants.remove(&participant_id);
158        room.subscriptions.remove(&participant_id);
159        for track_id in subscriptions {
160            self.adapter
161                .on_unsubscribe(&room_id, &participant_id, &track_id);
162            self.sink.emit(RoomEvent::Unsubscribed {
163                room_id: room_id.clone(),
164                participant_id: participant_id.clone(),
165                track_id,
166            });
167        }
168        self.sink.emit(RoomEvent::ParticipantLeft {
169            room_id,
170            participant_id,
171        });
172        Ok(())
173    }
174
175    pub fn publish_track(
176        &self,
177        room_id: RoomId,
178        participant_id: ParticipantId,
179        kind: TrackKind,
180        codec_hint: Option<String>,
181        now_ms: u64,
182    ) -> Result<TrackId> {
183        let mut state = self.write_state()?;
184        let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
185        if !room.participants.contains_key(&participant_id) {
186            return Err(SfuError::ParticipantNotFound);
187        }
188        let track_id = TrackId::new();
189        if room.tracks.contains_key(&track_id) {
190            return Err(SfuError::AlreadyExists);
191        }
192        let track_state = TrackState {
193            track_id: track_id.clone(),
194            publisher: participant_id.clone(),
195            kind,
196            created_at_ms: now_ms,
197            codec_hint,
198        };
199        room.tracks.insert(track_id.clone(), track_state.clone());
200        self.adapter
201            .on_track_published(&room_id, &track_state);
202        self.sink.emit(RoomEvent::TrackPublished {
203            room_id,
204            participant_id,
205            track_id: track_id.clone(),
206            kind,
207        });
208        Ok(track_id)
209    }
210
211    pub fn unpublish_track(&self, room_id: RoomId, track_id: TrackId) -> Result<()> {
212        let mut state = self.write_state()?;
213        let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
214        self.remove_track(room, &room_id, &track_id)
215    }
216
217    pub fn subscribe(
218        &self,
219        room_id: RoomId,
220        participant_id: ParticipantId,
221        track_id: TrackId,
222    ) -> Result<()> {
223        let mut state = self.write_state()?;
224        let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
225        if !room.participants.contains_key(&participant_id) {
226            return Err(SfuError::ParticipantNotFound);
227        }
228        if !room.tracks.contains_key(&track_id) {
229            return Err(SfuError::TrackNotFound);
230        }
231        let entry = room
232            .subscriptions
233            .entry(participant_id.clone())
234            .or_insert_with(HashSet::new);
235        if !entry.insert(track_id.clone()) {
236            return Err(SfuError::AlreadyExists);
237        }
238        self.adapter
239            .on_subscribe(&room_id, &participant_id, &track_id);
240        self.sink.emit(RoomEvent::Subscribed {
241            room_id,
242            participant_id,
243            track_id,
244        });
245        Ok(())
246    }
247
248    pub fn unsubscribe(
249        &self,
250        room_id: RoomId,
251        participant_id: ParticipantId,
252        track_id: TrackId,
253    ) -> Result<()> {
254        let mut state = self.write_state()?;
255        let room = state.rooms.get_mut(&room_id).ok_or(SfuError::RoomNotFound)?;
256        if !room.participants.contains_key(&participant_id) {
257            return Err(SfuError::ParticipantNotFound);
258        }
259        if !room.tracks.contains_key(&track_id) {
260            return Err(SfuError::TrackNotFound);
261        }
262        let set = room
263            .subscriptions
264            .get_mut(&participant_id)
265            .ok_or(SfuError::ParticipantNotFound)?;
266        if !set.remove(&track_id) {
267            return Err(SfuError::NotAllowed);
268        }
269        self.adapter
270            .on_unsubscribe(&room_id, &participant_id, &track_id);
271        self.sink.emit(RoomEvent::Unsubscribed {
272            room_id,
273            participant_id,
274            track_id,
275        });
276        Ok(())
277    }
278
279    pub fn subscriptions(&self, room_id: RoomId, participant_id: ParticipantId) -> Result<Vec<TrackId>> {
280        let state = self.read_state()?;
281        let room = state.rooms.get(&room_id).ok_or(SfuError::RoomNotFound)?;
282        if !room.participants.contains_key(&participant_id) {
283            return Err(SfuError::ParticipantNotFound);
284        }
285        let mut list: Vec<TrackId> = room
286            .subscriptions
287            .get(&participant_id)
288            .cloned()
289            .unwrap_or_default()
290            .into_iter()
291            .collect();
292        list.sort_by(|a, b| a.to_string().cmp(&b.to_string()));
293        Ok(list)
294    }
295
296    fn remove_track(
297        &self,
298        room: &mut RoomState,
299        room_id: &RoomId,
300        track_id: &TrackId,
301    ) -> Result<()> {
302        if room.tracks.remove(track_id).is_none() {
303            return Err(SfuError::TrackNotFound);
304        }
305        let mut unsubscribed = Vec::new();
306        for (participant, set) in room.subscriptions.iter_mut() {
307            if set.remove(track_id) {
308                unsubscribed.push(participant.clone());
309            }
310        }
311        self.adapter.on_track_unpublished(room_id, track_id);
312        for participant_id in unsubscribed {
313            self.adapter
314                .on_unsubscribe(room_id, &participant_id, track_id);
315            self.sink.emit(RoomEvent::Unsubscribed {
316                room_id: room_id.clone(),
317                participant_id,
318                track_id: track_id.clone(),
319            });
320        }
321        self.sink.emit(RoomEvent::TrackUnpublished {
322            room_id: room_id.clone(),
323            track_id: track_id.clone(),
324        });
325        Ok(())
326    }
327
328    fn read_state(&self) -> Result<RwLockReadGuard<'_, SfuState>> {
329        self.inner
330            .read()
331            .map_err(|_| SfuError::Internal("state poisoned".to_string()))
332    }
333
334    fn write_state(&self) -> Result<RwLockWriteGuard<'_, SfuState>> {
335        self.inner
336            .write()
337            .map_err(|_| SfuError::Internal("state poisoned".to_string()))
338    }
339}