use std::{
io::{Read, Seek},
sync::Arc,
time::Duration,
};
use bytes::Bytes;
use switchy_async::sync::Mutex;
use symphonia::core::io::MediaSource;
use tokio_stream::StreamExt as _;
use super::track::TrackBytes;
pub struct TrackBytesMediaSource {
pub id: usize,
inner: Arc<Mutex<TrackBytes>>,
started: bool,
finished: bool,
buf: Vec<u8>,
sender: flume::Sender<Bytes>,
receiver: flume::Receiver<Bytes>,
}
impl TrackBytesMediaSource {
#[must_use]
pub fn new(track_bytes: TrackBytes) -> Self {
let (sender, receiver) = flume::unbounded();
Self {
id: track_bytes.id,
inner: Arc::new(Mutex::new(track_bytes)),
started: false,
finished: false,
buf: vec![],
sender,
receiver,
}
}
fn start_listening(&self) {
let bytes = self.inner.clone();
let sender = self.sender.clone();
let id = self.id;
switchy_async::runtime::Handle::current().spawn_with_name(
"files: TrackBytesMediaSource",
async move {
log::trace!("Starting stream listen for track bytes for writer id={id}");
loop {
log::trace!("Acquiring lock for inner bytes for writer id={id}");
let mut bytes = bytes.lock().await;
log::trace!("Acquired lock for inner bytes for writer id={id}");
switchy_async::select!(
() = switchy_async::time::sleep(Duration::from_secs(15)) => {
moosicbox_assert::die_or_error!(
"Timed out waiting for bytes from stream for writer id={id}"
);
break;
}
response = bytes.stream.next() => {
match response {
Some(Ok(Ok(bytes))) => {
log::trace!("Sending {} bytes to writer id={id}", bytes.len());
if sender.send_async(bytes).await.is_err() {
log::debug!("Receiver has dropped for writer id={id}");
break;
}
}
None => {
log::trace!("Sending empty bytes to writer id={id}");
if sender.send_async(Bytes::new()).await.is_err() {
log::debug!("Receiver has dropped for writer id={id}");
}
break;
}
Some(Err(err) | Ok(Err(err))) => {
moosicbox_assert::die_or_error!(
"Byte stream returned error: writer id={id} {err:?}"
);
}
}
}
);
drop(bytes);
}
},
);
}
}
impl Seek for TrackBytesMediaSource {
fn seek(&mut self, _pos: std::io::SeekFrom) -> std::io::Result<u64> {
moosicbox_assert::assert!(false, "Seeking is not allowed for writer id={}", self.id);
panic!("Seeking is not allowed for writer id={}", self.id)
}
}
impl Read for TrackBytesMediaSource {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.finished {
return Ok(0);
}
if !self.started {
self.started = true;
self.start_listening();
}
if !self.buf.is_empty() {
let end = std::cmp::min(buf.len(), self.buf.len());
buf[..end].copy_from_slice(&self.buf.drain(..end).collect::<Vec<_>>());
return Ok(end);
}
let bytes = self
.receiver
.recv_timeout(Duration::from_secs(15))
.map_err(|e| {
moosicbox_assert::die_or_error!(
"Timed out waiting for bytes buf.len={} self.buf.len={} writer id={}: {e:?}",
buf.len(),
self.buf.len(),
self.id,
);
std::io::Error::new(std::io::ErrorKind::TimedOut, e)
})?;
let end = std::cmp::min(buf.len(), bytes.len());
buf[..end].copy_from_slice(&bytes[..end]);
self.buf.extend_from_slice(&bytes[end..]);
log::trace!(
"TrackBytesMediaSource::read end={end} bytes.len={} buf.len={} self.buf.len={} writer id={}",
bytes.len(),
buf.len(),
self.buf.len(),
self.id,
);
if bytes.is_empty() && self.buf.is_empty() {
self.finished = true;
}
Ok(end)
}
}
impl Drop for TrackBytesMediaSource {
fn drop(&mut self) {
log::debug!("Dropping TrackBytesMediaSource writer id={}", self.id);
}
}
impl MediaSource for TrackBytesMediaSource {
fn is_seekable(&self) -> bool {
false
}
fn byte_len(&self) -> Option<u64> {
None
}
}