use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use livekit::prelude::{DataTrackFrame, LocalDataTrack, LocalParticipant, PublishError};
use tokio::runtime::Handle;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use tracing::{debug, error};
use crate::{ChannelId, Metadata};
const FRAME_HEADER_SIZE: usize = 8;
pub(crate) struct DataTrack {
track: Arc<OnceLock<LocalDataTrack>>,
close: CancellationToken,
task: Option<JoinHandle<()>>,
sequence: AtomicU32,
drop_throttler: parking_lot::Mutex<crate::throttler::Throttler>,
}
impl DataTrack {
pub fn publish(
runtime: &Handle,
local_participant: LocalParticipant,
channel_id: ChannelId,
session_cancel: CancellationToken,
) -> Self {
let track = Arc::new(OnceLock::new());
let track_clone = Arc::clone(&track);
let close = CancellationToken::new();
let close_clone = close.clone();
let name = format!("data-ch-{}", u64::from(channel_id));
let task = runtime.spawn(async move {
const INITIAL_BACKOFF: Duration = Duration::from_millis(100);
const MAX_BACKOFF: Duration = Duration::from_secs(3);
let mut backoff = INITIAL_BACKOFF;
loop {
if close_clone.is_cancelled() {
return;
}
let result = tokio::select! {
() = session_cancel.cancelled() => return,
result = local_participant.publish_data_track(name.clone()) => result,
};
match result {
Ok(published) => {
track_clone.set(published).ok();
debug!("data track {name} published");
return;
}
Err(PublishError::DuplicateName) => {
debug!(
"data track {name} still being unpublished at SFU, \
retrying in {backoff:?}"
);
}
Err(e) => {
error!(
"failed to publish data track {name}: {e:?}, \
retrying in {backoff:?}"
);
}
}
tokio::select! {
() = close_clone.cancelled() => return,
() = session_cancel.cancelled() => return,
() = tokio::time::sleep(backoff) => {}
}
backoff = (backoff * 2).min(MAX_BACKOFF);
}
});
Self {
track,
close,
task: Some(task),
sequence: AtomicU32::new(0),
drop_throttler: parking_lot::Mutex::new(crate::throttler::Throttler::new(
Duration::from_secs(30),
)),
}
}
pub fn log(&self, channel_id: ChannelId, msg: &[u8], metadata: &Metadata) {
let Some(track) = self.track.get() else {
if self.drop_throttler.lock().try_acquire() {
debug!("data track not ready, dropping message for channel {channel_id:?}");
}
return;
};
let seq = self.sequence.fetch_add(1, Ordering::Relaxed);
let mut payload = Vec::with_capacity(FRAME_HEADER_SIZE + msg.len());
payload.extend_from_slice(&0u16.to_le_bytes());
payload.extend_from_slice(&(FRAME_HEADER_SIZE as u16).to_le_bytes()); payload.extend_from_slice(&seq.to_le_bytes());
payload.extend_from_slice(msg);
let frame = DataTrackFrame::new(payload).with_user_timestamp(metadata.log_time);
if let Err(e) = track.try_push(frame) {
if self.drop_throttler.lock().try_acquire() {
debug!("data track message dropped for channel {channel_id:?}: {e:?}");
}
}
}
pub async fn close(&mut self) {
self.close.cancel();
if let Some(task) = self.task.take() {
_ = task.await;
}
if let Some(track) = self.track.get() {
debug!("unpublishing data track {}", track.info().name());
track.unpublish();
}
}
}
impl Drop for DataTrack {
fn drop(&mut self) {
self.close.cancel();
}
}