1pub mod paged;
4mod read;
5mod tip;
6mod write;
7
8pub use read::Read;
9pub use write::Write;
10
11#[cfg(test)]
12mod tests {
13 use super::*;
14 use crate::{deterministic, Blob as _, Error, IoBufMut, Runner, Storage};
15 use commonware_macros::test_traced;
16 use commonware_utils::NZUsize;
17
18 #[test_traced]
19 fn test_read_basic() {
20 let executor = deterministic::Runner::default();
21 executor.start(|context| async move {
22 let data = b"Hello, world! This is a test.";
24 let (blob, size) = context.open("partition", b"test").await.unwrap();
25 assert_eq!(size, 0);
26 blob.write_at(0, data).await.unwrap();
27 let size = data.len() as u64;
28
29 let mut reader = Read::new(blob, size, NZUsize!(10));
31
32 let mut buf = [0u8; 5];
34 reader.read_exact(&mut buf, 5).await.unwrap();
35 assert_eq!(&buf, b"Hello");
36
37 let mut buf = [0u8; 14];
39 reader.read_exact(&mut buf, 14).await.unwrap();
40 assert_eq!(&buf, b", world! This ");
41
42 assert_eq!(reader.position(), 19);
44
45 let mut buf = [0u8; 10];
47 reader.read_exact(&mut buf, 7).await.unwrap();
48 assert_eq!(&buf[..7], b"is a te");
49
50 let mut buf = [0u8; 5];
52 let result = reader.read_exact(&mut buf, 5).await;
53 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
54 });
55 }
56
57 #[test_traced]
58 fn test_read_cross_boundary() {
59 let executor = deterministic::Runner::default();
60 executor.start(|context| async move {
61 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
63 let (blob, size) = context.open("partition", b"test").await.unwrap();
64 assert_eq!(size, 0);
65 blob.write_at(0, data).await.unwrap();
66 let size = data.len() as u64;
67
68 let mut reader = Read::new(blob, size, NZUsize!(10));
70
71 let mut buf = [0u8; 15];
73 reader.read_exact(&mut buf, 15).await.unwrap();
74 assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
75
76 assert_eq!(reader.position(), 15);
78
79 let mut buf = [0u8; 11];
81 reader.read_exact(&mut buf, 11).await.unwrap();
82 assert_eq!(&buf, b"PQRSTUVWXYZ");
83
84 assert_eq!(reader.position(), 26);
86 assert_eq!(reader.blob_remaining(), 0);
87 });
88 }
89
90 #[test_traced]
92 fn test_read_to_end_then_rewind_and_read_again() {
93 let executor = deterministic::Runner::default();
94 executor.start(|context| async move {
95 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
96 let (blob, size) = context.open("partition", b"test").await.unwrap();
97 assert_eq!(size, 0);
98 blob.write_at(0, data).await.unwrap();
99 let size = data.len() as u64;
100
101 let mut reader = Read::new(blob, size, NZUsize!(20));
102
103 let mut buf = [0u8; 21];
105 reader.read_exact(&mut buf, 21).await.unwrap();
106 assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
107
108 assert_eq!(reader.position(), 21);
110
111 let mut buf = [0u8; 5];
113 reader.read_exact(&mut buf, 5).await.unwrap();
114 assert_eq!(&buf, b"VWXYZ");
115
116 reader.seek_to(0).unwrap();
118 let mut buf = [0u8; 21];
119 reader.read_exact(&mut buf, 21).await.unwrap();
120 assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
121 });
122 }
123
124 #[test_traced]
125 fn test_read_with_known_size() {
126 let executor = deterministic::Runner::default();
127 executor.start(|context| async move {
128 let data = b"This is a test with known size limitations.";
130 let (blob, size) = context.open("partition", b"test").await.unwrap();
131 assert_eq!(size, 0);
132 blob.write_at(0, data).await.unwrap();
133 let size = data.len() as u64;
134
135 let mut reader = Read::new(blob, size, NZUsize!(10));
137
138 assert_eq!(reader.blob_remaining(), size);
140
141 let mut buf = [0u8; 5];
143 reader.read_exact(&mut buf, 5).await.unwrap();
144 assert_eq!(&buf, b"This ");
145
146 assert_eq!(reader.blob_remaining(), size - 5);
148
149 let mut buf = vec![0u8; (size - 5) as usize];
151 reader
152 .read_exact(&mut buf, (size - 5) as usize)
153 .await
154 .unwrap();
155 assert_eq!(&buf, b"is a test with known size limitations.");
156
157 assert_eq!(reader.blob_remaining(), 0);
159
160 let mut buf = [0u8; 1];
162 let result = reader.read_exact(&mut buf, 1).await;
163 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
164 });
165 }
166
167 #[test_traced]
168 fn test_read_large_data() {
169 let executor = deterministic::Runner::default();
170 executor.start(|context| async move {
171 let data_size = 1024 * 256; let data = vec![0x42; data_size];
174 let (blob, size) = context.open("partition", b"test").await.unwrap();
175 assert_eq!(size, 0);
176 blob.write_at(0, data.clone()).await.unwrap();
177 let size = data.len() as u64;
178
179 let mut reader = Read::new(blob, size, NZUsize!(64 * 1024)); let mut total_read = 0;
184 let chunk_size = 8 * 1024; let mut buf = vec![0u8; chunk_size];
186
187 while total_read < data_size {
188 let to_read = std::cmp::min(chunk_size, data_size - total_read);
189 reader
190 .read_exact(&mut buf[..to_read], to_read)
191 .await
192 .unwrap();
193
194 assert!(
196 buf[..to_read].iter().all(|&b| b == 0x42),
197 "Data at position {total_read} is not correct"
198 );
199
200 total_read += to_read;
201 }
202
203 assert_eq!(total_read, data_size);
205
206 let mut extra_buf = [0u8; 1];
208 let result = reader.read_exact(&mut extra_buf, 1).await;
209 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
210 });
211 }
212
213 #[test_traced]
214 fn test_read_exact_size_reads() {
215 let executor = deterministic::Runner::default();
216 executor.start(|context| async move {
217 let buffer_size = 1024;
219 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
221
222 let (blob, size) = context.open("partition", b"test").await.unwrap();
223 assert_eq!(size, 0);
224 blob.write_at(0, data.clone()).await.unwrap();
225 let size = data.len() as u64;
226
227 let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
228
229 let mut buf1 = vec![0u8; buffer_size];
231 reader.read_exact(&mut buf1, buffer_size).await.unwrap();
232 assert!(buf1.iter().all(|&b| b == 0x37));
233
234 let mut buf2 = vec![0u8; buffer_size];
236 reader.read_exact(&mut buf2, buffer_size).await.unwrap();
237 assert!(buf2.iter().all(|&b| b == 0x37));
238
239 let half_buffer = buffer_size / 2;
241 let mut buf3 = vec![0u8; half_buffer];
242 reader.read_exact(&mut buf3, half_buffer).await.unwrap();
243 assert!(buf3.iter().all(|&b| b == 0x37));
244
245 assert_eq!(reader.blob_remaining(), 0);
247 assert_eq!(reader.position(), size);
248 });
249 }
250
251 #[test_traced]
252 fn test_read_seek_to() {
253 let executor = deterministic::Runner::default();
254 executor.start(|context| async move {
255 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
257 let (blob, size) = context.open("partition", b"test").await.unwrap();
258 assert_eq!(size, 0);
259 blob.write_at(0, data).await.unwrap();
260 let size = data.len() as u64;
261
262 let mut reader = Read::new(blob, size, NZUsize!(10));
264
265 let mut buf = [0u8; 5];
267 reader.read_exact(&mut buf, 5).await.unwrap();
268 assert_eq!(&buf, b"ABCDE");
269 assert_eq!(reader.position(), 5);
270
271 reader.seek_to(10).unwrap();
273 assert_eq!(reader.position(), 10);
274
275 let mut buf = [0u8; 5];
277 reader.read_exact(&mut buf, 5).await.unwrap();
278 assert_eq!(&buf, b"KLMNO");
279
280 reader.seek_to(0).unwrap();
282 assert_eq!(reader.position(), 0);
283
284 let mut buf = [0u8; 5];
285 reader.read_exact(&mut buf, 5).await.unwrap();
286 assert_eq!(&buf, b"ABCDE");
287
288 reader.seek_to(size).unwrap();
290 assert_eq!(reader.position(), size);
291
292 let mut buf = [0u8; 1];
294 let result = reader.read_exact(&mut buf, 1).await;
295 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
296
297 let result = reader.seek_to(size + 10);
299 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
300 });
301 }
302
303 #[test_traced]
304 fn test_read_seek_with_refill() {
305 let executor = deterministic::Runner::default();
306 executor.start(|context| async move {
307 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
310 assert_eq!(size, 0);
311 blob.write_at(0, data.clone()).await.unwrap();
312 let size = data.len() as u64;
313
314 let mut reader = Read::new(blob, size, NZUsize!(10));
316
317 let mut buf = [0u8; 5];
319 reader.read_exact(&mut buf, 5).await.unwrap();
320
321 reader.seek_to(500).unwrap();
323
324 let mut buf = [0u8; 5];
326 reader.read_exact(&mut buf, 5).await.unwrap();
327 assert_eq!(&buf, b"AAAAA"); assert_eq!(reader.position(), 505);
329
330 reader.seek_to(100).unwrap();
332
333 let mut buf = [0u8; 5];
335 reader.read_exact(&mut buf, 5).await.unwrap();
336 assert_eq!(reader.position(), 105);
337 });
338 }
339
340 #[test_traced]
341 fn test_read_resize() {
342 let executor = deterministic::Runner::default();
343 executor.start(|context| async move {
344 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
346 let (blob, size) = context.open("partition", b"test").await.unwrap();
347 assert_eq!(size, 0);
348 blob.write_at(0, data).await.unwrap();
349 let data_len = data.len() as u64;
350
351 let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
353
354 let resize_len = data_len / 2;
356 reader.resize(resize_len).await.unwrap();
357
358 let (blob, size) = context.open("partition", b"test").await.unwrap();
360 assert_eq!(size, resize_len, "Blob should be resized to half size");
361
362 let mut new_reader = Read::new(blob, size, NZUsize!(10));
364
365 let mut buf = vec![0u8; size as usize];
367 new_reader
368 .read_exact(&mut buf, size as usize)
369 .await
370 .unwrap();
371 assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
372
373 let mut extra_buf = [0u8; 1];
375 let result = new_reader.read_exact(&mut extra_buf, 1).await;
376 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
377
378 new_reader.resize(data_len * 2).await.unwrap();
380
381 let (blob, new_size) = context.open("partition", b"test").await.unwrap();
383 assert_eq!(new_size, data_len * 2);
384
385 let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
387 let mut buf = vec![0u8; new_size as usize];
388 new_reader
389 .read_exact(&mut buf, new_size as usize)
390 .await
391 .unwrap();
392 assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
393 assert_eq!(
394 &buf[size as usize..],
395 vec![0u8; new_size as usize - size as usize]
396 );
397 });
398 }
399
400 #[test_traced]
401 fn test_read_resize_to_zero() {
402 let executor = deterministic::Runner::default();
403 executor.start(|context| async move {
404 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
406 let data_len = data.len() as u64;
407 let (blob, size) = context.open("partition", b"test").await.unwrap();
408 assert_eq!(size, 0);
409 blob.write_at(0, data).await.unwrap();
410
411 let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
413
414 reader.resize(0).await.unwrap();
416
417 let (blob, size) = context.open("partition", b"test").await.unwrap();
419 assert_eq!(size, 0, "Blob should be resized to zero");
420
421 let mut new_reader = Read::new(blob, size, NZUsize!(10));
423
424 let mut buf = [0u8; 1];
426 let result = new_reader.read_exact(&mut buf, 1).await;
427 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
428 });
429 }
430
431 #[test_traced]
432 fn test_write_basic() {
433 let executor = deterministic::Runner::default();
434 executor.start(|context| async move {
435 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
437 assert_eq!(size, 0);
438
439 let writer = Write::new(blob.clone(), size, NZUsize!(8));
440 writer.write_at(0, b"hello").await.unwrap();
441 assert_eq!(writer.size().await, 5);
442 writer.sync().await.unwrap();
443 assert_eq!(writer.size().await, 5);
444
445 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
447 assert_eq!(size, 5);
448 let mut reader = Read::new(blob, size, NZUsize!(8));
449 let mut buf = [0u8; 5];
450 reader.read_exact(&mut buf, 5).await.unwrap();
451 assert_eq!(&buf, b"hello");
452 });
453 }
454
455 #[test_traced]
456 fn test_write_multiple_flushes() {
457 let executor = deterministic::Runner::default();
458 executor.start(|context| async move {
459 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
461 assert_eq!(size, 0);
462
463 let writer = Write::new(blob.clone(), size, NZUsize!(4));
464 writer.write_at(0, b"abc").await.unwrap();
465 assert_eq!(writer.size().await, 3);
466 writer.write_at(3, b"defg").await.unwrap();
467 assert_eq!(writer.size().await, 7);
468 writer.sync().await.unwrap();
469
470 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
472 assert_eq!(size, 7);
473 let mut reader = Read::new(blob, size, NZUsize!(4));
474 let mut buf = [0u8; 7];
475 reader.read_exact(&mut buf, 7).await.unwrap();
476 assert_eq!(&buf, b"abcdefg");
477 });
478 }
479
480 #[test_traced]
481 fn test_write_large_data() {
482 let executor = deterministic::Runner::default();
483 executor.start(|context| async move {
484 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
486 assert_eq!(size, 0);
487
488 let writer = Write::new(blob.clone(), size, NZUsize!(4));
489 writer.write_at(0, b"abc").await.unwrap();
490 assert_eq!(writer.size().await, 3);
491 writer
492 .write_at(3, b"defghijklmnopqrstuvwxyz")
493 .await
494 .unwrap();
495 assert_eq!(writer.size().await, 26);
496 writer.sync().await.unwrap();
497 assert_eq!(writer.size().await, 26);
498
499 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
501 assert_eq!(size, 26);
502 let mut reader = Read::new(blob, size, NZUsize!(4));
503 let mut buf = [0u8; 26];
504 reader.read_exact(&mut buf, 26).await.unwrap();
505 assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
506 });
507 }
508
509 #[test_traced]
510 fn test_write_append_to_buffer() {
511 let executor = deterministic::Runner::default();
512 executor.start(|context| async move {
513 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
515 let writer = Write::new(blob.clone(), size, NZUsize!(10));
516
517 writer.write_at(0, b"hello").await.unwrap();
519 assert_eq!(writer.size().await, 5);
520
521 writer.write_at(5, b" world").await.unwrap();
523 writer.sync().await.unwrap();
524 assert_eq!(writer.size().await, 11);
525
526 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
528 assert_eq!(size, 11);
529 let mut reader = Read::new(blob, size, NZUsize!(10));
530 let mut buf = vec![0u8; 11];
531 reader.read_exact(&mut buf, 11).await.unwrap();
532 assert_eq!(&buf, b"hello world");
533 });
534 }
535
536 #[test_traced]
537 fn test_write_into_middle_of_buffer() {
538 let executor = deterministic::Runner::default();
539 executor.start(|context| async move {
540 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
542 let writer = Write::new(blob.clone(), size, NZUsize!(20));
543
544 writer.write_at(0, b"abcdefghij").await.unwrap();
546 assert_eq!(writer.size().await, 10);
547
548 writer.write_at(2, b"01234").await.unwrap();
550 assert_eq!(writer.size().await, 10);
551 writer.sync().await.unwrap();
552
553 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
555 assert_eq!(size, 10);
556 let mut reader = Read::new(blob, size, NZUsize!(10));
557 let mut buf = vec![0u8; 10];
558 reader.read_exact(&mut buf, 10).await.unwrap();
559 assert_eq!(&buf, b"ab01234hij");
560
561 writer.write_at(10, b"klmnopqrst").await.unwrap();
563 assert_eq!(writer.size().await, 20);
564 writer.write_at(9, b"wxyz").await.unwrap();
565 assert_eq!(writer.size().await, 20);
566 writer.sync().await.unwrap();
567
568 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
570 assert_eq!(size, 20);
571 let mut reader = Read::new(blob, size, NZUsize!(20));
572 let mut buf = vec![0u8; 20];
573 reader.read_exact(&mut buf, 20).await.unwrap();
574 assert_eq!(&buf, b"ab01234hiwxyznopqrst");
575 });
576 }
577
578 #[test_traced]
579 fn test_write_before_buffer() {
580 let executor = deterministic::Runner::default();
581 executor.start(|context| async move {
582 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
584 let writer = Write::new(blob.clone(), size, NZUsize!(10));
585
586 writer.write_at(10, b"0123456789").await.unwrap();
588 assert_eq!(writer.size().await, 20);
589
590 writer.write_at(0, b"abcde").await.unwrap();
592 assert_eq!(writer.size().await, 20);
593 writer.sync().await.unwrap();
594
595 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
597 assert_eq!(size, 20);
598 let mut reader = Read::new(blob, size, NZUsize!(20));
599 let mut buf = vec![0u8; 20];
600 reader.read_exact(&mut buf, 20).await.unwrap();
601 let mut expected = vec![0u8; 20];
602 expected[0..5].copy_from_slice("abcde".as_bytes());
603 expected[10..20].copy_from_slice("0123456789".as_bytes());
604 assert_eq!(buf, expected);
605
606 writer.write_at(5, b"fghij").await.unwrap();
608 assert_eq!(writer.size().await, 20);
609 writer.sync().await.unwrap();
610 assert_eq!(writer.size().await, 20);
611
612 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
614 assert_eq!(size, 20);
615 let mut reader = Read::new(blob, size, NZUsize!(20));
616 let mut buf = vec![0u8; 20];
617 reader.read_exact(&mut buf, 20).await.unwrap();
618 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
619 assert_eq!(buf, expected);
620 });
621 }
622
623 #[test_traced]
624 fn test_write_resize() {
625 let executor = deterministic::Runner::default();
626 executor.start(|context| async move {
627 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
629 let writer = Write::new(blob, size, NZUsize!(10));
630
631 writer.write_at(0, b"hello world").await.unwrap();
633 assert_eq!(writer.size().await, 11);
634 writer.sync().await.unwrap();
635 assert_eq!(writer.size().await, 11);
636
637 let (blob_check, size_check) =
638 context.open("partition", b"resize_write").await.unwrap();
639 assert_eq!(size_check, 11);
640 drop(blob_check);
641
642 writer.resize(5).await.unwrap();
644 assert_eq!(writer.size().await, 5);
645 writer.sync().await.unwrap();
646
647 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
649 assert_eq!(size, 5);
650 let mut reader = Read::new(blob, size, NZUsize!(5));
651 let mut buf = vec![0u8; 5];
652 reader.read_exact(&mut buf, 5).await.unwrap();
653 assert_eq!(&buf, b"hello");
654
655 writer.write_at(0, b"X").await.unwrap();
657 assert_eq!(writer.size().await, 5);
658 writer.sync().await.unwrap();
659
660 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
662 assert_eq!(size, 5);
663 let mut reader = Read::new(blob, size, NZUsize!(5));
664 let mut buf = vec![0u8; 5];
665 reader.read_exact(&mut buf, 5).await.unwrap();
666 assert_eq!(&buf, b"Xello");
667
668 writer.resize(10).await.unwrap();
670 assert_eq!(writer.size().await, 10);
671 writer.sync().await.unwrap();
672
673 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
675 assert_eq!(size, 10);
676 let mut reader = Read::new(blob, size, NZUsize!(10));
677 let mut buf = vec![0u8; 10];
678 reader.read_exact(&mut buf, 10).await.unwrap();
679 assert_eq!(&buf[0..5], b"Xello");
680 assert_eq!(&buf[5..10], [0u8; 5]);
681
682 let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
684 let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
685 writer_zero.write_at(0, b"some data").await.unwrap();
686 assert_eq!(writer_zero.size().await, 9);
687 writer_zero.sync().await.unwrap();
688 assert_eq!(writer_zero.size().await, 9);
689 writer_zero.resize(0).await.unwrap();
690 assert_eq!(writer_zero.size().await, 0);
691 writer_zero.sync().await.unwrap();
692 assert_eq!(writer_zero.size().await, 0);
693
694 let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
696 assert_eq!(size_z, 0);
697 });
698 }
699
700 #[test_traced]
701 fn test_write_read_at_on_writer() {
702 let executor = deterministic::Runner::default();
703 executor.start(|context| async move {
704 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
706 let writer = Write::new(blob.clone(), size, NZUsize!(10));
707
708 writer.write_at(0, b"buffered").await.unwrap();
710 assert_eq!(writer.size().await, 8);
711
712 let read_buf_vec = writer
714 .read_at(0, IoBufMut::zeroed(4))
715 .await
716 .unwrap()
717 .coalesce();
718 assert_eq!(read_buf_vec, b"buff");
719
720 let read_buf_vec = writer
721 .read_at(4, IoBufMut::zeroed(4))
722 .await
723 .unwrap()
724 .coalesce();
725 assert_eq!(read_buf_vec, b"ered");
726
727 assert!(writer.read_at(8, IoBufMut::zeroed(1)).await.is_err());
729
730 writer.write_at(8, b" and flushed").await.unwrap();
732 assert_eq!(writer.size().await, 20);
733 writer.sync().await.unwrap();
734 assert_eq!(writer.size().await, 20);
735
736 let read_buf_vec_2 = writer
738 .read_at(0, IoBufMut::zeroed(4))
739 .await
740 .unwrap()
741 .coalesce();
742 assert_eq!(read_buf_vec_2, b"buff");
743
744 let read_buf_7_vec = writer
745 .read_at(13, IoBufMut::zeroed(7))
746 .await
747 .unwrap()
748 .coalesce();
749 assert_eq!(read_buf_7_vec, b"flushed");
750
751 writer.write_at(20, b" more data").await.unwrap();
753 assert_eq!(writer.size().await, 30);
754
755 let read_buf_vec_3 = writer
757 .read_at(20, IoBufMut::zeroed(5))
758 .await
759 .unwrap()
760 .coalesce();
761 assert_eq!(read_buf_vec_3, b" more");
762
763 let combo_read_buf_vec = writer.read_at(16, IoBufMut::zeroed(12)).await.unwrap();
765 assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
766
767 writer.sync().await.unwrap();
769 assert_eq!(writer.size().await, 30);
770 let (final_blob, final_size) =
771 context.open("partition", b"read_at_writer").await.unwrap();
772 assert_eq!(final_size, 30);
773 let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
774 let mut full_content = vec![0u8; 30];
775 final_reader
776 .read_exact(&mut full_content, 30)
777 .await
778 .unwrap();
779 assert_eq!(&full_content, b"buffered and flushed more data");
780 });
781 }
782
783 #[test_traced]
784 fn test_write_straddling_non_mergeable() {
785 let executor = deterministic::Runner::default();
786 executor.start(|context| async move {
787 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
789 let writer = Write::new(blob.clone(), size, NZUsize!(10));
790
791 writer.write_at(0, b"0123456789").await.unwrap();
793 assert_eq!(writer.size().await, 10);
794
795 writer.write_at(15, b"abc").await.unwrap();
797 assert_eq!(writer.size().await, 18);
798 writer.sync().await.unwrap();
799 assert_eq!(writer.size().await, 18);
800
801 let (blob_check, size_check) =
803 context.open("partition", b"write_straddle").await.unwrap();
804 assert_eq!(size_check, 18);
805 let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
806 let mut buf = vec![0u8; 18];
807 reader.read_exact(&mut buf, 18).await.unwrap();
808
809 let mut expected = vec![0u8; 18];
810 expected[0..10].copy_from_slice(b"0123456789");
811 expected[15..18].copy_from_slice(b"abc");
812 assert_eq!(buf, expected);
813
814 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
816 let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
817 writer2.write_at(0, b"0123456789").await.unwrap();
818 assert_eq!(writer2.size().await, 10);
819
820 writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
822 assert_eq!(writer2.size().await, 17);
823 writer2.sync().await.unwrap();
824 assert_eq!(writer2.size().await, 17);
825
826 let (blob_check2, size_check2) =
828 context.open("partition", b"write_straddle2").await.unwrap();
829 assert_eq!(size_check2, 17);
830 let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
831 let mut buf2 = vec![0u8; 17];
832 reader2.read_exact(&mut buf2, 17).await.unwrap();
833 assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
834 });
835 }
836
837 #[test_traced]
838 fn test_write_close() {
839 let executor = deterministic::Runner::default();
840 executor.start(|context| async move {
841 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
843 let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
844 writer.write_at(0, b"pending").await.unwrap();
845 assert_eq!(writer.size().await, 7);
846
847 writer.sync().await.unwrap();
849
850 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
852 assert_eq!(size_check, 7);
853 let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
854 let mut buf = [0u8; 7];
855 reader.read_exact(&mut buf, 7).await.unwrap();
856 assert_eq!(&buf, b"pending");
857 });
858 }
859
860 #[test_traced]
861 fn test_write_direct_due_to_size() {
862 let executor = deterministic::Runner::default();
863 executor.start(|context| async move {
864 let (blob, size) = context
866 .open("partition", b"write_direct_size")
867 .await
868 .unwrap();
869 let writer = Write::new(blob.clone(), size, NZUsize!(5));
870
871 let data_large = b"0123456789";
873 writer.write_at(0, data_large).await.unwrap();
874 assert_eq!(writer.size().await, 10);
875
876 writer.sync().await.unwrap();
878
879 let (blob_check, size_check) = context
881 .open("partition", b"write_direct_size")
882 .await
883 .unwrap();
884 assert_eq!(size_check, 10);
885 let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
886 let mut buf = vec![0u8; 10];
887 reader.read_exact(&mut buf, 10).await.unwrap();
888 assert_eq!(&buf, data_large.as_slice());
889
890 writer.write_at(10, b"abc").await.unwrap();
892 assert_eq!(writer.size().await, 13);
893
894 let read_small_buf_vec = writer
896 .read_at(10, IoBufMut::zeroed(3))
897 .await
898 .unwrap()
899 .coalesce();
900 assert_eq!(read_small_buf_vec, b"abc");
901
902 writer.sync().await.unwrap();
903
904 let (blob_check2, size_check2) = context
906 .open("partition", b"write_direct_size")
907 .await
908 .unwrap();
909 assert_eq!(size_check2, 13);
910 let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
911 let mut buf2 = vec![0u8; 13];
912 reader2.read_exact(&mut buf2, 13).await.unwrap();
913 assert_eq!(&buf2[10..], b"abc".as_slice());
914 });
915 }
916
917 #[test_traced]
918 fn test_write_overwrite_and_extend_in_buffer() {
919 let executor = deterministic::Runner::default();
920 executor.start(|context| async move {
921 let (blob, size) = context
923 .open("partition", b"overwrite_extend_buf")
924 .await
925 .unwrap();
926 let writer = Write::new(blob.clone(), size, NZUsize!(15));
927
928 writer.write_at(0, b"0123456789").await.unwrap();
930 assert_eq!(writer.size().await, 10);
931
932 writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
934 assert_eq!(writer.size().await, 15);
935
936 let read_buf_vec = writer
938 .read_at(0, IoBufMut::zeroed(15))
939 .await
940 .unwrap()
941 .coalesce();
942 assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
943
944 writer.sync().await.unwrap();
945
946 let (blob_check, size_check) = context
948 .open("partition", b"overwrite_extend_buf")
949 .await
950 .unwrap();
951 assert_eq!(size_check, 15);
952 let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
953 let mut final_buf = vec![0u8; 15];
954 reader.read_exact(&mut final_buf, 15).await.unwrap();
955 assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
956 });
957 }
958
959 #[test_traced]
960 fn test_write_at_size() {
961 let executor = deterministic::Runner::default();
962 executor.start(|context| async move {
963 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
965 let writer = Write::new(blob.clone(), size, NZUsize!(20));
966
967 writer.write_at(0, b"0123456789").await.unwrap();
969 assert_eq!(writer.size().await, 10);
970 writer.sync().await.unwrap();
971
972 writer.write_at(writer.size().await, b"abc").await.unwrap();
974 assert_eq!(writer.size().await, 13);
975 writer.sync().await.unwrap();
976
977 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
979 assert_eq!(size_check, 13);
980 let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
981 let mut buf = vec![0u8; 13];
982 reader.read_exact(&mut buf, 13).await.unwrap();
983 assert_eq!(&buf, b"0123456789abc");
984 });
985 }
986
987 #[test_traced]
988 fn test_write_at_size_multiple_appends() {
989 let executor = deterministic::Runner::default();
990 executor.start(|context| async move {
991 let (blob, size) = context
993 .open("partition", b"write_multiple_appends_at_size")
994 .await
995 .unwrap();
996 let writer = Write::new(blob.clone(), size, NZUsize!(5)); writer.write_at(0, b"AAA").await.unwrap();
1000 assert_eq!(writer.size().await, 3);
1001 writer.sync().await.unwrap();
1002 assert_eq!(writer.size().await, 3);
1003
1004 writer.write_at(writer.size().await, b"BBB").await.unwrap();
1006 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
1008 assert_eq!(writer.size().await, 6);
1009
1010 writer.write_at(writer.size().await, b"CCC").await.unwrap();
1012 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
1014 assert_eq!(writer.size().await, 9);
1015
1016 let (blob_check, size_check) = context
1018 .open("partition", b"write_multiple_appends_at_size")
1019 .await
1020 .unwrap();
1021 assert_eq!(size_check, 9);
1022 let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
1023 let mut buf = vec![0u8; 9];
1024 reader.read_exact(&mut buf, 9).await.unwrap();
1025 assert_eq!(&buf, b"AAABBBCCC");
1026 });
1027 }
1028
1029 #[test_traced]
1030 fn test_write_non_contiguous_then_append_at_size() {
1031 let executor = deterministic::Runner::default();
1032 executor.start(|context| async move {
1033 let (blob, size) = context
1035 .open("partition", b"write_non_contiguous_then_append")
1036 .await
1037 .unwrap();
1038 let writer = Write::new(blob.clone(), size, NZUsize!(10));
1039
1040 writer.write_at(0, b"INITIAL").await.unwrap(); assert_eq!(writer.size().await, 7);
1043 writer.write_at(20, b"NONCONTIG").await.unwrap();
1047 assert_eq!(writer.size().await, 29);
1048 writer.sync().await.unwrap();
1049 assert_eq!(writer.size().await, 29);
1050
1051 writer
1053 .write_at(writer.size().await, b"APPEND")
1054 .await
1055 .unwrap();
1056 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1058 assert_eq!(writer.size().await, 35);
1059
1060 let (blob_check, size_check) = context
1062 .open("partition", b"write_non_contiguous_then_append")
1063 .await
1064 .unwrap();
1065 assert_eq!(size_check, 35);
1066 let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
1067 let mut buf = vec![0u8; 35];
1068 reader.read_exact(&mut buf, 35).await.unwrap();
1069
1070 let mut expected = vec![0u8; 35];
1071 expected[0..7].copy_from_slice(b"INITIAL");
1072 expected[20..29].copy_from_slice(b"NONCONTIG");
1073 expected[29..35].copy_from_slice(b"APPEND");
1074 assert_eq!(buf, expected);
1075 });
1076 }
1077
1078 #[test_traced]
1079 fn test_resize_then_append_at_size() {
1080 let executor = deterministic::Runner::default();
1081 executor.start(|context| async move {
1082 let (blob, size) = context
1084 .open("partition", b"resize_then_append_at_size")
1085 .await
1086 .unwrap();
1087 let writer = Write::new(blob.clone(), size, NZUsize!(10));
1088
1089 writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); assert_eq!(writer.size().await, 16);
1092 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1094
1095 let resize_to = 5;
1097 writer.resize(resize_to).await.unwrap();
1098 assert_eq!(writer.size().await, resize_to);
1101 writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
1103
1104 writer
1106 .write_at(writer.size().await, b"XXXXX")
1107 .await
1108 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1112 assert_eq!(writer.size().await, 10);
1113
1114 let (blob_check, size_check) = context
1116 .open("partition", b"resize_then_append_at_size")
1117 .await
1118 .unwrap();
1119 assert_eq!(size_check, 10);
1120 let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
1121 let mut buf = vec![0u8; 10];
1122 reader.read_exact(&mut buf, 10).await.unwrap();
1123 assert_eq!(&buf, b"01234XXXXX");
1124 });
1125 }
1126}