Skip to main content

irontide_session/
streaming.rs

1//! File streaming — AsyncRead + AsyncSeek over individual torrent files.
2//!
3//! Provides [`FileStream`], which lets consumers read a file within a torrent
4//! as if it were a regular seekable byte stream. The stream blocks on pieces
5//! that haven't been downloaded yet and resumes automatically when the piece
6//! arrives.
7
8use std::io;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13use bytes::Bytes;
14use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
15use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, watch};
16
17use irontide_core::Lengths;
18use irontide_storage::Bitfield;
19
20use crate::disk::{DiskHandle, DiskJobFlags};
21
22/// Internal handle passed from the TorrentActor to construct a [`FileStream`].
23pub struct FileStreamHandle {
24    pub(crate) disk: DiskHandle,
25    pub(crate) lengths: Lengths,
26    pub(crate) file_index: usize,
27    pub(crate) file_offset: u64,
28    pub(crate) file_length: u64,
29    pub(crate) cursor_tx: watch::Sender<u64>,
30    pub(crate) piece_ready_rx: broadcast::Receiver<u32>,
31    pub(crate) have: watch::Receiver<Bitfield>,
32    pub(crate) read_permit: OwnedSemaphorePermit,
33}
34
35impl std::fmt::Debug for FileStreamHandle {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("FileStreamHandle")
38            .field("file_index", &self.file_index)
39            .field("file_offset", &self.file_offset)
40            .field("file_length", &self.file_length)
41            .finish_non_exhaustive()
42    }
43}
44
45/// Streaming cursor tracked by the TorrentActor.
46///
47/// The actor polls this to update streaming piece priorities.
48pub(crate) struct StreamingCursor {
49    #[allow(dead_code)]
50    pub file_index: usize,
51    pub file_offset: u64,
52    pub cursor_piece: u32,
53    pub readahead_pieces: u32,
54    pub cursor_rx: watch::Receiver<u64>,
55}
56
57/// Async reader/seeker over a single file within a torrent.
58///
59/// Created via [`crate::TorrentHandle::open_file()`]. Implements [`AsyncRead`] and
60/// [`AsyncSeek`], blocking on pieces that haven't been downloaded yet.
61///
62/// The stream updates a cursor position that the TorrentActor uses to
63/// prioritise downloading the pieces around the read head.
64pub struct FileStream {
65    disk: DiskHandle,
66    lengths: Lengths,
67    #[allow(dead_code)]
68    file_index: usize,
69    /// Absolute byte offset of the file within the torrent data.
70    file_offset: u64,
71    /// Length of this file in bytes.
72    file_length: u64,
73    /// Current read position relative to start of file.
74    position: u64,
75    /// Cursor channel to notify actor of read position changes.
76    cursor_tx: watch::Sender<u64>,
77    /// Broadcast receiver for piece completion notifications.
78    piece_ready_rx: broadcast::Receiver<u32>,
79    /// Watch receiver for the have-bitfield.
80    have: watch::Receiver<Bitfield>,
81    /// In-progress read future (piece, begin, length).
82    pending_read:
83        Option<Pin<Box<dyn std::future::Future<Output = irontide_storage::Result<Bytes>> + Send>>>,
84    /// Buffered data from the last disk read (partially consumed).
85    buffer: Bytes,
86    /// Pending seek result (set by start_seek, consumed by poll_complete).
87    seek_result: Option<io::Result<u64>>,
88    /// Semaphore permit — held for the lifetime of the stream.
89    _read_permit: OwnedSemaphorePermit,
90}
91
92impl FileStream {
93    /// Construct a `FileStream` from an actor-provided handle.
94    pub fn from_handle(h: FileStreamHandle) -> Self {
95        Self {
96            disk: h.disk,
97            lengths: h.lengths,
98            file_index: h.file_index,
99            file_offset: h.file_offset,
100            file_length: h.file_length,
101            position: 0,
102            cursor_tx: h.cursor_tx,
103            piece_ready_rx: h.piece_ready_rx,
104            have: h.have,
105            pending_read: None,
106            buffer: Bytes::new(),
107            seek_result: None,
108            _read_permit: h.read_permit,
109        }
110    }
111
112    /// File length in bytes.
113    pub fn file_length(&self) -> u64 {
114        self.file_length
115    }
116
117    /// Current read position within the file.
118    pub fn position(&self) -> u64 {
119        self.position
120    }
121
122    /// Check whether the piece containing the current read position is available.
123    fn current_piece_available(&self) -> bool {
124        let abs = self.file_offset + self.position;
125        if let Some((piece, _)) = self.lengths.byte_to_piece(abs) {
126            let have = self.have.borrow();
127            have.get(piece)
128        } else {
129            false
130        }
131    }
132
133    /// How many bytes remain from `position` to end-of-file.
134    fn remaining(&self) -> u64 {
135        self.file_length.saturating_sub(self.position)
136    }
137}
138
139impl AsyncRead for FileStream {
140    fn poll_read(
141        mut self: Pin<&mut Self>,
142        cx: &mut Context<'_>,
143        buf: &mut ReadBuf<'_>,
144    ) -> Poll<io::Result<()>> {
145        // EOF check
146        if self.position >= self.file_length {
147            return Poll::Ready(Ok(()));
148        }
149
150        // If we have buffered data, drain it first.
151        if !self.buffer.is_empty() {
152            let to_copy = self.buffer.len().min(buf.remaining());
153            let to_copy = to_copy.min(self.remaining() as usize);
154            buf.put_slice(&self.buffer[..to_copy]);
155            self.buffer = self.buffer.slice(to_copy..);
156            self.position += to_copy as u64;
157            let _ = self.cursor_tx.send(self.position);
158            return Poll::Ready(Ok(()));
159        }
160
161        // If we have a pending disk read, poll it.
162        if let Some(ref mut fut) = self.pending_read {
163            match fut.as_mut().poll(cx) {
164                Poll::Ready(Ok(data)) => {
165                    self.pending_read = None;
166                    let to_copy = data.len().min(buf.remaining());
167                    let to_copy = to_copy.min(self.remaining() as usize);
168                    buf.put_slice(&data[..to_copy]);
169                    if to_copy < data.len() {
170                        self.buffer = data.slice(to_copy..);
171                    }
172                    self.position += to_copy as u64;
173                    let _ = self.cursor_tx.send(self.position);
174                    return Poll::Ready(Ok(()));
175                }
176                Poll::Ready(Err(e)) => {
177                    self.pending_read = None;
178                    return Poll::Ready(Err(io::Error::other(e.to_string())));
179                }
180                Poll::Pending => return Poll::Pending,
181            }
182        }
183
184        // Check if the current piece is available.
185        if !self.current_piece_available() {
186            // Register waker with piece_ready_rx: when any piece completes,
187            // we wake and re-check.
188            let mut rx = self.piece_ready_rx.resubscribe();
189            let waker = cx.waker().clone();
190            tokio::spawn(async move {
191                let _ = rx.recv().await;
192                waker.wake();
193            });
194            return Poll::Pending;
195        }
196
197        // Piece is available — issue a disk read.
198        let abs = self.file_offset + self.position;
199        let Some((piece, offset_in_piece)) = self.lengths.byte_to_piece(abs) else {
200            return Poll::Ready(Ok(())); // past end
201        };
202
203        // Read one chunk from the piece at the current offset.
204        let piece_size = self.lengths.piece_size(piece);
205        let read_len = (piece_size - offset_in_piece)
206            .min(self.lengths.chunk_size())
207            .min(self.remaining() as u32);
208
209        let disk = self.disk.clone();
210        let fut = Box::pin(async move {
211            disk.read_chunk(piece, offset_in_piece, read_len, DiskJobFlags::SEQUENTIAL)
212                .await
213        });
214        self.pending_read = Some(fut);
215
216        // Poll the newly created future immediately.
217        let fut = self.pending_read.as_mut().unwrap();
218        match fut.as_mut().poll(cx) {
219            Poll::Ready(Ok(data)) => {
220                self.pending_read = None;
221                let to_copy = data.len().min(buf.remaining());
222                let to_copy = to_copy.min(self.remaining() as usize);
223                buf.put_slice(&data[..to_copy]);
224                if to_copy < data.len() {
225                    self.buffer = data.slice(to_copy..);
226                }
227                self.position += to_copy as u64;
228                let _ = self.cursor_tx.send(self.position);
229                Poll::Ready(Ok(()))
230            }
231            Poll::Ready(Err(e)) => {
232                self.pending_read = None;
233                Poll::Ready(Err(io::Error::other(e.to_string())))
234            }
235            Poll::Pending => Poll::Pending,
236        }
237    }
238}
239
240impl AsyncSeek for FileStream {
241    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
242        let new_pos = match pos {
243            io::SeekFrom::Start(n) => n as i64,
244            io::SeekFrom::End(n) => self.file_length as i64 + n,
245            io::SeekFrom::Current(n) => self.position as i64 + n,
246        };
247
248        if new_pos < 0 {
249            self.seek_result = Some(Err(io::Error::new(
250                io::ErrorKind::InvalidInput,
251                "seek to negative position",
252            )));
253        } else {
254            let new_pos = new_pos as u64;
255            self.position = new_pos;
256            self.buffer = Bytes::new();
257            self.pending_read = None;
258            let _ = self.cursor_tx.send(self.position);
259            self.seek_result = Some(Ok(new_pos));
260        }
261        Ok(())
262    }
263
264    fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
265        match self.seek_result.take() {
266            Some(result) => Poll::Ready(result),
267            None => Poll::Ready(Ok(self.position)),
268        }
269    }
270}
271
272/// Create a semaphore for limiting concurrent stream reads.
273pub(crate) fn stream_read_semaphore(max: usize) -> Arc<Semaphore> {
274    Arc::new(Semaphore::new(max))
275}
276
277#[cfg(test)]
278mod tests {
279    use super::*;
280
281    /// Helper: create a Lengths for testing.
282    fn test_lengths() -> Lengths {
283        // 4 pieces of 64KB, 16KB chunks = 256KB total
284        Lengths::new(262144, 65536, 16384)
285    }
286
287    /// Helper: create a fully-available bitfield.
288    fn full_bitfield(num_pieces: u32) -> Bitfield {
289        let mut bf = Bitfield::new(num_pieces);
290        for i in 0..num_pieces {
291            bf.set(i);
292        }
293        bf
294    }
295
296    #[test]
297    fn seek_updates_cursor() {
298        // Create channels
299        let (cursor_tx, mut cursor_rx) = watch::channel(0u64);
300        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
301        let have_bf = full_bitfield(4);
302        let (have_tx, have_rx) = watch::channel(have_bf);
303        let _ = have_tx; // keep alive
304
305        let sem = Arc::new(Semaphore::new(1));
306        let permit = sem.try_acquire_owned().unwrap();
307
308        // We need a DiskHandle to construct FileStream, but we won't actually read.
309        // Create a dummy one via a channel that we immediately drop the receiver of.
310        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
311        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
312
313        let handle = FileStreamHandle {
314            disk,
315            lengths: test_lengths(),
316            file_index: 0,
317            file_offset: 0,
318            file_length: 262144,
319            cursor_tx,
320            piece_ready_rx: piece_rx,
321            have: have_rx,
322            read_permit: permit,
323        };
324
325        let mut stream = FileStream::from_handle(handle);
326
327        // Seek to position 100000
328        use tokio::io::AsyncSeek;
329        Pin::new(&mut stream)
330            .start_seek(io::SeekFrom::Start(100000))
331            .unwrap();
332
333        // Cursor should have been updated
334        assert!(cursor_rx.has_changed().unwrap());
335        assert_eq!(*cursor_rx.borrow_and_update(), 100000);
336        assert_eq!(stream.position(), 100000);
337    }
338
339    #[test]
340    fn seek_end_relative() {
341        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
342        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
343        let (have_tx, have_rx) = watch::channel(full_bitfield(4));
344        let _ = have_tx;
345
346        let sem = Arc::new(Semaphore::new(1));
347        let permit = sem.try_acquire_owned().unwrap();
348        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
349        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
350
351        let handle = FileStreamHandle {
352            disk,
353            lengths: test_lengths(),
354            file_index: 0,
355            file_offset: 0,
356            file_length: 262144,
357            cursor_tx,
358            piece_ready_rx: piece_rx,
359            have: have_rx,
360            read_permit: permit,
361        };
362
363        let mut stream = FileStream::from_handle(handle);
364
365        // Seek to 1024 bytes before end
366        use tokio::io::AsyncSeek;
367        Pin::new(&mut stream)
368            .start_seek(io::SeekFrom::End(-1024))
369            .unwrap();
370        assert_eq!(stream.position(), 262144 - 1024);
371    }
372
373    #[test]
374    fn seek_negative_errors() {
375        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
376        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
377        let (have_tx, have_rx) = watch::channel(full_bitfield(4));
378        let _ = have_tx;
379
380        let sem = Arc::new(Semaphore::new(1));
381        let permit = sem.try_acquire_owned().unwrap();
382        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
383        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
384
385        let handle = FileStreamHandle {
386            disk,
387            lengths: test_lengths(),
388            file_index: 0,
389            file_offset: 0,
390            file_length: 262144,
391            cursor_tx,
392            piece_ready_rx: piece_rx,
393            have: have_rx,
394            read_permit: permit,
395        };
396
397        let mut stream = FileStream::from_handle(handle);
398
399        // Seek to negative position
400        use tokio::io::AsyncSeek;
401        Pin::new(&mut stream)
402            .start_seek(io::SeekFrom::Start(0))
403            .unwrap();
404        Pin::new(&mut stream)
405            .start_seek(io::SeekFrom::Current(-1))
406            .unwrap();
407
408        // poll_complete should return error
409        let rt = tokio::runtime::Builder::new_current_thread()
410            .build()
411            .unwrap();
412        let result = rt.block_on(async {
413            use futures::FutureExt;
414            std::future::poll_fn(|cx| Pin::new(&mut stream).poll_complete(cx)).await
415        });
416        assert!(result.is_err());
417    }
418
419    #[tokio::test]
420    async fn eof_returns_zero_bytes() {
421        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
422        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
423        let (have_tx, have_rx) = watch::channel(full_bitfield(4));
424        let _ = have_tx;
425
426        let sem = Arc::new(Semaphore::new(1));
427        let permit = sem.try_acquire_owned().unwrap();
428        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
429        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
430
431        let handle = FileStreamHandle {
432            disk,
433            lengths: test_lengths(),
434            file_index: 0,
435            file_offset: 0,
436            file_length: 262144,
437            cursor_tx,
438            piece_ready_rx: piece_rx,
439            have: have_rx,
440            read_permit: permit,
441        };
442
443        let mut stream = FileStream::from_handle(handle);
444        // Set position to EOF
445        stream.position = 262144;
446
447        let mut buf = [0u8; 1024];
448        let mut read_buf = ReadBuf::new(&mut buf);
449        let result =
450            std::future::poll_fn(|cx| Pin::new(&mut stream).poll_read(cx, &mut read_buf)).await;
451        assert!(result.is_ok());
452        assert_eq!(read_buf.filled().len(), 0);
453    }
454
455    #[tokio::test]
456    async fn blocks_on_missing_piece_wakes_on_completion() {
457        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
458        let (piece_tx, piece_rx) = broadcast::channel::<u32>(16);
459        // Start with empty bitfield — no pieces available
460        let empty_bf = Bitfield::new(4);
461        let (have_tx, have_rx) = watch::channel(empty_bf);
462
463        let sem = Arc::new(Semaphore::new(1));
464        let permit = sem.try_acquire_owned().unwrap();
465        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
466        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
467
468        let handle = FileStreamHandle {
469            disk,
470            lengths: test_lengths(),
471            file_index: 0,
472            file_offset: 0,
473            file_length: 262144,
474            cursor_tx,
475            piece_ready_rx: piece_rx,
476            have: have_rx,
477            read_permit: permit,
478        };
479
480        let mut stream = FileStream::from_handle(handle);
481
482        // Try to read — should return Pending because piece 0 is missing
483        let mut buf = [0u8; 1024];
484        let mut read_buf = ReadBuf::new(&mut buf);
485        let is_pending = std::future::poll_fn(|cx| {
486            let result = Pin::new(&mut stream).poll_read(cx, &mut read_buf);
487            match result {
488                Poll::Pending => Poll::Ready(true),
489                Poll::Ready(_) => Poll::Ready(false),
490            }
491        })
492        .await;
493        assert!(is_pending, "should be Pending when piece is missing");
494
495        // Now mark piece 0 as available and broadcast
496        let mut bf = Bitfield::new(4);
497        bf.set(0);
498        have_tx.send(bf).unwrap();
499        piece_tx.send(0).unwrap();
500
501        // The waker should fire (we can't easily test the full wake cycle
502        // without a real disk backend, but we verified it returns Pending
503        // when the piece is missing, which is the critical behavior).
504    }
505}