enigma_sfu/
engine.rs

1use crate::adapter::NoopAdapter;
2use crate::api::Sfu;
3use crate::config::SfuConfig;
4use crate::error::Result;
5use crate::events::{EventSink, VecEventSink};
6use crate::ids::RoomId;
7use crate::state::ParticipantMeta;
8use crate::types::{
9    CreateRoomResponse, JoinRequest, JoinResponse, LeaveRequest, ListRoomsResponse, RoomOptions,
10    RoomSummary, TrickledIce,
11};
12use std::collections::HashMap;
13use std::sync::{Arc, RwLock};
14use uuid::Uuid;
15
16pub struct SfuEngine {
17    pub config: SfuConfig,
18    pub sink: Arc<dyn EventSink + Send + Sync>,
19    sfu: Sfu,
20    room_options: RwLock<HashMap<RoomId, RoomOptions>>,
21}
22
23impl SfuEngine {
24    pub fn new(config: SfuConfig) -> Arc<Self> {
25        let sink: Arc<dyn EventSink + Send + Sync> = Arc::new(VecEventSink::new());
26        Arc::new(Self::with_sink(config, sink))
27    }
28
29    pub fn with_sink(
30        config: SfuConfig,
31        sink: Arc<dyn EventSink + Send + Sync>,
32    ) -> Self {
33        let sfu = Sfu::with_adapter(sink.clone(), Arc::new(NoopAdapter));
34        Self {
35            config,
36            sink,
37            sfu,
38            room_options: RwLock::new(HashMap::new()),
39        }
40    }
41
42    pub async fn create_room(&self, options: RoomOptions) -> Result<CreateRoomResponse> {
43        let room_id = RoomId::from_uuid(Uuid::new_v4());
44        self.sfu
45            .create_room(room_id.clone(), options.created_at_ms)?;
46        if let Ok(mut guard) = self.room_options.write() {
47            guard.insert(room_id.clone(), options);
48        }
49        Ok(CreateRoomResponse { room_id })
50    }
51
52    pub async fn delete_room(&self, room_id: RoomId) -> Result<()> {
53        self.sfu.delete_room(room_id.clone())?;
54        if let Ok(mut guard) = self.room_options.write() {
55            guard.remove(&room_id);
56        }
57        Ok(())
58    }
59
60    pub async fn list_rooms(&self) -> ListRoomsResponse {
61        let rooms = self.sfu.list_rooms().unwrap_or_default();
62        let options = self.room_options.read().ok();
63        let summaries = rooms
64            .into_iter()
65            .map(|room_id| {
66                let info = self.sfu.room_info(room_id.clone()).ok();
67                let name = options
68                    .as_ref()
69                    .and_then(|map| map.get(&room_id).and_then(|opt| opt.name.clone()));
70                RoomSummary {
71                    room_id: room_id.clone(),
72                    name,
73                    peers: info.as_ref().map(|r| r.participants.len()).unwrap_or(0),
74                    created_at_ms: info.as_ref().map(|r| r.created_at_ms).unwrap_or(0),
75                }
76            })
77            .collect();
78        ListRoomsResponse { rooms: summaries }
79    }
80
81    pub async fn join_room(&self, req: JoinRequest) -> Result<JoinResponse> {
82        let meta = ParticipantMeta {
83            display_name: req.peer.display.clone(),
84            tags: HashMap::new(),
85        };
86        self.sfu.join(
87            req.room_id.clone(),
88            req.peer.peer_id.clone(),
89            meta,
90            req.peer.joined_at_ms,
91        )?;
92        Ok(JoinResponse {
93            answer_sdp: String::new(),
94            peer_id: req.peer.peer_id,
95        })
96    }
97
98    pub async fn trickle_ice(&self, ice: TrickledIce) -> Result<()> {
99        self.sfu
100            .subscriptions(ice.room_id.clone(), ice.peer_id.clone())
101            .map(|_| ())
102    }
103
104    pub async fn leave(&self, req: LeaveRequest) -> Result<()> {
105        self.sfu.leave(req.room_id, req.peer_id)
106    }
107
108    pub async fn remove_idle(&self, now_ms: u64) -> Result<()> {
109        let retention = self.config.retention_room_idle_secs * 1000;
110        let rooms = self.sfu.list_rooms()?;
111        for room_id in rooms {
112            if let Ok(info) = self.sfu.room_info(room_id.clone()) {
113                if now_ms.saturating_sub(info.created_at_ms) > retention {
114                    let _ = self.delete_room(room_id).await;
115                }
116            }
117        }
118        Ok(())
119    }
120
121    pub fn inner(&self) -> Sfu {
122        self.sfu.clone()
123    }
124}