1pub mod paged;
4mod read;
5mod tip;
6mod write;
7
8pub use read::Read;
9pub use write::Write;
10
11#[cfg(test)]
12mod tests {
13 use super::*;
14 use crate::{
15 deterministic, Blob as _, BufMut, Clock, Error, IoBufMut, IoBufs, IoBufsMut, Runner,
16 Spawner, Storage,
17 };
18 use commonware_macros::test_traced;
19 use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize};
20 use futures::{pin_mut, FutureExt};
21 use std::{sync::Arc, time::Duration};
22
23 struct BlockingReadGate {
24 read_started: Option<oneshot::Sender<()>>,
25 release_read: Option<oneshot::Receiver<()>>,
26 }
27
28 #[derive(Clone)]
32 struct BlockingReadBlob {
33 data: Arc<Mutex<Vec<u8>>>,
34 gate: Arc<Mutex<BlockingReadGate>>,
35 }
36
37 impl BlockingReadBlob {
38 fn new(data: Vec<u8>) -> (Self, oneshot::Receiver<()>, oneshot::Sender<()>) {
39 let (read_started_tx, read_started_rx) = oneshot::channel();
40 let (release_read_tx, release_read_rx) = oneshot::channel();
41 (
42 Self {
43 data: Arc::new(Mutex::new(data)),
44 gate: Arc::new(Mutex::new(BlockingReadGate {
45 read_started: Some(read_started_tx),
46 release_read: Some(release_read_rx),
47 })),
48 },
49 read_started_rx,
50 release_read_tx,
51 )
52 }
53
54 async fn block_once_on_read(&self) {
55 let rx = {
56 let mut gate = self.gate.lock();
57 gate.read_started.take().map(|read_started| {
58 let _ = read_started.send(());
59 gate.release_read.take().expect("release signal missing")
60 })
61 };
62 if let Some(rx) = rx {
63 let _ = rx.await;
64 }
65 }
66 }
67
68 impl crate::Blob for BlockingReadBlob {
69 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
70 self.read_at_buf(offset, len, IoBufMut::default()).await
71 }
72
73 async fn read_at_buf(
74 &self,
75 offset: u64,
76 len: usize,
77 buf: impl Into<IoBufsMut> + Send,
78 ) -> Result<IoBufsMut, Error> {
79 self.block_once_on_read().await;
80
81 let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
82 let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
83 let data = self.data.lock();
84 if end > data.len() {
85 return Err(Error::BlobInsufficientLength);
86 }
87
88 let mut out = buf.into();
89 out.put_slice(&data[start..end]);
90 Ok(out)
91 }
92
93 async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
94 let buf = buf.into().coalesce();
95 let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
96 let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
97
98 let mut data = self.data.lock();
99 if end > data.len() {
100 data.resize(end, 0);
101 }
102 data[start..end].copy_from_slice(buf.as_ref());
103 Ok(())
104 }
105
106 async fn resize(&self, len: u64) -> Result<(), Error> {
107 let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
108 self.data.lock().resize(len, 0);
109 Ok(())
110 }
111
112 async fn sync(&self) -> Result<(), Error> {
113 Ok(())
114 }
115 }
116
117 #[test_traced]
118 fn test_read_basic() {
119 let executor = deterministic::Runner::default();
120 executor.start(|context| async move {
121 let data = b"Hello, world! This is a test.";
123 let (blob, size) = context.open("partition", b"test").await.unwrap();
124 assert_eq!(size, 0);
125 blob.write_at(0, data).await.unwrap();
126 let size = data.len() as u64;
127
128 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
130
131 let read = reader.read(5).await.unwrap().coalesce();
133 assert_eq!(read.as_ref(), b"Hello");
134
135 let read = reader.read(14).await.unwrap().coalesce();
137 assert_eq!(read.as_ref(), b", world! This ");
138
139 assert_eq!(reader.position(), 19);
141
142 let read = reader.read(7).await.unwrap().coalesce();
144 assert_eq!(read.as_ref(), b"is a te");
145
146 let result = reader.read(5).await;
148 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
149 });
150 }
151
152 #[test_traced]
153 fn test_read_cross_boundary() {
154 let executor = deterministic::Runner::default();
155 executor.start(|context| async move {
156 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
158 let (blob, size) = context.open("partition", b"test").await.unwrap();
159 assert_eq!(size, 0);
160 blob.write_at(0, data).await.unwrap();
161 let size = data.len() as u64;
162
163 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
165
166 let read = reader.read(15).await.unwrap().coalesce();
168 assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNO");
169
170 assert_eq!(reader.position(), 15);
172
173 let read = reader.read(11).await.unwrap().coalesce();
175 assert_eq!(read.as_ref(), b"PQRSTUVWXYZ");
176
177 assert_eq!(reader.position(), 26);
179 assert_eq!(reader.blob_remaining(), 0);
180 });
181 }
182
183 #[test_traced]
185 fn test_read_to_end_then_rewind_and_read_again() {
186 let executor = deterministic::Runner::default();
187 executor.start(|context| async move {
188 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
189 let (blob, size) = context.open("partition", b"test").await.unwrap();
190 assert_eq!(size, 0);
191 blob.write_at(0, data).await.unwrap();
192 let size = data.len() as u64;
193
194 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
195
196 let read = reader.read(21).await.unwrap().coalesce();
198 assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
199
200 assert_eq!(reader.position(), 21);
202
203 let read = reader.read(5).await.unwrap().coalesce();
205 assert_eq!(read.as_ref(), b"VWXYZ");
206
207 reader.seek_to(0).unwrap();
209 let read = reader.read(21).await.unwrap().coalesce();
210 assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
211 });
212 }
213
214 #[test_traced]
215 fn test_read_with_known_size() {
216 let executor = deterministic::Runner::default();
217 executor.start(|context| async move {
218 let data = b"This is a test with known size limitations.";
220 let (blob, size) = context.open("partition", b"test").await.unwrap();
221 assert_eq!(size, 0);
222 blob.write_at(0, data).await.unwrap();
223 let size = data.len() as u64;
224
225 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
227
228 assert_eq!(reader.blob_remaining(), size);
230
231 let read = reader.read(5).await.unwrap().coalesce();
233 assert_eq!(read.as_ref(), b"This ");
234
235 assert_eq!(reader.blob_remaining(), size - 5);
237
238 let read = reader.read((size - 5) as usize).await.unwrap().coalesce();
240 assert_eq!(read.as_ref(), b"is a test with known size limitations.");
241
242 assert_eq!(reader.blob_remaining(), 0);
244
245 let result = reader.read(1).await;
247 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
248 });
249 }
250
251 #[test_traced]
252 fn test_read_oversized_request_does_not_consume_buffered_bytes() {
253 let executor = deterministic::Runner::default();
254 executor.start(|context| async move {
255 let data = b"abcdefghij";
256 let (blob, size) = context
257 .open("partition", b"double-count-regression")
258 .await
259 .unwrap();
260 assert_eq!(size, 0);
261 blob.write_at(0, data).await.unwrap();
262
263 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(8));
264
265 let first = reader.read(6).await.unwrap().coalesce();
267 assert_eq!(first.as_ref(), b"abcdef");
268 assert_eq!(reader.position(), 6);
269
270 let err = reader.read(5).await.unwrap_err();
272 assert!(matches!(err, Error::BlobInsufficientLength));
273 assert_eq!(reader.position(), 6);
274
275 let tail = reader.read(4).await.unwrap().coalesce();
277 assert_eq!(tail.as_ref(), b"ghij");
278 assert_eq!(reader.position(), 10);
279 });
280 }
281
282 #[test_traced]
283 fn test_read_large_data() {
284 let executor = deterministic::Runner::default();
285 executor.start(|context| async move {
286 let data_size = 1024 * 256; let data = vec![0x42; data_size];
289 let (blob, size) = context.open("partition", b"test").await.unwrap();
290 assert_eq!(size, 0);
291 blob.write_at(0, data.clone()).await.unwrap();
292 let size = data.len() as u64;
293
294 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(64 * 1024));
296
297 let mut total_read = 0;
299 let chunk_size = 8 * 1024; while total_read < data_size {
302 let to_read = std::cmp::min(chunk_size, data_size - total_read);
303 let read = reader.read(to_read).await.unwrap().coalesce();
304
305 assert!(
307 read.as_ref().iter().all(|&b| b == 0x42),
308 "Data at position {total_read} is not correct"
309 );
310
311 total_read += to_read;
312 }
313
314 assert_eq!(total_read, data_size);
316
317 let result = reader.read(1).await;
319 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
320 });
321 }
322
323 #[test_traced]
324 fn test_read_exact_size_reads() {
325 let executor = deterministic::Runner::default();
326 executor.start(|context| async move {
327 let buffer_size = 1024;
329 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
331
332 let (blob, size) = context.open("partition", b"test").await.unwrap();
333 assert_eq!(size, 0);
334 blob.write_at(0, data.clone()).await.unwrap();
335 let size = data.len() as u64;
336
337 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(buffer_size));
338
339 let read = reader.read(buffer_size).await.unwrap().coalesce();
341 assert!(read.as_ref().iter().all(|&b| b == 0x37));
342
343 let read = reader.read(buffer_size).await.unwrap().coalesce();
345 assert!(read.as_ref().iter().all(|&b| b == 0x37));
346
347 let half_buffer = buffer_size / 2;
349 let read = reader.read(half_buffer).await.unwrap().coalesce();
350 assert!(read.as_ref().iter().all(|&b| b == 0x37));
351
352 assert_eq!(reader.blob_remaining(), 0);
354 assert_eq!(reader.position(), size);
355 });
356 }
357
358 #[test_traced]
359 fn test_read_structure_single_vs_chunked() {
360 let executor = deterministic::Runner::default();
361 executor.start(|context| async move {
362 let data = b"ABCDEFGHIJKL";
363 let (blob, size) = context.open("partition", b"structural").await.unwrap();
364 assert_eq!(size, 0);
365 blob.write_at(0, data).await.unwrap();
366
367 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(5));
368
369 let first = reader.read(3).await.unwrap();
371 assert!(first.is_single());
372 assert_eq!(first.coalesce().as_ref(), b"ABC");
373
374 let second = reader.read(7).await.unwrap();
376 assert!(!second.is_single());
377 assert_eq!(second.coalesce().as_ref(), b"DEFGHIJ");
378 });
379 }
380
381 #[test_traced]
382 fn test_read_seek_to() {
383 let executor = deterministic::Runner::default();
384 executor.start(|context| async move {
385 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
387 let (blob, size) = context.open("partition", b"test").await.unwrap();
388 assert_eq!(size, 0);
389 blob.write_at(0, data).await.unwrap();
390 let size = data.len() as u64;
391
392 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
394
395 let read = reader.read(5).await.unwrap().coalesce();
397 assert_eq!(read.as_ref(), b"ABCDE");
398 assert_eq!(reader.position(), 5);
399
400 reader.seek_to(10).unwrap();
402 assert_eq!(reader.position(), 10);
403
404 let read = reader.read(5).await.unwrap().coalesce();
406 assert_eq!(read.as_ref(), b"KLMNO");
407
408 reader.seek_to(0).unwrap();
410 assert_eq!(reader.position(), 0);
411
412 let read = reader.read(5).await.unwrap().coalesce();
413 assert_eq!(read.as_ref(), b"ABCDE");
414
415 reader.seek_to(size).unwrap();
417 assert_eq!(reader.position(), size);
418
419 let result = reader.read(1).await;
421 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
422
423 let result = reader.seek_to(size + 10);
425 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
426 });
427 }
428
429 #[test_traced]
430 fn test_read_seek_with_refill() {
431 let executor = deterministic::Runner::default();
432 executor.start(|context| async move {
433 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
436 assert_eq!(size, 0);
437 blob.write_at(0, data.clone()).await.unwrap();
438 let size = data.len() as u64;
439
440 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
442
443 let _ = reader.read(5).await.unwrap().coalesce();
445
446 reader.seek_to(500).unwrap();
448
449 let read = reader.read(5).await.unwrap().coalesce();
451 assert_eq!(read.as_ref(), b"AAAAA"); assert_eq!(reader.position(), 505);
453
454 reader.seek_to(100).unwrap();
456
457 let _ = reader.read(5).await.unwrap().coalesce();
459 assert_eq!(reader.position(), 105);
460 });
461 }
462
463 #[test_traced]
464 fn test_read_seek_within_buffered_range() {
465 let executor = deterministic::Runner::default();
466 executor.start(|context| async move {
467 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
468 let (blob, size) = context.open("partition", b"test").await.unwrap();
469 assert_eq!(size, 0);
470 blob.write_at(0, data).await.unwrap();
471
472 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
473
474 let read = reader.read(6).await.unwrap().coalesce();
476 assert_eq!(read.as_ref(), b"ABCDEF");
477 assert_eq!(reader.position(), 6);
478 assert_eq!(reader.buffer_remaining(), 4);
479
480 reader.seek_to(3).unwrap();
482 assert_eq!(reader.position(), 3);
483 assert_eq!(reader.buffer_remaining(), 7);
484
485 let read = reader.read(5).await.unwrap().coalesce();
486 assert_eq!(read.as_ref(), b"DEFGH");
487 assert_eq!(reader.position(), 8);
488 assert_eq!(reader.buffer_remaining(), 2);
489 });
490 }
491
492 #[test_traced]
493 fn test_read_seek_within_unread_buffer_does_not_refill() {
494 let executor = deterministic::Runner::default();
495 executor.start(|context| async move {
496 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
497 let (blob, size) = context
498 .open("partition", b"seek_unread_no_refill")
499 .await
500 .unwrap();
501 assert_eq!(size, 0);
502 blob.write_at(0, data).await.unwrap();
503
504 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
505
506 let first = reader.read(6).await.unwrap();
508 assert_eq!(first.coalesce().as_ref(), b"ABCDEF");
509 assert_eq!(reader.position(), 6);
510 assert_eq!(reader.buffer_remaining(), 4);
511
512 reader.seek_to(7).unwrap();
514 assert_eq!(reader.position(), 7);
515 assert_eq!(reader.buffer_remaining(), 3);
516
517 let second = reader.read(3).await.unwrap();
519 assert_eq!(second.coalesce().as_ref(), b"HIJ");
520 assert_eq!(reader.position(), 10);
521 assert_eq!(reader.buffer_remaining(), 0);
522
523 let third = reader.read(1).await.unwrap();
525 assert_eq!(third.coalesce().as_ref(), b"K");
526 assert_eq!(reader.position(), 11);
527 assert_eq!(reader.buffer_remaining(), 9);
528 });
529 }
530
531 #[test_traced]
532 fn test_read_resize() {
533 let executor = deterministic::Runner::default();
534 executor.start(|context| async move {
535 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
537 let (blob, size) = context.open("partition", b"test").await.unwrap();
538 assert_eq!(size, 0);
539 blob.write_at(0, data).await.unwrap();
540 let data_len = data.len() as u64;
541
542 let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
544
545 let resize_len = data_len / 2;
547 reader.resize(resize_len).await.unwrap();
548
549 let (blob, size) = context.open("partition", b"test").await.unwrap();
551 assert_eq!(size, resize_len, "Blob should be resized to half size");
552
553 let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
555
556 let read = new_reader.read(size as usize).await.unwrap().coalesce();
558 assert_eq!(
559 read.as_ref(),
560 b"ABCDEFGHIJKLM",
561 "Resized content should match"
562 );
563
564 let result = new_reader.read(1).await;
566 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
567
568 new_reader.resize(data_len * 2).await.unwrap();
570
571 let (blob, new_size) = context.open("partition", b"test").await.unwrap();
573 assert_eq!(new_size, data_len * 2);
574
575 let mut new_reader = Read::from_pooler(&context, blob, new_size, NZUsize!(10));
577 let read = new_reader.read(new_size as usize).await.unwrap().coalesce();
578 assert_eq!(&read.as_ref()[..size as usize], b"ABCDEFGHIJKLM");
579 assert_eq!(
580 &read.as_ref()[size as usize..],
581 vec![0u8; new_size as usize - size as usize]
582 );
583 });
584 }
585
586 #[test_traced]
587 fn test_read_resize_to_zero() {
588 let executor = deterministic::Runner::default();
589 executor.start(|context| async move {
590 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
592 let data_len = data.len() as u64;
593 let (blob, size) = context.open("partition", b"test").await.unwrap();
594 assert_eq!(size, 0);
595 blob.write_at(0, data).await.unwrap();
596
597 let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
599
600 reader.resize(0).await.unwrap();
602
603 let (blob, size) = context.open("partition", b"test").await.unwrap();
605 assert_eq!(size, 0, "Blob should be resized to zero");
606
607 let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
609
610 let result = new_reader.read(1).await;
612 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
613 });
614 }
615
616 #[test_traced]
617 fn test_write_basic() {
618 let executor = deterministic::Runner::default();
619 executor.start(|context| async move {
620 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
622 assert_eq!(size, 0);
623
624 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(8));
625 writer.write_at(0, b"hello").await.unwrap();
626 assert_eq!(writer.size().await, 5);
627 writer.sync().await.unwrap();
628 assert_eq!(writer.size().await, 5);
629
630 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
632 assert_eq!(size, 5);
633 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
634 let read = reader.read(5).await.unwrap().coalesce();
635 assert_eq!(read.as_ref(), b"hello");
636 });
637 }
638
639 #[test_traced]
640 fn test_write_multiple_flushes() {
641 let executor = deterministic::Runner::default();
642 executor.start(|context| async move {
643 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
645 assert_eq!(size, 0);
646
647 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
648 writer.write_at(0, b"abc").await.unwrap();
649 assert_eq!(writer.size().await, 3);
650 writer.write_at(3, b"defg").await.unwrap();
651 assert_eq!(writer.size().await, 7);
652 writer.sync().await.unwrap();
653
654 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
656 assert_eq!(size, 7);
657 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
658 let read = reader.read(7).await.unwrap().coalesce();
659 assert_eq!(read.as_ref(), b"abcdefg");
660 });
661 }
662
663 #[test_traced]
664 fn test_write_large_data() {
665 let executor = deterministic::Runner::default();
666 executor.start(|context| async move {
667 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
669 assert_eq!(size, 0);
670
671 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
672 writer.write_at(0, b"abc").await.unwrap();
673 assert_eq!(writer.size().await, 3);
674 writer
675 .write_at(3, b"defghijklmnopqrstuvwxyz")
676 .await
677 .unwrap();
678 assert_eq!(writer.size().await, 26);
679 writer.sync().await.unwrap();
680 assert_eq!(writer.size().await, 26);
681
682 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
684 assert_eq!(size, 26);
685 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
686 let read = reader.read(26).await.unwrap().coalesce();
687 assert_eq!(read.as_ref(), b"abcdefghijklmnopqrstuvwxyz");
688 });
689 }
690
691 #[test_traced]
692 fn test_write_append_to_buffer() {
693 let executor = deterministic::Runner::default();
694 executor.start(|context| async move {
695 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
697 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
698
699 writer.write_at(0, b"hello").await.unwrap();
701 assert_eq!(writer.size().await, 5);
702
703 writer.write_at(5, b" world").await.unwrap();
705 writer.sync().await.unwrap();
706 assert_eq!(writer.size().await, 11);
707
708 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
710 assert_eq!(size, 11);
711 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
712 let read = reader.read(11).await.unwrap().coalesce();
713 assert_eq!(read.as_ref(), b"hello world");
714 });
715 }
716
717 #[test_traced]
718 fn test_write_into_middle_of_buffer() {
719 let executor = deterministic::Runner::default();
720 executor.start(|context| async move {
721 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
723 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
724
725 writer.write_at(0, b"abcdefghij").await.unwrap();
727 assert_eq!(writer.size().await, 10);
728
729 writer.write_at(2, b"01234").await.unwrap();
731 assert_eq!(writer.size().await, 10);
732 writer.sync().await.unwrap();
733
734 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
736 assert_eq!(size, 10);
737 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
738 let read = reader.read(10).await.unwrap().coalesce();
739 assert_eq!(read.as_ref(), b"ab01234hij");
740
741 writer.write_at(10, b"klmnopqrst").await.unwrap();
743 assert_eq!(writer.size().await, 20);
744 writer.write_at(9, b"wxyz").await.unwrap();
745 assert_eq!(writer.size().await, 20);
746 writer.sync().await.unwrap();
747
748 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
750 assert_eq!(size, 20);
751 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
752 let read = reader.read(20).await.unwrap().coalesce();
753 assert_eq!(read.as_ref(), b"ab01234hiwxyznopqrst");
754 });
755 }
756
757 #[test_traced]
758 fn test_write_before_buffer() {
759 let executor = deterministic::Runner::default();
760 executor.start(|context| async move {
761 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
763 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
764
765 writer.write_at(10, b"0123456789").await.unwrap();
767 assert_eq!(writer.size().await, 20);
768
769 writer.write_at(0, b"abcde").await.unwrap();
771 assert_eq!(writer.size().await, 20);
772 writer.sync().await.unwrap();
773
774 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
776 assert_eq!(size, 20);
777 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
778 let read = reader.read(20).await.unwrap().coalesce();
779 let mut expected = vec![0u8; 20];
780 expected[0..5].copy_from_slice("abcde".as_bytes());
781 expected[10..20].copy_from_slice("0123456789".as_bytes());
782 assert_eq!(read.as_ref(), expected.as_slice());
783
784 writer.write_at(5, b"fghij").await.unwrap();
786 assert_eq!(writer.size().await, 20);
787 writer.sync().await.unwrap();
788 assert_eq!(writer.size().await, 20);
789
790 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
792 assert_eq!(size, 20);
793 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
794 let read = reader.read(20).await.unwrap().coalesce();
795 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
796 assert_eq!(read.as_ref(), expected.as_slice());
797 });
798 }
799
800 #[test_traced]
801 fn test_write_resize() {
802 let executor = deterministic::Runner::default();
803 executor.start(|context| async move {
804 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
806 let writer = Write::from_pooler(&context, blob, size, NZUsize!(10));
807
808 writer.write_at(0, b"hello world").await.unwrap();
810 assert_eq!(writer.size().await, 11);
811 writer.sync().await.unwrap();
812 assert_eq!(writer.size().await, 11);
813
814 let (blob_check, size_check) =
815 context.open("partition", b"resize_write").await.unwrap();
816 assert_eq!(size_check, 11);
817 drop(blob_check);
818
819 writer.resize(5).await.unwrap();
821 assert_eq!(writer.size().await, 5);
822 writer.sync().await.unwrap();
823
824 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
826 assert_eq!(size, 5);
827 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
828 let read = reader.read(5).await.unwrap().coalesce();
829 assert_eq!(read.as_ref(), b"hello");
830
831 writer.write_at(0, b"X").await.unwrap();
833 assert_eq!(writer.size().await, 5);
834 writer.sync().await.unwrap();
835
836 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
838 assert_eq!(size, 5);
839 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
840 let read = reader.read(5).await.unwrap().coalesce();
841 assert_eq!(read.as_ref(), b"Xello");
842
843 writer.resize(10).await.unwrap();
845 assert_eq!(writer.size().await, 10);
846 writer.sync().await.unwrap();
847
848 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
850 assert_eq!(size, 10);
851 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
852 let read = reader.read(10).await.unwrap().coalesce();
853 assert_eq!(&read.as_ref()[0..5], b"Xello");
854 assert_eq!(&read.as_ref()[5..10], [0u8; 5]);
855
856 let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
858 let writer_zero = Write::from_pooler(&context, blob_zero.clone(), size, NZUsize!(10));
859 writer_zero.write_at(0, b"some data").await.unwrap();
860 assert_eq!(writer_zero.size().await, 9);
861 writer_zero.sync().await.unwrap();
862 assert_eq!(writer_zero.size().await, 9);
863 writer_zero.resize(0).await.unwrap();
864 assert_eq!(writer_zero.size().await, 0);
865 writer_zero.sync().await.unwrap();
866 assert_eq!(writer_zero.size().await, 0);
867
868 let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
870 assert_eq!(size_z, 0);
871 });
872 }
873
874 #[test_traced]
875 fn test_write_read_at_on_writer() {
876 let executor = deterministic::Runner::default();
877 executor.start(|context| async move {
878 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
880 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
881
882 writer.write_at(0, b"buffered").await.unwrap();
884 assert_eq!(writer.size().await, 8);
885
886 let read_buf_vec = writer.read_at(0, 4).await.unwrap().coalesce();
888 assert_eq!(read_buf_vec, b"buff");
889
890 let read_buf_vec = writer.read_at(4, 4).await.unwrap().coalesce();
891 assert_eq!(read_buf_vec, b"ered");
892
893 assert!(writer.read_at(8, 1).await.is_err());
895
896 writer.write_at(8, b" and flushed").await.unwrap();
898 assert_eq!(writer.size().await, 20);
899 writer.sync().await.unwrap();
900 assert_eq!(writer.size().await, 20);
901
902 let read_buf_vec_2 = writer.read_at(0, 4).await.unwrap().coalesce();
904 assert_eq!(read_buf_vec_2, b"buff");
905
906 let read_buf_7_vec = writer.read_at(13, 7).await.unwrap().coalesce();
907 assert_eq!(read_buf_7_vec, b"flushed");
908
909 writer.write_at(20, b" more data").await.unwrap();
911 assert_eq!(writer.size().await, 30);
912
913 let read_buf_vec_3 = writer.read_at(20, 5).await.unwrap().coalesce();
915 assert_eq!(read_buf_vec_3, b" more");
916
917 let combo_read_buf_vec = writer.read_at(16, 12).await.unwrap();
919 assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
920
921 writer.sync().await.unwrap();
923 assert_eq!(writer.size().await, 30);
924 let (final_blob, final_size) =
925 context.open("partition", b"read_at_writer").await.unwrap();
926 assert_eq!(final_size, 30);
927 let mut final_reader =
928 Read::from_pooler(&context, final_blob, final_size, NZUsize!(30));
929 let read = final_reader.read(30).await.unwrap().coalesce();
930 assert_eq!(read.as_ref(), b"buffered and flushed more data");
931 });
932 }
933
934 #[test_traced]
935 fn test_write_zero_length_read_past_eof_errors() {
936 let executor = deterministic::Runner::default();
937 executor.start(|context| async move {
938 let (blob, size) = context.open("partition", b"zero_len_probe").await.unwrap();
939 let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
940 writer.write_at(0, b"abc").await.unwrap();
941
942 let empty = writer.read_at(3, 0).await.unwrap();
943 assert!(empty.is_empty());
944
945 let err = writer.read_at(4, 0).await.unwrap_err();
946 assert!(matches!(err, Error::BlobInsufficientLength));
947 });
948 }
949
950 #[test_traced]
951 fn test_write_read_at_blocks_concurrent_write_until_persisted_read_completes() {
952 let executor = deterministic::Runner::default();
953 executor.start(|context| async move {
954 let (blob, read_started_rx, release_read_tx) =
955 BlockingReadBlob::new(b"abcdefghij".to_vec());
956 let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
957 let reader = writer.clone();
958 let verifier = writer.clone();
959
960 let read_task = context
962 .clone()
963 .spawn(move |_| async move { reader.read_at(0, 4).await.expect("read failed") });
964
965 read_started_rx.await.expect("read start signal missing");
967
968 let write_task = context.clone().spawn(move |_| async move {
969 writer.write_at(0, b"WXYZ").await.expect("write failed");
970 });
971 pin_mut!(write_task);
972
973 context.sleep(Duration::from_secs(1)).await;
975 assert!(
976 write_task.as_mut().now_or_never().is_none(),
977 "write_at completed while read_at still held lock over blob I/O"
978 );
979
980 release_read_tx
982 .send(())
983 .expect("failed to release blocked read");
984 let read_result = read_task.await.expect("read task failed").coalesce();
985 assert_eq!(read_result.as_ref(), b"abcd");
986 write_task.await.expect("write task failed");
987
988 let updated = verifier.read_at(0, 4).await.unwrap().coalesce();
989 assert_eq!(updated.as_ref(), b"WXYZ");
990 });
991 }
992
993 #[test_traced]
994 fn test_write_read_at_overlap_blocks_concurrent_write_until_persisted_read_completes() {
995 let executor = deterministic::Runner::default();
996 executor.start(|context| async move {
997 let (blob, read_started_rx, release_read_tx) =
998 BlockingReadBlob::new(b"abcdefghij".to_vec());
999 let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
1000 let verifier = writer.clone();
1001
1002 writer.write_at(10, b"XYZ").await.unwrap();
1004
1005 let reader = writer.clone();
1007 let read_task = context
1008 .clone()
1009 .spawn(move |_| async move { reader.read_at(8, 5).await.expect("read failed") });
1010
1011 read_started_rx.await.expect("read start signal missing");
1013
1014 let write_task = context.clone().spawn(move |_| async move {
1015 writer.write_at(10, b"UVW").await.expect("write failed");
1016 });
1017 pin_mut!(write_task);
1018
1019 context.sleep(Duration::from_secs(1)).await;
1021 assert!(
1022 write_task.as_mut().now_or_never().is_none(),
1023 "write_at completed while overlap read_at still held lock over blob I/O"
1024 );
1025
1026 release_read_tx
1028 .send(())
1029 .expect("failed to release blocked read");
1030 let read_result = read_task.await.expect("read task failed").coalesce();
1031 assert_eq!(read_result.as_ref(), b"ijXYZ");
1032 write_task.await.expect("write task failed");
1033
1034 let updated = verifier.read_at(8, 5).await.unwrap().coalesce();
1035 assert_eq!(updated.as_ref(), b"ijUVW");
1036 });
1037 }
1038
1039 #[test_traced]
1040 fn test_write_straddling_non_mergeable() {
1041 let executor = deterministic::Runner::default();
1042 executor.start(|context| async move {
1043 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
1045 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1046
1047 writer.write_at(0, b"0123456789").await.unwrap();
1049 assert_eq!(writer.size().await, 10);
1050
1051 writer.write_at(15, b"abc").await.unwrap();
1053 assert_eq!(writer.size().await, 18);
1054 writer.sync().await.unwrap();
1055 assert_eq!(writer.size().await, 18);
1056
1057 let (blob_check, size_check) =
1059 context.open("partition", b"write_straddle").await.unwrap();
1060 assert_eq!(size_check, 18);
1061 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(20));
1062 let read = reader.read(18).await.unwrap().coalesce();
1063
1064 let mut expected = vec![0u8; 18];
1065 expected[0..10].copy_from_slice(b"0123456789");
1066 expected[15..18].copy_from_slice(b"abc");
1067 assert_eq!(read.as_ref(), expected.as_slice());
1068
1069 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
1071 let writer2 = Write::from_pooler(&context, blob2.clone(), size, NZUsize!(10));
1072 writer2.write_at(0, b"0123456789").await.unwrap();
1073 assert_eq!(writer2.size().await, 10);
1074
1075 writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
1077 assert_eq!(writer2.size().await, 17);
1078 writer2.sync().await.unwrap();
1079 assert_eq!(writer2.size().await, 17);
1080
1081 let (blob_check2, size_check2) =
1083 context.open("partition", b"write_straddle2").await.unwrap();
1084 assert_eq!(size_check2, 17);
1085 let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(20));
1086 let read = reader2.read(17).await.unwrap().coalesce();
1087 assert_eq!(read.as_ref(), b"01234ABCDEFGHIJKL");
1088 });
1089 }
1090
1091 #[test_traced]
1092 fn test_write_close() {
1093 let executor = deterministic::Runner::default();
1094 executor.start(|context| async move {
1095 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
1097 let writer = Write::from_pooler(&context, blob_orig.clone(), size, NZUsize!(8));
1098 writer.write_at(0, b"pending").await.unwrap();
1099 assert_eq!(writer.size().await, 7);
1100
1101 writer.sync().await.unwrap();
1103
1104 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
1106 assert_eq!(size_check, 7);
1107 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(8));
1108 let read = reader.read(7).await.unwrap().coalesce();
1109 assert_eq!(read.as_ref(), b"pending");
1110 });
1111 }
1112
1113 #[test_traced]
1114 fn test_write_direct_due_to_size() {
1115 let executor = deterministic::Runner::default();
1116 executor.start(|context| async move {
1117 let (blob, size) = context
1119 .open("partition", b"write_direct_size")
1120 .await
1121 .unwrap();
1122 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1123
1124 let data_large = b"0123456789";
1126 writer.write_at(0, data_large).await.unwrap();
1127 assert_eq!(writer.size().await, 10);
1128
1129 writer.sync().await.unwrap();
1131
1132 let (blob_check, size_check) = context
1134 .open("partition", b"write_direct_size")
1135 .await
1136 .unwrap();
1137 assert_eq!(size_check, 10);
1138 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1139 let read = reader.read(10).await.unwrap().coalesce();
1140 assert_eq!(read.as_ref(), data_large.as_slice());
1141
1142 writer.write_at(10, b"abc").await.unwrap();
1144 assert_eq!(writer.size().await, 13);
1145
1146 let read_small_buf_vec = writer.read_at(10, 3).await.unwrap().coalesce();
1148 assert_eq!(read_small_buf_vec, b"abc");
1149
1150 writer.sync().await.unwrap();
1151
1152 let (blob_check2, size_check2) = context
1154 .open("partition", b"write_direct_size")
1155 .await
1156 .unwrap();
1157 assert_eq!(size_check2, 13);
1158 let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(13));
1159 let read = reader2.read(13).await.unwrap().coalesce();
1160 assert_eq!(&read.as_ref()[10..], b"abc".as_slice());
1161 });
1162 }
1163
1164 #[test_traced]
1165 fn test_write_overwrite_and_extend_in_buffer() {
1166 let executor = deterministic::Runner::default();
1167 executor.start(|context| async move {
1168 let (blob, size) = context
1170 .open("partition", b"overwrite_extend_buf")
1171 .await
1172 .unwrap();
1173 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(15));
1174
1175 writer.write_at(0, b"0123456789").await.unwrap();
1177 assert_eq!(writer.size().await, 10);
1178
1179 writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
1181 assert_eq!(writer.size().await, 15);
1182
1183 let read_buf_vec = writer.read_at(0, 15).await.unwrap().coalesce();
1185 assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
1186
1187 writer.sync().await.unwrap();
1188
1189 let (blob_check, size_check) = context
1191 .open("partition", b"overwrite_extend_buf")
1192 .await
1193 .unwrap();
1194 assert_eq!(size_check, 15);
1195 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(15));
1196 let read = reader.read(15).await.unwrap().coalesce();
1197 assert_eq!(read.as_ref(), b"01234ABCDEFGHIJ".as_slice());
1198 });
1199 }
1200
1201 #[test_traced]
1202 fn test_write_at_size() {
1203 let executor = deterministic::Runner::default();
1204 executor.start(|context| async move {
1205 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
1207 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
1208
1209 writer.write_at(0, b"0123456789").await.unwrap();
1211 assert_eq!(writer.size().await, 10);
1212 writer.sync().await.unwrap();
1213
1214 writer.write_at(writer.size().await, b"abc").await.unwrap();
1216 assert_eq!(writer.size().await, 13);
1217 writer.sync().await.unwrap();
1218
1219 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
1221 assert_eq!(size_check, 13);
1222 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(13));
1223 let read = reader.read(13).await.unwrap().coalesce();
1224 assert_eq!(read.as_ref(), b"0123456789abc");
1225 });
1226 }
1227
1228 #[test_traced]
1229 fn test_write_at_size_multiple_appends() {
1230 let executor = deterministic::Runner::default();
1231 executor.start(|context| async move {
1232 let (blob, size) = context
1234 .open("partition", b"write_multiple_appends_at_size")
1235 .await
1236 .unwrap();
1237 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1238
1239 writer.write_at(0, b"AAA").await.unwrap();
1241 assert_eq!(writer.size().await, 3);
1242 writer.sync().await.unwrap();
1243 assert_eq!(writer.size().await, 3);
1244
1245 writer.write_at(writer.size().await, b"BBB").await.unwrap();
1247 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
1249 assert_eq!(writer.size().await, 6);
1250
1251 writer.write_at(writer.size().await, b"CCC").await.unwrap();
1253 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
1255 assert_eq!(writer.size().await, 9);
1256
1257 let (blob_check, size_check) = context
1259 .open("partition", b"write_multiple_appends_at_size")
1260 .await
1261 .unwrap();
1262 assert_eq!(size_check, 9);
1263 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(9));
1264 let read = reader.read(9).await.unwrap().coalesce();
1265 assert_eq!(read.as_ref(), b"AAABBBCCC");
1266 });
1267 }
1268
1269 #[test_traced]
1270 fn test_write_non_contiguous_then_append_at_size() {
1271 let executor = deterministic::Runner::default();
1272 executor.start(|context| async move {
1273 let (blob, size) = context
1275 .open("partition", b"write_non_contiguous_then_append")
1276 .await
1277 .unwrap();
1278 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1279
1280 writer.write_at(0, b"INITIAL").await.unwrap(); assert_eq!(writer.size().await, 7);
1283 writer.write_at(20, b"NONCONTIG").await.unwrap();
1287 assert_eq!(writer.size().await, 29);
1288 writer.sync().await.unwrap();
1289 assert_eq!(writer.size().await, 29);
1290
1291 writer
1293 .write_at(writer.size().await, b"APPEND")
1294 .await
1295 .unwrap();
1296 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1298 assert_eq!(writer.size().await, 35);
1299
1300 let (blob_check, size_check) = context
1302 .open("partition", b"write_non_contiguous_then_append")
1303 .await
1304 .unwrap();
1305 assert_eq!(size_check, 35);
1306 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(35));
1307 let read = reader.read(35).await.unwrap().coalesce();
1308
1309 let mut expected = vec![0u8; 35];
1310 expected[0..7].copy_from_slice(b"INITIAL");
1311 expected[20..29].copy_from_slice(b"NONCONTIG");
1312 expected[29..35].copy_from_slice(b"APPEND");
1313 assert_eq!(read.as_ref(), expected.as_slice());
1314 });
1315 }
1316
1317 #[test_traced]
1318 fn test_resize_then_append_at_size() {
1319 let executor = deterministic::Runner::default();
1320 executor.start(|context| async move {
1321 let (blob, size) = context
1323 .open("partition", b"resize_then_append_at_size")
1324 .await
1325 .unwrap();
1326 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1327
1328 writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); assert_eq!(writer.size().await, 16);
1331 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1333
1334 let resize_to = 5;
1336 writer.resize(resize_to).await.unwrap();
1337 assert_eq!(writer.size().await, resize_to);
1340 writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
1342
1343 writer
1345 .write_at(writer.size().await, b"XXXXX")
1346 .await
1347 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1351 assert_eq!(writer.size().await, 10);
1352
1353 let (blob_check, size_check) = context
1355 .open("partition", b"resize_then_append_at_size")
1356 .await
1357 .unwrap();
1358 assert_eq!(size_check, 10);
1359 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1360 let read = reader.read(10).await.unwrap().coalesce();
1361 assert_eq!(read.as_ref(), b"01234XXXXX");
1362 });
1363 }
1364}