Skip to main content

irontide_session/
streaming.rs

1#![allow(
2    clippy::cast_possible_truncation,
3    clippy::cast_precision_loss,
4    clippy::cast_possible_wrap,
5    clippy::cast_sign_loss,
6    reason = "M175: file streaming — read positions bounded by file_length (u64); chunk lengths bounded by chunk_size (u32 by construction)"
7)]
8
9//! File streaming — `AsyncRead` + `AsyncSeek` over individual torrent files.
10//!
11//! Provides [`FileStream`], which lets consumers read a file within a torrent
12//! as if it were a regular seekable byte stream. The stream blocks on pieces
13//! that haven't been downloaded yet and resumes automatically when the piece
14//! arrives.
15
16use std::io;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use bytes::Bytes;
22use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
23use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, watch};
24
25use irontide_core::Lengths;
26use irontide_storage::Bitfield;
27
28use crate::disk::{DiskHandle, DiskJobFlags};
29
30/// Internal handle passed from the `TorrentActor` to construct a [`FileStream`].
31pub struct FileStreamHandle {
32    pub(crate) disk: DiskHandle,
33    pub(crate) lengths: Lengths,
34    pub(crate) file_index: usize,
35    pub(crate) file_offset: u64,
36    pub(crate) file_length: u64,
37    pub(crate) cursor_tx: watch::Sender<u64>,
38    pub(crate) piece_ready_rx: broadcast::Receiver<u32>,
39    pub(crate) have: watch::Receiver<Bitfield>,
40    pub(crate) read_permit: OwnedSemaphorePermit,
41}
42
43impl std::fmt::Debug for FileStreamHandle {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        f.debug_struct("FileStreamHandle")
46            .field("file_index", &self.file_index)
47            .field("file_offset", &self.file_offset)
48            .field("file_length", &self.file_length)
49            .finish_non_exhaustive()
50    }
51}
52
53/// Streaming cursor tracked by the `TorrentActor`.
54///
55/// The actor polls this to update streaming piece priorities.
56pub(crate) struct StreamingCursor {
57    #[allow(dead_code)]
58    pub file_index: usize,
59    pub file_offset: u64,
60    pub cursor_piece: u32,
61    pub readahead_pieces: u32,
62    pub cursor_rx: watch::Receiver<u64>,
63}
64
65/// Async reader/seeker over a single file within a torrent.
66///
67/// Created via [`crate::TorrentHandle::open_file()`]. Implements [`AsyncRead`] and
68/// [`AsyncSeek`], blocking on pieces that haven't been downloaded yet.
69///
70/// The stream updates a cursor position that the `TorrentActor` uses to
71/// prioritise downloading the pieces around the read head.
72pub struct FileStream {
73    disk: DiskHandle,
74    lengths: Lengths,
75    #[allow(dead_code)]
76    file_index: usize,
77    /// Absolute byte offset of the file within the torrent data.
78    file_offset: u64,
79    /// Length of this file in bytes.
80    file_length: u64,
81    /// Current read position relative to start of file.
82    position: u64,
83    /// Cursor channel to notify actor of read position changes.
84    cursor_tx: watch::Sender<u64>,
85    /// Broadcast receiver for piece completion notifications.
86    piece_ready_rx: broadcast::Receiver<u32>,
87    /// Watch receiver for the have-bitfield.
88    have: watch::Receiver<Bitfield>,
89    /// In-progress read future (piece, begin, length).
90    pending_read:
91        Option<Pin<Box<dyn std::future::Future<Output = irontide_storage::Result<Bytes>> + Send>>>,
92    /// Buffered data from the last disk read (partially consumed).
93    buffer: Bytes,
94    /// Pending seek result (set by `start_seek`, consumed by `poll_complete`).
95    seek_result: Option<io::Result<u64>>,
96    /// Semaphore permit — held for the lifetime of the stream.
97    _read_permit: OwnedSemaphorePermit,
98}
99
100impl FileStream {
101    /// Construct a `FileStream` from an actor-provided handle.
102    #[must_use]
103    pub fn from_handle(h: FileStreamHandle) -> Self {
104        Self {
105            disk: h.disk,
106            lengths: h.lengths,
107            file_index: h.file_index,
108            file_offset: h.file_offset,
109            file_length: h.file_length,
110            position: 0,
111            cursor_tx: h.cursor_tx,
112            piece_ready_rx: h.piece_ready_rx,
113            have: h.have,
114            pending_read: None,
115            buffer: Bytes::new(),
116            seek_result: None,
117            _read_permit: h.read_permit,
118        }
119    }
120
121    /// File length in bytes.
122    pub fn file_length(&self) -> u64 {
123        self.file_length
124    }
125
126    /// Current read position within the file.
127    pub fn position(&self) -> u64 {
128        self.position
129    }
130
131    /// Check whether the piece containing the current read position is available.
132    fn current_piece_available(&self) -> bool {
133        let abs = self.file_offset + self.position;
134        if let Some(piece) = self.lengths.piece_index_for_byte(abs) {
135            let have = self.have.borrow();
136            have.get(piece)
137        } else {
138            false
139        }
140    }
141
142    /// How many bytes remain from `position` to end-of-file.
143    fn remaining(&self) -> u64 {
144        self.file_length.saturating_sub(self.position)
145    }
146}
147
148impl AsyncRead for FileStream {
149    fn poll_read(
150        mut self: Pin<&mut Self>,
151        cx: &mut Context<'_>,
152        buf: &mut ReadBuf<'_>,
153    ) -> Poll<io::Result<()>> {
154        // EOF check
155        if self.position >= self.file_length {
156            return Poll::Ready(Ok(()));
157        }
158
159        // If we have buffered data, drain it first.
160        if !self.buffer.is_empty() {
161            let to_copy = self.buffer.len().min(buf.remaining());
162            let to_copy = to_copy.min(self.remaining() as usize);
163            buf.put_slice(&self.buffer[..to_copy]);
164            self.buffer = self.buffer.slice(to_copy..);
165            self.position += to_copy as u64;
166            let _ = self.cursor_tx.send(self.position);
167            return Poll::Ready(Ok(()));
168        }
169
170        // If we have a pending disk read, poll it.
171        if let Some(ref mut fut) = self.pending_read {
172            match fut.as_mut().poll(cx) {
173                Poll::Ready(Ok(data)) => {
174                    self.pending_read = None;
175                    let to_copy = data.len().min(buf.remaining());
176                    let to_copy = to_copy.min(self.remaining() as usize);
177                    buf.put_slice(&data[..to_copy]);
178                    if to_copy < data.len() {
179                        self.buffer = data.slice(to_copy..);
180                    }
181                    self.position += to_copy as u64;
182                    let _ = self.cursor_tx.send(self.position);
183                    return Poll::Ready(Ok(()));
184                }
185                Poll::Ready(Err(e)) => {
186                    self.pending_read = None;
187                    return Poll::Ready(Err(io::Error::other(e.to_string())));
188                }
189                Poll::Pending => return Poll::Pending,
190            }
191        }
192
193        // Check if the current piece is available.
194        if !self.current_piece_available() {
195            // Register waker with piece_ready_rx: when any piece completes,
196            // we wake and re-check.
197            let mut rx = self.piece_ready_rx.resubscribe();
198            let waker = cx.waker().clone();
199            tokio::spawn(async move {
200                let _ = rx.recv().await;
201                waker.wake();
202            });
203            return Poll::Pending;
204        }
205
206        // Piece is available — issue a disk read.
207        let abs = self.file_offset + self.position;
208        let Some((piece, offset_in_piece)) = self.lengths.byte_to_piece_with_offset(abs) else {
209            return Poll::Ready(Ok(())); // past end
210        };
211
212        // Read one chunk from the piece at the current offset.
213        let piece_size = self.lengths.piece_size(piece);
214        let read_len = (piece_size - offset_in_piece)
215            .min(self.lengths.chunk_size())
216            .min(self.remaining() as u32);
217
218        let disk = self.disk.clone();
219        let fut = Box::pin(async move {
220            disk.read_chunk(piece, offset_in_piece, read_len, DiskJobFlags::SEQUENTIAL)
221                .await
222        });
223        self.pending_read = Some(fut);
224
225        // Poll the newly created future immediately.
226        let fut = self.pending_read.as_mut().unwrap();
227        match fut.as_mut().poll(cx) {
228            Poll::Ready(Ok(data)) => {
229                self.pending_read = None;
230                let to_copy = data.len().min(buf.remaining());
231                let to_copy = to_copy.min(self.remaining() as usize);
232                buf.put_slice(&data[..to_copy]);
233                if to_copy < data.len() {
234                    self.buffer = data.slice(to_copy..);
235                }
236                self.position += to_copy as u64;
237                let _ = self.cursor_tx.send(self.position);
238                Poll::Ready(Ok(()))
239            }
240            Poll::Ready(Err(e)) => {
241                self.pending_read = None;
242                Poll::Ready(Err(io::Error::other(e.to_string())))
243            }
244            Poll::Pending => Poll::Pending,
245        }
246    }
247}
248
249impl AsyncSeek for FileStream {
250    fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
251        let new_pos = match pos {
252            io::SeekFrom::Start(n) => n as i64,
253            io::SeekFrom::End(n) => self.file_length as i64 + n,
254            io::SeekFrom::Current(n) => self.position as i64 + n,
255        };
256
257        if new_pos < 0 {
258            self.seek_result = Some(Err(io::Error::new(
259                io::ErrorKind::InvalidInput,
260                "seek to negative position",
261            )));
262        } else {
263            let new_pos = new_pos as u64;
264            self.position = new_pos;
265            self.buffer = Bytes::new();
266            self.pending_read = None;
267            let _ = self.cursor_tx.send(self.position);
268            self.seek_result = Some(Ok(new_pos));
269        }
270        Ok(())
271    }
272
273    fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
274        match self.seek_result.take() {
275            Some(result) => Poll::Ready(result),
276            None => Poll::Ready(Ok(self.position)),
277        }
278    }
279}
280
281/// Create a semaphore for limiting concurrent stream reads.
282pub(crate) fn stream_read_semaphore(max: usize) -> Arc<Semaphore> {
283    Arc::new(Semaphore::new(max))
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    /// Helper: create a Lengths for testing.
291    fn test_lengths() -> Lengths {
292        // 4 pieces of 64KB, 16KB chunks = 256KB total
293        Lengths::new(262_144, 65536, 16384)
294    }
295
296    /// Helper: create a fully-available bitfield.
297    fn full_bitfield(num_pieces: u32) -> Bitfield {
298        let mut bf = Bitfield::new(num_pieces);
299        for i in 0..num_pieces {
300            bf.set(i);
301        }
302        bf
303    }
304
305    #[test]
306    fn seek_updates_cursor() {
307        use tokio::io::AsyncSeek;
308
309        // Create channels
310        let (cursor_tx, mut cursor_rx) = watch::channel(0u64);
311        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
312        let have_bf = full_bitfield(4);
313        let (have_tx, have_rx) = watch::channel(have_bf);
314        let _ = have_tx; // keep alive
315
316        let sem = Arc::new(Semaphore::new(1));
317        let permit = sem.try_acquire_owned().unwrap();
318
319        // We need a DiskHandle to construct FileStream, but we won't actually read.
320        // Create a dummy one via a channel that we immediately drop the receiver of.
321        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
322        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
323
324        let handle = FileStreamHandle {
325            disk,
326            lengths: test_lengths(),
327            file_index: 0,
328            file_offset: 0,
329            file_length: 262_144,
330            cursor_tx,
331            piece_ready_rx: piece_rx,
332            have: have_rx,
333            read_permit: permit,
334        };
335
336        let mut stream = FileStream::from_handle(handle);
337
338        // Seek to position 100_000
339        Pin::new(&mut stream)
340            .start_seek(io::SeekFrom::Start(100_000))
341            .unwrap();
342
343        // Cursor should have been updated
344        assert!(cursor_rx.has_changed().unwrap());
345        assert_eq!(*cursor_rx.borrow_and_update(), 100_000);
346        assert_eq!(stream.position(), 100_000);
347    }
348
349    #[test]
350    fn seek_end_relative() {
351        use tokio::io::AsyncSeek;
352
353        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
354        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
355        let (have_tx, have_rx) = watch::channel(full_bitfield(4));
356        let _ = have_tx;
357
358        let sem = Arc::new(Semaphore::new(1));
359        let permit = sem.try_acquire_owned().unwrap();
360        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
361        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
362
363        let handle = FileStreamHandle {
364            disk,
365            lengths: test_lengths(),
366            file_index: 0,
367            file_offset: 0,
368            file_length: 262_144,
369            cursor_tx,
370            piece_ready_rx: piece_rx,
371            have: have_rx,
372            read_permit: permit,
373        };
374
375        let mut stream = FileStream::from_handle(handle);
376
377        // Seek to 1024 bytes before end
378        Pin::new(&mut stream)
379            .start_seek(io::SeekFrom::End(-1024))
380            .unwrap();
381        assert_eq!(stream.position(), 262_144 - 1024);
382    }
383
384    #[test]
385    fn seek_negative_errors() {
386        use tokio::io::AsyncSeek;
387
388        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
389        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
390        let (have_tx, have_rx) = watch::channel(full_bitfield(4));
391        let _ = have_tx;
392
393        let sem = Arc::new(Semaphore::new(1));
394        let permit = sem.try_acquire_owned().unwrap();
395        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
396        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
397
398        let handle = FileStreamHandle {
399            disk,
400            lengths: test_lengths(),
401            file_index: 0,
402            file_offset: 0,
403            file_length: 262_144,
404            cursor_tx,
405            piece_ready_rx: piece_rx,
406            have: have_rx,
407            read_permit: permit,
408        };
409
410        let mut stream = FileStream::from_handle(handle);
411
412        // Seek to negative position
413        Pin::new(&mut stream)
414            .start_seek(io::SeekFrom::Start(0))
415            .unwrap();
416        Pin::new(&mut stream)
417            .start_seek(io::SeekFrom::Current(-1))
418            .unwrap();
419
420        // poll_complete should return error
421        let rt = tokio::runtime::Builder::new_current_thread()
422            .build()
423            .unwrap();
424        let result = rt.block_on(async {
425            std::future::poll_fn(|cx| Pin::new(&mut stream).poll_complete(cx)).await
426        });
427        assert!(result.is_err());
428    }
429
430    #[tokio::test]
431    async fn eof_returns_zero_bytes() {
432        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
433        let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
434        let (have_tx, have_rx) = watch::channel(full_bitfield(4));
435        let _ = have_tx;
436
437        let sem = Arc::new(Semaphore::new(1));
438        let permit = sem.try_acquire_owned().unwrap();
439        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
440        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
441
442        let handle = FileStreamHandle {
443            disk,
444            lengths: test_lengths(),
445            file_index: 0,
446            file_offset: 0,
447            file_length: 262_144,
448            cursor_tx,
449            piece_ready_rx: piece_rx,
450            have: have_rx,
451            read_permit: permit,
452        };
453
454        let mut stream = FileStream::from_handle(handle);
455        // Set position to EOF
456        stream.position = 262_144;
457
458        let mut buf = [0u8; 1024];
459        let mut read_buf = ReadBuf::new(&mut buf);
460        let result =
461            std::future::poll_fn(|cx| Pin::new(&mut stream).poll_read(cx, &mut read_buf)).await;
462        assert!(result.is_ok());
463        assert_eq!(read_buf.filled().len(), 0);
464    }
465
466    #[tokio::test]
467    async fn blocks_on_missing_piece_wakes_on_completion() {
468        let (cursor_tx, _cursor_rx) = watch::channel(0u64);
469        let (piece_tx, piece_rx) = broadcast::channel::<u32>(16);
470        // Start with empty bitfield — no pieces available
471        let empty_bf = Bitfield::new(4);
472        let (have_tx, have_rx) = watch::channel(empty_bf);
473
474        let sem = Arc::new(Semaphore::new(1));
475        let permit = sem.try_acquire_owned().unwrap();
476        let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
477        let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
478
479        let handle = FileStreamHandle {
480            disk,
481            lengths: test_lengths(),
482            file_index: 0,
483            file_offset: 0,
484            file_length: 262_144,
485            cursor_tx,
486            piece_ready_rx: piece_rx,
487            have: have_rx,
488            read_permit: permit,
489        };
490
491        let mut stream = FileStream::from_handle(handle);
492
493        // Try to read — should return Pending because piece 0 is missing
494        let mut buf = [0u8; 1024];
495        let mut read_buf = ReadBuf::new(&mut buf);
496        let is_pending = std::future::poll_fn(|cx| {
497            let result = Pin::new(&mut stream).poll_read(cx, &mut read_buf);
498            match result {
499                Poll::Pending => Poll::Ready(true),
500                Poll::Ready(_) => Poll::Ready(false),
501            }
502        })
503        .await;
504        assert!(is_pending, "should be Pending when piece is missing");
505
506        // Now mark piece 0 as available and broadcast
507        let mut bf = Bitfield::new(4);
508        bf.set(0);
509        have_tx.send(bf).unwrap();
510        piece_tx.send(0).unwrap();
511
512        // The waker should fire (we can't easily test the full wake cycle
513        // without a real disk backend, but we verified it returns Pending
514        // when the piece is missing, which is the critical behavior).
515    }
516}