#![allow(
clippy::cast_possible_truncation,
clippy::cast_precision_loss,
clippy::cast_possible_wrap,
clippy::cast_sign_loss,
reason = "M175: file streaming — read positions bounded by file_length (u64); chunk lengths bounded by chunk_size (u32 by construction)"
)]
use std::io;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, watch};
use irontide_core::Lengths;
use irontide_storage::Bitfield;
use crate::disk::{DiskHandle, DiskJobFlags};
pub struct FileStreamHandle {
pub(crate) disk: DiskHandle,
pub(crate) lengths: Lengths,
pub(crate) file_index: usize,
pub(crate) file_offset: u64,
pub(crate) file_length: u64,
pub(crate) cursor_tx: watch::Sender<u64>,
pub(crate) piece_ready_rx: broadcast::Receiver<u32>,
pub(crate) have: watch::Receiver<Bitfield>,
pub(crate) read_permit: OwnedSemaphorePermit,
}
impl std::fmt::Debug for FileStreamHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileStreamHandle")
.field("file_index", &self.file_index)
.field("file_offset", &self.file_offset)
.field("file_length", &self.file_length)
.finish_non_exhaustive()
}
}
pub(crate) struct StreamingCursor {
#[allow(dead_code)]
pub file_index: usize,
pub file_offset: u64,
pub cursor_piece: u32,
pub readahead_pieces: u32,
pub cursor_rx: watch::Receiver<u64>,
}
pub struct FileStream {
disk: DiskHandle,
lengths: Lengths,
#[allow(dead_code)]
file_index: usize,
file_offset: u64,
file_length: u64,
position: u64,
cursor_tx: watch::Sender<u64>,
piece_ready_rx: broadcast::Receiver<u32>,
have: watch::Receiver<Bitfield>,
pending_read:
Option<Pin<Box<dyn std::future::Future<Output = irontide_storage::Result<Bytes>> + Send>>>,
buffer: Bytes,
seek_result: Option<io::Result<u64>>,
_read_permit: OwnedSemaphorePermit,
}
impl FileStream {
#[must_use]
pub fn from_handle(h: FileStreamHandle) -> Self {
Self {
disk: h.disk,
lengths: h.lengths,
file_index: h.file_index,
file_offset: h.file_offset,
file_length: h.file_length,
position: 0,
cursor_tx: h.cursor_tx,
piece_ready_rx: h.piece_ready_rx,
have: h.have,
pending_read: None,
buffer: Bytes::new(),
seek_result: None,
_read_permit: h.read_permit,
}
}
pub fn file_length(&self) -> u64 {
self.file_length
}
pub fn position(&self) -> u64 {
self.position
}
fn current_piece_available(&self) -> bool {
let abs = self.file_offset + self.position;
if let Some(piece) = self.lengths.piece_index_for_byte(abs) {
let have = self.have.borrow();
have.get(piece)
} else {
false
}
}
fn remaining(&self) -> u64 {
self.file_length.saturating_sub(self.position)
}
}
impl AsyncRead for FileStream {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
if self.position >= self.file_length {
return Poll::Ready(Ok(()));
}
if !self.buffer.is_empty() {
let to_copy = self.buffer.len().min(buf.remaining());
let to_copy = to_copy.min(self.remaining() as usize);
buf.put_slice(&self.buffer[..to_copy]);
self.buffer = self.buffer.slice(to_copy..);
self.position += to_copy as u64;
let _ = self.cursor_tx.send(self.position);
return Poll::Ready(Ok(()));
}
if let Some(ref mut fut) = self.pending_read {
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(data)) => {
self.pending_read = None;
let to_copy = data.len().min(buf.remaining());
let to_copy = to_copy.min(self.remaining() as usize);
buf.put_slice(&data[..to_copy]);
if to_copy < data.len() {
self.buffer = data.slice(to_copy..);
}
self.position += to_copy as u64;
let _ = self.cursor_tx.send(self.position);
return Poll::Ready(Ok(()));
}
Poll::Ready(Err(e)) => {
self.pending_read = None;
return Poll::Ready(Err(io::Error::other(e.to_string())));
}
Poll::Pending => return Poll::Pending,
}
}
if !self.current_piece_available() {
let mut rx = self.piece_ready_rx.resubscribe();
let waker = cx.waker().clone();
tokio::spawn(async move {
let _ = rx.recv().await;
waker.wake();
});
return Poll::Pending;
}
let abs = self.file_offset + self.position;
let Some((piece, offset_in_piece)) = self.lengths.byte_to_piece_with_offset(abs) else {
return Poll::Ready(Ok(())); };
let piece_size = self.lengths.piece_size(piece);
let read_len = (piece_size - offset_in_piece)
.min(self.lengths.chunk_size())
.min(self.remaining() as u32);
let disk = self.disk.clone();
let fut = Box::pin(async move {
disk.read_chunk(piece, offset_in_piece, read_len, DiskJobFlags::SEQUENTIAL)
.await
});
self.pending_read = Some(fut);
let fut = self.pending_read.as_mut().unwrap();
match fut.as_mut().poll(cx) {
Poll::Ready(Ok(data)) => {
self.pending_read = None;
let to_copy = data.len().min(buf.remaining());
let to_copy = to_copy.min(self.remaining() as usize);
buf.put_slice(&data[..to_copy]);
if to_copy < data.len() {
self.buffer = data.slice(to_copy..);
}
self.position += to_copy as u64;
let _ = self.cursor_tx.send(self.position);
Poll::Ready(Ok(()))
}
Poll::Ready(Err(e)) => {
self.pending_read = None;
Poll::Ready(Err(io::Error::other(e.to_string())))
}
Poll::Pending => Poll::Pending,
}
}
}
impl AsyncSeek for FileStream {
fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
let new_pos = match pos {
io::SeekFrom::Start(n) => n as i64,
io::SeekFrom::End(n) => self.file_length as i64 + n,
io::SeekFrom::Current(n) => self.position as i64 + n,
};
if new_pos < 0 {
self.seek_result = Some(Err(io::Error::new(
io::ErrorKind::InvalidInput,
"seek to negative position",
)));
} else {
let new_pos = new_pos as u64;
self.position = new_pos;
self.buffer = Bytes::new();
self.pending_read = None;
let _ = self.cursor_tx.send(self.position);
self.seek_result = Some(Ok(new_pos));
}
Ok(())
}
fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
match self.seek_result.take() {
Some(result) => Poll::Ready(result),
None => Poll::Ready(Ok(self.position)),
}
}
}
pub(crate) fn stream_read_semaphore(max: usize) -> Arc<Semaphore> {
Arc::new(Semaphore::new(max))
}
#[cfg(test)]
mod tests {
use super::*;
fn test_lengths() -> Lengths {
Lengths::new(262_144, 65536, 16384)
}
fn full_bitfield(num_pieces: u32) -> Bitfield {
let mut bf = Bitfield::new(num_pieces);
for i in 0..num_pieces {
bf.set(i);
}
bf
}
#[test]
fn seek_updates_cursor() {
use tokio::io::AsyncSeek;
let (cursor_tx, mut cursor_rx) = watch::channel(0u64);
let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
let have_bf = full_bitfield(4);
let (have_tx, have_rx) = watch::channel(have_bf);
let _ = have_tx;
let sem = Arc::new(Semaphore::new(1));
let permit = sem.try_acquire_owned().unwrap();
let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
let handle = FileStreamHandle {
disk,
lengths: test_lengths(),
file_index: 0,
file_offset: 0,
file_length: 262_144,
cursor_tx,
piece_ready_rx: piece_rx,
have: have_rx,
read_permit: permit,
};
let mut stream = FileStream::from_handle(handle);
Pin::new(&mut stream)
.start_seek(io::SeekFrom::Start(100_000))
.unwrap();
assert!(cursor_rx.has_changed().unwrap());
assert_eq!(*cursor_rx.borrow_and_update(), 100_000);
assert_eq!(stream.position(), 100_000);
}
#[test]
fn seek_end_relative() {
use tokio::io::AsyncSeek;
let (cursor_tx, _cursor_rx) = watch::channel(0u64);
let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
let (have_tx, have_rx) = watch::channel(full_bitfield(4));
let _ = have_tx;
let sem = Arc::new(Semaphore::new(1));
let permit = sem.try_acquire_owned().unwrap();
let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
let handle = FileStreamHandle {
disk,
lengths: test_lengths(),
file_index: 0,
file_offset: 0,
file_length: 262_144,
cursor_tx,
piece_ready_rx: piece_rx,
have: have_rx,
read_permit: permit,
};
let mut stream = FileStream::from_handle(handle);
Pin::new(&mut stream)
.start_seek(io::SeekFrom::End(-1024))
.unwrap();
assert_eq!(stream.position(), 262_144 - 1024);
}
#[test]
fn seek_negative_errors() {
use tokio::io::AsyncSeek;
let (cursor_tx, _cursor_rx) = watch::channel(0u64);
let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
let (have_tx, have_rx) = watch::channel(full_bitfield(4));
let _ = have_tx;
let sem = Arc::new(Semaphore::new(1));
let permit = sem.try_acquire_owned().unwrap();
let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
let handle = FileStreamHandle {
disk,
lengths: test_lengths(),
file_index: 0,
file_offset: 0,
file_length: 262_144,
cursor_tx,
piece_ready_rx: piece_rx,
have: have_rx,
read_permit: permit,
};
let mut stream = FileStream::from_handle(handle);
Pin::new(&mut stream)
.start_seek(io::SeekFrom::Start(0))
.unwrap();
Pin::new(&mut stream)
.start_seek(io::SeekFrom::Current(-1))
.unwrap();
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let result = rt.block_on(async {
std::future::poll_fn(|cx| Pin::new(&mut stream).poll_complete(cx)).await
});
assert!(result.is_err());
}
#[tokio::test]
async fn eof_returns_zero_bytes() {
let (cursor_tx, _cursor_rx) = watch::channel(0u64);
let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
let (have_tx, have_rx) = watch::channel(full_bitfield(4));
let _ = have_tx;
let sem = Arc::new(Semaphore::new(1));
let permit = sem.try_acquire_owned().unwrap();
let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
let handle = FileStreamHandle {
disk,
lengths: test_lengths(),
file_index: 0,
file_offset: 0,
file_length: 262_144,
cursor_tx,
piece_ready_rx: piece_rx,
have: have_rx,
read_permit: permit,
};
let mut stream = FileStream::from_handle(handle);
stream.position = 262_144;
let mut buf = [0u8; 1024];
let mut read_buf = ReadBuf::new(&mut buf);
let result =
std::future::poll_fn(|cx| Pin::new(&mut stream).poll_read(cx, &mut read_buf)).await;
assert!(result.is_ok());
assert_eq!(read_buf.filled().len(), 0);
}
#[tokio::test]
async fn blocks_on_missing_piece_wakes_on_completion() {
let (cursor_tx, _cursor_rx) = watch::channel(0u64);
let (piece_tx, piece_rx) = broadcast::channel::<u32>(16);
let empty_bf = Bitfield::new(4);
let (have_tx, have_rx) = watch::channel(empty_bf);
let sem = Arc::new(Semaphore::new(1));
let permit = sem.try_acquire_owned().unwrap();
let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
let handle = FileStreamHandle {
disk,
lengths: test_lengths(),
file_index: 0,
file_offset: 0,
file_length: 262_144,
cursor_tx,
piece_ready_rx: piece_rx,
have: have_rx,
read_permit: permit,
};
let mut stream = FileStream::from_handle(handle);
let mut buf = [0u8; 1024];
let mut read_buf = ReadBuf::new(&mut buf);
let is_pending = std::future::poll_fn(|cx| {
let result = Pin::new(&mut stream).poll_read(cx, &mut read_buf);
match result {
Poll::Pending => Poll::Ready(true),
Poll::Ready(_) => Poll::Ready(false),
}
})
.await;
assert!(is_pending, "should be Pending when piece is missing");
let mut bf = Bitfield::new(4);
bf.set(0);
have_tx.send(bf).unwrap();
piece_tx.send(0).unwrap();
}
}