Skip to main content

opentalk_compositor/
lib.rs

1// SPDX-FileCopyrightText: OpenTalk GmbH <mail@opentalk.eu>
2//
3// SPDX-License-Identifier: EUPL-1.2
4
5#![allow(clippy::module_name_repetitions)]
6
7use std::sync::Mutex as StdMutex;
8use std::{collections::HashMap, sync::Arc, time::Instant};
9
10use anyhow::{bail, Context, Result};
11use audio::{audio_mixer_task, NativeAudioStreamSource, Silence};
12use audio_nodes::{AudioConvert, AudioMixer};
13use ezk::nodes::{Access, AccessHandle};
14use ezk_image::{ColorInfo, ColorPrimaries, ColorSpace, ColorTransfer, YuvColorInfo};
15use futures::StreamExt;
16use livekit::webrtc::prelude::I420Buffer;
17use livekit::{
18    prelude::*,
19    webrtc::{audio_stream::native::NativeAudioStream, video_stream::native::NativeVideoStream},
20};
21use livekit_api::access_token::{AccessToken, AccessTokenError, VideoGrants};
22use tokio::{
23    sync::{broadcast, mpsc, Mutex},
24    task::JoinHandle,
25};
26use video::{VideoPipeline, VideoStreamCommand};
27
28pub mod audio;
29pub mod font;
30pub mod image;
31pub mod sinks;
32pub mod video;
33
34#[cfg(feature = "gstreamer")]
35pub mod debug;
36#[cfg(feature = "gstreamer")]
37pub mod elements;
38#[cfg(feature = "gstreamer")]
39pub mod gst_with_context;
40#[cfg(feature = "gstreamer")]
41pub mod gstreamer;
42#[cfg(feature = "gstreamer")]
43pub mod pipeline_watched;
44
45#[cfg(feature = "gstreamer")]
46pub use gst_with_context::*;
47pub use sinks::*;
48
49pub use livekit;
50
51#[macro_use]
52extern crate log;
53
54pub use livekit::id::ParticipantIdentity;
55
56#[derive(Debug, Clone, serde::Deserialize)]
57pub struct ClockFormat(String);
58
59impl Default for ClockFormat {
60    fn default() -> Self {
61        Self(String::from("%x %X %Z"))
62    }
63}
64
65impl AsRef<str> for ClockFormat {
66    fn as_ref(&self) -> &str {
67        &self.0
68    }
69}
70
71pub const WIDTH: usize = 1920;
72pub const HEIGHT: usize = 1080;
73pub const FRAMES_PER_SECOND: usize = 25;
74
75/// The amount of pixels for borders
76pub(crate) const BORDER: usize = 4;
77
78pub const I420_COLOR: ColorInfo = ColorInfo::YUV(YuvColorInfo {
79    transfer: ColorTransfer::Linear,
80    primaries: ColorPrimaries::BT709,
81    space: ColorSpace::BT709,
82    full_range: false,
83});
84
85pub const PADDING: usize = 16;
86pub const OFFSET_TOP: usize = 40;
87
88pub struct Mixer {
89    #[cfg(feature = "gstreamer")]
90    start: Instant,
91
92    auto_subscribe: bool,
93
94    sinks: Arc<Mutex<HashMap<String, Box<dyn Sink>>>>,
95
96    room: Room,
97    // LiveKitRoom events
98    room_events: mpsc::UnboundedReceiver<RoomEvent>,
99
100    http_client: reqwest::Client,
101
102    #[allow(clippy::type_complexity)]
103    external_room_event_handler: Vec<Box<dyn FnMut(&RoomEvent) + Send>>,
104
105    // Shared Data for Audio and Video Mixer
106    shared: Arc<StdMutex<Shared>>,
107
108    // Audio
109    audio_mixer_handle: Arc<parking_lot::Mutex<AccessHandle<AudioMixer>>>,
110    audio_mixer_task: JoinHandle<()>,
111
112    // Video
113    video_stream_tx: mpsc::UnboundedSender<VideoStreamCommand>,
114    video_task: Option<JoinHandle<()>>,
115
116    shutdown_tx: broadcast::Sender<()>,
117}
118
119#[derive(Debug, Clone)]
120struct SpeakingState {
121    is_speaking: bool,
122    last_event: Instant,
123}
124
125#[derive(Debug)]
126struct Shared {
127    participants: HashMap<ParticipantIdentity, Participant>,
128    speakers: HashMap<ParticipantIdentity, SpeakingState>,
129
130    clock_format: ClockFormat,
131    event_title: Option<String>,
132
133    render_frames: bool,
134}
135
136// FIXME
137impl std::fmt::Debug for Mixer {
138    fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139        Ok(())
140    }
141}
142
143#[derive(Debug)]
144pub(crate) struct Participant {
145    display_name: String,
146    // Optional avatar to show instead of the placeholder image
147    avatar: Option<I420Buffer>,
148}
149
150pub struct MixerParameters {
151    pub auto_subscribe: bool,
152    pub clock_format: ClockFormat,
153    pub livekit_url: String,
154    pub livekit_token: String,
155    pub target_fps: u16,
156}
157
158impl Mixer {
159    // TODO: This will be fixed later on
160    #[allow(clippy::missing_errors_doc)]
161    // RoomOptions in future livekits will have #[non_exhaustive] making the struct initilization impossible
162    #[allow(clippy::field_reassign_with_default)]
163    pub async fn new(parameters: MixerParameters) -> Result<Self> {
164        #[cfg(feature = "gstreamer")]
165        {
166            use anyhow::Context;
167
168            elements::register_all().context("Unable to register all custom GStreamer Elements")?;
169        }
170
171        let mut room_options = RoomOptions::default();
172        room_options.auto_subscribe = false;
173
174        let (room, room_events) = Room::connect(
175            &parameters.livekit_url,
176            &parameters.livekit_token,
177            room_options,
178        )
179        .await?;
180
181        let shared = Arc::new(StdMutex::new(Shared {
182            participants: HashMap::default(),
183            speakers: HashMap::new(),
184            clock_format: parameters.clock_format,
185            event_title: None,
186            render_frames: true,
187        }));
188
189        #[cfg(feature = "gstreamer")]
190        let start = Instant::now();
191
192        // Initialize Audio Mixer
193        let (access, audio_mixer_handle) =
194            Access::new(AudioMixer::new(AudioConvert::new(Silence::default())));
195        let audio_mixer_handle = Arc::new(parking_lot::Mutex::new(audio_mixer_handle));
196        let sinks = Arc::new(Mutex::new(HashMap::default()));
197        let audio_mixer_task = tokio::spawn(audio_mixer_task(access, sinks.clone()));
198
199        // Initialize Video Mixer
200        let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
201        let (video_stream_tx, video_task) = VideoPipeline::create(
202            sinks.clone(),
203            shared.clone(),
204            shutdown_rx,
205            parameters.target_fps,
206        )
207        .context("Failed to create video pipeline")?;
208
209        let mixer = Mixer {
210            #[cfg(feature = "gstreamer")]
211            start,
212            auto_subscribe: parameters.auto_subscribe,
213            sinks,
214            room,
215            room_events,
216            http_client: reqwest::Client::new(),
217            external_room_event_handler: vec![],
218            shared,
219            audio_mixer_handle,
220            audio_mixer_task,
221            video_stream_tx,
222            video_task: Some(video_task),
223            shutdown_tx,
224        };
225
226        Ok(mixer)
227    }
228
229    pub fn add_livekit_event_handler<F>(&mut self, event_handler: F)
230    where
231        F: FnMut(&RoomEvent) + Send + 'static,
232    {
233        self.external_room_event_handler
234            .push(Box::new(event_handler));
235    }
236
237    #[must_use]
238    pub fn local_participant(&self) -> LocalParticipant {
239        self.room.local_participant()
240    }
241
242    /// Run the compositor event loop
243    ///
244    /// Returns once the client was disconnected from livekit
245    ///
246    /// This function is cancel safe
247    pub async fn run(&mut self) -> DisconnectReason {
248        if self.auto_subscribe {
249            for participant in self.room.remote_participants().values() {
250                self.add_participant(&participant.identity(), participant.name());
251            }
252        }
253
254        while let Some(event) = self.room_events.recv().await {
255            for handler in &mut self.external_room_event_handler {
256                handler(&event);
257            }
258
259            if let Some(disconnect_reason) = self.handle_livekit_event(event) {
260                return disconnect_reason;
261            }
262        }
263
264        DisconnectReason::UnknownReason
265    }
266
267    /// Sets the target fps of this [`Mixer`].
268    ///
269    /// # Panics
270    ///
271    /// Panics if the background render thread has exited
272    pub fn set_target_fps(&mut self, target_fps: u16) {
273        self.video_stream_tx
274            .send(VideoStreamCommand::SetTargetFps(target_fps))
275            .expect("unable to send set target fps event");
276    }
277
278    fn handle_livekit_event(&mut self, event: RoomEvent) -> Option<DisconnectReason> {
279        log::debug!("LiveKit event received: {event:?}");
280        match event {
281            RoomEvent::TrackSubscribed {
282                track,
283                publication: _,
284                participant,
285            } => match track {
286                RemoteTrack::Audio(audio_track) => {
287                    self.add_audio_track(participant.identity(), audio_track);
288                }
289                RemoteTrack::Video(video_track) => {
290                    self.add_video_track(participant, video_track);
291                }
292            },
293            RoomEvent::TrackUnsubscribed {
294                track,
295                publication: _,
296                participant: _,
297            } => {
298                self.video_stream_tx
299                    .send(VideoStreamCommand::RemoveTrack(track.sid()))
300                    .expect("unable to send video stream remove track event");
301            }
302            RoomEvent::ActiveSpeakersChanged { speakers } => {
303                self.handle_active_speakers_changed(speakers);
304            }
305            RoomEvent::TrackMuted {
306                participant: _,
307                publication,
308            } => {
309                self.video_stream_tx
310                    .send(VideoStreamCommand::Mute(publication.sid()))
311                    .expect("unable to send video stream mute event");
312            }
313            RoomEvent::TrackUnmuted {
314                participant: _,
315                publication,
316            } => {
317                self.video_stream_tx
318                    .send(VideoStreamCommand::Unmute(publication.sid()))
319                    .expect("unable to send video stream unmute event");
320            }
321            RoomEvent::TrackPublished {
322                publication,
323                participant,
324            } => {
325                let shared = self.shared.lock().unwrap();
326
327                if !shared.render_frames && matches!(publication.kind(), TrackKind::Video) {
328                    // do not subscribe video while not rendering frames
329                    return None;
330                }
331
332                if shared.participants.contains_key(&participant.identity()) {
333                    publication.set_subscribed(true);
334                }
335            }
336            RoomEvent::TrackUnpublished {
337                publication,
338                participant: _,
339            } => {
340                let track = publication.track()?;
341
342                self.video_stream_tx
343                    .send(VideoStreamCommand::RemoveTrack(track.sid()))
344                    .expect("unable to send video stream remove track event");
345            }
346            RoomEvent::ParticipantConnected(participant) => {
347                if self.auto_subscribe {
348                    self.add_participant(&participant.identity(), participant.name());
349                }
350            }
351            RoomEvent::ParticipantDisconnected(participant) => {
352                self.remove_participant(&participant.identity());
353            }
354            RoomEvent::Disconnected { reason } => return Some(reason.into()),
355            _ => {}
356        }
357
358        None
359    }
360
361    /// Changes the active Speaker for this [`Mixer`].
362    ///
363    /// # Panics
364    ///
365    /// Panics if the [`Shared`] lock couldn't be acquired.
366    fn handle_active_speakers_changed(&mut self, speakers: Vec<livekit::participant::Participant>) {
367        let shared = &mut *self.shared.lock().unwrap();
368
369        for state in shared.speakers.values_mut() {
370            state.is_speaking = false;
371        }
372
373        for participant in speakers {
374            shared.speakers.insert(
375                participant.identity(),
376                SpeakingState {
377                    is_speaking: true,
378                    last_event: Instant::now(),
379                },
380            );
381        }
382    }
383
384    // TODO: This will be fixed later on
385    #[allow(clippy::missing_errors_doc)]
386    #[cfg(feature = "gstreamer")]
387    pub async fn link_gstreamer_sink(
388        &mut self,
389        name: &str,
390        sink: impl gstreamer::GStreamerSink,
391    ) -> Result<()> {
392        trace!("link sink, name: {name}, sink: {sink:?}");
393
394        let mut sinks = self.sinks.lock().await;
395        if sinks.contains_key(name) {
396            bail!("a stream with the name '{name}' already exists");
397        }
398
399        let active_sink = gstreamer::GStreamerActiveSink::new(self.start, name, sink)?;
400
401        sinks.insert(name.to_owned(), Box::new(active_sink));
402
403        Ok(())
404    }
405
406    // TODO: This will be fixed later on
407    #[allow(clippy::missing_errors_doc)]
408    pub async fn link_sink(&mut self, name: &str, sink: Box<dyn Sink>) -> Result<()> {
409        trace!("link sink, name: {name}, sink: {sink:?}");
410
411        let mut sinks = self.sinks.lock().await;
412        if sinks.contains_key(name) {
413            bail!("a stream with the name '{name}' already exists");
414        }
415
416        sinks.insert(name.to_owned(), sink);
417
418        Ok(())
419    }
420
421    pub async fn release_sink(&mut self, name: &String) {
422        trace!("release_sink {name}");
423        self.sinks.lock().await.remove(name);
424    }
425
426    /// Sets the event title of this [`Mixer`].
427    ///
428    /// # Panics
429    ///
430    /// Panics if the lock for the [`Shared`] object could not be acquired.
431    pub fn set_event_title(&mut self, title: String) {
432        self.shared.lock().unwrap().event_title = Some(title);
433    }
434
435    fn add_audio_track(
436        &mut self,
437        participant_identity: ParticipantIdentity,
438        audio_track: RemoteAudioTrack,
439    ) {
440        let rtc_track = audio_track.rtc_track();
441        self.audio_mixer_handle
442            .lock()
443            .access_no_wait(move |audio_mixer| {
444                audio_mixer.add_source(AudioConvert::new(NativeAudioStreamSource {
445                    stream: NativeAudioStream::new(rtc_track, 48_000, 2),
446                    timestamp: 0,
447                }));
448            });
449
450        self.video_stream_tx
451            .send(VideoStreamCommand::AddAudioTrack((
452                participant_identity,
453                audio_track,
454            )))
455            .expect("unable to send add event to video_stream_tx");
456    }
457
458    fn add_video_track(&mut self, participant: RemoteParticipant, video_track: RemoteVideoTrack) {
459        self.video_stream_tx
460            .send(VideoStreamCommand::AddVideoTrack((
461                participant.identity(),
462                video_track.clone(),
463                Box::pin(
464                    NativeVideoStream::new(video_track.rtc_track()).map(move |frame| {
465                        (
466                            participant.identity(),
467                            video_track.sid(),
468                            frame.buffer.to_i420(),
469                        )
470                    }),
471                ),
472            )))
473            .expect("unable to send add event to video_stream_tx");
474    }
475
476    /// Adds a Participant to the [`Shared`] list.
477    ///
478    /// # Panics
479    ///
480    /// Panics if the [`Shared`] object lock couldn't be acquired.
481    pub fn add_participant(&mut self, identity: &ParticipantIdentity, display_name: String) {
482        log::debug!("Add participant {identity:?}");
483
484        self.shared
485            .lock()
486            .unwrap()
487            .participants
488            .entry(identity.clone())
489            .and_modify(|p| p.display_name.clone_from(&display_name))
490            .or_insert_with(|| Participant {
491                display_name,
492                avatar: None,
493            });
494
495        if let Some(remote_participant) = self.room.remote_participants().get(identity) {
496            for (_track_sid, track_publication) in remote_participant.track_publications() {
497                track_publication.set_subscribed(true);
498            }
499        }
500    }
501
502    /// Set a image http url for the participant
503    ///
504    /// The image is loaded asynchronously and used when the participant has no camera or screenshare enabled
505    ///
506    /// # Panics
507    ///
508    /// Panics if the [`Shared`] object lock couldn't be acquired.
509    pub fn set_participant_avatar_url(
510        &mut self,
511        identity: &ParticipantIdentity,
512        avatar_url: String,
513    ) {
514        async fn load_avatar_from_url(
515            http_client: reqwest::Client,
516            shared: Arc<StdMutex<Shared>>,
517            identity: ParticipantIdentity,
518            avatar_url: &str,
519        ) -> Result<()> {
520            let response = http_client
521                .get(avatar_url)
522                .send()
523                .await
524                .context("Failed to send a HTTP request")?;
525
526            if !response.status().is_success() {
527                bail!("Got unexpected {} response", response.status());
528            }
529
530            let bytes = response
531                .bytes()
532                .await
533                .context("Failed to receive HTTP response")?;
534
535            let avatar = ::image::load_from_memory(&bytes)
536                .context("Failed to load image from HTTP response")?;
537
538            let mut shared = shared.lock().unwrap();
539
540            if let Some(participant) = shared.participants.get_mut(&identity) {
541                participant.avatar = Some(
542                    video::placeholder::avatar_to_placeholder(&avatar)
543                        .context("Failed to convert received avatar to I420Buffer")?,
544                );
545            }
546
547            Ok(())
548        }
549
550        // Return early if the participant doesn't exist
551        if !self
552            .shared
553            .lock()
554            .unwrap()
555            .participants
556            .contains_key(identity)
557        {
558            return;
559        }
560
561        let http_client = self.http_client.clone();
562        let shared = self.shared.clone();
563        let identity = identity.clone();
564
565        tokio::spawn(async move {
566            if let Err(e) = load_avatar_from_url(http_client, shared, identity, &avatar_url).await {
567                log::warn!("Failed to fetch avatar from url={avatar_url}, error={e:?}");
568            }
569        });
570    }
571
572    /// # Panics
573    ///
574    /// This can fail if the event could not be send to internal the channel.
575    pub fn remove_participant(&mut self, identity: &ParticipantIdentity) {
576        log::debug!("Remove participant {identity:?}");
577
578        self.video_stream_tx
579            .send(VideoStreamCommand::RemoveParticipant(identity.to_owned()))
580            .expect("unable to send add remove event to video_stream_tx");
581    }
582
583    /// Sets the video support of this [`Mixer`].
584    ///
585    /// # Panics
586    ///
587    /// Panics if the [`Shared`] lock couldn't be acquired.
588    pub fn set_video_support(&mut self, enabled: bool) {
589        let mut shared = self.shared.lock().unwrap();
590
591        shared.render_frames = enabled;
592
593        // set subscription state of all participant publications
594        for (pid, remote_participant) in self.room.remote_participants() {
595            if !shared.participants.contains_key(&pid) {
596                // not tracking this participant, skip
597                continue;
598            }
599
600            for publication in remote_participant.track_publications().into_values() {
601                if matches!(publication.kind(), TrackKind::Video) {
602                    publication.set_subscribed(enabled);
603                }
604            }
605        }
606    }
607}
608
609impl Drop for Mixer {
610    fn drop(&mut self) {
611        log::debug!("Drop Mixer");
612
613        tokio::task::block_in_place(move || {
614            tokio::runtime::Handle::current().block_on(async move {
615                if let Err(e) = self.room.close().await {
616                    log::warn!("Failed to close livekit room, {e:?}");
617                }
618
619                log::debug!("Send shutdown to all tasks");
620                self.shutdown_tx.send(()).ok();
621
622                if let Some(video_task) = self.video_task.take() {
623                    if !video_task.is_finished() {
624                        log::debug!("Wait for video task to be finished");
625                        video_task.await.expect("unable to await video task");
626                    }
627                }
628
629                log::debug!("Drop all active sinks");
630                self.sinks.lock().await.drain();
631
632                self.audio_mixer_task.abort();
633            });
634        });
635    }
636}
637
638/// Create a livekit token
639///
640/// # Errors
641///
642/// If the given strings are empty this function will fail
643pub fn create_token(
644    api_key: &str,
645    api_secret: &str,
646    room: &str,
647    name: &str,
648) -> Result<String, AccessTokenError> {
649    AccessToken::with_api_key(api_key, api_secret)
650        .with_identity(uuid::Uuid::new_v4().to_string().as_str())
651        .with_name(name)
652        .with_grants(VideoGrants {
653            room_join: true,
654            room: room.to_string(),
655            hidden: false,
656            recorder: true,
657            ..Default::default()
658        })
659        .to_jwt()
660}