1#![allow(
2 clippy::cast_possible_truncation,
3 clippy::cast_precision_loss,
4 clippy::cast_possible_wrap,
5 clippy::cast_sign_loss,
6 reason = "M175: file streaming — read positions bounded by file_length (u64); chunk lengths bounded by chunk_size (u32 by construction)"
7)]
8
9use std::io;
17use std::pin::Pin;
18use std::sync::Arc;
19use std::task::{Context, Poll};
20
21use bytes::Bytes;
22use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
23use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, watch};
24
25use irontide_core::Lengths;
26use irontide_storage::Bitfield;
27
28use crate::disk::{DiskHandle, DiskJobFlags};
29
30pub struct FileStreamHandle {
32 pub(crate) disk: DiskHandle,
33 pub(crate) lengths: Lengths,
34 pub(crate) file_index: usize,
35 pub(crate) file_offset: u64,
36 pub(crate) file_length: u64,
37 pub(crate) cursor_tx: watch::Sender<u64>,
38 pub(crate) piece_ready_rx: broadcast::Receiver<u32>,
39 pub(crate) have: watch::Receiver<Bitfield>,
40 pub(crate) read_permit: OwnedSemaphorePermit,
41}
42
43impl std::fmt::Debug for FileStreamHandle {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 f.debug_struct("FileStreamHandle")
46 .field("file_index", &self.file_index)
47 .field("file_offset", &self.file_offset)
48 .field("file_length", &self.file_length)
49 .finish_non_exhaustive()
50 }
51}
52
53pub(crate) struct StreamingCursor {
57 #[allow(dead_code)]
58 pub file_index: usize,
59 pub file_offset: u64,
60 pub cursor_piece: u32,
61 pub readahead_pieces: u32,
62 pub cursor_rx: watch::Receiver<u64>,
63}
64
65pub struct FileStream {
73 disk: DiskHandle,
74 lengths: Lengths,
75 #[allow(dead_code)]
76 file_index: usize,
77 file_offset: u64,
79 file_length: u64,
81 position: u64,
83 cursor_tx: watch::Sender<u64>,
85 piece_ready_rx: broadcast::Receiver<u32>,
87 have: watch::Receiver<Bitfield>,
89 pending_read:
91 Option<Pin<Box<dyn std::future::Future<Output = irontide_storage::Result<Bytes>> + Send>>>,
92 buffer: Bytes,
94 seek_result: Option<io::Result<u64>>,
96 _read_permit: OwnedSemaphorePermit,
98}
99
100impl FileStream {
101 #[must_use]
103 pub fn from_handle(h: FileStreamHandle) -> Self {
104 Self {
105 disk: h.disk,
106 lengths: h.lengths,
107 file_index: h.file_index,
108 file_offset: h.file_offset,
109 file_length: h.file_length,
110 position: 0,
111 cursor_tx: h.cursor_tx,
112 piece_ready_rx: h.piece_ready_rx,
113 have: h.have,
114 pending_read: None,
115 buffer: Bytes::new(),
116 seek_result: None,
117 _read_permit: h.read_permit,
118 }
119 }
120
121 pub fn file_length(&self) -> u64 {
123 self.file_length
124 }
125
126 pub fn position(&self) -> u64 {
128 self.position
129 }
130
131 fn current_piece_available(&self) -> bool {
133 let abs = self.file_offset + self.position;
134 if let Some(piece) = self.lengths.piece_index_for_byte(abs) {
135 let have = self.have.borrow();
136 have.get(piece)
137 } else {
138 false
139 }
140 }
141
142 fn remaining(&self) -> u64 {
144 self.file_length.saturating_sub(self.position)
145 }
146}
147
148impl AsyncRead for FileStream {
149 fn poll_read(
150 mut self: Pin<&mut Self>,
151 cx: &mut Context<'_>,
152 buf: &mut ReadBuf<'_>,
153 ) -> Poll<io::Result<()>> {
154 if self.position >= self.file_length {
156 return Poll::Ready(Ok(()));
157 }
158
159 if !self.buffer.is_empty() {
161 let to_copy = self.buffer.len().min(buf.remaining());
162 let to_copy = to_copy.min(self.remaining() as usize);
163 buf.put_slice(&self.buffer[..to_copy]);
164 self.buffer = self.buffer.slice(to_copy..);
165 self.position += to_copy as u64;
166 let _ = self.cursor_tx.send(self.position);
167 return Poll::Ready(Ok(()));
168 }
169
170 if let Some(ref mut fut) = self.pending_read {
172 match fut.as_mut().poll(cx) {
173 Poll::Ready(Ok(data)) => {
174 self.pending_read = None;
175 let to_copy = data.len().min(buf.remaining());
176 let to_copy = to_copy.min(self.remaining() as usize);
177 buf.put_slice(&data[..to_copy]);
178 if to_copy < data.len() {
179 self.buffer = data.slice(to_copy..);
180 }
181 self.position += to_copy as u64;
182 let _ = self.cursor_tx.send(self.position);
183 return Poll::Ready(Ok(()));
184 }
185 Poll::Ready(Err(e)) => {
186 self.pending_read = None;
187 return Poll::Ready(Err(io::Error::other(e.to_string())));
188 }
189 Poll::Pending => return Poll::Pending,
190 }
191 }
192
193 if !self.current_piece_available() {
195 let mut rx = self.piece_ready_rx.resubscribe();
198 let waker = cx.waker().clone();
199 tokio::spawn(async move {
200 let _ = rx.recv().await;
201 waker.wake();
202 });
203 return Poll::Pending;
204 }
205
206 let abs = self.file_offset + self.position;
208 let Some((piece, offset_in_piece)) = self.lengths.byte_to_piece_with_offset(abs) else {
209 return Poll::Ready(Ok(())); };
211
212 let piece_size = self.lengths.piece_size(piece);
214 let read_len = (piece_size - offset_in_piece)
215 .min(self.lengths.chunk_size())
216 .min(self.remaining() as u32);
217
218 let disk = self.disk.clone();
219 let fut = Box::pin(async move {
220 disk.read_chunk(piece, offset_in_piece, read_len, DiskJobFlags::SEQUENTIAL)
221 .await
222 });
223 self.pending_read = Some(fut);
224
225 let fut = self.pending_read.as_mut().unwrap();
227 match fut.as_mut().poll(cx) {
228 Poll::Ready(Ok(data)) => {
229 self.pending_read = None;
230 let to_copy = data.len().min(buf.remaining());
231 let to_copy = to_copy.min(self.remaining() as usize);
232 buf.put_slice(&data[..to_copy]);
233 if to_copy < data.len() {
234 self.buffer = data.slice(to_copy..);
235 }
236 self.position += to_copy as u64;
237 let _ = self.cursor_tx.send(self.position);
238 Poll::Ready(Ok(()))
239 }
240 Poll::Ready(Err(e)) => {
241 self.pending_read = None;
242 Poll::Ready(Err(io::Error::other(e.to_string())))
243 }
244 Poll::Pending => Poll::Pending,
245 }
246 }
247}
248
249impl AsyncSeek for FileStream {
250 fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
251 let new_pos = match pos {
252 io::SeekFrom::Start(n) => n as i64,
253 io::SeekFrom::End(n) => self.file_length as i64 + n,
254 io::SeekFrom::Current(n) => self.position as i64 + n,
255 };
256
257 if new_pos < 0 {
258 self.seek_result = Some(Err(io::Error::new(
259 io::ErrorKind::InvalidInput,
260 "seek to negative position",
261 )));
262 } else {
263 let new_pos = new_pos as u64;
264 self.position = new_pos;
265 self.buffer = Bytes::new();
266 self.pending_read = None;
267 let _ = self.cursor_tx.send(self.position);
268 self.seek_result = Some(Ok(new_pos));
269 }
270 Ok(())
271 }
272
273 fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
274 match self.seek_result.take() {
275 Some(result) => Poll::Ready(result),
276 None => Poll::Ready(Ok(self.position)),
277 }
278 }
279}
280
281pub(crate) fn stream_read_semaphore(max: usize) -> Arc<Semaphore> {
283 Arc::new(Semaphore::new(max))
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 fn test_lengths() -> Lengths {
292 Lengths::new(262_144, 65536, 16384)
294 }
295
296 fn full_bitfield(num_pieces: u32) -> Bitfield {
298 let mut bf = Bitfield::new(num_pieces);
299 for i in 0..num_pieces {
300 bf.set(i);
301 }
302 bf
303 }
304
305 #[test]
306 fn seek_updates_cursor() {
307 use tokio::io::AsyncSeek;
308
309 let (cursor_tx, mut cursor_rx) = watch::channel(0u64);
311 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
312 let have_bf = full_bitfield(4);
313 let (have_tx, have_rx) = watch::channel(have_bf);
314 let _ = have_tx; let sem = Arc::new(Semaphore::new(1));
317 let permit = sem.try_acquire_owned().unwrap();
318
319 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
322 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
323
324 let handle = FileStreamHandle {
325 disk,
326 lengths: test_lengths(),
327 file_index: 0,
328 file_offset: 0,
329 file_length: 262_144,
330 cursor_tx,
331 piece_ready_rx: piece_rx,
332 have: have_rx,
333 read_permit: permit,
334 };
335
336 let mut stream = FileStream::from_handle(handle);
337
338 Pin::new(&mut stream)
340 .start_seek(io::SeekFrom::Start(100_000))
341 .unwrap();
342
343 assert!(cursor_rx.has_changed().unwrap());
345 assert_eq!(*cursor_rx.borrow_and_update(), 100_000);
346 assert_eq!(stream.position(), 100_000);
347 }
348
349 #[test]
350 fn seek_end_relative() {
351 use tokio::io::AsyncSeek;
352
353 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
354 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
355 let (have_tx, have_rx) = watch::channel(full_bitfield(4));
356 let _ = have_tx;
357
358 let sem = Arc::new(Semaphore::new(1));
359 let permit = sem.try_acquire_owned().unwrap();
360 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
361 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
362
363 let handle = FileStreamHandle {
364 disk,
365 lengths: test_lengths(),
366 file_index: 0,
367 file_offset: 0,
368 file_length: 262_144,
369 cursor_tx,
370 piece_ready_rx: piece_rx,
371 have: have_rx,
372 read_permit: permit,
373 };
374
375 let mut stream = FileStream::from_handle(handle);
376
377 Pin::new(&mut stream)
379 .start_seek(io::SeekFrom::End(-1024))
380 .unwrap();
381 assert_eq!(stream.position(), 262_144 - 1024);
382 }
383
384 #[test]
385 fn seek_negative_errors() {
386 use tokio::io::AsyncSeek;
387
388 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
389 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
390 let (have_tx, have_rx) = watch::channel(full_bitfield(4));
391 let _ = have_tx;
392
393 let sem = Arc::new(Semaphore::new(1));
394 let permit = sem.try_acquire_owned().unwrap();
395 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
396 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
397
398 let handle = FileStreamHandle {
399 disk,
400 lengths: test_lengths(),
401 file_index: 0,
402 file_offset: 0,
403 file_length: 262_144,
404 cursor_tx,
405 piece_ready_rx: piece_rx,
406 have: have_rx,
407 read_permit: permit,
408 };
409
410 let mut stream = FileStream::from_handle(handle);
411
412 Pin::new(&mut stream)
414 .start_seek(io::SeekFrom::Start(0))
415 .unwrap();
416 Pin::new(&mut stream)
417 .start_seek(io::SeekFrom::Current(-1))
418 .unwrap();
419
420 let rt = tokio::runtime::Builder::new_current_thread()
422 .build()
423 .unwrap();
424 let result = rt.block_on(async {
425 std::future::poll_fn(|cx| Pin::new(&mut stream).poll_complete(cx)).await
426 });
427 assert!(result.is_err());
428 }
429
430 #[tokio::test]
431 async fn eof_returns_zero_bytes() {
432 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
433 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
434 let (have_tx, have_rx) = watch::channel(full_bitfield(4));
435 let _ = have_tx;
436
437 let sem = Arc::new(Semaphore::new(1));
438 let permit = sem.try_acquire_owned().unwrap();
439 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
440 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
441
442 let handle = FileStreamHandle {
443 disk,
444 lengths: test_lengths(),
445 file_index: 0,
446 file_offset: 0,
447 file_length: 262_144,
448 cursor_tx,
449 piece_ready_rx: piece_rx,
450 have: have_rx,
451 read_permit: permit,
452 };
453
454 let mut stream = FileStream::from_handle(handle);
455 stream.position = 262_144;
457
458 let mut buf = [0u8; 1024];
459 let mut read_buf = ReadBuf::new(&mut buf);
460 let result =
461 std::future::poll_fn(|cx| Pin::new(&mut stream).poll_read(cx, &mut read_buf)).await;
462 assert!(result.is_ok());
463 assert_eq!(read_buf.filled().len(), 0);
464 }
465
466 #[tokio::test]
467 async fn blocks_on_missing_piece_wakes_on_completion() {
468 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
469 let (piece_tx, piece_rx) = broadcast::channel::<u32>(16);
470 let empty_bf = Bitfield::new(4);
472 let (have_tx, have_rx) = watch::channel(empty_bf);
473
474 let sem = Arc::new(Semaphore::new(1));
475 let permit = sem.try_acquire_owned().unwrap();
476 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
477 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
478
479 let handle = FileStreamHandle {
480 disk,
481 lengths: test_lengths(),
482 file_index: 0,
483 file_offset: 0,
484 file_length: 262_144,
485 cursor_tx,
486 piece_ready_rx: piece_rx,
487 have: have_rx,
488 read_permit: permit,
489 };
490
491 let mut stream = FileStream::from_handle(handle);
492
493 let mut buf = [0u8; 1024];
495 let mut read_buf = ReadBuf::new(&mut buf);
496 let is_pending = std::future::poll_fn(|cx| {
497 let result = Pin::new(&mut stream).poll_read(cx, &mut read_buf);
498 match result {
499 Poll::Pending => Poll::Ready(true),
500 Poll::Ready(_) => Poll::Ready(false),
501 }
502 })
503 .await;
504 assert!(is_pending, "should be Pending when piece is missing");
505
506 let mut bf = Bitfield::new(4);
508 bf.set(0);
509 have_tx.send(bf).unwrap();
510 piece_tx.send(0).unwrap();
511
512 }
516}