1#![allow(clippy::module_name_repetitions)]
6
7use std::sync::Mutex as StdMutex;
8use std::{collections::HashMap, sync::Arc, time::Instant};
9
10use anyhow::{bail, Context, Result};
11use audio::{audio_mixer_task, NativeAudioStreamSource, Silence};
12use audio_nodes::{AudioConvert, AudioMixer};
13use ezk::nodes::{Access, AccessHandle};
14use ezk_image::{ColorInfo, ColorPrimaries, ColorSpace, ColorTransfer, YuvColorInfo};
15use futures::StreamExt;
16use livekit::webrtc::prelude::I420Buffer;
17use livekit::{
18 prelude::*,
19 webrtc::{audio_stream::native::NativeAudioStream, video_stream::native::NativeVideoStream},
20};
21use livekit_api::access_token::{AccessToken, AccessTokenError, VideoGrants};
22use tokio::{
23 sync::{broadcast, mpsc, Mutex},
24 task::JoinHandle,
25};
26use video::{VideoPipeline, VideoStreamCommand};
27
28pub mod audio;
29pub mod font;
30pub mod image;
31pub mod sinks;
32pub mod video;
33
34#[cfg(feature = "gstreamer")]
35pub mod debug;
36#[cfg(feature = "gstreamer")]
37pub mod elements;
38#[cfg(feature = "gstreamer")]
39pub mod gst_with_context;
40#[cfg(feature = "gstreamer")]
41pub mod gstreamer;
42#[cfg(feature = "gstreamer")]
43pub mod pipeline_watched;
44
45#[cfg(feature = "gstreamer")]
46pub use gst_with_context::*;
47pub use sinks::*;
48
49pub use livekit;
50
51#[macro_use]
52extern crate log;
53
54pub use livekit::id::ParticipantIdentity;
55
56#[derive(Debug, Clone, serde::Deserialize)]
57pub struct ClockFormat(String);
58
59impl Default for ClockFormat {
60 fn default() -> Self {
61 Self(String::from("%x %X %Z"))
62 }
63}
64
65impl AsRef<str> for ClockFormat {
66 fn as_ref(&self) -> &str {
67 &self.0
68 }
69}
70
71pub const WIDTH: usize = 1920;
72pub const HEIGHT: usize = 1080;
73pub const FRAMES_PER_SECOND: usize = 25;
74
75pub(crate) const BORDER: usize = 4;
77
78pub const I420_COLOR: ColorInfo = ColorInfo::YUV(YuvColorInfo {
79 transfer: ColorTransfer::Linear,
80 primaries: ColorPrimaries::BT709,
81 space: ColorSpace::BT709,
82 full_range: false,
83});
84
85pub const PADDING: usize = 16;
86pub const OFFSET_TOP: usize = 40;
87
88pub struct Mixer {
89 #[cfg(feature = "gstreamer")]
90 start: Instant,
91
92 auto_subscribe: bool,
93
94 sinks: Arc<Mutex<HashMap<String, Box<dyn Sink>>>>,
95
96 room: Room,
97 room_events: mpsc::UnboundedReceiver<RoomEvent>,
99
100 http_client: reqwest::Client,
101
102 #[allow(clippy::type_complexity)]
103 external_room_event_handler: Vec<Box<dyn FnMut(&RoomEvent) + Send>>,
104
105 shared: Arc<StdMutex<Shared>>,
107
108 audio_mixer_handle: Arc<parking_lot::Mutex<AccessHandle<AudioMixer>>>,
110 audio_mixer_task: JoinHandle<()>,
111
112 video_stream_tx: mpsc::UnboundedSender<VideoStreamCommand>,
114 video_task: Option<JoinHandle<()>>,
115
116 shutdown_tx: broadcast::Sender<()>,
117}
118
119#[derive(Debug, Clone)]
120struct SpeakingState {
121 is_speaking: bool,
122 last_event: Instant,
123}
124
125#[derive(Debug)]
126struct Shared {
127 participants: HashMap<ParticipantIdentity, Participant>,
128 speakers: HashMap<ParticipantIdentity, SpeakingState>,
129
130 clock_format: ClockFormat,
131 event_title: Option<String>,
132
133 render_frames: bool,
134}
135
136impl std::fmt::Debug for Mixer {
138 fn fmt(&self, _f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139 Ok(())
140 }
141}
142
143#[derive(Debug)]
144pub(crate) struct Participant {
145 display_name: String,
146 avatar: Option<I420Buffer>,
148}
149
150pub struct MixerParameters {
151 pub auto_subscribe: bool,
152 pub clock_format: ClockFormat,
153 pub livekit_url: String,
154 pub livekit_token: String,
155 pub target_fps: u16,
156}
157
158impl Mixer {
159 #[allow(clippy::missing_errors_doc)]
161 #[allow(clippy::field_reassign_with_default)]
163 pub async fn new(parameters: MixerParameters) -> Result<Self> {
164 #[cfg(feature = "gstreamer")]
165 {
166 use anyhow::Context;
167
168 elements::register_all().context("Unable to register all custom GStreamer Elements")?;
169 }
170
171 let mut room_options = RoomOptions::default();
172 room_options.auto_subscribe = false;
173
174 let (room, room_events) = Room::connect(
175 ¶meters.livekit_url,
176 ¶meters.livekit_token,
177 room_options,
178 )
179 .await?;
180
181 let shared = Arc::new(StdMutex::new(Shared {
182 participants: HashMap::default(),
183 speakers: HashMap::new(),
184 clock_format: parameters.clock_format,
185 event_title: None,
186 render_frames: true,
187 }));
188
189 #[cfg(feature = "gstreamer")]
190 let start = Instant::now();
191
192 let (access, audio_mixer_handle) =
194 Access::new(AudioMixer::new(AudioConvert::new(Silence::default())));
195 let audio_mixer_handle = Arc::new(parking_lot::Mutex::new(audio_mixer_handle));
196 let sinks = Arc::new(Mutex::new(HashMap::default()));
197 let audio_mixer_task = tokio::spawn(audio_mixer_task(access, sinks.clone()));
198
199 let (shutdown_tx, shutdown_rx) = broadcast::channel(1);
201 let (video_stream_tx, video_task) = VideoPipeline::create(
202 sinks.clone(),
203 shared.clone(),
204 shutdown_rx,
205 parameters.target_fps,
206 )
207 .context("Failed to create video pipeline")?;
208
209 let mixer = Mixer {
210 #[cfg(feature = "gstreamer")]
211 start,
212 auto_subscribe: parameters.auto_subscribe,
213 sinks,
214 room,
215 room_events,
216 http_client: reqwest::Client::new(),
217 external_room_event_handler: vec![],
218 shared,
219 audio_mixer_handle,
220 audio_mixer_task,
221 video_stream_tx,
222 video_task: Some(video_task),
223 shutdown_tx,
224 };
225
226 Ok(mixer)
227 }
228
229 pub fn add_livekit_event_handler<F>(&mut self, event_handler: F)
230 where
231 F: FnMut(&RoomEvent) + Send + 'static,
232 {
233 self.external_room_event_handler
234 .push(Box::new(event_handler));
235 }
236
237 #[must_use]
238 pub fn local_participant(&self) -> LocalParticipant {
239 self.room.local_participant()
240 }
241
242 pub async fn run(&mut self) -> DisconnectReason {
248 if self.auto_subscribe {
249 for participant in self.room.remote_participants().values() {
250 self.add_participant(&participant.identity(), participant.name());
251 }
252 }
253
254 while let Some(event) = self.room_events.recv().await {
255 for handler in &mut self.external_room_event_handler {
256 handler(&event);
257 }
258
259 if let Some(disconnect_reason) = self.handle_livekit_event(event) {
260 return disconnect_reason;
261 }
262 }
263
264 DisconnectReason::UnknownReason
265 }
266
267 pub fn set_target_fps(&mut self, target_fps: u16) {
273 self.video_stream_tx
274 .send(VideoStreamCommand::SetTargetFps(target_fps))
275 .expect("unable to send set target fps event");
276 }
277
278 fn handle_livekit_event(&mut self, event: RoomEvent) -> Option<DisconnectReason> {
279 log::debug!("LiveKit event received: {event:?}");
280 match event {
281 RoomEvent::TrackSubscribed {
282 track,
283 publication: _,
284 participant,
285 } => match track {
286 RemoteTrack::Audio(audio_track) => {
287 self.add_audio_track(participant.identity(), audio_track);
288 }
289 RemoteTrack::Video(video_track) => {
290 self.add_video_track(participant, video_track);
291 }
292 },
293 RoomEvent::TrackUnsubscribed {
294 track,
295 publication: _,
296 participant: _,
297 } => {
298 self.video_stream_tx
299 .send(VideoStreamCommand::RemoveTrack(track.sid()))
300 .expect("unable to send video stream remove track event");
301 }
302 RoomEvent::ActiveSpeakersChanged { speakers } => {
303 self.handle_active_speakers_changed(speakers);
304 }
305 RoomEvent::TrackMuted {
306 participant: _,
307 publication,
308 } => {
309 self.video_stream_tx
310 .send(VideoStreamCommand::Mute(publication.sid()))
311 .expect("unable to send video stream mute event");
312 }
313 RoomEvent::TrackUnmuted {
314 participant: _,
315 publication,
316 } => {
317 self.video_stream_tx
318 .send(VideoStreamCommand::Unmute(publication.sid()))
319 .expect("unable to send video stream unmute event");
320 }
321 RoomEvent::TrackPublished {
322 publication,
323 participant,
324 } => {
325 let shared = self.shared.lock().unwrap();
326
327 if !shared.render_frames && matches!(publication.kind(), TrackKind::Video) {
328 return None;
330 }
331
332 if shared.participants.contains_key(&participant.identity()) {
333 publication.set_subscribed(true);
334 }
335 }
336 RoomEvent::TrackUnpublished {
337 publication,
338 participant: _,
339 } => {
340 let track = publication.track()?;
341
342 self.video_stream_tx
343 .send(VideoStreamCommand::RemoveTrack(track.sid()))
344 .expect("unable to send video stream remove track event");
345 }
346 RoomEvent::ParticipantConnected(participant) => {
347 if self.auto_subscribe {
348 self.add_participant(&participant.identity(), participant.name());
349 }
350 }
351 RoomEvent::ParticipantDisconnected(participant) => {
352 self.remove_participant(&participant.identity());
353 }
354 RoomEvent::Disconnected { reason } => return Some(reason.into()),
355 _ => {}
356 }
357
358 None
359 }
360
361 fn handle_active_speakers_changed(&mut self, speakers: Vec<livekit::participant::Participant>) {
367 let shared = &mut *self.shared.lock().unwrap();
368
369 for state in shared.speakers.values_mut() {
370 state.is_speaking = false;
371 }
372
373 for participant in speakers {
374 shared.speakers.insert(
375 participant.identity(),
376 SpeakingState {
377 is_speaking: true,
378 last_event: Instant::now(),
379 },
380 );
381 }
382 }
383
384 #[allow(clippy::missing_errors_doc)]
386 #[cfg(feature = "gstreamer")]
387 pub async fn link_gstreamer_sink(
388 &mut self,
389 name: &str,
390 sink: impl gstreamer::GStreamerSink,
391 ) -> Result<()> {
392 trace!("link sink, name: {name}, sink: {sink:?}");
393
394 let mut sinks = self.sinks.lock().await;
395 if sinks.contains_key(name) {
396 bail!("a stream with the name '{name}' already exists");
397 }
398
399 let active_sink = gstreamer::GStreamerActiveSink::new(self.start, name, sink)?;
400
401 sinks.insert(name.to_owned(), Box::new(active_sink));
402
403 Ok(())
404 }
405
406 #[allow(clippy::missing_errors_doc)]
408 pub async fn link_sink(&mut self, name: &str, sink: Box<dyn Sink>) -> Result<()> {
409 trace!("link sink, name: {name}, sink: {sink:?}");
410
411 let mut sinks = self.sinks.lock().await;
412 if sinks.contains_key(name) {
413 bail!("a stream with the name '{name}' already exists");
414 }
415
416 sinks.insert(name.to_owned(), sink);
417
418 Ok(())
419 }
420
421 pub async fn release_sink(&mut self, name: &String) {
422 trace!("release_sink {name}");
423 self.sinks.lock().await.remove(name);
424 }
425
426 pub fn set_event_title(&mut self, title: String) {
432 self.shared.lock().unwrap().event_title = Some(title);
433 }
434
435 fn add_audio_track(
436 &mut self,
437 participant_identity: ParticipantIdentity,
438 audio_track: RemoteAudioTrack,
439 ) {
440 let rtc_track = audio_track.rtc_track();
441 self.audio_mixer_handle
442 .lock()
443 .access_no_wait(move |audio_mixer| {
444 audio_mixer.add_source(AudioConvert::new(NativeAudioStreamSource {
445 stream: NativeAudioStream::new(rtc_track, 48_000, 2),
446 timestamp: 0,
447 }));
448 });
449
450 self.video_stream_tx
451 .send(VideoStreamCommand::AddAudioTrack((
452 participant_identity,
453 audio_track,
454 )))
455 .expect("unable to send add event to video_stream_tx");
456 }
457
458 fn add_video_track(&mut self, participant: RemoteParticipant, video_track: RemoteVideoTrack) {
459 self.video_stream_tx
460 .send(VideoStreamCommand::AddVideoTrack((
461 participant.identity(),
462 video_track.clone(),
463 Box::pin(
464 NativeVideoStream::new(video_track.rtc_track()).map(move |frame| {
465 (
466 participant.identity(),
467 video_track.sid(),
468 frame.buffer.to_i420(),
469 )
470 }),
471 ),
472 )))
473 .expect("unable to send add event to video_stream_tx");
474 }
475
476 pub fn add_participant(&mut self, identity: &ParticipantIdentity, display_name: String) {
482 log::debug!("Add participant {identity:?}");
483
484 self.shared
485 .lock()
486 .unwrap()
487 .participants
488 .entry(identity.clone())
489 .and_modify(|p| p.display_name.clone_from(&display_name))
490 .or_insert_with(|| Participant {
491 display_name,
492 avatar: None,
493 });
494
495 if let Some(remote_participant) = self.room.remote_participants().get(identity) {
496 for (_track_sid, track_publication) in remote_participant.track_publications() {
497 track_publication.set_subscribed(true);
498 }
499 }
500 }
501
502 pub fn set_participant_avatar_url(
510 &mut self,
511 identity: &ParticipantIdentity,
512 avatar_url: String,
513 ) {
514 async fn load_avatar_from_url(
515 http_client: reqwest::Client,
516 shared: Arc<StdMutex<Shared>>,
517 identity: ParticipantIdentity,
518 avatar_url: &str,
519 ) -> Result<()> {
520 let response = http_client
521 .get(avatar_url)
522 .send()
523 .await
524 .context("Failed to send a HTTP request")?;
525
526 if !response.status().is_success() {
527 bail!("Got unexpected {} response", response.status());
528 }
529
530 let bytes = response
531 .bytes()
532 .await
533 .context("Failed to receive HTTP response")?;
534
535 let avatar = ::image::load_from_memory(&bytes)
536 .context("Failed to load image from HTTP response")?;
537
538 let mut shared = shared.lock().unwrap();
539
540 if let Some(participant) = shared.participants.get_mut(&identity) {
541 participant.avatar = Some(
542 video::placeholder::avatar_to_placeholder(&avatar)
543 .context("Failed to convert received avatar to I420Buffer")?,
544 );
545 }
546
547 Ok(())
548 }
549
550 if !self
552 .shared
553 .lock()
554 .unwrap()
555 .participants
556 .contains_key(identity)
557 {
558 return;
559 }
560
561 let http_client = self.http_client.clone();
562 let shared = self.shared.clone();
563 let identity = identity.clone();
564
565 tokio::spawn(async move {
566 if let Err(e) = load_avatar_from_url(http_client, shared, identity, &avatar_url).await {
567 log::warn!("Failed to fetch avatar from url={avatar_url}, error={e:?}");
568 }
569 });
570 }
571
572 pub fn remove_participant(&mut self, identity: &ParticipantIdentity) {
576 log::debug!("Remove participant {identity:?}");
577
578 self.video_stream_tx
579 .send(VideoStreamCommand::RemoveParticipant(identity.to_owned()))
580 .expect("unable to send add remove event to video_stream_tx");
581 }
582
583 pub fn set_video_support(&mut self, enabled: bool) {
589 let mut shared = self.shared.lock().unwrap();
590
591 shared.render_frames = enabled;
592
593 for (pid, remote_participant) in self.room.remote_participants() {
595 if !shared.participants.contains_key(&pid) {
596 continue;
598 }
599
600 for publication in remote_participant.track_publications().into_values() {
601 if matches!(publication.kind(), TrackKind::Video) {
602 publication.set_subscribed(enabled);
603 }
604 }
605 }
606 }
607}
608
609impl Drop for Mixer {
610 fn drop(&mut self) {
611 log::debug!("Drop Mixer");
612
613 tokio::task::block_in_place(move || {
614 tokio::runtime::Handle::current().block_on(async move {
615 if let Err(e) = self.room.close().await {
616 log::warn!("Failed to close livekit room, {e:?}");
617 }
618
619 log::debug!("Send shutdown to all tasks");
620 self.shutdown_tx.send(()).ok();
621
622 if let Some(video_task) = self.video_task.take() {
623 if !video_task.is_finished() {
624 log::debug!("Wait for video task to be finished");
625 video_task.await.expect("unable to await video task");
626 }
627 }
628
629 log::debug!("Drop all active sinks");
630 self.sinks.lock().await.drain();
631
632 self.audio_mixer_task.abort();
633 });
634 });
635 }
636}
637
638pub fn create_token(
644 api_key: &str,
645 api_secret: &str,
646 room: &str,
647 name: &str,
648) -> Result<String, AccessTokenError> {
649 AccessToken::with_api_key(api_key, api_secret)
650 .with_identity(uuid::Uuid::new_v4().to_string().as_str())
651 .with_name(name)
652 .with_grants(VideoGrants {
653 room_join: true,
654 room: room.to_string(),
655 hidden: false,
656 recorder: true,
657 ..Default::default()
658 })
659 .to_jwt()
660}