1use crate::adapter::NoopAdapter;
2use crate::api::Sfu;
3use crate::config::SfuConfig;
4use crate::error::Result;
5use crate::events::{EventSink, VecEventSink};
6use crate::ids::RoomId;
7use crate::state::ParticipantMeta;
8use crate::types::{
9 CreateRoomResponse, JoinRequest, JoinResponse, LeaveRequest, ListRoomsResponse, RoomOptions,
10 RoomSummary, TrickledIce,
11};
12use std::collections::HashMap;
13use std::sync::{Arc, RwLock};
14use uuid::Uuid;
15
16pub struct SfuEngine {
17 pub config: SfuConfig,
18 pub sink: Arc<dyn EventSink + Send + Sync>,
19 sfu: Sfu,
20 room_options: RwLock<HashMap<RoomId, RoomOptions>>,
21}
22
23impl SfuEngine {
24 pub fn new(config: SfuConfig) -> Arc<Self> {
25 let sink: Arc<dyn EventSink + Send + Sync> = Arc::new(VecEventSink::new());
26 Arc::new(Self::with_sink(config, sink))
27 }
28
29 pub fn with_sink(
30 config: SfuConfig,
31 sink: Arc<dyn EventSink + Send + Sync>,
32 ) -> Self {
33 let sfu = Sfu::with_adapter(sink.clone(), Arc::new(NoopAdapter));
34 Self {
35 config,
36 sink,
37 sfu,
38 room_options: RwLock::new(HashMap::new()),
39 }
40 }
41
42 pub async fn create_room(&self, options: RoomOptions) -> Result<CreateRoomResponse> {
43 let room_id = RoomId::from_uuid(Uuid::new_v4());
44 self.sfu
45 .create_room(room_id.clone(), options.created_at_ms)?;
46 if let Ok(mut guard) = self.room_options.write() {
47 guard.insert(room_id.clone(), options);
48 }
49 Ok(CreateRoomResponse { room_id })
50 }
51
52 pub async fn delete_room(&self, room_id: RoomId) -> Result<()> {
53 self.sfu.delete_room(room_id.clone())?;
54 if let Ok(mut guard) = self.room_options.write() {
55 guard.remove(&room_id);
56 }
57 Ok(())
58 }
59
60 pub async fn list_rooms(&self) -> ListRoomsResponse {
61 let rooms = self.sfu.list_rooms().unwrap_or_default();
62 let options = self.room_options.read().ok();
63 let summaries = rooms
64 .into_iter()
65 .map(|room_id| {
66 let info = self.sfu.room_info(room_id.clone()).ok();
67 let name = options
68 .as_ref()
69 .and_then(|map| map.get(&room_id).and_then(|opt| opt.name.clone()));
70 RoomSummary {
71 room_id: room_id.clone(),
72 name,
73 peers: info.as_ref().map(|r| r.participants.len()).unwrap_or(0),
74 created_at_ms: info.as_ref().map(|r| r.created_at_ms).unwrap_or(0),
75 }
76 })
77 .collect();
78 ListRoomsResponse { rooms: summaries }
79 }
80
81 pub async fn join_room(&self, req: JoinRequest) -> Result<JoinResponse> {
82 let meta = ParticipantMeta {
83 display_name: req.peer.display.clone(),
84 tags: HashMap::new(),
85 };
86 self.sfu.join(
87 req.room_id.clone(),
88 req.peer.peer_id.clone(),
89 meta,
90 req.peer.joined_at_ms,
91 )?;
92 Ok(JoinResponse {
93 answer_sdp: String::new(),
94 peer_id: req.peer.peer_id,
95 })
96 }
97
98 pub async fn trickle_ice(&self, ice: TrickledIce) -> Result<()> {
99 self.sfu
100 .subscriptions(ice.room_id.clone(), ice.peer_id.clone())
101 .map(|_| ())
102 }
103
104 pub async fn leave(&self, req: LeaveRequest) -> Result<()> {
105 self.sfu.leave(req.room_id, req.peer_id)
106 }
107
108 pub async fn remove_idle(&self, now_ms: u64) -> Result<()> {
109 let retention = self.config.retention_room_idle_secs * 1000;
110 let rooms = self.sfu.list_rooms()?;
111 for room_id in rooms {
112 if let Ok(info) = self.sfu.room_info(room_id.clone()) {
113 if now_ms.saturating_sub(info.created_at_ms) > retention {
114 let _ = self.delete_room(room_id).await;
115 }
116 }
117 }
118 Ok(())
119 }
120
121 pub fn inner(&self) -> Sfu {
122 self.sfu.clone()
123 }
124}