use std::io::{Read, Seek};
use std::sync::atomic::AtomicBool;
use flume::{Receiver, Sender, bounded};
use log::debug;
use rangemap::RangeSet;
use switchy_http::Client;
use symphonia::core::io::MediaSource;
pub static IS_STREAM_BUFFERING: AtomicBool = AtomicBool::new(false);
const CHUNK_SIZE: usize = 1024 * 128;
const FETCH_OFFSET: usize = CHUNK_SIZE / 2;
pub struct StreamableFileAsync {
url: String,
buffer: Vec<u8>,
read_position: usize,
downloaded: RangeSet<usize>,
requested: RangeSet<usize>,
#[allow(clippy::type_complexity)]
receivers: Vec<(u128, Receiver<(usize, Vec<u8>)>)>,
}
impl StreamableFileAsync {
#[must_use]
pub async fn new(url: String) -> Self {
let mut res = Client::new().head(&url).send().await.unwrap();
let header = res.headers().get("Content-Length").unwrap();
let size: usize = header.parse().unwrap();
Self {
url,
buffer: vec![0; size],
read_position: 0,
downloaded: RangeSet::new(),
requested: RangeSet::new(),
receivers: Vec::new(),
}
}
async fn read_chunk(tx: Sender<(usize, Vec<u8>)>, url: String, start: usize, file_size: usize) {
let end = (start + CHUNK_SIZE).min(file_size);
let chunk = Client::new()
.get(&url)
.header(
switchy_http::Header::Range.as_ref(),
&format!("bytes={start}-{end}"),
)
.send()
.await
.unwrap()
.bytes()
.await
.unwrap()
.to_vec();
tx.send_async((start, chunk)).await.unwrap();
}
fn try_write_chunk(&mut self, should_buffer: bool) {
let mut completed_downloads = Vec::new();
for (id, rx) in &self.receivers {
let result = if self.downloaded.is_empty() || should_buffer {
rx.recv().ok()
} else {
rx.try_recv().ok()
};
match result {
None => (),
Some((position, chunk)) => {
let end = (position + chunk.len()).min(self.buffer.len());
if position != end {
self.buffer[position..end].copy_from_slice(chunk.as_slice());
self.downloaded.insert(position..end);
}
completed_downloads.push(*id);
}
}
}
self.receivers
.retain(|(id, _)| !completed_downloads.contains(id));
}
fn should_get_chunk(&self, buf_len: usize) -> (bool, usize) {
let closest_range = self.downloaded.get(&self.read_position);
if closest_range.is_none() {
return (true, self.read_position);
}
let closest_range = closest_range.unwrap();
let is_already_downloading = self.requested.contains(&(self.read_position + CHUNK_SIZE));
let should_get_chunk = self.read_position + buf_len >= closest_range.end - FETCH_OFFSET
&& !is_already_downloading
&& closest_range.end != self.buffer.len();
(should_get_chunk, closest_range.end)
}
}
#[cfg_attr(feature = "profiling", profiling::all_functions)]
impl Read for StreamableFileAsync {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
if self.read_position >= self.buffer.len() {
return Ok(0);
}
let read_max = (self.read_position + buf.len()).min(self.buffer.len());
let (should_get_chunk, chunk_write_pos) = self.should_get_chunk(buf.len());
debug!(
"Read: read_pos[{}] read_max[{read_max}] buf[{}] write_pos[{chunk_write_pos}] download[{should_get_chunk}]",
self.read_position,
buf.len()
);
if should_get_chunk {
#[allow(clippy::range_plus_one)]
self.requested
.insert(chunk_write_pos..chunk_write_pos + CHUNK_SIZE + 1);
let url = self.url.clone();
let file_size = self.buffer.len();
let (tx, rx) = bounded(1);
let id = switchy_time::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
self.receivers.push((id, rx));
moosicbox_task::spawn(
"audio_decoder: StreamableFileAsync read_chunk",
async move {
Self::read_chunk(tx, url, chunk_write_pos, file_size).await;
},
);
}
let should_buffer = !self.downloaded.contains(&self.read_position);
IS_STREAM_BUFFERING.store(should_buffer, std::sync::atomic::Ordering::SeqCst);
self.try_write_chunk(should_buffer);
let bytes = &self.buffer[self.read_position..read_max];
buf[0..bytes.len()].copy_from_slice(bytes);
self.read_position += bytes.len();
Ok(bytes.len())
}
}
#[cfg_attr(feature = "profiling", profiling::all_functions)]
impl Seek for StreamableFileAsync {
fn seek(&mut self, pos: std::io::SeekFrom) -> std::io::Result<u64> {
let seek_position: usize = match pos {
#[allow(clippy::cast_possible_truncation)]
std::io::SeekFrom::Start(pos) => pos as usize,
std::io::SeekFrom::Current(pos) => {
#[allow(clippy::cast_possible_wrap)]
let pos = self.read_position as i64 + pos;
pos.try_into().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Invalid seek: {pos}"),
)
})?
}
std::io::SeekFrom::End(pos) => {
#[allow(clippy::cast_possible_wrap)]
let pos = self.buffer.len() as i64 + pos;
pos.try_into().map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Invalid seek: {pos}"),
)
})?
}
};
if seek_position > self.buffer.len() {
debug!("Seek position {seek_position} > file size");
return Ok(self.read_position as u64);
}
debug!("Seeking: pos[{seek_position}] type[{pos:?}]");
self.read_position = seek_position;
Ok(seek_position as u64)
}
}
impl MediaSource for StreamableFileAsync {
fn is_seekable(&self) -> bool {
true
}
fn byte_len(&self) -> Option<u64> {
Some(self.buffer.len() as u64)
}
}