1use std::io;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::task::{Context, Poll};
12
13use bytes::Bytes;
14use tokio::io::{AsyncRead, AsyncSeek, ReadBuf};
15use tokio::sync::{OwnedSemaphorePermit, Semaphore, broadcast, watch};
16
17use irontide_core::Lengths;
18use irontide_storage::Bitfield;
19
20use crate::disk::{DiskHandle, DiskJobFlags};
21
22pub struct FileStreamHandle {
24 pub(crate) disk: DiskHandle,
25 pub(crate) lengths: Lengths,
26 pub(crate) file_index: usize,
27 pub(crate) file_offset: u64,
28 pub(crate) file_length: u64,
29 pub(crate) cursor_tx: watch::Sender<u64>,
30 pub(crate) piece_ready_rx: broadcast::Receiver<u32>,
31 pub(crate) have: watch::Receiver<Bitfield>,
32 pub(crate) read_permit: OwnedSemaphorePermit,
33}
34
35impl std::fmt::Debug for FileStreamHandle {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("FileStreamHandle")
38 .field("file_index", &self.file_index)
39 .field("file_offset", &self.file_offset)
40 .field("file_length", &self.file_length)
41 .finish_non_exhaustive()
42 }
43}
44
45pub(crate) struct StreamingCursor {
49 #[allow(dead_code)]
50 pub file_index: usize,
51 pub file_offset: u64,
52 pub cursor_piece: u32,
53 pub readahead_pieces: u32,
54 pub cursor_rx: watch::Receiver<u64>,
55}
56
57pub struct FileStream {
65 disk: DiskHandle,
66 lengths: Lengths,
67 #[allow(dead_code)]
68 file_index: usize,
69 file_offset: u64,
71 file_length: u64,
73 position: u64,
75 cursor_tx: watch::Sender<u64>,
77 piece_ready_rx: broadcast::Receiver<u32>,
79 have: watch::Receiver<Bitfield>,
81 pending_read:
83 Option<Pin<Box<dyn std::future::Future<Output = irontide_storage::Result<Bytes>> + Send>>>,
84 buffer: Bytes,
86 seek_result: Option<io::Result<u64>>,
88 _read_permit: OwnedSemaphorePermit,
90}
91
92impl FileStream {
93 pub fn from_handle(h: FileStreamHandle) -> Self {
95 Self {
96 disk: h.disk,
97 lengths: h.lengths,
98 file_index: h.file_index,
99 file_offset: h.file_offset,
100 file_length: h.file_length,
101 position: 0,
102 cursor_tx: h.cursor_tx,
103 piece_ready_rx: h.piece_ready_rx,
104 have: h.have,
105 pending_read: None,
106 buffer: Bytes::new(),
107 seek_result: None,
108 _read_permit: h.read_permit,
109 }
110 }
111
112 pub fn file_length(&self) -> u64 {
114 self.file_length
115 }
116
117 pub fn position(&self) -> u64 {
119 self.position
120 }
121
122 fn current_piece_available(&self) -> bool {
124 let abs = self.file_offset + self.position;
125 if let Some((piece, _)) = self.lengths.byte_to_piece(abs) {
126 let have = self.have.borrow();
127 have.get(piece)
128 } else {
129 false
130 }
131 }
132
133 fn remaining(&self) -> u64 {
135 self.file_length.saturating_sub(self.position)
136 }
137}
138
139impl AsyncRead for FileStream {
140 fn poll_read(
141 mut self: Pin<&mut Self>,
142 cx: &mut Context<'_>,
143 buf: &mut ReadBuf<'_>,
144 ) -> Poll<io::Result<()>> {
145 if self.position >= self.file_length {
147 return Poll::Ready(Ok(()));
148 }
149
150 if !self.buffer.is_empty() {
152 let to_copy = self.buffer.len().min(buf.remaining());
153 let to_copy = to_copy.min(self.remaining() as usize);
154 buf.put_slice(&self.buffer[..to_copy]);
155 self.buffer = self.buffer.slice(to_copy..);
156 self.position += to_copy as u64;
157 let _ = self.cursor_tx.send(self.position);
158 return Poll::Ready(Ok(()));
159 }
160
161 if let Some(ref mut fut) = self.pending_read {
163 match fut.as_mut().poll(cx) {
164 Poll::Ready(Ok(data)) => {
165 self.pending_read = None;
166 let to_copy = data.len().min(buf.remaining());
167 let to_copy = to_copy.min(self.remaining() as usize);
168 buf.put_slice(&data[..to_copy]);
169 if to_copy < data.len() {
170 self.buffer = data.slice(to_copy..);
171 }
172 self.position += to_copy as u64;
173 let _ = self.cursor_tx.send(self.position);
174 return Poll::Ready(Ok(()));
175 }
176 Poll::Ready(Err(e)) => {
177 self.pending_read = None;
178 return Poll::Ready(Err(io::Error::other(e.to_string())));
179 }
180 Poll::Pending => return Poll::Pending,
181 }
182 }
183
184 if !self.current_piece_available() {
186 let mut rx = self.piece_ready_rx.resubscribe();
189 let waker = cx.waker().clone();
190 tokio::spawn(async move {
191 let _ = rx.recv().await;
192 waker.wake();
193 });
194 return Poll::Pending;
195 }
196
197 let abs = self.file_offset + self.position;
199 let Some((piece, offset_in_piece)) = self.lengths.byte_to_piece(abs) else {
200 return Poll::Ready(Ok(())); };
202
203 let piece_size = self.lengths.piece_size(piece);
205 let read_len = (piece_size - offset_in_piece)
206 .min(self.lengths.chunk_size())
207 .min(self.remaining() as u32);
208
209 let disk = self.disk.clone();
210 let fut = Box::pin(async move {
211 disk.read_chunk(piece, offset_in_piece, read_len, DiskJobFlags::SEQUENTIAL)
212 .await
213 });
214 self.pending_read = Some(fut);
215
216 let fut = self.pending_read.as_mut().unwrap();
218 match fut.as_mut().poll(cx) {
219 Poll::Ready(Ok(data)) => {
220 self.pending_read = None;
221 let to_copy = data.len().min(buf.remaining());
222 let to_copy = to_copy.min(self.remaining() as usize);
223 buf.put_slice(&data[..to_copy]);
224 if to_copy < data.len() {
225 self.buffer = data.slice(to_copy..);
226 }
227 self.position += to_copy as u64;
228 let _ = self.cursor_tx.send(self.position);
229 Poll::Ready(Ok(()))
230 }
231 Poll::Ready(Err(e)) => {
232 self.pending_read = None;
233 Poll::Ready(Err(io::Error::other(e.to_string())))
234 }
235 Poll::Pending => Poll::Pending,
236 }
237 }
238}
239
240impl AsyncSeek for FileStream {
241 fn start_seek(mut self: Pin<&mut Self>, pos: io::SeekFrom) -> io::Result<()> {
242 let new_pos = match pos {
243 io::SeekFrom::Start(n) => n as i64,
244 io::SeekFrom::End(n) => self.file_length as i64 + n,
245 io::SeekFrom::Current(n) => self.position as i64 + n,
246 };
247
248 if new_pos < 0 {
249 self.seek_result = Some(Err(io::Error::new(
250 io::ErrorKind::InvalidInput,
251 "seek to negative position",
252 )));
253 } else {
254 let new_pos = new_pos as u64;
255 self.position = new_pos;
256 self.buffer = Bytes::new();
257 self.pending_read = None;
258 let _ = self.cursor_tx.send(self.position);
259 self.seek_result = Some(Ok(new_pos));
260 }
261 Ok(())
262 }
263
264 fn poll_complete(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
265 match self.seek_result.take() {
266 Some(result) => Poll::Ready(result),
267 None => Poll::Ready(Ok(self.position)),
268 }
269 }
270}
271
272pub(crate) fn stream_read_semaphore(max: usize) -> Arc<Semaphore> {
274 Arc::new(Semaphore::new(max))
275}
276
277#[cfg(test)]
278mod tests {
279 use super::*;
280
281 fn test_lengths() -> Lengths {
283 Lengths::new(262144, 65536, 16384)
285 }
286
287 fn full_bitfield(num_pieces: u32) -> Bitfield {
289 let mut bf = Bitfield::new(num_pieces);
290 for i in 0..num_pieces {
291 bf.set(i);
292 }
293 bf
294 }
295
296 #[test]
297 fn seek_updates_cursor() {
298 let (cursor_tx, mut cursor_rx) = watch::channel(0u64);
300 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
301 let have_bf = full_bitfield(4);
302 let (have_tx, have_rx) = watch::channel(have_bf);
303 let _ = have_tx; let sem = Arc::new(Semaphore::new(1));
306 let permit = sem.try_acquire_owned().unwrap();
307
308 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
311 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
312
313 let handle = FileStreamHandle {
314 disk,
315 lengths: test_lengths(),
316 file_index: 0,
317 file_offset: 0,
318 file_length: 262144,
319 cursor_tx,
320 piece_ready_rx: piece_rx,
321 have: have_rx,
322 read_permit: permit,
323 };
324
325 let mut stream = FileStream::from_handle(handle);
326
327 use tokio::io::AsyncSeek;
329 Pin::new(&mut stream)
330 .start_seek(io::SeekFrom::Start(100000))
331 .unwrap();
332
333 assert!(cursor_rx.has_changed().unwrap());
335 assert_eq!(*cursor_rx.borrow_and_update(), 100000);
336 assert_eq!(stream.position(), 100000);
337 }
338
339 #[test]
340 fn seek_end_relative() {
341 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
342 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
343 let (have_tx, have_rx) = watch::channel(full_bitfield(4));
344 let _ = have_tx;
345
346 let sem = Arc::new(Semaphore::new(1));
347 let permit = sem.try_acquire_owned().unwrap();
348 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
349 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
350
351 let handle = FileStreamHandle {
352 disk,
353 lengths: test_lengths(),
354 file_index: 0,
355 file_offset: 0,
356 file_length: 262144,
357 cursor_tx,
358 piece_ready_rx: piece_rx,
359 have: have_rx,
360 read_permit: permit,
361 };
362
363 let mut stream = FileStream::from_handle(handle);
364
365 use tokio::io::AsyncSeek;
367 Pin::new(&mut stream)
368 .start_seek(io::SeekFrom::End(-1024))
369 .unwrap();
370 assert_eq!(stream.position(), 262144 - 1024);
371 }
372
373 #[test]
374 fn seek_negative_errors() {
375 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
376 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
377 let (have_tx, have_rx) = watch::channel(full_bitfield(4));
378 let _ = have_tx;
379
380 let sem = Arc::new(Semaphore::new(1));
381 let permit = sem.try_acquire_owned().unwrap();
382 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
383 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
384
385 let handle = FileStreamHandle {
386 disk,
387 lengths: test_lengths(),
388 file_index: 0,
389 file_offset: 0,
390 file_length: 262144,
391 cursor_tx,
392 piece_ready_rx: piece_rx,
393 have: have_rx,
394 read_permit: permit,
395 };
396
397 let mut stream = FileStream::from_handle(handle);
398
399 use tokio::io::AsyncSeek;
401 Pin::new(&mut stream)
402 .start_seek(io::SeekFrom::Start(0))
403 .unwrap();
404 Pin::new(&mut stream)
405 .start_seek(io::SeekFrom::Current(-1))
406 .unwrap();
407
408 let rt = tokio::runtime::Builder::new_current_thread()
410 .build()
411 .unwrap();
412 let result = rt.block_on(async {
413 use futures::FutureExt;
414 std::future::poll_fn(|cx| Pin::new(&mut stream).poll_complete(cx)).await
415 });
416 assert!(result.is_err());
417 }
418
419 #[tokio::test]
420 async fn eof_returns_zero_bytes() {
421 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
422 let (_piece_tx, piece_rx) = broadcast::channel::<u32>(16);
423 let (have_tx, have_rx) = watch::channel(full_bitfield(4));
424 let _ = have_tx;
425
426 let sem = Arc::new(Semaphore::new(1));
427 let permit = sem.try_acquire_owned().unwrap();
428 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
429 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
430
431 let handle = FileStreamHandle {
432 disk,
433 lengths: test_lengths(),
434 file_index: 0,
435 file_offset: 0,
436 file_length: 262144,
437 cursor_tx,
438 piece_ready_rx: piece_rx,
439 have: have_rx,
440 read_permit: permit,
441 };
442
443 let mut stream = FileStream::from_handle(handle);
444 stream.position = 262144;
446
447 let mut buf = [0u8; 1024];
448 let mut read_buf = ReadBuf::new(&mut buf);
449 let result =
450 std::future::poll_fn(|cx| Pin::new(&mut stream).poll_read(cx, &mut read_buf)).await;
451 assert!(result.is_ok());
452 assert_eq!(read_buf.filled().len(), 0);
453 }
454
455 #[tokio::test]
456 async fn blocks_on_missing_piece_wakes_on_completion() {
457 let (cursor_tx, _cursor_rx) = watch::channel(0u64);
458 let (piece_tx, piece_rx) = broadcast::channel::<u32>(16);
459 let empty_bf = Bitfield::new(4);
461 let (have_tx, have_rx) = watch::channel(empty_bf);
462
463 let sem = Arc::new(Semaphore::new(1));
464 let permit = sem.try_acquire_owned().unwrap();
465 let (disk_tx, _disk_rx) = tokio::sync::mpsc::channel(1);
466 let disk = DiskHandle::new(disk_tx, irontide_core::Id20::ZERO);
467
468 let handle = FileStreamHandle {
469 disk,
470 lengths: test_lengths(),
471 file_index: 0,
472 file_offset: 0,
473 file_length: 262144,
474 cursor_tx,
475 piece_ready_rx: piece_rx,
476 have: have_rx,
477 read_permit: permit,
478 };
479
480 let mut stream = FileStream::from_handle(handle);
481
482 let mut buf = [0u8; 1024];
484 let mut read_buf = ReadBuf::new(&mut buf);
485 let is_pending = std::future::poll_fn(|cx| {
486 let result = Pin::new(&mut stream).poll_read(cx, &mut read_buf);
487 match result {
488 Poll::Pending => Poll::Ready(true),
489 Poll::Ready(_) => Poll::Ready(false),
490 }
491 })
492 .await;
493 assert!(is_pending, "should be Pending when piece is missing");
494
495 let mut bf = Bitfield::new(4);
497 bf.set(0);
498 have_tx.send(bf).unwrap();
499 piece_tx.send(0).unwrap();
500
501 }
505}