1mod append;
4pub mod pool;
5mod read;
6mod tip;
7mod write;
8
9pub use append::Append;
10pub use pool::{Pool, PoolRef};
11pub use read::Read;
12pub use write::Write;
13
14#[cfg(test)]
15mod tests {
16 use super::*;
17 use crate::{deterministic, Blob as _, Error, Runner, Storage};
18 use commonware_macros::test_traced;
19
20 #[test_traced]
21 fn test_read_basic() {
22 let executor = deterministic::Runner::default();
23 executor.start(|context| async move {
24 let data = b"Hello, world! This is a test.";
26 let (blob, size) = context.open("partition", b"test").await.unwrap();
27 assert_eq!(size, 0);
28 blob.write_at(data.to_vec(), 0).await.unwrap();
29 let size = data.len() as u64;
30
31 let buffer_size = 10;
33 let mut reader = Read::new(blob, size, buffer_size);
34
35 let mut buf = [0u8; 5];
37 reader.read_exact(&mut buf, 5).await.unwrap();
38 assert_eq!(&buf, b"Hello");
39
40 let mut buf = [0u8; 14];
42 reader.read_exact(&mut buf, 14).await.unwrap();
43 assert_eq!(&buf, b", world! This ");
44
45 assert_eq!(reader.position(), 19);
47
48 let mut buf = [0u8; 10];
50 reader.read_exact(&mut buf, 7).await.unwrap();
51 assert_eq!(&buf[..7], b"is a te");
52
53 let mut buf = [0u8; 5];
55 let result = reader.read_exact(&mut buf, 5).await;
56 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
57 });
58 }
59
60 #[test_traced]
61 #[should_panic(expected = "buffer size must be greater than zero")]
62 fn test_read_empty() {
63 let executor = deterministic::Runner::default();
64 executor.start(|context| async move {
65 let data = b"Hello, world! This is a test.";
67 let (blob, size) = context.open("partition", b"test").await.unwrap();
68 assert_eq!(size, 0);
69 blob.write_at(data.to_vec(), 0).await.unwrap();
70 let size = data.len() as u64;
71
72 let buffer_size = 0;
74 Read::new(blob, size, buffer_size);
75 });
76 }
77
78 #[test_traced]
79 fn test_read_cross_boundary() {
80 let executor = deterministic::Runner::default();
81 executor.start(|context| async move {
82 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
84 let (blob, size) = context.open("partition", b"test").await.unwrap();
85 assert_eq!(size, 0);
86 blob.write_at(data.to_vec(), 0).await.unwrap();
87 let size = data.len() as u64;
88
89 let buffer_size = 10;
91 let mut reader = Read::new(blob, size, buffer_size);
92
93 let mut buf = [0u8; 15];
95 reader.read_exact(&mut buf, 15).await.unwrap();
96 assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
97
98 assert_eq!(reader.position(), 15);
100
101 let mut buf = [0u8; 11];
103 reader.read_exact(&mut buf, 11).await.unwrap();
104 assert_eq!(&buf, b"PQRSTUVWXYZ");
105
106 assert_eq!(reader.position(), 26);
108 assert_eq!(reader.blob_remaining(), 0);
109 });
110 }
111
112 #[test_traced]
113 fn test_read_with_known_size() {
114 let executor = deterministic::Runner::default();
115 executor.start(|context| async move {
116 let data = b"This is a test with known size limitations.";
118 let (blob, size) = context.open("partition", b"test").await.unwrap();
119 assert_eq!(size, 0);
120 blob.write_at(data.to_vec(), 0).await.unwrap();
121 let size = data.len() as u64;
122
123 let buffer_size = 10;
125 let mut reader = Read::new(blob, size, buffer_size);
126
127 assert_eq!(reader.blob_remaining(), size);
129
130 let mut buf = [0u8; 5];
132 reader.read_exact(&mut buf, 5).await.unwrap();
133 assert_eq!(&buf, b"This ");
134
135 assert_eq!(reader.blob_remaining(), size - 5);
137
138 let mut buf = vec![0u8; (size - 5) as usize];
140 reader
141 .read_exact(&mut buf, (size - 5) as usize)
142 .await
143 .unwrap();
144 assert_eq!(&buf, b"is a test with known size limitations.");
145
146 assert_eq!(reader.blob_remaining(), 0);
148
149 let mut buf = [0u8; 1];
151 let result = reader.read_exact(&mut buf, 1).await;
152 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
153 });
154 }
155
156 #[test_traced]
157 fn test_read_large_data() {
158 let executor = deterministic::Runner::default();
159 executor.start(|context| async move {
160 let data_size = 1024 * 256; let data = vec![0x42; data_size];
163 let (blob, size) = context.open("partition", b"test").await.unwrap();
164 assert_eq!(size, 0);
165 blob.write_at(data.clone(), 0).await.unwrap();
166 let size = data.len() as u64;
167
168 let buffer_size = 64 * 1024; let mut reader = Read::new(blob, size, buffer_size);
171
172 let mut total_read = 0;
174 let chunk_size = 8 * 1024; let mut buf = vec![0u8; chunk_size];
176
177 while total_read < data_size {
178 let to_read = std::cmp::min(chunk_size, data_size - total_read);
179 reader
180 .read_exact(&mut buf[..to_read], to_read)
181 .await
182 .unwrap();
183
184 assert!(
186 buf[..to_read].iter().all(|&b| b == 0x42),
187 "Data at position {total_read} is not correct"
188 );
189
190 total_read += to_read;
191 }
192
193 assert_eq!(total_read, data_size);
195
196 let mut extra_buf = [0u8; 1];
198 let result = reader.read_exact(&mut extra_buf, 1).await;
199 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
200 });
201 }
202
203 #[test_traced]
204 fn test_read_exact_size_reads() {
205 let executor = deterministic::Runner::default();
206 executor.start(|context| async move {
207 let buffer_size = 1024;
209 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
211
212 let (blob, size) = context.open("partition", b"test").await.unwrap();
213 assert_eq!(size, 0);
214 blob.write_at(data.clone(), 0).await.unwrap();
215 let size = data.len() as u64;
216
217 let mut reader = Read::new(blob, size, buffer_size);
218
219 let mut buf1 = vec![0u8; buffer_size];
221 reader.read_exact(&mut buf1, buffer_size).await.unwrap();
222 assert!(buf1.iter().all(|&b| b == 0x37));
223
224 let mut buf2 = vec![0u8; buffer_size];
226 reader.read_exact(&mut buf2, buffer_size).await.unwrap();
227 assert!(buf2.iter().all(|&b| b == 0x37));
228
229 let half_buffer = buffer_size / 2;
231 let mut buf3 = vec![0u8; half_buffer];
232 reader.read_exact(&mut buf3, half_buffer).await.unwrap();
233 assert!(buf3.iter().all(|&b| b == 0x37));
234
235 assert_eq!(reader.blob_remaining(), 0);
237 assert_eq!(reader.position(), size);
238 });
239 }
240
241 #[test_traced]
242 fn test_read_seek_to() {
243 let executor = deterministic::Runner::default();
244 executor.start(|context| async move {
245 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
247 let (blob, size) = context.open("partition", b"test").await.unwrap();
248 assert_eq!(size, 0);
249 blob.write_at(data.to_vec(), 0).await.unwrap();
250 let size = data.len() as u64;
251
252 let buffer_size = 10;
254 let mut reader = Read::new(blob, size, buffer_size);
255
256 let mut buf = [0u8; 5];
258 reader.read_exact(&mut buf, 5).await.unwrap();
259 assert_eq!(&buf, b"ABCDE");
260 assert_eq!(reader.position(), 5);
261
262 reader.seek_to(10).unwrap();
264 assert_eq!(reader.position(), 10);
265
266 let mut buf = [0u8; 5];
268 reader.read_exact(&mut buf, 5).await.unwrap();
269 assert_eq!(&buf, b"KLMNO");
270
271 reader.seek_to(0).unwrap();
273 assert_eq!(reader.position(), 0);
274
275 let mut buf = [0u8; 5];
276 reader.read_exact(&mut buf, 5).await.unwrap();
277 assert_eq!(&buf, b"ABCDE");
278
279 reader.seek_to(size).unwrap();
281 assert_eq!(reader.position(), size);
282
283 let mut buf = [0u8; 1];
285 let result = reader.read_exact(&mut buf, 1).await;
286 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
287
288 let result = reader.seek_to(size + 10);
290 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
291 });
292 }
293
294 #[test_traced]
295 fn test_read_seek_with_refill() {
296 let executor = deterministic::Runner::default();
297 executor.start(|context| async move {
298 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
301 assert_eq!(size, 0);
302 blob.write_at(data.clone(), 0).await.unwrap();
303 let size = data.len() as u64;
304
305 let buffer_size = 10;
307 let mut reader = Read::new(blob, size, buffer_size);
308
309 let mut buf = [0u8; 5];
311 reader.read_exact(&mut buf, 5).await.unwrap();
312
313 reader.seek_to(500).unwrap();
315
316 let mut buf = [0u8; 5];
318 reader.read_exact(&mut buf, 5).await.unwrap();
319 assert_eq!(&buf, b"AAAAA"); assert_eq!(reader.position(), 505);
321
322 reader.seek_to(100).unwrap();
324
325 let mut buf = [0u8; 5];
327 reader.read_exact(&mut buf, 5).await.unwrap();
328 assert_eq!(reader.position(), 105);
329 });
330 }
331
332 #[test_traced]
333 fn test_read_resize() {
334 let executor = deterministic::Runner::default();
335 executor.start(|context| async move {
336 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
338 let (blob, size) = context.open("partition", b"test").await.unwrap();
339 assert_eq!(size, 0);
340 blob.write_at(data.to_vec(), 0).await.unwrap();
341 let data_len = data.len() as u64;
342
343 let buffer_size = 10;
345 let reader = Read::new(blob.clone(), data_len, buffer_size);
346
347 let resize_len = data_len / 2;
349 reader.resize(resize_len).await.unwrap();
350
351 let (blob, size) = context.open("partition", b"test").await.unwrap();
353 assert_eq!(size, resize_len, "Blob should be resized to half size");
354
355 let mut new_reader = Read::new(blob, size, buffer_size);
357
358 let mut buf = vec![0u8; size as usize];
360 new_reader
361 .read_exact(&mut buf, size as usize)
362 .await
363 .unwrap();
364 assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
365
366 let mut extra_buf = [0u8; 1];
368 let result = new_reader.read_exact(&mut extra_buf, 1).await;
369 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
370
371 new_reader.resize(data_len * 2).await.unwrap();
373
374 let (blob, new_size) = context.open("partition", b"test").await.unwrap();
376 assert_eq!(new_size, data_len * 2);
377
378 let mut new_reader = Read::new(blob, new_size, buffer_size);
380 let mut buf = vec![0u8; new_size as usize];
381 new_reader
382 .read_exact(&mut buf, new_size as usize)
383 .await
384 .unwrap();
385 assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
386 assert_eq!(
387 &buf[size as usize..],
388 vec![0u8; new_size as usize - size as usize]
389 );
390 });
391 }
392
393 #[test_traced]
394 fn test_read_resize_to_zero() {
395 let executor = deterministic::Runner::default();
396 executor.start(|context| async move {
397 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
399 let data_len = data.len() as u64;
400 let (blob, size) = context.open("partition", b"test").await.unwrap();
401 assert_eq!(size, 0);
402 blob.write_at(data.to_vec(), 0).await.unwrap();
403
404 let buffer_size = 10;
406 let reader = Read::new(blob.clone(), data_len, buffer_size);
407
408 reader.resize(0).await.unwrap();
410
411 let (blob, size) = context.open("partition", b"test").await.unwrap();
413 assert_eq!(size, 0, "Blob should be resized to zero");
414
415 let mut new_reader = Read::new(blob, size, buffer_size);
417
418 let mut buf = [0u8; 1];
420 let result = new_reader.read_exact(&mut buf, 1).await;
421 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
422 });
423 }
424
425 #[test_traced]
426 fn test_write_basic() {
427 let executor = deterministic::Runner::default();
428 executor.start(|context| async move {
429 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
431 assert_eq!(size, 0);
432
433 let writer = Write::new(blob.clone(), size, 8);
434 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
435 assert_eq!(writer.size().await, 5);
436 writer.sync().await.unwrap();
437 assert_eq!(writer.size().await, 5);
438
439 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
441 assert_eq!(size, 5);
442 let mut reader = Read::new(blob, size, 8);
443 let mut buf = [0u8; 5];
444 reader.read_exact(&mut buf, 5).await.unwrap();
445 assert_eq!(&buf, b"hello");
446 });
447 }
448
449 #[test_traced]
450 fn test_write_multiple_flushes() {
451 let executor = deterministic::Runner::default();
452 executor.start(|context| async move {
453 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
455 assert_eq!(size, 0);
456
457 let writer = Write::new(blob.clone(), size, 4);
458 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
459 assert_eq!(writer.size().await, 3);
460 writer.write_at(b"defg".to_vec(), 3).await.unwrap();
461 assert_eq!(writer.size().await, 7);
462 writer.sync().await.unwrap();
463
464 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
466 assert_eq!(size, 7);
467 let mut reader = Read::new(blob, size, 4);
468 let mut buf = [0u8; 7];
469 reader.read_exact(&mut buf, 7).await.unwrap();
470 assert_eq!(&buf, b"abcdefg");
471 });
472 }
473
474 #[test_traced]
475 fn test_write_large_data() {
476 let executor = deterministic::Runner::default();
477 executor.start(|context| async move {
478 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
480 assert_eq!(size, 0);
481
482 let writer = Write::new(blob.clone(), size, 4);
483 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
484 assert_eq!(writer.size().await, 3);
485 writer
486 .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
487 .await
488 .unwrap();
489 assert_eq!(writer.size().await, 26);
490 writer.sync().await.unwrap();
491 assert_eq!(writer.size().await, 26);
492
493 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
495 assert_eq!(size, 26);
496 let mut reader = Read::new(blob, size, 4);
497 let mut buf = [0u8; 26];
498 reader.read_exact(&mut buf, 26).await.unwrap();
499 assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
500 });
501 }
502
503 #[test_traced]
504 #[should_panic(expected = "buffer capacity must be greater than zero")]
505 fn test_write_empty() {
506 let executor = deterministic::Runner::default();
507 executor.start(|context| async move {
508 let (blob, size) = context.open("partition", b"write_empty").await.unwrap();
510 assert_eq!(size, 0);
511 Write::new(blob, size, 0);
512 });
513 }
514
515 #[test_traced]
516 fn test_write_append_to_buffer() {
517 let executor = deterministic::Runner::default();
518 executor.start(|context| async move {
519 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
521 let writer = Write::new(blob.clone(), size, 10);
522
523 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
525 assert_eq!(writer.size().await, 5);
526
527 writer.write_at(b" world".to_vec(), 5).await.unwrap();
529 writer.sync().await.unwrap();
530 assert_eq!(writer.size().await, 11);
531
532 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
534 assert_eq!(size, 11);
535 let mut reader = Read::new(blob, size, 10);
536 let mut buf = vec![0u8; 11];
537 reader.read_exact(&mut buf, 11).await.unwrap();
538 assert_eq!(&buf, b"hello world");
539 });
540 }
541
542 #[test_traced]
543 fn test_write_into_middle_of_buffer() {
544 let executor = deterministic::Runner::default();
545 executor.start(|context| async move {
546 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
548 let writer = Write::new(blob.clone(), size, 20);
549
550 writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
552 assert_eq!(writer.size().await, 10);
553
554 writer.write_at(b"01234".to_vec(), 2).await.unwrap();
556 assert_eq!(writer.size().await, 10);
557 writer.sync().await.unwrap();
558
559 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
561 assert_eq!(size, 10);
562 let mut reader = Read::new(blob, size, 10);
563 let mut buf = vec![0u8; 10];
564 reader.read_exact(&mut buf, 10).await.unwrap();
565 assert_eq!(&buf, b"ab01234hij");
566
567 writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
569 assert_eq!(writer.size().await, 20);
570 writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
571 assert_eq!(writer.size().await, 20);
572 writer.sync().await.unwrap();
573
574 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
576 assert_eq!(size, 20);
577 let mut reader = Read::new(blob, size, 20);
578 let mut buf = vec![0u8; 20];
579 reader.read_exact(&mut buf, 20).await.unwrap();
580 assert_eq!(&buf, b"ab01234hiwxyznopqrst");
581 });
582 }
583
584 #[test_traced]
585 fn test_write_before_buffer() {
586 let executor = deterministic::Runner::default();
587 executor.start(|context| async move {
588 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
590 let writer = Write::new(blob.clone(), size, 10);
591
592 writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
594 assert_eq!(writer.size().await, 20);
595
596 writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
598 assert_eq!(writer.size().await, 20);
599 writer.sync().await.unwrap();
600
601 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
603 assert_eq!(size, 20);
604 let mut reader = Read::new(blob, size, 20);
605 let mut buf = vec![0u8; 20];
606 reader.read_exact(&mut buf, 20).await.unwrap();
607 let mut expected = vec![0u8; 20];
608 expected[0..5].copy_from_slice("abcde".as_bytes());
609 expected[10..20].copy_from_slice("0123456789".as_bytes());
610 assert_eq!(buf, expected);
611
612 writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
614 assert_eq!(writer.size().await, 20);
615 writer.sync().await.unwrap();
616 assert_eq!(writer.size().await, 20);
617
618 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
620 assert_eq!(size, 20);
621 let mut reader = Read::new(blob, size, 20);
622 let mut buf = vec![0u8; 20];
623 reader.read_exact(&mut buf, 20).await.unwrap();
624 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
625 assert_eq!(buf, expected);
626 });
627 }
628
629 #[test_traced]
630 fn test_write_resize() {
631 let executor = deterministic::Runner::default();
632 executor.start(|context| async move {
633 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
635 let writer = Write::new(blob, size, 10);
636
637 writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
639 assert_eq!(writer.size().await, 11);
640 writer.sync().await.unwrap();
641 assert_eq!(writer.size().await, 11);
642
643 let (blob_check, size_check) =
644 context.open("partition", b"resize_write").await.unwrap();
645 assert_eq!(size_check, 11);
646 drop(blob_check);
647
648 writer.resize(5).await.unwrap();
650 assert_eq!(writer.size().await, 5);
651 writer.sync().await.unwrap();
652
653 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
655 assert_eq!(size, 5);
656 let mut reader = Read::new(blob, size, 5);
657 let mut buf = vec![0u8; 5];
658 reader.read_exact(&mut buf, 5).await.unwrap();
659 assert_eq!(&buf, b"hello");
660
661 writer.write_at(b"X".to_vec(), 0).await.unwrap();
663 assert_eq!(writer.size().await, 5);
664 writer.sync().await.unwrap();
665
666 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
668 assert_eq!(size, 5);
669 let mut reader = Read::new(blob, size, 5);
670 let mut buf = vec![0u8; 5];
671 reader.read_exact(&mut buf, 5).await.unwrap();
672 assert_eq!(&buf, b"Xello");
673
674 writer.resize(10).await.unwrap();
676 assert_eq!(writer.size().await, 10);
677 writer.sync().await.unwrap();
678
679 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
681 assert_eq!(size, 10);
682 let mut reader = Read::new(blob, size, 10);
683 let mut buf = vec![0u8; 10];
684 reader.read_exact(&mut buf, 10).await.unwrap();
685 assert_eq!(&buf[0..5], b"Xello");
686 assert_eq!(&buf[5..10], [0u8; 5]);
687
688 let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
690 let writer_zero = Write::new(blob_zero.clone(), size, 10);
691 writer_zero
692 .write_at(b"some data".to_vec(), 0)
693 .await
694 .unwrap();
695 assert_eq!(writer_zero.size().await, 9);
696 writer_zero.sync().await.unwrap();
697 assert_eq!(writer_zero.size().await, 9);
698 writer_zero.resize(0).await.unwrap();
699 assert_eq!(writer_zero.size().await, 0);
700 writer_zero.sync().await.unwrap();
701 assert_eq!(writer_zero.size().await, 0);
702
703 let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
705 assert_eq!(size_z, 0);
706 });
707 }
708
709 #[test_traced]
710 fn test_write_read_at_on_writer() {
711 let executor = deterministic::Runner::default();
712 executor.start(|context| async move {
713 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
715 let writer = Write::new(blob.clone(), size, 10);
716
717 writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
719 assert_eq!(writer.size().await, 8);
720
721 let mut read_buf_vec = vec![0u8; 4].into();
723 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
724 assert_eq!(read_buf_vec.as_ref(), b"buff");
725
726 read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
727 assert_eq!(read_buf_vec.as_ref(), b"ered");
728
729 let small_buf_vec = vec![0u8; 1];
731 assert!(writer.read_at(small_buf_vec, 8).await.is_err());
732
733 writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
735 assert_eq!(writer.size().await, 20);
736 writer.sync().await.unwrap();
737 assert_eq!(writer.size().await, 20);
738
739 let mut read_buf_vec_2 = vec![0u8; 4].into();
741 read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
742 assert_eq!(read_buf_vec_2.as_ref(), b"buff");
743
744 let mut read_buf_7_vec = vec![0u8; 7].into();
745 read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
746 assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
747
748 writer.write_at(b" more data".to_vec(), 20).await.unwrap();
750 assert_eq!(writer.size().await, 30);
751
752 let mut read_buf_vec_3 = vec![0u8; 5].into();
754 read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
755 assert_eq!(read_buf_vec_3.as_ref(), b" more");
756
757 let mut combo_read_buf_vec = vec![0u8; 12].into();
759 combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
760 assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
761
762 writer.sync().await.unwrap();
764 assert_eq!(writer.size().await, 30);
765 let (final_blob, final_size) =
766 context.open("partition", b"read_at_writer").await.unwrap();
767 assert_eq!(final_size, 30);
768 let mut final_reader = Read::new(final_blob, final_size, 30);
769 let mut full_content = vec![0u8; 30];
770 final_reader
771 .read_exact(&mut full_content, 30)
772 .await
773 .unwrap();
774 assert_eq!(&full_content, b"buffered and flushed more data");
775 });
776 }
777
778 #[test_traced]
779 fn test_write_straddling_non_mergeable() {
780 let executor = deterministic::Runner::default();
781 executor.start(|context| async move {
782 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
784 let writer = Write::new(blob.clone(), size, 10);
785
786 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
788 assert_eq!(writer.size().await, 10);
789
790 writer.write_at(b"abc".to_vec(), 15).await.unwrap();
792 assert_eq!(writer.size().await, 18);
793 writer.sync().await.unwrap();
794 assert_eq!(writer.size().await, 18);
795
796 let (blob_check, size_check) =
798 context.open("partition", b"write_straddle").await.unwrap();
799 assert_eq!(size_check, 18);
800 let mut reader = Read::new(blob_check, size_check, 20);
801 let mut buf = vec![0u8; 18];
802 reader.read_exact(&mut buf, 18).await.unwrap();
803
804 let mut expected = vec![0u8; 18];
805 expected[0..10].copy_from_slice(b"0123456789");
806 expected[15..18].copy_from_slice(b"abc");
807 assert_eq!(buf, expected);
808
809 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
811 let writer2 = Write::new(blob2.clone(), size, 10);
812 writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
813 assert_eq!(writer2.size().await, 10);
814
815 writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
817 assert_eq!(writer2.size().await, 17);
818 writer2.sync().await.unwrap();
819 assert_eq!(writer2.size().await, 17);
820
821 let (blob_check2, size_check2) =
823 context.open("partition", b"write_straddle2").await.unwrap();
824 assert_eq!(size_check2, 17);
825 let mut reader2 = Read::new(blob_check2, size_check2, 20);
826 let mut buf2 = vec![0u8; 17];
827 reader2.read_exact(&mut buf2, 17).await.unwrap();
828 assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
829 });
830 }
831
832 #[test_traced]
833 fn test_write_close() {
834 let executor = deterministic::Runner::default();
835 executor.start(|context| async move {
836 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
838 let writer = Write::new(blob_orig.clone(), size, 8);
839 writer.write_at(b"pending".to_vec(), 0).await.unwrap();
840 assert_eq!(writer.size().await, 7);
841
842 writer.close().await.unwrap();
844
845 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
847 assert_eq!(size_check, 7);
848 let mut reader = Read::new(blob_check, size_check, 8);
849 let mut buf = [0u8; 7];
850 reader.read_exact(&mut buf, 7).await.unwrap();
851 assert_eq!(&buf, b"pending");
852 });
853 }
854
855 #[test_traced]
856 fn test_write_direct_due_to_size() {
857 let executor = deterministic::Runner::default();
858 executor.start(|context| async move {
859 let (blob, size) = context
861 .open("partition", b"write_direct_size")
862 .await
863 .unwrap();
864 let writer = Write::new(blob.clone(), size, 5);
865
866 let data_large = b"0123456789";
868 writer.write_at(data_large.to_vec(), 0).await.unwrap();
869 assert_eq!(writer.size().await, 10);
870
871 writer.sync().await.unwrap();
873
874 let (blob_check, size_check) = context
876 .open("partition", b"write_direct_size")
877 .await
878 .unwrap();
879 assert_eq!(size_check, 10);
880 let mut reader = Read::new(blob_check, size_check, 10);
881 let mut buf = vec![0u8; 10];
882 reader.read_exact(&mut buf, 10).await.unwrap();
883 assert_eq!(&buf, data_large.as_slice());
884
885 writer.write_at(b"abc".to_vec(), 10).await.unwrap();
887 assert_eq!(writer.size().await, 13);
888
889 let mut read_small_buf_vec = vec![0u8; 3].into();
891 read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
892 assert_eq!(read_small_buf_vec.as_ref(), b"abc");
893
894 writer.sync().await.unwrap();
895
896 let (blob_check2, size_check2) = context
898 .open("partition", b"write_direct_size")
899 .await
900 .unwrap();
901 assert_eq!(size_check2, 13);
902 let mut reader2 = Read::new(blob_check2, size_check2, 13);
903 let mut buf2 = vec![0u8; 13];
904 reader2.read_exact(&mut buf2, 13).await.unwrap();
905 assert_eq!(&buf2[10..], b"abc".as_slice());
906 });
907 }
908
909 #[test_traced]
910 fn test_write_overwrite_and_extend_in_buffer() {
911 let executor = deterministic::Runner::default();
912 executor.start(|context| async move {
913 let (blob, size) = context
915 .open("partition", b"overwrite_extend_buf")
916 .await
917 .unwrap();
918 let writer = Write::new(blob.clone(), size, 15);
919
920 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
922 assert_eq!(writer.size().await, 10);
923
924 writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
926 assert_eq!(writer.size().await, 15);
927
928 let mut read_buf_vec = vec![0u8; 15].into();
930 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
931 assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
932
933 writer.sync().await.unwrap();
934
935 let (blob_check, size_check) = context
937 .open("partition", b"overwrite_extend_buf")
938 .await
939 .unwrap();
940 assert_eq!(size_check, 15);
941 let mut reader = Read::new(blob_check, size_check, 15);
942 let mut final_buf = vec![0u8; 15];
943 reader.read_exact(&mut final_buf, 15).await.unwrap();
944 assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
945 });
946 }
947
948 #[test_traced]
949 fn test_write_at_size() {
950 let executor = deterministic::Runner::default();
951 executor.start(|context| async move {
952 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
954 let writer = Write::new(blob.clone(), size, 20);
955
956 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
958 assert_eq!(writer.size().await, 10);
959 writer.sync().await.unwrap();
960
961 writer
963 .write_at(b"abc".to_vec(), writer.size().await)
964 .await
965 .unwrap();
966 assert_eq!(writer.size().await, 13);
967 writer.sync().await.unwrap();
968
969 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
971 assert_eq!(size_check, 13);
972 let mut reader = Read::new(blob_check, size_check, 13);
973 let mut buf = vec![0u8; 13];
974 reader.read_exact(&mut buf, 13).await.unwrap();
975 assert_eq!(&buf, b"0123456789abc");
976 });
977 }
978
979 #[test_traced]
980 fn test_write_at_size_multiple_appends() {
981 let executor = deterministic::Runner::default();
982 executor.start(|context| async move {
983 let (blob, size) = context
985 .open("partition", b"write_multiple_appends_at_size")
986 .await
987 .unwrap();
988 let writer = Write::new(blob.clone(), size, 5); writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
992 assert_eq!(writer.size().await, 3);
993 writer.sync().await.unwrap();
994 assert_eq!(writer.size().await, 3);
995
996 writer
998 .write_at(b"BBB".to_vec(), writer.size().await)
999 .await
1000 .unwrap();
1001 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
1003 assert_eq!(writer.size().await, 6);
1004
1005 writer
1007 .write_at(b"CCC".to_vec(), writer.size().await)
1008 .await
1009 .unwrap();
1010 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
1012 assert_eq!(writer.size().await, 9);
1013
1014 let (blob_check, size_check) = context
1016 .open("partition", b"write_multiple_appends_at_size")
1017 .await
1018 .unwrap();
1019 assert_eq!(size_check, 9);
1020 let mut reader = Read::new(blob_check, size_check, 9);
1021 let mut buf = vec![0u8; 9];
1022 reader.read_exact(&mut buf, 9).await.unwrap();
1023 assert_eq!(&buf, b"AAABBBCCC");
1024 });
1025 }
1026
1027 #[test_traced]
1028 fn test_write_non_contiguous_then_append_at_size() {
1029 let executor = deterministic::Runner::default();
1030 executor.start(|context| async move {
1031 let (blob, size) = context
1033 .open("partition", b"write_non_contiguous_then_append")
1034 .await
1035 .unwrap();
1036 let writer = Write::new(blob.clone(), size, 10);
1037
1038 writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); assert_eq!(writer.size().await, 7);
1041 writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1045 assert_eq!(writer.size().await, 29);
1046 writer.sync().await.unwrap();
1047 assert_eq!(writer.size().await, 29);
1048
1049 writer
1051 .write_at(b"APPEND".to_vec(), writer.size().await)
1052 .await
1053 .unwrap();
1054 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1056 assert_eq!(writer.size().await, 35);
1057
1058 let (blob_check, size_check) = context
1060 .open("partition", b"write_non_contiguous_then_append")
1061 .await
1062 .unwrap();
1063 assert_eq!(size_check, 35);
1064 let mut reader = Read::new(blob_check, size_check, 35);
1065 let mut buf = vec![0u8; 35];
1066 reader.read_exact(&mut buf, 35).await.unwrap();
1067
1068 let mut expected = vec![0u8; 35];
1069 expected[0..7].copy_from_slice(b"INITIAL");
1070 expected[20..29].copy_from_slice(b"NONCONTIG");
1071 expected[29..35].copy_from_slice(b"APPEND");
1072 assert_eq!(buf, expected);
1073 });
1074 }
1075
1076 #[test_traced]
1077 fn test_resize_then_append_at_size() {
1078 let executor = deterministic::Runner::default();
1079 executor.start(|context| async move {
1080 let (blob, size) = context
1082 .open("partition", b"resize_then_append_at_size")
1083 .await
1084 .unwrap();
1085 let writer = Write::new(blob.clone(), size, 10);
1086
1087 writer
1089 .write_at(b"0123456789ABCDEF".to_vec(), 0)
1090 .await
1091 .unwrap(); assert_eq!(writer.size().await, 16);
1093 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1095
1096 let resize_to = 5;
1098 writer.resize(resize_to).await.unwrap();
1099 assert_eq!(writer.size().await, resize_to);
1102 writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
1104
1105 writer
1107 .write_at(b"XXXXX".to_vec(), writer.size().await)
1108 .await
1109 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1113 assert_eq!(writer.size().await, 10);
1114
1115 let (blob_check, size_check) = context
1117 .open("partition", b"resize_then_append_at_size")
1118 .await
1119 .unwrap();
1120 assert_eq!(size_check, 10);
1121 let mut reader = Read::new(blob_check, size_check, 10);
1122 let mut buf = vec![0u8; 10];
1123 reader.read_exact(&mut buf, 10).await.unwrap();
1124 assert_eq!(&buf, b"01234XXXXX");
1125 });
1126 }
1127}