1pub mod pool;
4mod read;
5mod tip;
6mod write;
7
8pub use pool::PoolRef;
9pub use read::Read;
10pub use write::Write;
11
12#[cfg(test)]
13mod tests {
14 use super::*;
15 use crate::{deterministic, Blob as _, Error, Runner, Storage};
16 use commonware_macros::test_traced;
17 use commonware_utils::NZUsize;
18
19 #[test_traced]
20 fn test_read_basic() {
21 let executor = deterministic::Runner::default();
22 executor.start(|context| async move {
23 let data = b"Hello, world! This is a test.";
25 let (blob, size) = context.open("partition", b"test").await.unwrap();
26 assert_eq!(size, 0);
27 blob.write_at(data.to_vec(), 0).await.unwrap();
28 let size = data.len() as u64;
29
30 let mut reader = Read::new(blob, size, NZUsize!(10));
32
33 let mut buf = [0u8; 5];
35 reader.read_exact(&mut buf, 5).await.unwrap();
36 assert_eq!(&buf, b"Hello");
37
38 let mut buf = [0u8; 14];
40 reader.read_exact(&mut buf, 14).await.unwrap();
41 assert_eq!(&buf, b", world! This ");
42
43 assert_eq!(reader.position(), 19);
45
46 let mut buf = [0u8; 10];
48 reader.read_exact(&mut buf, 7).await.unwrap();
49 assert_eq!(&buf[..7], b"is a te");
50
51 let mut buf = [0u8; 5];
53 let result = reader.read_exact(&mut buf, 5).await;
54 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
55 });
56 }
57
58 #[test_traced]
59 fn test_read_cross_boundary() {
60 let executor = deterministic::Runner::default();
61 executor.start(|context| async move {
62 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
64 let (blob, size) = context.open("partition", b"test").await.unwrap();
65 assert_eq!(size, 0);
66 blob.write_at(data.to_vec(), 0).await.unwrap();
67 let size = data.len() as u64;
68
69 let mut reader = Read::new(blob, size, NZUsize!(10));
71
72 let mut buf = [0u8; 15];
74 reader.read_exact(&mut buf, 15).await.unwrap();
75 assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
76
77 assert_eq!(reader.position(), 15);
79
80 let mut buf = [0u8; 11];
82 reader.read_exact(&mut buf, 11).await.unwrap();
83 assert_eq!(&buf, b"PQRSTUVWXYZ");
84
85 assert_eq!(reader.position(), 26);
87 assert_eq!(reader.blob_remaining(), 0);
88 });
89 }
90
91 #[test_traced]
93 fn test_read_to_end_then_rewind_and_read_again() {
94 let executor = deterministic::Runner::default();
95 executor.start(|context| async move {
96 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
97 let (blob, size) = context.open("partition", b"test").await.unwrap();
98 assert_eq!(size, 0);
99 blob.write_at(data.to_vec(), 0).await.unwrap();
100 let size = data.len() as u64;
101
102 let mut reader = Read::new(blob, size, NZUsize!(20));
103
104 let mut buf = [0u8; 21];
106 reader.read_exact(&mut buf, 21).await.unwrap();
107 assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
108
109 assert_eq!(reader.position(), 21);
111
112 let mut buf = [0u8; 5];
114 reader.read_exact(&mut buf, 5).await.unwrap();
115 assert_eq!(&buf, b"VWXYZ");
116
117 reader.seek_to(0).unwrap();
119 let mut buf = [0u8; 21];
120 reader.read_exact(&mut buf, 21).await.unwrap();
121 assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
122 });
123 }
124
125 #[test_traced]
126 fn test_read_with_known_size() {
127 let executor = deterministic::Runner::default();
128 executor.start(|context| async move {
129 let data = b"This is a test with known size limitations.";
131 let (blob, size) = context.open("partition", b"test").await.unwrap();
132 assert_eq!(size, 0);
133 blob.write_at(data.to_vec(), 0).await.unwrap();
134 let size = data.len() as u64;
135
136 let mut reader = Read::new(blob, size, NZUsize!(10));
138
139 assert_eq!(reader.blob_remaining(), size);
141
142 let mut buf = [0u8; 5];
144 reader.read_exact(&mut buf, 5).await.unwrap();
145 assert_eq!(&buf, b"This ");
146
147 assert_eq!(reader.blob_remaining(), size - 5);
149
150 let mut buf = vec![0u8; (size - 5) as usize];
152 reader
153 .read_exact(&mut buf, (size - 5) as usize)
154 .await
155 .unwrap();
156 assert_eq!(&buf, b"is a test with known size limitations.");
157
158 assert_eq!(reader.blob_remaining(), 0);
160
161 let mut buf = [0u8; 1];
163 let result = reader.read_exact(&mut buf, 1).await;
164 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
165 });
166 }
167
168 #[test_traced]
169 fn test_read_large_data() {
170 let executor = deterministic::Runner::default();
171 executor.start(|context| async move {
172 let data_size = 1024 * 256; let data = vec![0x42; data_size];
175 let (blob, size) = context.open("partition", b"test").await.unwrap();
176 assert_eq!(size, 0);
177 blob.write_at(data.clone(), 0).await.unwrap();
178 let size = data.len() as u64;
179
180 let mut reader = Read::new(blob, size, NZUsize!(64 * 1024)); let mut total_read = 0;
185 let chunk_size = 8 * 1024; let mut buf = vec![0u8; chunk_size];
187
188 while total_read < data_size {
189 let to_read = std::cmp::min(chunk_size, data_size - total_read);
190 reader
191 .read_exact(&mut buf[..to_read], to_read)
192 .await
193 .unwrap();
194
195 assert!(
197 buf[..to_read].iter().all(|&b| b == 0x42),
198 "Data at position {total_read} is not correct"
199 );
200
201 total_read += to_read;
202 }
203
204 assert_eq!(total_read, data_size);
206
207 let mut extra_buf = [0u8; 1];
209 let result = reader.read_exact(&mut extra_buf, 1).await;
210 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
211 });
212 }
213
214 #[test_traced]
215 fn test_read_exact_size_reads() {
216 let executor = deterministic::Runner::default();
217 executor.start(|context| async move {
218 let buffer_size = 1024;
220 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
222
223 let (blob, size) = context.open("partition", b"test").await.unwrap();
224 assert_eq!(size, 0);
225 blob.write_at(data.clone(), 0).await.unwrap();
226 let size = data.len() as u64;
227
228 let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
229
230 let mut buf1 = vec![0u8; buffer_size];
232 reader.read_exact(&mut buf1, buffer_size).await.unwrap();
233 assert!(buf1.iter().all(|&b| b == 0x37));
234
235 let mut buf2 = vec![0u8; buffer_size];
237 reader.read_exact(&mut buf2, buffer_size).await.unwrap();
238 assert!(buf2.iter().all(|&b| b == 0x37));
239
240 let half_buffer = buffer_size / 2;
242 let mut buf3 = vec![0u8; half_buffer];
243 reader.read_exact(&mut buf3, half_buffer).await.unwrap();
244 assert!(buf3.iter().all(|&b| b == 0x37));
245
246 assert_eq!(reader.blob_remaining(), 0);
248 assert_eq!(reader.position(), size);
249 });
250 }
251
252 #[test_traced]
253 fn test_read_seek_to() {
254 let executor = deterministic::Runner::default();
255 executor.start(|context| async move {
256 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
258 let (blob, size) = context.open("partition", b"test").await.unwrap();
259 assert_eq!(size, 0);
260 blob.write_at(data.to_vec(), 0).await.unwrap();
261 let size = data.len() as u64;
262
263 let mut reader = Read::new(blob, size, NZUsize!(10));
265
266 let mut buf = [0u8; 5];
268 reader.read_exact(&mut buf, 5).await.unwrap();
269 assert_eq!(&buf, b"ABCDE");
270 assert_eq!(reader.position(), 5);
271
272 reader.seek_to(10).unwrap();
274 assert_eq!(reader.position(), 10);
275
276 let mut buf = [0u8; 5];
278 reader.read_exact(&mut buf, 5).await.unwrap();
279 assert_eq!(&buf, b"KLMNO");
280
281 reader.seek_to(0).unwrap();
283 assert_eq!(reader.position(), 0);
284
285 let mut buf = [0u8; 5];
286 reader.read_exact(&mut buf, 5).await.unwrap();
287 assert_eq!(&buf, b"ABCDE");
288
289 reader.seek_to(size).unwrap();
291 assert_eq!(reader.position(), size);
292
293 let mut buf = [0u8; 1];
295 let result = reader.read_exact(&mut buf, 1).await;
296 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
297
298 let result = reader.seek_to(size + 10);
300 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
301 });
302 }
303
304 #[test_traced]
305 fn test_read_seek_with_refill() {
306 let executor = deterministic::Runner::default();
307 executor.start(|context| async move {
308 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
311 assert_eq!(size, 0);
312 blob.write_at(data.clone(), 0).await.unwrap();
313 let size = data.len() as u64;
314
315 let mut reader = Read::new(blob, size, NZUsize!(10));
317
318 let mut buf = [0u8; 5];
320 reader.read_exact(&mut buf, 5).await.unwrap();
321
322 reader.seek_to(500).unwrap();
324
325 let mut buf = [0u8; 5];
327 reader.read_exact(&mut buf, 5).await.unwrap();
328 assert_eq!(&buf, b"AAAAA"); assert_eq!(reader.position(), 505);
330
331 reader.seek_to(100).unwrap();
333
334 let mut buf = [0u8; 5];
336 reader.read_exact(&mut buf, 5).await.unwrap();
337 assert_eq!(reader.position(), 105);
338 });
339 }
340
341 #[test_traced]
342 fn test_read_resize() {
343 let executor = deterministic::Runner::default();
344 executor.start(|context| async move {
345 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
347 let (blob, size) = context.open("partition", b"test").await.unwrap();
348 assert_eq!(size, 0);
349 blob.write_at(data.to_vec(), 0).await.unwrap();
350 let data_len = data.len() as u64;
351
352 let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
354
355 let resize_len = data_len / 2;
357 reader.resize(resize_len).await.unwrap();
358
359 let (blob, size) = context.open("partition", b"test").await.unwrap();
361 assert_eq!(size, resize_len, "Blob should be resized to half size");
362
363 let mut new_reader = Read::new(blob, size, NZUsize!(10));
365
366 let mut buf = vec![0u8; size as usize];
368 new_reader
369 .read_exact(&mut buf, size as usize)
370 .await
371 .unwrap();
372 assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
373
374 let mut extra_buf = [0u8; 1];
376 let result = new_reader.read_exact(&mut extra_buf, 1).await;
377 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
378
379 new_reader.resize(data_len * 2).await.unwrap();
381
382 let (blob, new_size) = context.open("partition", b"test").await.unwrap();
384 assert_eq!(new_size, data_len * 2);
385
386 let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
388 let mut buf = vec![0u8; new_size as usize];
389 new_reader
390 .read_exact(&mut buf, new_size as usize)
391 .await
392 .unwrap();
393 assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
394 assert_eq!(
395 &buf[size as usize..],
396 vec![0u8; new_size as usize - size as usize]
397 );
398 });
399 }
400
401 #[test_traced]
402 fn test_read_resize_to_zero() {
403 let executor = deterministic::Runner::default();
404 executor.start(|context| async move {
405 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
407 let data_len = data.len() as u64;
408 let (blob, size) = context.open("partition", b"test").await.unwrap();
409 assert_eq!(size, 0);
410 blob.write_at(data.to_vec(), 0).await.unwrap();
411
412 let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
414
415 reader.resize(0).await.unwrap();
417
418 let (blob, size) = context.open("partition", b"test").await.unwrap();
420 assert_eq!(size, 0, "Blob should be resized to zero");
421
422 let mut new_reader = Read::new(blob, size, NZUsize!(10));
424
425 let mut buf = [0u8; 1];
427 let result = new_reader.read_exact(&mut buf, 1).await;
428 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
429 });
430 }
431
432 #[test_traced]
433 fn test_write_basic() {
434 let executor = deterministic::Runner::default();
435 executor.start(|context| async move {
436 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
438 assert_eq!(size, 0);
439
440 let writer = Write::new(blob.clone(), size, NZUsize!(8));
441 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
442 assert_eq!(writer.size().await, 5);
443 writer.sync().await.unwrap();
444 assert_eq!(writer.size().await, 5);
445
446 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
448 assert_eq!(size, 5);
449 let mut reader = Read::new(blob, size, NZUsize!(8));
450 let mut buf = [0u8; 5];
451 reader.read_exact(&mut buf, 5).await.unwrap();
452 assert_eq!(&buf, b"hello");
453 });
454 }
455
456 #[test_traced]
457 fn test_write_multiple_flushes() {
458 let executor = deterministic::Runner::default();
459 executor.start(|context| async move {
460 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
462 assert_eq!(size, 0);
463
464 let writer = Write::new(blob.clone(), size, NZUsize!(4));
465 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
466 assert_eq!(writer.size().await, 3);
467 writer.write_at(b"defg".to_vec(), 3).await.unwrap();
468 assert_eq!(writer.size().await, 7);
469 writer.sync().await.unwrap();
470
471 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
473 assert_eq!(size, 7);
474 let mut reader = Read::new(blob, size, NZUsize!(4));
475 let mut buf = [0u8; 7];
476 reader.read_exact(&mut buf, 7).await.unwrap();
477 assert_eq!(&buf, b"abcdefg");
478 });
479 }
480
481 #[test_traced]
482 fn test_write_large_data() {
483 let executor = deterministic::Runner::default();
484 executor.start(|context| async move {
485 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
487 assert_eq!(size, 0);
488
489 let writer = Write::new(blob.clone(), size, NZUsize!(4));
490 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
491 assert_eq!(writer.size().await, 3);
492 writer
493 .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
494 .await
495 .unwrap();
496 assert_eq!(writer.size().await, 26);
497 writer.sync().await.unwrap();
498 assert_eq!(writer.size().await, 26);
499
500 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
502 assert_eq!(size, 26);
503 let mut reader = Read::new(blob, size, NZUsize!(4));
504 let mut buf = [0u8; 26];
505 reader.read_exact(&mut buf, 26).await.unwrap();
506 assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
507 });
508 }
509
510 #[test_traced]
511 fn test_write_append_to_buffer() {
512 let executor = deterministic::Runner::default();
513 executor.start(|context| async move {
514 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
516 let writer = Write::new(blob.clone(), size, NZUsize!(10));
517
518 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
520 assert_eq!(writer.size().await, 5);
521
522 writer.write_at(b" world".to_vec(), 5).await.unwrap();
524 writer.sync().await.unwrap();
525 assert_eq!(writer.size().await, 11);
526
527 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
529 assert_eq!(size, 11);
530 let mut reader = Read::new(blob, size, NZUsize!(10));
531 let mut buf = vec![0u8; 11];
532 reader.read_exact(&mut buf, 11).await.unwrap();
533 assert_eq!(&buf, b"hello world");
534 });
535 }
536
537 #[test_traced]
538 fn test_write_into_middle_of_buffer() {
539 let executor = deterministic::Runner::default();
540 executor.start(|context| async move {
541 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
543 let writer = Write::new(blob.clone(), size, NZUsize!(20));
544
545 writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
547 assert_eq!(writer.size().await, 10);
548
549 writer.write_at(b"01234".to_vec(), 2).await.unwrap();
551 assert_eq!(writer.size().await, 10);
552 writer.sync().await.unwrap();
553
554 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
556 assert_eq!(size, 10);
557 let mut reader = Read::new(blob, size, NZUsize!(10));
558 let mut buf = vec![0u8; 10];
559 reader.read_exact(&mut buf, 10).await.unwrap();
560 assert_eq!(&buf, b"ab01234hij");
561
562 writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
564 assert_eq!(writer.size().await, 20);
565 writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
566 assert_eq!(writer.size().await, 20);
567 writer.sync().await.unwrap();
568
569 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
571 assert_eq!(size, 20);
572 let mut reader = Read::new(blob, size, NZUsize!(20));
573 let mut buf = vec![0u8; 20];
574 reader.read_exact(&mut buf, 20).await.unwrap();
575 assert_eq!(&buf, b"ab01234hiwxyznopqrst");
576 });
577 }
578
579 #[test_traced]
580 fn test_write_before_buffer() {
581 let executor = deterministic::Runner::default();
582 executor.start(|context| async move {
583 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
585 let writer = Write::new(blob.clone(), size, NZUsize!(10));
586
587 writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
589 assert_eq!(writer.size().await, 20);
590
591 writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
593 assert_eq!(writer.size().await, 20);
594 writer.sync().await.unwrap();
595
596 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
598 assert_eq!(size, 20);
599 let mut reader = Read::new(blob, size, NZUsize!(20));
600 let mut buf = vec![0u8; 20];
601 reader.read_exact(&mut buf, 20).await.unwrap();
602 let mut expected = vec![0u8; 20];
603 expected[0..5].copy_from_slice("abcde".as_bytes());
604 expected[10..20].copy_from_slice("0123456789".as_bytes());
605 assert_eq!(buf, expected);
606
607 writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
609 assert_eq!(writer.size().await, 20);
610 writer.sync().await.unwrap();
611 assert_eq!(writer.size().await, 20);
612
613 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
615 assert_eq!(size, 20);
616 let mut reader = Read::new(blob, size, NZUsize!(20));
617 let mut buf = vec![0u8; 20];
618 reader.read_exact(&mut buf, 20).await.unwrap();
619 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
620 assert_eq!(buf, expected);
621 });
622 }
623
624 #[test_traced]
625 fn test_write_resize() {
626 let executor = deterministic::Runner::default();
627 executor.start(|context| async move {
628 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
630 let writer = Write::new(blob, size, NZUsize!(10));
631
632 writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
634 assert_eq!(writer.size().await, 11);
635 writer.sync().await.unwrap();
636 assert_eq!(writer.size().await, 11);
637
638 let (blob_check, size_check) =
639 context.open("partition", b"resize_write").await.unwrap();
640 assert_eq!(size_check, 11);
641 drop(blob_check);
642
643 writer.resize(5).await.unwrap();
645 assert_eq!(writer.size().await, 5);
646 writer.sync().await.unwrap();
647
648 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
650 assert_eq!(size, 5);
651 let mut reader = Read::new(blob, size, NZUsize!(5));
652 let mut buf = vec![0u8; 5];
653 reader.read_exact(&mut buf, 5).await.unwrap();
654 assert_eq!(&buf, b"hello");
655
656 writer.write_at(b"X".to_vec(), 0).await.unwrap();
658 assert_eq!(writer.size().await, 5);
659 writer.sync().await.unwrap();
660
661 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
663 assert_eq!(size, 5);
664 let mut reader = Read::new(blob, size, NZUsize!(5));
665 let mut buf = vec![0u8; 5];
666 reader.read_exact(&mut buf, 5).await.unwrap();
667 assert_eq!(&buf, b"Xello");
668
669 writer.resize(10).await.unwrap();
671 assert_eq!(writer.size().await, 10);
672 writer.sync().await.unwrap();
673
674 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
676 assert_eq!(size, 10);
677 let mut reader = Read::new(blob, size, NZUsize!(10));
678 let mut buf = vec![0u8; 10];
679 reader.read_exact(&mut buf, 10).await.unwrap();
680 assert_eq!(&buf[0..5], b"Xello");
681 assert_eq!(&buf[5..10], [0u8; 5]);
682
683 let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
685 let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
686 writer_zero
687 .write_at(b"some data".to_vec(), 0)
688 .await
689 .unwrap();
690 assert_eq!(writer_zero.size().await, 9);
691 writer_zero.sync().await.unwrap();
692 assert_eq!(writer_zero.size().await, 9);
693 writer_zero.resize(0).await.unwrap();
694 assert_eq!(writer_zero.size().await, 0);
695 writer_zero.sync().await.unwrap();
696 assert_eq!(writer_zero.size().await, 0);
697
698 let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
700 assert_eq!(size_z, 0);
701 });
702 }
703
704 #[test_traced]
705 fn test_write_read_at_on_writer() {
706 let executor = deterministic::Runner::default();
707 executor.start(|context| async move {
708 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
710 let writer = Write::new(blob.clone(), size, NZUsize!(10));
711
712 writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
714 assert_eq!(writer.size().await, 8);
715
716 let mut read_buf_vec = vec![0u8; 4].into();
718 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
719 assert_eq!(read_buf_vec.as_ref(), b"buff");
720
721 read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
722 assert_eq!(read_buf_vec.as_ref(), b"ered");
723
724 let small_buf_vec = vec![0u8; 1];
726 assert!(writer.read_at(small_buf_vec, 8).await.is_err());
727
728 writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
730 assert_eq!(writer.size().await, 20);
731 writer.sync().await.unwrap();
732 assert_eq!(writer.size().await, 20);
733
734 let mut read_buf_vec_2 = vec![0u8; 4].into();
736 read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
737 assert_eq!(read_buf_vec_2.as_ref(), b"buff");
738
739 let mut read_buf_7_vec = vec![0u8; 7].into();
740 read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
741 assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
742
743 writer.write_at(b" more data".to_vec(), 20).await.unwrap();
745 assert_eq!(writer.size().await, 30);
746
747 let mut read_buf_vec_3 = vec![0u8; 5].into();
749 read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
750 assert_eq!(read_buf_vec_3.as_ref(), b" more");
751
752 let mut combo_read_buf_vec = vec![0u8; 12].into();
754 combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
755 assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
756
757 writer.sync().await.unwrap();
759 assert_eq!(writer.size().await, 30);
760 let (final_blob, final_size) =
761 context.open("partition", b"read_at_writer").await.unwrap();
762 assert_eq!(final_size, 30);
763 let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
764 let mut full_content = vec![0u8; 30];
765 final_reader
766 .read_exact(&mut full_content, 30)
767 .await
768 .unwrap();
769 assert_eq!(&full_content, b"buffered and flushed more data");
770 });
771 }
772
773 #[test_traced]
774 fn test_write_straddling_non_mergeable() {
775 let executor = deterministic::Runner::default();
776 executor.start(|context| async move {
777 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
779 let writer = Write::new(blob.clone(), size, NZUsize!(10));
780
781 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
783 assert_eq!(writer.size().await, 10);
784
785 writer.write_at(b"abc".to_vec(), 15).await.unwrap();
787 assert_eq!(writer.size().await, 18);
788 writer.sync().await.unwrap();
789 assert_eq!(writer.size().await, 18);
790
791 let (blob_check, size_check) =
793 context.open("partition", b"write_straddle").await.unwrap();
794 assert_eq!(size_check, 18);
795 let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
796 let mut buf = vec![0u8; 18];
797 reader.read_exact(&mut buf, 18).await.unwrap();
798
799 let mut expected = vec![0u8; 18];
800 expected[0..10].copy_from_slice(b"0123456789");
801 expected[15..18].copy_from_slice(b"abc");
802 assert_eq!(buf, expected);
803
804 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
806 let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
807 writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
808 assert_eq!(writer2.size().await, 10);
809
810 writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
812 assert_eq!(writer2.size().await, 17);
813 writer2.sync().await.unwrap();
814 assert_eq!(writer2.size().await, 17);
815
816 let (blob_check2, size_check2) =
818 context.open("partition", b"write_straddle2").await.unwrap();
819 assert_eq!(size_check2, 17);
820 let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
821 let mut buf2 = vec![0u8; 17];
822 reader2.read_exact(&mut buf2, 17).await.unwrap();
823 assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
824 });
825 }
826
827 #[test_traced]
828 fn test_write_close() {
829 let executor = deterministic::Runner::default();
830 executor.start(|context| async move {
831 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
833 let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
834 writer.write_at(b"pending".to_vec(), 0).await.unwrap();
835 assert_eq!(writer.size().await, 7);
836
837 writer.sync().await.unwrap();
839
840 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
842 assert_eq!(size_check, 7);
843 let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
844 let mut buf = [0u8; 7];
845 reader.read_exact(&mut buf, 7).await.unwrap();
846 assert_eq!(&buf, b"pending");
847 });
848 }
849
850 #[test_traced]
851 fn test_write_direct_due_to_size() {
852 let executor = deterministic::Runner::default();
853 executor.start(|context| async move {
854 let (blob, size) = context
856 .open("partition", b"write_direct_size")
857 .await
858 .unwrap();
859 let writer = Write::new(blob.clone(), size, NZUsize!(5));
860
861 let data_large = b"0123456789";
863 writer.write_at(data_large.to_vec(), 0).await.unwrap();
864 assert_eq!(writer.size().await, 10);
865
866 writer.sync().await.unwrap();
868
869 let (blob_check, size_check) = context
871 .open("partition", b"write_direct_size")
872 .await
873 .unwrap();
874 assert_eq!(size_check, 10);
875 let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
876 let mut buf = vec![0u8; 10];
877 reader.read_exact(&mut buf, 10).await.unwrap();
878 assert_eq!(&buf, data_large.as_slice());
879
880 writer.write_at(b"abc".to_vec(), 10).await.unwrap();
882 assert_eq!(writer.size().await, 13);
883
884 let mut read_small_buf_vec = vec![0u8; 3].into();
886 read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
887 assert_eq!(read_small_buf_vec.as_ref(), b"abc");
888
889 writer.sync().await.unwrap();
890
891 let (blob_check2, size_check2) = context
893 .open("partition", b"write_direct_size")
894 .await
895 .unwrap();
896 assert_eq!(size_check2, 13);
897 let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
898 let mut buf2 = vec![0u8; 13];
899 reader2.read_exact(&mut buf2, 13).await.unwrap();
900 assert_eq!(&buf2[10..], b"abc".as_slice());
901 });
902 }
903
904 #[test_traced]
905 fn test_write_overwrite_and_extend_in_buffer() {
906 let executor = deterministic::Runner::default();
907 executor.start(|context| async move {
908 let (blob, size) = context
910 .open("partition", b"overwrite_extend_buf")
911 .await
912 .unwrap();
913 let writer = Write::new(blob.clone(), size, NZUsize!(15));
914
915 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
917 assert_eq!(writer.size().await, 10);
918
919 writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
921 assert_eq!(writer.size().await, 15);
922
923 let mut read_buf_vec = vec![0u8; 15].into();
925 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
926 assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
927
928 writer.sync().await.unwrap();
929
930 let (blob_check, size_check) = context
932 .open("partition", b"overwrite_extend_buf")
933 .await
934 .unwrap();
935 assert_eq!(size_check, 15);
936 let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
937 let mut final_buf = vec![0u8; 15];
938 reader.read_exact(&mut final_buf, 15).await.unwrap();
939 assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
940 });
941 }
942
943 #[test_traced]
944 fn test_write_at_size() {
945 let executor = deterministic::Runner::default();
946 executor.start(|context| async move {
947 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
949 let writer = Write::new(blob.clone(), size, NZUsize!(20));
950
951 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
953 assert_eq!(writer.size().await, 10);
954 writer.sync().await.unwrap();
955
956 writer
958 .write_at(b"abc".to_vec(), writer.size().await)
959 .await
960 .unwrap();
961 assert_eq!(writer.size().await, 13);
962 writer.sync().await.unwrap();
963
964 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
966 assert_eq!(size_check, 13);
967 let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
968 let mut buf = vec![0u8; 13];
969 reader.read_exact(&mut buf, 13).await.unwrap();
970 assert_eq!(&buf, b"0123456789abc");
971 });
972 }
973
974 #[test_traced]
975 fn test_write_at_size_multiple_appends() {
976 let executor = deterministic::Runner::default();
977 executor.start(|context| async move {
978 let (blob, size) = context
980 .open("partition", b"write_multiple_appends_at_size")
981 .await
982 .unwrap();
983 let writer = Write::new(blob.clone(), size, NZUsize!(5)); writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
987 assert_eq!(writer.size().await, 3);
988 writer.sync().await.unwrap();
989 assert_eq!(writer.size().await, 3);
990
991 writer
993 .write_at(b"BBB".to_vec(), writer.size().await)
994 .await
995 .unwrap();
996 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
998 assert_eq!(writer.size().await, 6);
999
1000 writer
1002 .write_at(b"CCC".to_vec(), writer.size().await)
1003 .await
1004 .unwrap();
1005 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
1007 assert_eq!(writer.size().await, 9);
1008
1009 let (blob_check, size_check) = context
1011 .open("partition", b"write_multiple_appends_at_size")
1012 .await
1013 .unwrap();
1014 assert_eq!(size_check, 9);
1015 let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
1016 let mut buf = vec![0u8; 9];
1017 reader.read_exact(&mut buf, 9).await.unwrap();
1018 assert_eq!(&buf, b"AAABBBCCC");
1019 });
1020 }
1021
1022 #[test_traced]
1023 fn test_write_non_contiguous_then_append_at_size() {
1024 let executor = deterministic::Runner::default();
1025 executor.start(|context| async move {
1026 let (blob, size) = context
1028 .open("partition", b"write_non_contiguous_then_append")
1029 .await
1030 .unwrap();
1031 let writer = Write::new(blob.clone(), size, NZUsize!(10));
1032
1033 writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); assert_eq!(writer.size().await, 7);
1036 writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1040 assert_eq!(writer.size().await, 29);
1041 writer.sync().await.unwrap();
1042 assert_eq!(writer.size().await, 29);
1043
1044 writer
1046 .write_at(b"APPEND".to_vec(), writer.size().await)
1047 .await
1048 .unwrap();
1049 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1051 assert_eq!(writer.size().await, 35);
1052
1053 let (blob_check, size_check) = context
1055 .open("partition", b"write_non_contiguous_then_append")
1056 .await
1057 .unwrap();
1058 assert_eq!(size_check, 35);
1059 let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
1060 let mut buf = vec![0u8; 35];
1061 reader.read_exact(&mut buf, 35).await.unwrap();
1062
1063 let mut expected = vec![0u8; 35];
1064 expected[0..7].copy_from_slice(b"INITIAL");
1065 expected[20..29].copy_from_slice(b"NONCONTIG");
1066 expected[29..35].copy_from_slice(b"APPEND");
1067 assert_eq!(buf, expected);
1068 });
1069 }
1070
1071 #[test_traced]
1072 fn test_resize_then_append_at_size() {
1073 let executor = deterministic::Runner::default();
1074 executor.start(|context| async move {
1075 let (blob, size) = context
1077 .open("partition", b"resize_then_append_at_size")
1078 .await
1079 .unwrap();
1080 let writer = Write::new(blob.clone(), size, NZUsize!(10));
1081
1082 writer
1084 .write_at(b"0123456789ABCDEF".to_vec(), 0)
1085 .await
1086 .unwrap(); assert_eq!(writer.size().await, 16);
1088 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1090
1091 let resize_to = 5;
1093 writer.resize(resize_to).await.unwrap();
1094 assert_eq!(writer.size().await, resize_to);
1097 writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
1099
1100 writer
1102 .write_at(b"XXXXX".to_vec(), writer.size().await)
1103 .await
1104 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1108 assert_eq!(writer.size().await, 10);
1109
1110 let (blob_check, size_check) = context
1112 .open("partition", b"resize_then_append_at_size")
1113 .await
1114 .unwrap();
1115 assert_eq!(size_check, 10);
1116 let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
1117 let mut buf = vec![0u8; 10];
1118 reader.read_exact(&mut buf, 10).await.unwrap();
1119 assert_eq!(&buf, b"01234XXXXX");
1120 });
1121 }
1122}