1mod append;
4pub mod pool;
5mod read;
6mod tip;
7mod write;
8
9pub use append::Append;
10pub use pool::{Pool, PoolRef};
11pub use read::Read;
12pub use write::Write;
13
14#[cfg(test)]
15mod tests {
16 use super::*;
17 use crate::{deterministic, Blob as _, Error, Runner, Storage};
18 use commonware_macros::test_traced;
19 use commonware_utils::NZUsize;
20
21 #[test_traced]
22 fn test_read_basic() {
23 let executor = deterministic::Runner::default();
24 executor.start(|context| async move {
25 let data = b"Hello, world! This is a test.";
27 let (blob, size) = context.open("partition", b"test").await.unwrap();
28 assert_eq!(size, 0);
29 blob.write_at(data.to_vec(), 0).await.unwrap();
30 let size = data.len() as u64;
31
32 let mut reader = Read::new(blob, size, NZUsize!(10));
34
35 let mut buf = [0u8; 5];
37 reader.read_exact(&mut buf, 5).await.unwrap();
38 assert_eq!(&buf, b"Hello");
39
40 let mut buf = [0u8; 14];
42 reader.read_exact(&mut buf, 14).await.unwrap();
43 assert_eq!(&buf, b", world! This ");
44
45 assert_eq!(reader.position(), 19);
47
48 let mut buf = [0u8; 10];
50 reader.read_exact(&mut buf, 7).await.unwrap();
51 assert_eq!(&buf[..7], b"is a te");
52
53 let mut buf = [0u8; 5];
55 let result = reader.read_exact(&mut buf, 5).await;
56 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
57 });
58 }
59
60 #[test_traced]
61 fn test_read_cross_boundary() {
62 let executor = deterministic::Runner::default();
63 executor.start(|context| async move {
64 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
66 let (blob, size) = context.open("partition", b"test").await.unwrap();
67 assert_eq!(size, 0);
68 blob.write_at(data.to_vec(), 0).await.unwrap();
69 let size = data.len() as u64;
70
71 let mut reader = Read::new(blob, size, NZUsize!(10));
73
74 let mut buf = [0u8; 15];
76 reader.read_exact(&mut buf, 15).await.unwrap();
77 assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
78
79 assert_eq!(reader.position(), 15);
81
82 let mut buf = [0u8; 11];
84 reader.read_exact(&mut buf, 11).await.unwrap();
85 assert_eq!(&buf, b"PQRSTUVWXYZ");
86
87 assert_eq!(reader.position(), 26);
89 assert_eq!(reader.blob_remaining(), 0);
90 });
91 }
92
93 #[test_traced]
95 fn test_read_to_end_then_rewind_and_read_again() {
96 let executor = deterministic::Runner::default();
97 executor.start(|context| async move {
98 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99 let (blob, size) = context.open("partition", b"test").await.unwrap();
100 assert_eq!(size, 0);
101 blob.write_at(data.to_vec(), 0).await.unwrap();
102 let size = data.len() as u64;
103
104 let mut reader = Read::new(blob, size, NZUsize!(20));
105
106 let mut buf = [0u8; 21];
108 reader.read_exact(&mut buf, 21).await.unwrap();
109 assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
110
111 assert_eq!(reader.position(), 21);
113
114 let mut buf = [0u8; 5];
116 reader.read_exact(&mut buf, 5).await.unwrap();
117 assert_eq!(&buf, b"VWXYZ");
118
119 reader.seek_to(0).unwrap();
121 let mut buf = [0u8; 21];
122 reader.read_exact(&mut buf, 21).await.unwrap();
123 assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
124 });
125 }
126
127 #[test_traced]
128 fn test_read_with_known_size() {
129 let executor = deterministic::Runner::default();
130 executor.start(|context| async move {
131 let data = b"This is a test with known size limitations.";
133 let (blob, size) = context.open("partition", b"test").await.unwrap();
134 assert_eq!(size, 0);
135 blob.write_at(data.to_vec(), 0).await.unwrap();
136 let size = data.len() as u64;
137
138 let mut reader = Read::new(blob, size, NZUsize!(10));
140
141 assert_eq!(reader.blob_remaining(), size);
143
144 let mut buf = [0u8; 5];
146 reader.read_exact(&mut buf, 5).await.unwrap();
147 assert_eq!(&buf, b"This ");
148
149 assert_eq!(reader.blob_remaining(), size - 5);
151
152 let mut buf = vec![0u8; (size - 5) as usize];
154 reader
155 .read_exact(&mut buf, (size - 5) as usize)
156 .await
157 .unwrap();
158 assert_eq!(&buf, b"is a test with known size limitations.");
159
160 assert_eq!(reader.blob_remaining(), 0);
162
163 let mut buf = [0u8; 1];
165 let result = reader.read_exact(&mut buf, 1).await;
166 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
167 });
168 }
169
170 #[test_traced]
171 fn test_read_large_data() {
172 let executor = deterministic::Runner::default();
173 executor.start(|context| async move {
174 let data_size = 1024 * 256; let data = vec![0x42; data_size];
177 let (blob, size) = context.open("partition", b"test").await.unwrap();
178 assert_eq!(size, 0);
179 blob.write_at(data.clone(), 0).await.unwrap();
180 let size = data.len() as u64;
181
182 let mut reader = Read::new(blob, size, NZUsize!(64 * 1024)); let mut total_read = 0;
187 let chunk_size = 8 * 1024; let mut buf = vec![0u8; chunk_size];
189
190 while total_read < data_size {
191 let to_read = std::cmp::min(chunk_size, data_size - total_read);
192 reader
193 .read_exact(&mut buf[..to_read], to_read)
194 .await
195 .unwrap();
196
197 assert!(
199 buf[..to_read].iter().all(|&b| b == 0x42),
200 "Data at position {total_read} is not correct"
201 );
202
203 total_read += to_read;
204 }
205
206 assert_eq!(total_read, data_size);
208
209 let mut extra_buf = [0u8; 1];
211 let result = reader.read_exact(&mut extra_buf, 1).await;
212 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
213 });
214 }
215
216 #[test_traced]
217 fn test_read_exact_size_reads() {
218 let executor = deterministic::Runner::default();
219 executor.start(|context| async move {
220 let buffer_size = 1024;
222 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
224
225 let (blob, size) = context.open("partition", b"test").await.unwrap();
226 assert_eq!(size, 0);
227 blob.write_at(data.clone(), 0).await.unwrap();
228 let size = data.len() as u64;
229
230 let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
231
232 let mut buf1 = vec![0u8; buffer_size];
234 reader.read_exact(&mut buf1, buffer_size).await.unwrap();
235 assert!(buf1.iter().all(|&b| b == 0x37));
236
237 let mut buf2 = vec![0u8; buffer_size];
239 reader.read_exact(&mut buf2, buffer_size).await.unwrap();
240 assert!(buf2.iter().all(|&b| b == 0x37));
241
242 let half_buffer = buffer_size / 2;
244 let mut buf3 = vec![0u8; half_buffer];
245 reader.read_exact(&mut buf3, half_buffer).await.unwrap();
246 assert!(buf3.iter().all(|&b| b == 0x37));
247
248 assert_eq!(reader.blob_remaining(), 0);
250 assert_eq!(reader.position(), size);
251 });
252 }
253
254 #[test_traced]
255 fn test_read_seek_to() {
256 let executor = deterministic::Runner::default();
257 executor.start(|context| async move {
258 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
260 let (blob, size) = context.open("partition", b"test").await.unwrap();
261 assert_eq!(size, 0);
262 blob.write_at(data.to_vec(), 0).await.unwrap();
263 let size = data.len() as u64;
264
265 let mut reader = Read::new(blob, size, NZUsize!(10));
267
268 let mut buf = [0u8; 5];
270 reader.read_exact(&mut buf, 5).await.unwrap();
271 assert_eq!(&buf, b"ABCDE");
272 assert_eq!(reader.position(), 5);
273
274 reader.seek_to(10).unwrap();
276 assert_eq!(reader.position(), 10);
277
278 let mut buf = [0u8; 5];
280 reader.read_exact(&mut buf, 5).await.unwrap();
281 assert_eq!(&buf, b"KLMNO");
282
283 reader.seek_to(0).unwrap();
285 assert_eq!(reader.position(), 0);
286
287 let mut buf = [0u8; 5];
288 reader.read_exact(&mut buf, 5).await.unwrap();
289 assert_eq!(&buf, b"ABCDE");
290
291 reader.seek_to(size).unwrap();
293 assert_eq!(reader.position(), size);
294
295 let mut buf = [0u8; 1];
297 let result = reader.read_exact(&mut buf, 1).await;
298 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
299
300 let result = reader.seek_to(size + 10);
302 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
303 });
304 }
305
306 #[test_traced]
307 fn test_read_seek_with_refill() {
308 let executor = deterministic::Runner::default();
309 executor.start(|context| async move {
310 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
313 assert_eq!(size, 0);
314 blob.write_at(data.clone(), 0).await.unwrap();
315 let size = data.len() as u64;
316
317 let mut reader = Read::new(blob, size, NZUsize!(10));
319
320 let mut buf = [0u8; 5];
322 reader.read_exact(&mut buf, 5).await.unwrap();
323
324 reader.seek_to(500).unwrap();
326
327 let mut buf = [0u8; 5];
329 reader.read_exact(&mut buf, 5).await.unwrap();
330 assert_eq!(&buf, b"AAAAA"); assert_eq!(reader.position(), 505);
332
333 reader.seek_to(100).unwrap();
335
336 let mut buf = [0u8; 5];
338 reader.read_exact(&mut buf, 5).await.unwrap();
339 assert_eq!(reader.position(), 105);
340 });
341 }
342
343 #[test_traced]
344 fn test_read_resize() {
345 let executor = deterministic::Runner::default();
346 executor.start(|context| async move {
347 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
349 let (blob, size) = context.open("partition", b"test").await.unwrap();
350 assert_eq!(size, 0);
351 blob.write_at(data.to_vec(), 0).await.unwrap();
352 let data_len = data.len() as u64;
353
354 let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
356
357 let resize_len = data_len / 2;
359 reader.resize(resize_len).await.unwrap();
360
361 let (blob, size) = context.open("partition", b"test").await.unwrap();
363 assert_eq!(size, resize_len, "Blob should be resized to half size");
364
365 let mut new_reader = Read::new(blob, size, NZUsize!(10));
367
368 let mut buf = vec![0u8; size as usize];
370 new_reader
371 .read_exact(&mut buf, size as usize)
372 .await
373 .unwrap();
374 assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
375
376 let mut extra_buf = [0u8; 1];
378 let result = new_reader.read_exact(&mut extra_buf, 1).await;
379 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
380
381 new_reader.resize(data_len * 2).await.unwrap();
383
384 let (blob, new_size) = context.open("partition", b"test").await.unwrap();
386 assert_eq!(new_size, data_len * 2);
387
388 let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
390 let mut buf = vec![0u8; new_size as usize];
391 new_reader
392 .read_exact(&mut buf, new_size as usize)
393 .await
394 .unwrap();
395 assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
396 assert_eq!(
397 &buf[size as usize..],
398 vec![0u8; new_size as usize - size as usize]
399 );
400 });
401 }
402
403 #[test_traced]
404 fn test_read_resize_to_zero() {
405 let executor = deterministic::Runner::default();
406 executor.start(|context| async move {
407 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
409 let data_len = data.len() as u64;
410 let (blob, size) = context.open("partition", b"test").await.unwrap();
411 assert_eq!(size, 0);
412 blob.write_at(data.to_vec(), 0).await.unwrap();
413
414 let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
416
417 reader.resize(0).await.unwrap();
419
420 let (blob, size) = context.open("partition", b"test").await.unwrap();
422 assert_eq!(size, 0, "Blob should be resized to zero");
423
424 let mut new_reader = Read::new(blob, size, NZUsize!(10));
426
427 let mut buf = [0u8; 1];
429 let result = new_reader.read_exact(&mut buf, 1).await;
430 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
431 });
432 }
433
434 #[test_traced]
435 fn test_write_basic() {
436 let executor = deterministic::Runner::default();
437 executor.start(|context| async move {
438 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
440 assert_eq!(size, 0);
441
442 let writer = Write::new(blob.clone(), size, NZUsize!(8));
443 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
444 assert_eq!(writer.size().await, 5);
445 writer.sync().await.unwrap();
446 assert_eq!(writer.size().await, 5);
447
448 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
450 assert_eq!(size, 5);
451 let mut reader = Read::new(blob, size, NZUsize!(8));
452 let mut buf = [0u8; 5];
453 reader.read_exact(&mut buf, 5).await.unwrap();
454 assert_eq!(&buf, b"hello");
455 });
456 }
457
458 #[test_traced]
459 fn test_write_multiple_flushes() {
460 let executor = deterministic::Runner::default();
461 executor.start(|context| async move {
462 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
464 assert_eq!(size, 0);
465
466 let writer = Write::new(blob.clone(), size, NZUsize!(4));
467 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
468 assert_eq!(writer.size().await, 3);
469 writer.write_at(b"defg".to_vec(), 3).await.unwrap();
470 assert_eq!(writer.size().await, 7);
471 writer.sync().await.unwrap();
472
473 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
475 assert_eq!(size, 7);
476 let mut reader = Read::new(blob, size, NZUsize!(4));
477 let mut buf = [0u8; 7];
478 reader.read_exact(&mut buf, 7).await.unwrap();
479 assert_eq!(&buf, b"abcdefg");
480 });
481 }
482
483 #[test_traced]
484 fn test_write_large_data() {
485 let executor = deterministic::Runner::default();
486 executor.start(|context| async move {
487 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
489 assert_eq!(size, 0);
490
491 let writer = Write::new(blob.clone(), size, NZUsize!(4));
492 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
493 assert_eq!(writer.size().await, 3);
494 writer
495 .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
496 .await
497 .unwrap();
498 assert_eq!(writer.size().await, 26);
499 writer.sync().await.unwrap();
500 assert_eq!(writer.size().await, 26);
501
502 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
504 assert_eq!(size, 26);
505 let mut reader = Read::new(blob, size, NZUsize!(4));
506 let mut buf = [0u8; 26];
507 reader.read_exact(&mut buf, 26).await.unwrap();
508 assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
509 });
510 }
511
512 #[test_traced]
513 fn test_write_append_to_buffer() {
514 let executor = deterministic::Runner::default();
515 executor.start(|context| async move {
516 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
518 let writer = Write::new(blob.clone(), size, NZUsize!(10));
519
520 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
522 assert_eq!(writer.size().await, 5);
523
524 writer.write_at(b" world".to_vec(), 5).await.unwrap();
526 writer.sync().await.unwrap();
527 assert_eq!(writer.size().await, 11);
528
529 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
531 assert_eq!(size, 11);
532 let mut reader = Read::new(blob, size, NZUsize!(10));
533 let mut buf = vec![0u8; 11];
534 reader.read_exact(&mut buf, 11).await.unwrap();
535 assert_eq!(&buf, b"hello world");
536 });
537 }
538
539 #[test_traced]
540 fn test_write_into_middle_of_buffer() {
541 let executor = deterministic::Runner::default();
542 executor.start(|context| async move {
543 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
545 let writer = Write::new(blob.clone(), size, NZUsize!(20));
546
547 writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
549 assert_eq!(writer.size().await, 10);
550
551 writer.write_at(b"01234".to_vec(), 2).await.unwrap();
553 assert_eq!(writer.size().await, 10);
554 writer.sync().await.unwrap();
555
556 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
558 assert_eq!(size, 10);
559 let mut reader = Read::new(blob, size, NZUsize!(10));
560 let mut buf = vec![0u8; 10];
561 reader.read_exact(&mut buf, 10).await.unwrap();
562 assert_eq!(&buf, b"ab01234hij");
563
564 writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
566 assert_eq!(writer.size().await, 20);
567 writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
568 assert_eq!(writer.size().await, 20);
569 writer.sync().await.unwrap();
570
571 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
573 assert_eq!(size, 20);
574 let mut reader = Read::new(blob, size, NZUsize!(20));
575 let mut buf = vec![0u8; 20];
576 reader.read_exact(&mut buf, 20).await.unwrap();
577 assert_eq!(&buf, b"ab01234hiwxyznopqrst");
578 });
579 }
580
581 #[test_traced]
582 fn test_write_before_buffer() {
583 let executor = deterministic::Runner::default();
584 executor.start(|context| async move {
585 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
587 let writer = Write::new(blob.clone(), size, NZUsize!(10));
588
589 writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
591 assert_eq!(writer.size().await, 20);
592
593 writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
595 assert_eq!(writer.size().await, 20);
596 writer.sync().await.unwrap();
597
598 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
600 assert_eq!(size, 20);
601 let mut reader = Read::new(blob, size, NZUsize!(20));
602 let mut buf = vec![0u8; 20];
603 reader.read_exact(&mut buf, 20).await.unwrap();
604 let mut expected = vec![0u8; 20];
605 expected[0..5].copy_from_slice("abcde".as_bytes());
606 expected[10..20].copy_from_slice("0123456789".as_bytes());
607 assert_eq!(buf, expected);
608
609 writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
611 assert_eq!(writer.size().await, 20);
612 writer.sync().await.unwrap();
613 assert_eq!(writer.size().await, 20);
614
615 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
617 assert_eq!(size, 20);
618 let mut reader = Read::new(blob, size, NZUsize!(20));
619 let mut buf = vec![0u8; 20];
620 reader.read_exact(&mut buf, 20).await.unwrap();
621 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
622 assert_eq!(buf, expected);
623 });
624 }
625
626 #[test_traced]
627 fn test_write_resize() {
628 let executor = deterministic::Runner::default();
629 executor.start(|context| async move {
630 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
632 let writer = Write::new(blob, size, NZUsize!(10));
633
634 writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
636 assert_eq!(writer.size().await, 11);
637 writer.sync().await.unwrap();
638 assert_eq!(writer.size().await, 11);
639
640 let (blob_check, size_check) =
641 context.open("partition", b"resize_write").await.unwrap();
642 assert_eq!(size_check, 11);
643 drop(blob_check);
644
645 writer.resize(5).await.unwrap();
647 assert_eq!(writer.size().await, 5);
648 writer.sync().await.unwrap();
649
650 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
652 assert_eq!(size, 5);
653 let mut reader = Read::new(blob, size, NZUsize!(5));
654 let mut buf = vec![0u8; 5];
655 reader.read_exact(&mut buf, 5).await.unwrap();
656 assert_eq!(&buf, b"hello");
657
658 writer.write_at(b"X".to_vec(), 0).await.unwrap();
660 assert_eq!(writer.size().await, 5);
661 writer.sync().await.unwrap();
662
663 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
665 assert_eq!(size, 5);
666 let mut reader = Read::new(blob, size, NZUsize!(5));
667 let mut buf = vec![0u8; 5];
668 reader.read_exact(&mut buf, 5).await.unwrap();
669 assert_eq!(&buf, b"Xello");
670
671 writer.resize(10).await.unwrap();
673 assert_eq!(writer.size().await, 10);
674 writer.sync().await.unwrap();
675
676 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
678 assert_eq!(size, 10);
679 let mut reader = Read::new(blob, size, NZUsize!(10));
680 let mut buf = vec![0u8; 10];
681 reader.read_exact(&mut buf, 10).await.unwrap();
682 assert_eq!(&buf[0..5], b"Xello");
683 assert_eq!(&buf[5..10], [0u8; 5]);
684
685 let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
687 let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
688 writer_zero
689 .write_at(b"some data".to_vec(), 0)
690 .await
691 .unwrap();
692 assert_eq!(writer_zero.size().await, 9);
693 writer_zero.sync().await.unwrap();
694 assert_eq!(writer_zero.size().await, 9);
695 writer_zero.resize(0).await.unwrap();
696 assert_eq!(writer_zero.size().await, 0);
697 writer_zero.sync().await.unwrap();
698 assert_eq!(writer_zero.size().await, 0);
699
700 let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
702 assert_eq!(size_z, 0);
703 });
704 }
705
706 #[test_traced]
707 fn test_write_read_at_on_writer() {
708 let executor = deterministic::Runner::default();
709 executor.start(|context| async move {
710 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
712 let writer = Write::new(blob.clone(), size, NZUsize!(10));
713
714 writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
716 assert_eq!(writer.size().await, 8);
717
718 let mut read_buf_vec = vec![0u8; 4].into();
720 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
721 assert_eq!(read_buf_vec.as_ref(), b"buff");
722
723 read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
724 assert_eq!(read_buf_vec.as_ref(), b"ered");
725
726 let small_buf_vec = vec![0u8; 1];
728 assert!(writer.read_at(small_buf_vec, 8).await.is_err());
729
730 writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
732 assert_eq!(writer.size().await, 20);
733 writer.sync().await.unwrap();
734 assert_eq!(writer.size().await, 20);
735
736 let mut read_buf_vec_2 = vec![0u8; 4].into();
738 read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
739 assert_eq!(read_buf_vec_2.as_ref(), b"buff");
740
741 let mut read_buf_7_vec = vec![0u8; 7].into();
742 read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
743 assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
744
745 writer.write_at(b" more data".to_vec(), 20).await.unwrap();
747 assert_eq!(writer.size().await, 30);
748
749 let mut read_buf_vec_3 = vec![0u8; 5].into();
751 read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
752 assert_eq!(read_buf_vec_3.as_ref(), b" more");
753
754 let mut combo_read_buf_vec = vec![0u8; 12].into();
756 combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
757 assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
758
759 writer.sync().await.unwrap();
761 assert_eq!(writer.size().await, 30);
762 let (final_blob, final_size) =
763 context.open("partition", b"read_at_writer").await.unwrap();
764 assert_eq!(final_size, 30);
765 let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
766 let mut full_content = vec![0u8; 30];
767 final_reader
768 .read_exact(&mut full_content, 30)
769 .await
770 .unwrap();
771 assert_eq!(&full_content, b"buffered and flushed more data");
772 });
773 }
774
775 #[test_traced]
776 fn test_write_straddling_non_mergeable() {
777 let executor = deterministic::Runner::default();
778 executor.start(|context| async move {
779 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
781 let writer = Write::new(blob.clone(), size, NZUsize!(10));
782
783 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
785 assert_eq!(writer.size().await, 10);
786
787 writer.write_at(b"abc".to_vec(), 15).await.unwrap();
789 assert_eq!(writer.size().await, 18);
790 writer.sync().await.unwrap();
791 assert_eq!(writer.size().await, 18);
792
793 let (blob_check, size_check) =
795 context.open("partition", b"write_straddle").await.unwrap();
796 assert_eq!(size_check, 18);
797 let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
798 let mut buf = vec![0u8; 18];
799 reader.read_exact(&mut buf, 18).await.unwrap();
800
801 let mut expected = vec![0u8; 18];
802 expected[0..10].copy_from_slice(b"0123456789");
803 expected[15..18].copy_from_slice(b"abc");
804 assert_eq!(buf, expected);
805
806 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
808 let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
809 writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
810 assert_eq!(writer2.size().await, 10);
811
812 writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
814 assert_eq!(writer2.size().await, 17);
815 writer2.sync().await.unwrap();
816 assert_eq!(writer2.size().await, 17);
817
818 let (blob_check2, size_check2) =
820 context.open("partition", b"write_straddle2").await.unwrap();
821 assert_eq!(size_check2, 17);
822 let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
823 let mut buf2 = vec![0u8; 17];
824 reader2.read_exact(&mut buf2, 17).await.unwrap();
825 assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
826 });
827 }
828
829 #[test_traced]
830 fn test_write_close() {
831 let executor = deterministic::Runner::default();
832 executor.start(|context| async move {
833 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
835 let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
836 writer.write_at(b"pending".to_vec(), 0).await.unwrap();
837 assert_eq!(writer.size().await, 7);
838
839 writer.sync().await.unwrap();
841
842 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
844 assert_eq!(size_check, 7);
845 let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
846 let mut buf = [0u8; 7];
847 reader.read_exact(&mut buf, 7).await.unwrap();
848 assert_eq!(&buf, b"pending");
849 });
850 }
851
852 #[test_traced]
853 fn test_write_direct_due_to_size() {
854 let executor = deterministic::Runner::default();
855 executor.start(|context| async move {
856 let (blob, size) = context
858 .open("partition", b"write_direct_size")
859 .await
860 .unwrap();
861 let writer = Write::new(blob.clone(), size, NZUsize!(5));
862
863 let data_large = b"0123456789";
865 writer.write_at(data_large.to_vec(), 0).await.unwrap();
866 assert_eq!(writer.size().await, 10);
867
868 writer.sync().await.unwrap();
870
871 let (blob_check, size_check) = context
873 .open("partition", b"write_direct_size")
874 .await
875 .unwrap();
876 assert_eq!(size_check, 10);
877 let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
878 let mut buf = vec![0u8; 10];
879 reader.read_exact(&mut buf, 10).await.unwrap();
880 assert_eq!(&buf, data_large.as_slice());
881
882 writer.write_at(b"abc".to_vec(), 10).await.unwrap();
884 assert_eq!(writer.size().await, 13);
885
886 let mut read_small_buf_vec = vec![0u8; 3].into();
888 read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
889 assert_eq!(read_small_buf_vec.as_ref(), b"abc");
890
891 writer.sync().await.unwrap();
892
893 let (blob_check2, size_check2) = context
895 .open("partition", b"write_direct_size")
896 .await
897 .unwrap();
898 assert_eq!(size_check2, 13);
899 let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
900 let mut buf2 = vec![0u8; 13];
901 reader2.read_exact(&mut buf2, 13).await.unwrap();
902 assert_eq!(&buf2[10..], b"abc".as_slice());
903 });
904 }
905
906 #[test_traced]
907 fn test_write_overwrite_and_extend_in_buffer() {
908 let executor = deterministic::Runner::default();
909 executor.start(|context| async move {
910 let (blob, size) = context
912 .open("partition", b"overwrite_extend_buf")
913 .await
914 .unwrap();
915 let writer = Write::new(blob.clone(), size, NZUsize!(15));
916
917 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
919 assert_eq!(writer.size().await, 10);
920
921 writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
923 assert_eq!(writer.size().await, 15);
924
925 let mut read_buf_vec = vec![0u8; 15].into();
927 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
928 assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
929
930 writer.sync().await.unwrap();
931
932 let (blob_check, size_check) = context
934 .open("partition", b"overwrite_extend_buf")
935 .await
936 .unwrap();
937 assert_eq!(size_check, 15);
938 let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
939 let mut final_buf = vec![0u8; 15];
940 reader.read_exact(&mut final_buf, 15).await.unwrap();
941 assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
942 });
943 }
944
945 #[test_traced]
946 fn test_write_at_size() {
947 let executor = deterministic::Runner::default();
948 executor.start(|context| async move {
949 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
951 let writer = Write::new(blob.clone(), size, NZUsize!(20));
952
953 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
955 assert_eq!(writer.size().await, 10);
956 writer.sync().await.unwrap();
957
958 writer
960 .write_at(b"abc".to_vec(), writer.size().await)
961 .await
962 .unwrap();
963 assert_eq!(writer.size().await, 13);
964 writer.sync().await.unwrap();
965
966 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
968 assert_eq!(size_check, 13);
969 let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
970 let mut buf = vec![0u8; 13];
971 reader.read_exact(&mut buf, 13).await.unwrap();
972 assert_eq!(&buf, b"0123456789abc");
973 });
974 }
975
976 #[test_traced]
977 fn test_write_at_size_multiple_appends() {
978 let executor = deterministic::Runner::default();
979 executor.start(|context| async move {
980 let (blob, size) = context
982 .open("partition", b"write_multiple_appends_at_size")
983 .await
984 .unwrap();
985 let writer = Write::new(blob.clone(), size, NZUsize!(5)); writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
989 assert_eq!(writer.size().await, 3);
990 writer.sync().await.unwrap();
991 assert_eq!(writer.size().await, 3);
992
993 writer
995 .write_at(b"BBB".to_vec(), writer.size().await)
996 .await
997 .unwrap();
998 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
1000 assert_eq!(writer.size().await, 6);
1001
1002 writer
1004 .write_at(b"CCC".to_vec(), writer.size().await)
1005 .await
1006 .unwrap();
1007 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
1009 assert_eq!(writer.size().await, 9);
1010
1011 let (blob_check, size_check) = context
1013 .open("partition", b"write_multiple_appends_at_size")
1014 .await
1015 .unwrap();
1016 assert_eq!(size_check, 9);
1017 let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
1018 let mut buf = vec![0u8; 9];
1019 reader.read_exact(&mut buf, 9).await.unwrap();
1020 assert_eq!(&buf, b"AAABBBCCC");
1021 });
1022 }
1023
1024 #[test_traced]
1025 fn test_write_non_contiguous_then_append_at_size() {
1026 let executor = deterministic::Runner::default();
1027 executor.start(|context| async move {
1028 let (blob, size) = context
1030 .open("partition", b"write_non_contiguous_then_append")
1031 .await
1032 .unwrap();
1033 let writer = Write::new(blob.clone(), size, NZUsize!(10));
1034
1035 writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); assert_eq!(writer.size().await, 7);
1038 writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1042 assert_eq!(writer.size().await, 29);
1043 writer.sync().await.unwrap();
1044 assert_eq!(writer.size().await, 29);
1045
1046 writer
1048 .write_at(b"APPEND".to_vec(), writer.size().await)
1049 .await
1050 .unwrap();
1051 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1053 assert_eq!(writer.size().await, 35);
1054
1055 let (blob_check, size_check) = context
1057 .open("partition", b"write_non_contiguous_then_append")
1058 .await
1059 .unwrap();
1060 assert_eq!(size_check, 35);
1061 let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
1062 let mut buf = vec![0u8; 35];
1063 reader.read_exact(&mut buf, 35).await.unwrap();
1064
1065 let mut expected = vec![0u8; 35];
1066 expected[0..7].copy_from_slice(b"INITIAL");
1067 expected[20..29].copy_from_slice(b"NONCONTIG");
1068 expected[29..35].copy_from_slice(b"APPEND");
1069 assert_eq!(buf, expected);
1070 });
1071 }
1072
1073 #[test_traced]
1074 fn test_resize_then_append_at_size() {
1075 let executor = deterministic::Runner::default();
1076 executor.start(|context| async move {
1077 let (blob, size) = context
1079 .open("partition", b"resize_then_append_at_size")
1080 .await
1081 .unwrap();
1082 let writer = Write::new(blob.clone(), size, NZUsize!(10));
1083
1084 writer
1086 .write_at(b"0123456789ABCDEF".to_vec(), 0)
1087 .await
1088 .unwrap(); assert_eq!(writer.size().await, 16);
1090 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1092
1093 let resize_to = 5;
1095 writer.resize(resize_to).await.unwrap();
1096 assert_eq!(writer.size().await, resize_to);
1099 writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
1101
1102 writer
1104 .write_at(b"XXXXX".to_vec(), writer.size().await)
1105 .await
1106 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1110 assert_eq!(writer.size().await, 10);
1111
1112 let (blob_check, size_check) = context
1114 .open("partition", b"resize_then_append_at_size")
1115 .await
1116 .unwrap();
1117 assert_eq!(size_check, 10);
1118 let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
1119 let mut buf = vec![0u8; 10];
1120 reader.read_exact(&mut buf, 10).await.unwrap();
1121 assert_eq!(&buf, b"01234XXXXX");
1122 });
1123 }
1124}