Skip to main content

opentalk_roomserver_module_transcription/
lib.rs

1// SPDX-FileCopyrightText: OpenTalk GmbH <mail@opentalk.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5use std::collections::BTreeMap;
6
7use anyhow::Context;
8use opentalk_roomserver_signaling::{
9    module_context::ModuleContext,
10    participant_state::ParticipantState,
11    signaling_module::{
12        ModuleJoinData, ModuleSwitchData, NoOp, SignalingModule, SignalingModuleDescription,
13        SignalingModuleFeatureDescription, SignalingModuleInitData,
14    },
15};
16use opentalk_roomserver_types::{
17    client_parameters::ClientKind,
18    connection_id::ConnectionId,
19    room_kind::RoomKind,
20    signaling::module_error::{FatalError, SignalingModuleError},
21};
22use opentalk_roomserver_types_transcription::{
23    TRANSCRIPTION_FEATURE_ID, TRANSCRIPTION_MODULE_ID,
24    command::TranscriptionCommand,
25    event::{TranscriptionError, TranscriptionEvent},
26    service::{command::TranscriptionServiceCommand, event::TranscriptionServiceEvent},
27    settings::TranscriptionSettings,
28    state::{TranscriptionState, TranscriptionStatus},
29};
30use opentalk_transcription_web_api::v1::TranscriptionTarget;
31use opentalk_types_common::{features::ModuleFeatureId, modules::ModuleId};
32use opentalk_types_signaling::ParticipantId;
33
34pub struct TranscriptionModule {
35    settings: TranscriptionSettings,
36    http_client: reqwest::Client,
37    transcription_states: BTreeMap<RoomKind, TranscriptionRoomState>,
38}
39
40/// Internal per-room transcription state
41#[derive(Debug, Clone, PartialEq, Eq)]
42enum TranscriptionRoomState {
43    Requested,
44    Running,
45}
46
47pub enum LoopBackEvent {
48    TranscriptionRequestFailed,
49}
50
51impl SignalingModuleDescription for TranscriptionModule {
52    const MODULE_ID: ModuleId = TRANSCRIPTION_MODULE_ID;
53    const DESCRIPTION: &'static str = "Live transcription for meetings";
54    const FEATURES: &[SignalingModuleFeatureDescription] = &[SignalingModuleFeatureDescription {
55        feature_id: TRANSCRIPTION_FEATURE_ID,
56        description: "Allows to create transcriptions for meetings",
57    }];
58}
59
60impl SignalingModule for TranscriptionModule {
61    const NAMESPACE: ModuleId = TRANSCRIPTION_MODULE_ID;
62
63    type Incoming = TranscriptionCommand;
64    type Outgoing = TranscriptionEvent;
65    type Internal = NoOp;
66    type Loopback = LoopBackEvent;
67    type JoinInfo = TranscriptionState;
68    type PeerJoinInfo = ();
69    type Error = TranscriptionError;
70
71    fn init(init_data: SignalingModuleInitData) -> Option<Self> {
72        let tariff = &init_data.room_parameters.tariff;
73        if tariff.disabled_features.contains(&ModuleFeatureId {
74            module: TRANSCRIPTION_MODULE_ID,
75            feature: TRANSCRIPTION_FEATURE_ID,
76        }) {
77            return None;
78        }
79
80        let settings = init_data
81            .room_parameters
82            .module_settings
83            .get::<TranscriptionSettings>()
84            .ok()??;
85
86        Some(Self {
87            settings,
88            http_client: reqwest::Client::new(),
89            transcription_states: BTreeMap::new(),
90        })
91    }
92
93    fn on_participant_joined(
94        &mut self,
95        ctx: &mut ModuleContext<'_, Self>,
96        _participant_id: ParticipantId,
97        _connection_id: ConnectionId,
98        _is_first_connection: bool,
99    ) -> Result<ModuleJoinData<Self>, SignalingModuleError<Self::Error>> {
100        Ok(ModuleJoinData {
101            join_success: Some(self.build_module_state(ctx.room)),
102            ..Default::default()
103        })
104    }
105
106    fn on_participant_disconnected(
107        &mut self,
108        ctx: &mut ModuleContext<'_, Self>,
109        participant_id: ParticipantId,
110        _connection_id: ConnectionId,
111    ) -> Result<(), SignalingModuleError<Self::Error>> {
112        if is_transcription_service_for_current_room(ctx, participant_id)?
113            && self.transcription_states.remove(&ctx.room).is_some()
114        {
115            // We had an active transcription but the transcription service disconnected
116            // unexpectedly
117            ctx.send_ws_message(
118                ctx.participants.in_room(ctx.room).connected().ids(),
119                TranscriptionError::ServiceDisconnected.into(),
120            )?;
121
122            ctx.send_ws_message(
123                ctx.participants.in_room(ctx.room).connected().ids(),
124                TranscriptionEvent::StateUpdated {
125                    status: TranscriptionStatus::Inactive,
126                },
127            )?;
128        }
129
130        Ok(())
131    }
132
133    fn on_websocket_message(
134        &mut self,
135        ctx: &mut ModuleContext<'_, Self>,
136        participant_id: ParticipantId,
137        _connection_id: ConnectionId,
138        command: Self::Incoming,
139    ) -> Result<(), SignalingModuleError<Self::Error>> {
140        match command {
141            TranscriptionCommand::Start { language } => {
142                self.start_transcription(ctx, participant_id, language)
143            }
144            TranscriptionCommand::Stop => {
145                if !ctx.is_moderator(participant_id) {
146                    return Err(TranscriptionError::InsufficientPermissions.into());
147                }
148
149                self.stop_transcription(ctx)
150            }
151            TranscriptionCommand::TranscriptionServiceEvent { event } => {
152                self.handle_transcription_service_event(ctx, event, participant_id)
153            }
154        }
155    }
156
157    fn on_breakout_switch(
158        &mut self,
159        ctx: &mut ModuleContext<'_, Self>,
160        participant_id: ParticipantId,
161        _old_room: RoomKind,
162        new_room: RoomKind,
163    ) -> Result<ModuleSwitchData<Self>, SignalingModuleError<Self::Error>> {
164        let state = self.build_module_state(new_room);
165
166        let switch_success = ctx
167            .participant_state(participant_id)
168            .with_context(|| format!("Missing state for participant {participant_id}"))?
169            .connections()
170            .map(|con| (con, Some(state.clone())))
171            .collect();
172
173        Ok(ModuleSwitchData {
174            switch_success,
175            ..Default::default()
176        })
177    }
178
179    fn on_loopback_event(
180        &mut self,
181        ctx: &mut ModuleContext<'_, Self>,
182        event: Self::Loopback,
183    ) -> Result<(), SignalingModuleError<Self::Error>> {
184        match event {
185            LoopBackEvent::TranscriptionRequestFailed => {
186                ctx.send_ws_message(
187                    ctx.participants.in_room(ctx.room).connected().ids(),
188                    TranscriptionError::ServiceRequestFailed.into(),
189                )?;
190
191                self.stop_transcription(ctx)?;
192            }
193        }
194
195        Ok(())
196    }
197}
198
199impl TranscriptionModule {
200    fn start_transcription(
201        &mut self,
202        ctx: &mut ModuleContext<'_, TranscriptionModule>,
203        participant_id: ParticipantId,
204        language: Option<String>,
205    ) -> Result<(), SignalingModuleError<TranscriptionError>> {
206        if !ctx.is_moderator(participant_id) {
207            return Err(TranscriptionError::InsufficientPermissions.into());
208        }
209
210        if self.transcription_states.contains_key(&ctx.room) {
211            return Err(TranscriptionError::AlreadyActive.into());
212        };
213
214        self.request_transcription(ctx, language)?;
215
216        ctx.send_ws_message(
217            ctx.participants.in_room(ctx.room).connected().ids(),
218            TranscriptionEvent::StateUpdated {
219                status: TranscriptionStatus::Requested,
220            },
221        )?;
222
223        Ok(())
224    }
225
226    fn request_transcription(
227        &mut self,
228        ctx: &mut ModuleContext<'_, TranscriptionModule>,
229        language: Option<String>,
230    ) -> Result<(), SignalingModuleError<TranscriptionError>> {
231        let transcription_config = self.settings.clone();
232
233        let jwt_bearer_token = transcription_config
234            .api_key
235            .generate_jwt()
236            .context("Failed to generate JWT from transcription api_key")
237            .map_err(FatalError)?;
238
239        let http_client = self.http_client.clone();
240
241        let body = TranscriptionTarget {
242            room_id: ctx.room_id,
243            breakout_room: match ctx.room {
244                RoomKind::Main => None,
245                RoomKind::Breakout(breakout_id) => Some(breakout_id.into()),
246            },
247            language,
248        };
249
250        ctx.spawn_optional(async move {
251            let url = match transcription_config.url.join("v1/init") {
252                Ok(url) => url,
253                Err(err) => {
254                    tracing::error!(
255                        "Failed to build transcription init url from base_url: {}, {err}",
256                        transcription_config.url,
257                    );
258
259                    return Some(LoopBackEvent::TranscriptionRequestFailed);
260                }
261            };
262
263            tracing::debug!("Sending transcription start request to {url} with body {body:#?}");
264
265            let response = http_client
266                .post(url)
267                .bearer_auth(jwt_bearer_token)
268                .json(&body)
269                .send()
270                .await;
271
272            let response = match response {
273                Ok(response) => response,
274                Err(err) => {
275                    tracing::error!(
276                        "Failed to send init request to transcription service, {err:?}"
277                    );
278                    return Some(LoopBackEvent::TranscriptionRequestFailed);
279                }
280            };
281
282            let response_status = response.status();
283            let response_body = response
284                .bytes()
285                .await
286                .map(|bytes| String::from_utf8_lossy(&bytes).into_owned());
287
288            if response_status.is_success() {
289                tracing::debug!(
290                    "Got transcription start response status={} body={:#?}",
291                    response_status,
292                    response_body,
293                );
294
295                None
296            } else {
297                tracing::error!(
298                    "Got non-success response to transcription start request status={} body={:#?}",
299                    response_status,
300                    response_body,
301                );
302                Some(LoopBackEvent::TranscriptionRequestFailed)
303            }
304        });
305
306        self.transcription_states
307            .insert(ctx.room, TranscriptionRoomState::Requested);
308        Ok(())
309    }
310
311    /// Send a stop command to the transcription service if the transcription state is requested or
312    /// active
313    ///
314    /// Once the transcription service stops and disconnects, other participants will receive the
315    /// `Inactive` state update
316    fn stop_transcription(
317        &mut self,
318        ctx: &mut ModuleContext<'_, TranscriptionModule>,
319    ) -> Result<(), SignalingModuleError<TranscriptionError>> {
320        if let Some((transcription, _)) = find_transcription_service(ctx) {
321            ctx.send_ws_message(
322                [*transcription],
323                TranscriptionEvent::ServiceCommand {
324                    command: TranscriptionServiceCommand::Stop,
325                },
326            )?;
327        } else {
328            // The transcription service is not yet connected, but a stop command was issued
329            //
330            // Remove the transcription state so that if the transcription service connects later it
331            // will notice the missing state and immediately disconnect without starting
332            // the transcription
333            if self.transcription_states.remove(&ctx.room).is_none() {
334                return Err(TranscriptionError::NotActive.into());
335            };
336
337            // In this scenario, we don't need to wait until the transcription service disconnects
338            // to update the state for other participants
339            ctx.send_ws_message(
340                ctx.participants.in_room(ctx.room).connected().ids(),
341                TranscriptionEvent::StateUpdated {
342                    status: TranscriptionStatus::Inactive,
343                },
344            )?;
345        }
346
347        Ok(())
348    }
349
350    /// Handle events that were sent by the transcription service
351    fn handle_transcription_service_event(
352        &mut self,
353        ctx: &mut ModuleContext<'_, Self>,
354        service_event: TranscriptionServiceEvent,
355        participant_id: ParticipantId,
356    ) -> Result<(), SignalingModuleError<TranscriptionError>> {
357        if !is_transcription_service_for_current_room(ctx, participant_id)? {
358            return Err(TranscriptionError::InsufficientPermissions.into());
359        }
360
361        match service_event {
362            TranscriptionServiceEvent::Started => {
363                self.transcription_states
364                    .insert(ctx.room, TranscriptionRoomState::Running);
365
366                ctx.send_ws_message(
367                    ctx.participants.in_room(ctx.room).connected().ids(),
368                    TranscriptionEvent::StateUpdated {
369                        status: TranscriptionStatus::Running,
370                    },
371                )?;
372            }
373            TranscriptionServiceEvent::Stopped => {
374                self.transcription_states.remove(&ctx.room);
375
376                ctx.send_ws_message(
377                    ctx.participants.in_room(ctx.room).connected().ids(),
378                    TranscriptionEvent::StateUpdated {
379                        status: TranscriptionStatus::Inactive,
380                    },
381                )?;
382            }
383            TranscriptionServiceEvent::Segment(transcription_segment) => {
384                ctx.send_ws_message(
385                    ctx.participants.in_room(ctx.room).connected().ids(),
386                    TranscriptionEvent::Segment(transcription_segment),
387                )?;
388            }
389        };
390
391        Ok(())
392    }
393
394    fn build_module_state(&mut self, room: RoomKind) -> TranscriptionState {
395        let status = match self.transcription_states.get(&room) {
396            Some(TranscriptionRoomState::Requested) => TranscriptionStatus::Requested,
397            Some(TranscriptionRoomState::Running) => TranscriptionStatus::Running,
398            None => TranscriptionStatus::Inactive,
399        };
400
401        TranscriptionState { status }
402    }
403}
404
405/// Check if the given participant is the transcription service for the current room
406fn is_transcription_service_for_current_room(
407    ctx: &mut ModuleContext<'_, TranscriptionModule>,
408    participant_id: ParticipantId,
409) -> Result<bool, SignalingModuleError<TranscriptionError>> {
410    let participant_state = ctx
411        .participant_state(participant_id)
412        .context("Failed to get state for disconnected participant")?;
413
414    Ok(participant_state.kind == ClientKind::Transcription { room: ctx.room })
415}
416
417/// Find the connected transcription service participant for the current room
418fn find_transcription_service<'a>(
419    ctx: &'a ModuleContext<'a, TranscriptionModule>,
420) -> Option<(&'a ParticipantId, &'a ParticipantState)> {
421    ctx.participants.in_room(ctx.room).iter().find(|(_, p)| {
422        p.is_connected() && matches!(p.kind, ClientKind::Transcription { room } if room == ctx.room)
423    })
424}