1use std::collections::BTreeMap;
6
7use anyhow::Context;
8use opentalk_roomserver_signaling::{
9 module_context::ModuleContext,
10 participant_state::ParticipantState,
11 signaling_module::{
12 ModuleJoinData, ModuleSwitchData, NoOp, SignalingModule, SignalingModuleDescription,
13 SignalingModuleFeatureDescription, SignalingModuleInitData,
14 },
15};
16use opentalk_roomserver_types::{
17 client_parameters::ClientKind,
18 connection_id::ConnectionId,
19 room_kind::RoomKind,
20 signaling::module_error::{FatalError, SignalingModuleError},
21};
22use opentalk_roomserver_types_transcription::{
23 TRANSCRIPTION_FEATURE_ID, TRANSCRIPTION_MODULE_ID,
24 command::TranscriptionCommand,
25 event::{TranscriptionError, TranscriptionEvent},
26 service::{command::TranscriptionServiceCommand, event::TranscriptionServiceEvent},
27 settings::TranscriptionSettings,
28 state::{TranscriptionState, TranscriptionStatus},
29};
30use opentalk_transcription_web_api::v1::TranscriptionTarget;
31use opentalk_types_common::{features::ModuleFeatureId, modules::ModuleId};
32use opentalk_types_signaling::ParticipantId;
33
34pub struct TranscriptionModule {
35 settings: TranscriptionSettings,
36 http_client: reqwest::Client,
37 transcription_states: BTreeMap<RoomKind, TranscriptionRoomState>,
38}
39
40#[derive(Debug, Clone, PartialEq, Eq)]
42enum TranscriptionRoomState {
43 Requested,
44 Running,
45}
46
47pub enum LoopBackEvent {
48 TranscriptionRequestFailed,
49}
50
51impl SignalingModuleDescription for TranscriptionModule {
52 const MODULE_ID: ModuleId = TRANSCRIPTION_MODULE_ID;
53 const DESCRIPTION: &'static str = "Live transcription for meetings";
54 const FEATURES: &[SignalingModuleFeatureDescription] = &[SignalingModuleFeatureDescription {
55 feature_id: TRANSCRIPTION_FEATURE_ID,
56 description: "Allows to create transcriptions for meetings",
57 }];
58}
59
60impl SignalingModule for TranscriptionModule {
61 const NAMESPACE: ModuleId = TRANSCRIPTION_MODULE_ID;
62
63 type Incoming = TranscriptionCommand;
64 type Outgoing = TranscriptionEvent;
65 type Internal = NoOp;
66 type Loopback = LoopBackEvent;
67 type JoinInfo = TranscriptionState;
68 type PeerJoinInfo = ();
69 type Error = TranscriptionError;
70
71 fn init(init_data: SignalingModuleInitData) -> Option<Self> {
72 let tariff = &init_data.room_parameters.tariff;
73 if tariff.disabled_features.contains(&ModuleFeatureId {
74 module: TRANSCRIPTION_MODULE_ID,
75 feature: TRANSCRIPTION_FEATURE_ID,
76 }) {
77 return None;
78 }
79
80 let settings = init_data
81 .room_parameters
82 .module_settings
83 .get::<TranscriptionSettings>()
84 .ok()??;
85
86 Some(Self {
87 settings,
88 http_client: reqwest::Client::new(),
89 transcription_states: BTreeMap::new(),
90 })
91 }
92
93 fn on_participant_joined(
94 &mut self,
95 ctx: &mut ModuleContext<'_, Self>,
96 _participant_id: ParticipantId,
97 _connection_id: ConnectionId,
98 _is_first_connection: bool,
99 ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
100 Ok(ModuleJoinData {
101 join_success: Some(self.build_module_state(ctx.room)),
102 ..Default::default()
103 })
104 }
105
106 fn on_participant_disconnected(
107 &mut self,
108 ctx: &mut ModuleContext<'_, Self>,
109 participant_id: ParticipantId,
110 _connection_id: ConnectionId,
111 ) -> Result<(), SignalingModuleError<Self::Error>> {
112 if is_transcription_service_for_current_room(ctx, participant_id)?
113 && self.transcription_states.remove(&ctx.room).is_some()
114 {
115 ctx.send_ws_message(
118 ctx.participants.in_room(ctx.room).connected().ids(),
119 TranscriptionError::ServiceDisconnected.into(),
120 )?;
121
122 ctx.send_ws_message(
123 ctx.participants.in_room(ctx.room).connected().ids(),
124 TranscriptionEvent::StateUpdated {
125 status: TranscriptionStatus::Inactive,
126 },
127 )?;
128 }
129
130 Ok(())
131 }
132
133 fn on_websocket_message(
134 &mut self,
135 ctx: &mut ModuleContext<'_, Self>,
136 participant_id: ParticipantId,
137 _connection_id: ConnectionId,
138 command: Self::Incoming,
139 ) -> Result<(), SignalingModuleError<Self::Error>> {
140 match command {
141 TranscriptionCommand::Start { language } => {
142 self.start_transcription(ctx, participant_id, language)
143 }
144 TranscriptionCommand::Stop => {
145 if !ctx.is_moderator(participant_id) {
146 return Err(TranscriptionError::InsufficientPermissions.into());
147 }
148
149 self.stop_transcription(ctx)
150 }
151 TranscriptionCommand::TranscriptionServiceEvent { event } => {
152 self.handle_transcription_service_event(ctx, event, participant_id)
153 }
154 }
155 }
156
157 fn on_breakout_switch(
158 &mut self,
159 ctx: &mut ModuleContext<'_, Self>,
160 participant_id: ParticipantId,
161 _old_room: RoomKind,
162 new_room: RoomKind,
163 ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
164 let state = self.build_module_state(new_room);
165
166 let switch_success = ctx
167 .participant_state(participant_id)
168 .with_context(|| format!("Missing state for participant {participant_id}"))?
169 .connections()
170 .map(|con| (con, Some(state.clone())))
171 .collect();
172
173 Ok(ModuleSwitchData {
174 switch_success,
175 ..Default::default()
176 })
177 }
178
179 fn on_loopback_event(
180 &mut self,
181 ctx: &mut ModuleContext<'_, Self>,
182 event: Self::Loopback,
183 ) -> Result<(), SignalingModuleError<Self::Error>> {
184 match event {
185 LoopBackEvent::TranscriptionRequestFailed => {
186 ctx.send_ws_message(
187 ctx.participants.in_room(ctx.room).connected().ids(),
188 TranscriptionError::ServiceRequestFailed.into(),
189 )?;
190
191 self.stop_transcription(ctx)?;
192 }
193 }
194
195 Ok(())
196 }
197}
198
199impl TranscriptionModule {
200 fn start_transcription(
201 &mut self,
202 ctx: &mut ModuleContext<'_, TranscriptionModule>,
203 participant_id: ParticipantId,
204 language: Option<String>,
205 ) -> Result<(), SignalingModuleError<TranscriptionError>> {
206 if !ctx.is_moderator(participant_id) {
207 return Err(TranscriptionError::InsufficientPermissions.into());
208 }
209
210 if self.transcription_states.contains_key(&ctx.room) {
211 return Err(TranscriptionError::AlreadyActive.into());
212 };
213
214 self.request_transcription(ctx, language)?;
215
216 ctx.send_ws_message(
217 ctx.participants.in_room(ctx.room).connected().ids(),
218 TranscriptionEvent::StateUpdated {
219 status: TranscriptionStatus::Requested,
220 },
221 )?;
222
223 Ok(())
224 }
225
226 fn request_transcription(
227 &mut self,
228 ctx: &mut ModuleContext<'_, TranscriptionModule>,
229 language: Option<String>,
230 ) -> Result<(), SignalingModuleError<TranscriptionError>> {
231 let transcription_config = self.settings.clone();
232
233 let jwt_bearer_token = transcription_config
234 .api_key
235 .generate_jwt()
236 .context("Failed to generate JWT from transcription api_key")
237 .map_err(FatalError)?;
238
239 let http_client = self.http_client.clone();
240
241 let body = TranscriptionTarget {
242 room_id: ctx.room_id,
243 breakout_room: match ctx.room {
244 RoomKind::Main => None,
245 RoomKind::Breakout(breakout_id) => Some(breakout_id.into()),
246 },
247 language,
248 };
249
250 ctx.spawn_optional(async move {
251 let url = match transcription_config.url.join("v1/init") {
252 Ok(url) => url,
253 Err(err) => {
254 tracing::error!(
255 "Failed to build transcription init url from base_url: {}, {err}",
256 transcription_config.url,
257 );
258
259 return Some(LoopBackEvent::TranscriptionRequestFailed);
260 }
261 };
262
263 tracing::debug!("Sending transcription start request to {url} with body {body:#?}");
264
265 let response = http_client
266 .post(url)
267 .bearer_auth(jwt_bearer_token)
268 .json(&body)
269 .send()
270 .await;
271
272 let response = match response {
273 Ok(response) => response,
274 Err(err) => {
275 tracing::error!(
276 "Failed to send init request to transcription service, {err:?}"
277 );
278 return Some(LoopBackEvent::TranscriptionRequestFailed);
279 }
280 };
281
282 let response_status = response.status();
283 let response_body = response
284 .bytes()
285 .await
286 .map(|bytes| String::from_utf8_lossy(&bytes).into_owned());
287
288 if response_status.is_success() {
289 tracing::debug!(
290 "Got transcription start response status={} body={:#?}",
291 response_status,
292 response_body,
293 );
294
295 None
296 } else {
297 tracing::error!(
298 "Got non-success response to transcription start request status={} body={:#?}",
299 response_status,
300 response_body,
301 );
302 Some(LoopBackEvent::TranscriptionRequestFailed)
303 }
304 });
305
306 self.transcription_states
307 .insert(ctx.room, TranscriptionRoomState::Requested);
308 Ok(())
309 }
310
311 fn stop_transcription(
317 &mut self,
318 ctx: &mut ModuleContext<'_, TranscriptionModule>,
319 ) -> Result<(), SignalingModuleError<TranscriptionError>> {
320 if let Some((transcription, _)) = find_transcription_service(ctx) {
321 ctx.send_ws_message(
322 [*transcription],
323 TranscriptionEvent::ServiceCommand {
324 command: TranscriptionServiceCommand::Stop,
325 },
326 )?;
327 } else {
328 if self.transcription_states.remove(&ctx.room).is_none() {
334 return Err(TranscriptionError::NotActive.into());
335 };
336
337 ctx.send_ws_message(
340 ctx.participants.in_room(ctx.room).connected().ids(),
341 TranscriptionEvent::StateUpdated {
342 status: TranscriptionStatus::Inactive,
343 },
344 )?;
345 }
346
347 Ok(())
348 }
349
350 fn handle_transcription_service_event(
352 &mut self,
353 ctx: &mut ModuleContext<'_, Self>,
354 service_event: TranscriptionServiceEvent,
355 participant_id: ParticipantId,
356 ) -> Result<(), SignalingModuleError<TranscriptionError>> {
357 if !is_transcription_service_for_current_room(ctx, participant_id)? {
358 return Err(TranscriptionError::InsufficientPermissions.into());
359 }
360
361 match service_event {
362 TranscriptionServiceEvent::Started => {
363 self.transcription_states
364 .insert(ctx.room, TranscriptionRoomState::Running);
365
366 ctx.send_ws_message(
367 ctx.participants.in_room(ctx.room).connected().ids(),
368 TranscriptionEvent::StateUpdated {
369 status: TranscriptionStatus::Running,
370 },
371 )?;
372 }
373 TranscriptionServiceEvent::Stopped => {
374 self.transcription_states.remove(&ctx.room);
375
376 ctx.send_ws_message(
377 ctx.participants.in_room(ctx.room).connected().ids(),
378 TranscriptionEvent::StateUpdated {
379 status: TranscriptionStatus::Inactive,
380 },
381 )?;
382 }
383 TranscriptionServiceEvent::Segment(transcription_segment) => {
384 ctx.send_ws_message(
385 ctx.participants.in_room(ctx.room).connected().ids(),
386 TranscriptionEvent::Segment(transcription_segment),
387 )?;
388 }
389 };
390
391 Ok(())
392 }
393
394 fn build_module_state(&mut self, room: RoomKind) -> TranscriptionState {
395 let status = match self.transcription_states.get(&room) {
396 Some(TranscriptionRoomState::Requested) => TranscriptionStatus::Requested,
397 Some(TranscriptionRoomState::Running) => TranscriptionStatus::Running,
398 None => TranscriptionStatus::Inactive,
399 };
400
401 TranscriptionState { status }
402 }
403}
404
405fn is_transcription_service_for_current_room(
407 ctx: &mut ModuleContext<'_, TranscriptionModule>,
408 participant_id: ParticipantId,
409) -> Result<bool, SignalingModuleError<TranscriptionError>> {
410 let participant_state = ctx
411 .participant_state(participant_id)
412 .context("Failed to get state for disconnected participant")?;
413
414 Ok(participant_state.kind == ClientKind::Transcription { room: ctx.room })
415}
416
417fn find_transcription_service<'a>(
419 ctx: &'a ModuleContext<'a, TranscriptionModule>,
420) -> Option<(&'a ParticipantId, &'a ParticipantState)> {
421 ctx.participants.in_room(ctx.room).iter().find(|(_, p)| {
422 p.is_connected() && matches!(p.kind, ClientKind::Transcription { room } if room == ctx.room)
423 })
424}