1mod read;
4mod write;
5
6pub use read::Read;
7pub use write::Write;
8
9#[cfg(test)]
10mod tests {
11 use super::*;
12 use crate::{deterministic, Blob as _, Error, Runner, Storage};
13 use commonware_macros::test_traced;
14
15 #[test_traced]
16 fn test_read_basic() {
17 let executor = deterministic::Runner::default();
18 executor.start(|context| async move {
19 let data = b"Hello, world! This is a test.";
21 let (blob, size) = context.open("partition", b"test").await.unwrap();
22 assert_eq!(size, 0);
23 blob.write_at(data.to_vec(), 0).await.unwrap();
24 let size = data.len() as u64;
25
26 let buffer_size = 10;
28 let mut reader = Read::new(blob, size, buffer_size);
29
30 let mut buf = [0u8; 5];
32 reader.read_exact(&mut buf, 5).await.unwrap();
33 assert_eq!(&buf, b"Hello");
34
35 let mut buf = [0u8; 14];
37 reader.read_exact(&mut buf, 14).await.unwrap();
38 assert_eq!(&buf, b", world! This ");
39
40 assert_eq!(reader.position(), 19);
42
43 let mut buf = [0u8; 10];
45 reader.read_exact(&mut buf, 7).await.unwrap();
46 assert_eq!(&buf[..7], b"is a te");
47
48 let mut buf = [0u8; 5];
50 let result = reader.read_exact(&mut buf, 5).await;
51 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
52 });
53 }
54
55 #[test_traced]
56 #[should_panic(expected = "buffer size must be greater than zero")]
57 fn test_read_empty() {
58 let executor = deterministic::Runner::default();
59 executor.start(|context| async move {
60 let data = b"Hello, world! This is a test.";
62 let (blob, size) = context.open("partition", b"test").await.unwrap();
63 assert_eq!(size, 0);
64 blob.write_at(data.to_vec(), 0).await.unwrap();
65 let size = data.len() as u64;
66
67 let buffer_size = 0;
69 Read::new(blob, size, buffer_size);
70 });
71 }
72
73 #[test_traced]
74 fn test_read_cross_boundary() {
75 let executor = deterministic::Runner::default();
76 executor.start(|context| async move {
77 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
79 let (blob, size) = context.open("partition", b"test").await.unwrap();
80 assert_eq!(size, 0);
81 blob.write_at(data.to_vec(), 0).await.unwrap();
82 let size = data.len() as u64;
83
84 let buffer_size = 10;
86 let mut reader = Read::new(blob, size, buffer_size);
87
88 let mut buf = [0u8; 15];
90 reader.read_exact(&mut buf, 15).await.unwrap();
91 assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
92
93 assert_eq!(reader.position(), 15);
95
96 let mut buf = [0u8; 11];
98 reader.read_exact(&mut buf, 11).await.unwrap();
99 assert_eq!(&buf, b"PQRSTUVWXYZ");
100
101 assert_eq!(reader.position(), 26);
103 assert_eq!(reader.blob_remaining(), 0);
104 });
105 }
106
107 #[test_traced]
108 fn test_read_with_known_size() {
109 let executor = deterministic::Runner::default();
110 executor.start(|context| async move {
111 let data = b"This is a test with known size limitations.";
113 let (blob, size) = context.open("partition", b"test").await.unwrap();
114 assert_eq!(size, 0);
115 blob.write_at(data.to_vec(), 0).await.unwrap();
116 let size = data.len() as u64;
117
118 let buffer_size = 10;
120 let mut reader = Read::new(blob, size, buffer_size);
121
122 assert_eq!(reader.blob_remaining(), size);
124
125 let mut buf = [0u8; 5];
127 reader.read_exact(&mut buf, 5).await.unwrap();
128 assert_eq!(&buf, b"This ");
129
130 assert_eq!(reader.blob_remaining(), size - 5);
132
133 let mut buf = vec![0u8; (size - 5) as usize];
135 reader
136 .read_exact(&mut buf, (size - 5) as usize)
137 .await
138 .unwrap();
139 assert_eq!(&buf, b"is a test with known size limitations.");
140
141 assert_eq!(reader.blob_remaining(), 0);
143
144 let mut buf = [0u8; 1];
146 let result = reader.read_exact(&mut buf, 1).await;
147 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
148 });
149 }
150
151 #[test_traced]
152 fn test_read_large_data() {
153 let executor = deterministic::Runner::default();
154 executor.start(|context| async move {
155 let data_size = 1024 * 256; let data = vec![0x42; data_size];
158 let (blob, size) = context.open("partition", b"test").await.unwrap();
159 assert_eq!(size, 0);
160 blob.write_at(data.clone(), 0).await.unwrap();
161 let size = data.len() as u64;
162
163 let buffer_size = 64 * 1024; let mut reader = Read::new(blob, size, buffer_size);
166
167 let mut total_read = 0;
169 let chunk_size = 8 * 1024; let mut buf = vec![0u8; chunk_size];
171
172 while total_read < data_size {
173 let to_read = std::cmp::min(chunk_size, data_size - total_read);
174 reader
175 .read_exact(&mut buf[..to_read], to_read)
176 .await
177 .unwrap();
178
179 assert!(
181 buf[..to_read].iter().all(|&b| b == 0x42),
182 "Data at position {} is not correct",
183 total_read
184 );
185
186 total_read += to_read;
187 }
188
189 assert_eq!(total_read, data_size);
191
192 let mut extra_buf = [0u8; 1];
194 let result = reader.read_exact(&mut extra_buf, 1).await;
195 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
196 });
197 }
198
199 #[test_traced]
200 fn test_read_exact_size_reads() {
201 let executor = deterministic::Runner::default();
202 executor.start(|context| async move {
203 let buffer_size = 1024;
205 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
207
208 let (blob, size) = context.open("partition", b"test").await.unwrap();
209 assert_eq!(size, 0);
210 blob.write_at(data.clone(), 0).await.unwrap();
211 let size = data.len() as u64;
212
213 let mut reader = Read::new(blob, size, buffer_size);
214
215 let mut buf1 = vec![0u8; buffer_size];
217 reader.read_exact(&mut buf1, buffer_size).await.unwrap();
218 assert!(buf1.iter().all(|&b| b == 0x37));
219
220 let mut buf2 = vec![0u8; buffer_size];
222 reader.read_exact(&mut buf2, buffer_size).await.unwrap();
223 assert!(buf2.iter().all(|&b| b == 0x37));
224
225 let half_buffer = buffer_size / 2;
227 let mut buf3 = vec![0u8; half_buffer];
228 reader.read_exact(&mut buf3, half_buffer).await.unwrap();
229 assert!(buf3.iter().all(|&b| b == 0x37));
230
231 assert_eq!(reader.blob_remaining(), 0);
233 assert_eq!(reader.position(), size);
234 });
235 }
236
237 #[test_traced]
238 fn test_read_seek_to() {
239 let executor = deterministic::Runner::default();
240 executor.start(|context| async move {
241 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
243 let (blob, size) = context.open("partition", b"test").await.unwrap();
244 assert_eq!(size, 0);
245 blob.write_at(data.to_vec(), 0).await.unwrap();
246 let size = data.len() as u64;
247
248 let buffer_size = 10;
250 let mut reader = Read::new(blob, size, buffer_size);
251
252 let mut buf = [0u8; 5];
254 reader.read_exact(&mut buf, 5).await.unwrap();
255 assert_eq!(&buf, b"ABCDE");
256 assert_eq!(reader.position(), 5);
257
258 reader.seek_to(10).unwrap();
260 assert_eq!(reader.position(), 10);
261
262 let mut buf = [0u8; 5];
264 reader.read_exact(&mut buf, 5).await.unwrap();
265 assert_eq!(&buf, b"KLMNO");
266
267 reader.seek_to(0).unwrap();
269 assert_eq!(reader.position(), 0);
270
271 let mut buf = [0u8; 5];
272 reader.read_exact(&mut buf, 5).await.unwrap();
273 assert_eq!(&buf, b"ABCDE");
274
275 reader.seek_to(size).unwrap();
277 assert_eq!(reader.position(), size);
278
279 let mut buf = [0u8; 1];
281 let result = reader.read_exact(&mut buf, 1).await;
282 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
283
284 let result = reader.seek_to(size + 10);
286 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
287 });
288 }
289
290 #[test_traced]
291 fn test_read_seek_with_refill() {
292 let executor = deterministic::Runner::default();
293 executor.start(|context| async move {
294 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
297 assert_eq!(size, 0);
298 blob.write_at(data.clone(), 0).await.unwrap();
299 let size = data.len() as u64;
300
301 let buffer_size = 10;
303 let mut reader = Read::new(blob, size, buffer_size);
304
305 let mut buf = [0u8; 5];
307 reader.read_exact(&mut buf, 5).await.unwrap();
308
309 reader.seek_to(500).unwrap();
311
312 let mut buf = [0u8; 5];
314 reader.read_exact(&mut buf, 5).await.unwrap();
315 assert_eq!(&buf, b"AAAAA"); assert_eq!(reader.position(), 505);
317
318 reader.seek_to(100).unwrap();
320
321 let mut buf = [0u8; 5];
323 reader.read_exact(&mut buf, 5).await.unwrap();
324 assert_eq!(reader.position(), 105);
325 });
326 }
327
328 #[test_traced]
329 fn test_read_truncate() {
330 let executor = deterministic::Runner::default();
331 executor.start(|context| async move {
332 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
334 let (blob, size) = context.open("partition", b"test").await.unwrap();
335 assert_eq!(size, 0);
336 blob.write_at(data.to_vec(), 0).await.unwrap();
337 let data_len = data.len() as u64;
338
339 let buffer_size = 10;
341 let reader = Read::new(blob.clone(), data_len, buffer_size);
342
343 let truncate_len = data_len / 2;
345 reader.truncate(truncate_len).await.unwrap();
346
347 let (blob, size) = context.open("partition", b"test").await.unwrap();
349 assert_eq!(size, truncate_len, "Blob should be truncated to half size");
350
351 let mut new_reader = Read::new(blob, size, buffer_size);
353
354 let mut buf = vec![0u8; size as usize];
356 new_reader
357 .read_exact(&mut buf, size as usize)
358 .await
359 .unwrap();
360 assert_eq!(&buf, b"ABCDEFGHIJKLM", "Truncated content should match");
361
362 let mut extra_buf = [0u8; 1];
364 let result = new_reader.read_exact(&mut extra_buf, 1).await;
365 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
366 });
367 }
368
369 #[test_traced]
370 fn test_read_truncate_to_zero() {
371 let executor = deterministic::Runner::default();
372 executor.start(|context| async move {
373 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
375 let data_len = data.len() as u64;
376 let (blob, size) = context.open("partition", b"test").await.unwrap();
377 assert_eq!(size, 0);
378 blob.write_at(data.to_vec(), 0).await.unwrap();
379
380 let buffer_size = 10;
382 let reader = Read::new(blob.clone(), data_len, buffer_size);
383
384 reader.truncate(0).await.unwrap();
386
387 let (blob, size) = context.open("partition", b"test").await.unwrap();
389 assert_eq!(size, 0, "Blob should be truncated to zero");
390
391 let mut new_reader = Read::new(blob, size, buffer_size);
393
394 let mut buf = [0u8; 1];
396 let result = new_reader.read_exact(&mut buf, 1).await;
397 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
398 });
399 }
400
401 #[test_traced]
402 fn test_write_basic() {
403 let executor = deterministic::Runner::default();
404 executor.start(|context| async move {
405 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
407 assert_eq!(size, 0);
408
409 let writer = Write::new(blob.clone(), size, 8);
410 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
411 assert_eq!(writer.size().await, 5);
412 writer.sync().await.unwrap();
413 assert_eq!(writer.size().await, 5);
414
415 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
417 assert_eq!(size, 5);
418 let mut reader = Read::new(blob, size, 8);
419 let mut buf = [0u8; 5];
420 reader.read_exact(&mut buf, 5).await.unwrap();
421 assert_eq!(&buf, b"hello");
422 });
423 }
424
425 #[test_traced]
426 fn test_write_multiple_flushes() {
427 let executor = deterministic::Runner::default();
428 executor.start(|context| async move {
429 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
431 assert_eq!(size, 0);
432
433 let writer = Write::new(blob.clone(), size, 4);
434 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
435 assert_eq!(writer.size().await, 3);
436 writer.write_at(b"defg".to_vec(), 3).await.unwrap();
437 assert_eq!(writer.size().await, 7);
438 writer.sync().await.unwrap();
439
440 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
442 assert_eq!(size, 7);
443 let mut reader = Read::new(blob, size, 4);
444 let mut buf = [0u8; 7];
445 reader.read_exact(&mut buf, 7).await.unwrap();
446 assert_eq!(&buf, b"abcdefg");
447 });
448 }
449
450 #[test_traced]
451 fn test_write_large_data() {
452 let executor = deterministic::Runner::default();
453 executor.start(|context| async move {
454 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
456 assert_eq!(size, 0);
457
458 let writer = Write::new(blob.clone(), size, 4);
459 writer.write_at(b"abc".to_vec(), 0).await.unwrap();
460 assert_eq!(writer.size().await, 3);
461 writer
462 .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
463 .await
464 .unwrap();
465 assert_eq!(writer.size().await, 26);
466 writer.sync().await.unwrap();
467 assert_eq!(writer.size().await, 26);
468
469 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
471 assert_eq!(size, 26);
472 let mut reader = Read::new(blob, size, 4);
473 let mut buf = [0u8; 26];
474 reader.read_exact(&mut buf, 26).await.unwrap();
475 assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
476 });
477 }
478
479 #[test_traced]
480 #[should_panic(expected = "buffer capacity must be greater than zero")]
481 fn test_write_empty() {
482 let executor = deterministic::Runner::default();
483 executor.start(|context| async move {
484 let (blob, size) = context.open("partition", b"write_empty").await.unwrap();
486 assert_eq!(size, 0);
487 Write::new(blob, size, 0);
488 });
489 }
490
491 #[test_traced]
492 fn test_write_append_to_buffer() {
493 let executor = deterministic::Runner::default();
494 executor.start(|context| async move {
495 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
497 let writer = Write::new(blob.clone(), size, 10);
498
499 writer.write_at(b"hello".to_vec(), 0).await.unwrap();
501 assert_eq!(writer.size().await, 5);
502
503 writer.write_at(b" world".to_vec(), 5).await.unwrap();
505 writer.sync().await.unwrap();
506 assert_eq!(writer.size().await, 11);
507
508 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
510 assert_eq!(size, 11);
511 let mut reader = Read::new(blob, size, 10);
512 let mut buf = vec![0u8; 11];
513 reader.read_exact(&mut buf, 11).await.unwrap();
514 assert_eq!(&buf, b"hello world");
515 });
516 }
517
518 #[test_traced]
519 fn test_write_into_middle_of_buffer() {
520 let executor = deterministic::Runner::default();
521 executor.start(|context| async move {
522 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
524 let writer = Write::new(blob.clone(), size, 20);
525
526 writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
528 assert_eq!(writer.size().await, 10);
529
530 writer.write_at(b"01234".to_vec(), 2).await.unwrap();
532 assert_eq!(writer.size().await, 10);
533 writer.sync().await.unwrap();
534
535 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
537 assert_eq!(size, 10);
538 let mut reader = Read::new(blob, size, 10);
539 let mut buf = vec![0u8; 10];
540 reader.read_exact(&mut buf, 10).await.unwrap();
541 assert_eq!(&buf, b"ab01234hij");
542
543 writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
545 assert_eq!(writer.size().await, 20);
546 writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
547 assert_eq!(writer.size().await, 20);
548 writer.sync().await.unwrap();
549
550 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
552 assert_eq!(size, 20);
553 let mut reader = Read::new(blob, size, 20);
554 let mut buf = vec![0u8; 20];
555 reader.read_exact(&mut buf, 20).await.unwrap();
556 assert_eq!(&buf, b"ab01234hiwxyznopqrst");
557 });
558 }
559
560 #[test_traced]
561 fn test_write_before_buffer() {
562 let executor = deterministic::Runner::default();
563 executor.start(|context| async move {
564 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
566 let writer = Write::new(blob.clone(), size, 10);
567
568 writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
570 assert_eq!(writer.size().await, 20);
571
572 writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
574 assert_eq!(writer.size().await, 20);
575 writer.sync().await.unwrap();
576
577 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
579 assert_eq!(size, 20);
580 let mut reader = Read::new(blob, size, 20);
581 let mut buf = vec![0u8; 20];
582 reader.read_exact(&mut buf, 20).await.unwrap();
583 let mut expected = vec![0u8; 20];
584 expected[0..5].copy_from_slice("abcde".as_bytes());
585 expected[10..20].copy_from_slice("0123456789".as_bytes());
586 assert_eq!(buf, expected);
587
588 writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
590 assert_eq!(writer.size().await, 20);
591 writer.sync().await.unwrap();
592 assert_eq!(writer.size().await, 20);
593
594 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
596 assert_eq!(size, 20);
597 let mut reader = Read::new(blob, size, 20);
598 let mut buf = vec![0u8; 20];
599 reader.read_exact(&mut buf, 20).await.unwrap();
600 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
601 assert_eq!(buf, expected);
602 });
603 }
604
605 #[test_traced]
606 fn test_write_truncate() {
607 let executor = deterministic::Runner::default();
608 executor.start(|context| async move {
609 let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
611 let writer = Write::new(blob, size, 10);
612
613 writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
615 assert_eq!(writer.size().await, 11);
616 writer.sync().await.unwrap();
617 assert_eq!(writer.size().await, 11);
618
619 let (blob_check, size_check) =
620 context.open("partition", b"truncate_write").await.unwrap();
621 assert_eq!(size_check, 11);
622 drop(blob_check);
623
624 writer.truncate(5).await.unwrap();
626 assert_eq!(writer.size().await, 5);
627 writer.sync().await.unwrap();
628
629 let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
631 assert_eq!(size, 5);
632 let mut reader = Read::new(blob, size, 5);
633 let mut buf = vec![0u8; 5];
634 reader.read_exact(&mut buf, 5).await.unwrap();
635 assert_eq!(&buf, b"hello");
636
637 writer.write_at(b"X".to_vec(), 0).await.unwrap();
639 assert_eq!(writer.size().await, 5);
640 writer.sync().await.unwrap();
641
642 let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
644 assert_eq!(size, 5);
645 let mut reader = Read::new(blob, size, 5);
646 let mut buf = vec![0u8; 5];
647 reader.read_exact(&mut buf, 5).await.unwrap();
648 assert_eq!(&buf, b"Xello");
649
650 let (blob_zero, size) = context.open("partition", b"truncate_zero").await.unwrap();
652 let writer_zero = Write::new(blob_zero.clone(), size, 10);
653 writer_zero
654 .write_at(b"some data".to_vec(), 0)
655 .await
656 .unwrap();
657 assert_eq!(writer_zero.size().await, 9);
658 writer_zero.sync().await.unwrap();
659 assert_eq!(writer_zero.size().await, 9);
660 writer_zero.truncate(0).await.unwrap();
661 assert_eq!(writer_zero.size().await, 0);
662 writer_zero.sync().await.unwrap();
663 assert_eq!(writer_zero.size().await, 0);
664
665 let (_, size_z) = context.open("partition", b"truncate_zero").await.unwrap();
667 assert_eq!(size_z, 0);
668 });
669 }
670
671 #[test_traced]
672 fn test_write_read_at_on_writer() {
673 let executor = deterministic::Runner::default();
674 executor.start(|context| async move {
675 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
677 let writer = Write::new(blob.clone(), size, 10);
678
679 writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
681 assert_eq!(writer.size().await, 8);
682
683 let mut read_buf_vec = vec![0u8; 4].into();
685 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
686 assert_eq!(read_buf_vec.as_ref(), b"buff");
687
688 read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
689 assert_eq!(read_buf_vec.as_ref(), b"ered");
690
691 let small_buf_vec = vec![0u8; 1];
693 assert!(writer.read_at(small_buf_vec, 8).await.is_err());
694
695 writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
697 assert_eq!(writer.size().await, 20);
698 writer.sync().await.unwrap();
699 assert_eq!(writer.size().await, 20);
700
701 let mut read_buf_vec_2 = vec![0u8; 4].into();
703 read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
704 assert_eq!(read_buf_vec_2.as_ref(), b"buff");
705
706 let mut read_buf_7_vec = vec![0u8; 7].into();
707 read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
708 assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
709
710 writer.write_at(b" more data".to_vec(), 20).await.unwrap();
712 assert_eq!(writer.size().await, 30);
713
714 let mut read_buf_vec_3 = vec![0u8; 5].into();
716 read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
717 assert_eq!(read_buf_vec_3.as_ref(), b" more");
718
719 let mut combo_read_buf_vec = vec![0u8; 12].into();
721 combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
722 assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
723
724 writer.sync().await.unwrap();
726 assert_eq!(writer.size().await, 30);
727 let (final_blob, final_size) =
728 context.open("partition", b"read_at_writer").await.unwrap();
729 assert_eq!(final_size, 30);
730 let mut final_reader = Read::new(final_blob, final_size, 30);
731 let mut full_content = vec![0u8; 30];
732 final_reader
733 .read_exact(&mut full_content, 30)
734 .await
735 .unwrap();
736 assert_eq!(&full_content, b"buffered and flushed more data");
737 });
738 }
739
740 #[test_traced]
741 fn test_write_straddling_non_mergeable() {
742 let executor = deterministic::Runner::default();
743 executor.start(|context| async move {
744 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
746 let writer = Write::new(blob.clone(), size, 10);
747
748 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
750 assert_eq!(writer.size().await, 10);
751
752 writer.write_at(b"abc".to_vec(), 15).await.unwrap();
754 assert_eq!(writer.size().await, 18);
755 writer.sync().await.unwrap();
756 assert_eq!(writer.size().await, 18);
757
758 let (blob_check, size_check) =
760 context.open("partition", b"write_straddle").await.unwrap();
761 assert_eq!(size_check, 18);
762 let mut reader = Read::new(blob_check, size_check, 20);
763 let mut buf = vec![0u8; 18];
764 reader.read_exact(&mut buf, 18).await.unwrap();
765
766 let mut expected = vec![0u8; 18];
767 expected[0..10].copy_from_slice(b"0123456789");
768 expected[15..18].copy_from_slice(b"abc");
769 assert_eq!(buf, expected);
770
771 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
773 let writer2 = Write::new(blob2.clone(), size, 10);
774 writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
775 assert_eq!(writer2.size().await, 10);
776
777 writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
779 assert_eq!(writer2.size().await, 17);
780 writer2.sync().await.unwrap();
781 assert_eq!(writer2.size().await, 17);
782
783 let (blob_check2, size_check2) =
785 context.open("partition", b"write_straddle2").await.unwrap();
786 assert_eq!(size_check2, 17);
787 let mut reader2 = Read::new(blob_check2, size_check2, 20);
788 let mut buf2 = vec![0u8; 17];
789 reader2.read_exact(&mut buf2, 17).await.unwrap();
790 assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
791 });
792 }
793
794 #[test_traced]
795 fn test_write_close() {
796 let executor = deterministic::Runner::default();
797 executor.start(|context| async move {
798 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
800 let writer = Write::new(blob_orig.clone(), size, 8);
801 writer.write_at(b"pending".to_vec(), 0).await.unwrap();
802 assert_eq!(writer.size().await, 7);
803
804 writer.close().await.unwrap();
806
807 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
809 assert_eq!(size_check, 7);
810 let mut reader = Read::new(blob_check, size_check, 8);
811 let mut buf = [0u8; 7];
812 reader.read_exact(&mut buf, 7).await.unwrap();
813 assert_eq!(&buf, b"pending");
814 });
815 }
816
817 #[test_traced]
818 fn test_write_direct_due_to_size() {
819 let executor = deterministic::Runner::default();
820 executor.start(|context| async move {
821 let (blob, size) = context
823 .open("partition", b"write_direct_size")
824 .await
825 .unwrap();
826 let writer = Write::new(blob.clone(), size, 5);
827
828 let data_large = b"0123456789";
830 writer.write_at(data_large.to_vec(), 0).await.unwrap();
831 assert_eq!(writer.size().await, 10);
832
833 writer.sync().await.unwrap();
835
836 let (blob_check, size_check) = context
838 .open("partition", b"write_direct_size")
839 .await
840 .unwrap();
841 assert_eq!(size_check, 10);
842 let mut reader = Read::new(blob_check, size_check, 10);
843 let mut buf = vec![0u8; 10];
844 reader.read_exact(&mut buf, 10).await.unwrap();
845 assert_eq!(&buf, data_large.as_slice());
846
847 writer.write_at(b"abc".to_vec(), 10).await.unwrap();
849 assert_eq!(writer.size().await, 13);
850
851 let mut read_small_buf_vec = vec![0u8; 3].into();
853 read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
854 assert_eq!(read_small_buf_vec.as_ref(), b"abc");
855
856 writer.sync().await.unwrap();
857
858 let (blob_check2, size_check2) = context
860 .open("partition", b"write_direct_size")
861 .await
862 .unwrap();
863 assert_eq!(size_check2, 13);
864 let mut reader2 = Read::new(blob_check2, size_check2, 13);
865 let mut buf2 = vec![0u8; 13];
866 reader2.read_exact(&mut buf2, 13).await.unwrap();
867 assert_eq!(&buf2[10..], b"abc".as_slice());
868 });
869 }
870
871 #[test_traced]
872 fn test_write_overwrite_and_extend_in_buffer() {
873 let executor = deterministic::Runner::default();
874 executor.start(|context| async move {
875 let (blob, size) = context
877 .open("partition", b"overwrite_extend_buf")
878 .await
879 .unwrap();
880 let writer = Write::new(blob.clone(), size, 15);
881
882 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
884 assert_eq!(writer.size().await, 10);
885
886 writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
888 assert_eq!(writer.size().await, 15);
889
890 let mut read_buf_vec = vec![0u8; 15].into();
892 read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
893 assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
894
895 writer.sync().await.unwrap();
896
897 let (blob_check, size_check) = context
899 .open("partition", b"overwrite_extend_buf")
900 .await
901 .unwrap();
902 assert_eq!(size_check, 15);
903 let mut reader = Read::new(blob_check, size_check, 15);
904 let mut final_buf = vec![0u8; 15];
905 reader.read_exact(&mut final_buf, 15).await.unwrap();
906 assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
907 });
908 }
909
910 #[test_traced]
911 fn test_write_at_size() {
912 let executor = deterministic::Runner::default();
913 executor.start(|context| async move {
914 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
916 let writer = Write::new(blob.clone(), size, 20);
917
918 writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
920 assert_eq!(writer.size().await, 10);
921 writer.sync().await.unwrap();
922
923 writer
925 .write_at(b"abc".to_vec(), writer.size().await)
926 .await
927 .unwrap();
928 assert_eq!(writer.size().await, 13);
929 writer.sync().await.unwrap();
930
931 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
933 assert_eq!(size_check, 13);
934 let mut reader = Read::new(blob_check, size_check, 13);
935 let mut buf = vec![0u8; 13];
936 reader.read_exact(&mut buf, 13).await.unwrap();
937 assert_eq!(&buf, b"0123456789abc");
938 });
939 }
940
941 #[test_traced]
942 fn test_write_at_size_multiple_appends() {
943 let executor = deterministic::Runner::default();
944 executor.start(|context| async move {
945 let (blob, size) = context
947 .open("partition", b"write_multiple_appends_at_size")
948 .await
949 .unwrap();
950 let writer = Write::new(blob.clone(), size, 5); writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
954 assert_eq!(writer.size().await, 3);
955 writer.sync().await.unwrap();
956 assert_eq!(writer.size().await, 3);
957
958 writer
960 .write_at(b"BBB".to_vec(), writer.size().await)
961 .await
962 .unwrap();
963 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
965 assert_eq!(writer.size().await, 6);
966
967 writer
969 .write_at(b"CCC".to_vec(), writer.size().await)
970 .await
971 .unwrap();
972 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
974 assert_eq!(writer.size().await, 9);
975
976 let (blob_check, size_check) = context
978 .open("partition", b"write_multiple_appends_at_size")
979 .await
980 .unwrap();
981 assert_eq!(size_check, 9);
982 let mut reader = Read::new(blob_check, size_check, 9);
983 let mut buf = vec![0u8; 9];
984 reader.read_exact(&mut buf, 9).await.unwrap();
985 assert_eq!(&buf, b"AAABBBCCC");
986 });
987 }
988
989 #[test_traced]
990 fn test_write_non_contiguous_then_append_at_size() {
991 let executor = deterministic::Runner::default();
992 executor.start(|context| async move {
993 let (blob, size) = context
995 .open("partition", b"write_non_contiguous_then_append")
996 .await
997 .unwrap();
998 let writer = Write::new(blob.clone(), size, 10);
999
1000 writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); assert_eq!(writer.size().await, 7);
1003 writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1007 assert_eq!(writer.size().await, 29);
1008 writer.sync().await.unwrap();
1009 assert_eq!(writer.size().await, 29);
1010
1011 writer
1013 .write_at(b"APPEND".to_vec(), writer.size().await)
1014 .await
1015 .unwrap();
1016 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1018 assert_eq!(writer.size().await, 35);
1019
1020 let (blob_check, size_check) = context
1022 .open("partition", b"write_non_contiguous_then_append")
1023 .await
1024 .unwrap();
1025 assert_eq!(size_check, 35);
1026 let mut reader = Read::new(blob_check, size_check, 35);
1027 let mut buf = vec![0u8; 35];
1028 reader.read_exact(&mut buf, 35).await.unwrap();
1029
1030 let mut expected = vec![0u8; 35];
1031 expected[0..7].copy_from_slice(b"INITIAL");
1032 expected[20..29].copy_from_slice(b"NONCONTIG");
1033 expected[29..35].copy_from_slice(b"APPEND");
1034 assert_eq!(buf, expected);
1035 });
1036 }
1037
1038 #[test_traced]
1039 fn test_truncate_then_append_at_size() {
1040 let executor = deterministic::Runner::default();
1041 executor.start(|context| async move {
1042 let (blob, size) = context
1044 .open("partition", b"truncate_then_append_at_size")
1045 .await
1046 .unwrap();
1047 let writer = Write::new(blob.clone(), size, 10);
1048
1049 writer
1051 .write_at(b"0123456789ABCDEF".to_vec(), 0)
1052 .await
1053 .unwrap(); assert_eq!(writer.size().await, 16);
1055 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1057
1058 let truncate_to = 5;
1060 writer.truncate(truncate_to).await.unwrap();
1061 assert_eq!(writer.size().await, truncate_to);
1064 writer.sync().await.unwrap(); assert_eq!(writer.size().await, truncate_to);
1066
1067 writer
1069 .write_at(b"XXXXX".to_vec(), writer.size().await)
1070 .await
1071 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1075 assert_eq!(writer.size().await, 10);
1076
1077 let (blob_check, size_check) = context
1079 .open("partition", b"truncate_then_append_at_size")
1080 .await
1081 .unwrap();
1082 assert_eq!(size_check, 10);
1083 let mut reader = Read::new(blob_check, size_check, 10);
1084 let mut buf = vec![0u8; 10];
1085 reader.read_exact(&mut buf, 10).await.unwrap();
1086 assert_eq!(&buf, b"01234XXXXX");
1087 });
1088 }
1089}