1pub mod paged;
4mod read;
5mod tip;
6mod write;
7
8pub use read::Read;
9pub use write::Write;
10
11#[cfg(test)]
12mod tests {
13 use super::*;
14 use crate::{
15 deterministic, Blob as _, Buf, BufMut, Clock, Error, IoBufMut, IoBufs, IoBufsMut, Runner,
16 Spawner, Storage, Supervisor as _,
17 };
18 use commonware_macros::test_traced;
19 use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize};
20 use futures::{pin_mut, FutureExt};
21 use std::{sync::Arc, time::Duration};
22
23 struct BlockingReadGate {
24 read_started: Option<oneshot::Sender<()>>,
25 release_read: Option<oneshot::Receiver<()>>,
26 }
27
28 #[derive(Clone)]
32 struct BlockingReadBlob {
33 data: Arc<Mutex<Vec<u8>>>,
34 gate: Arc<Mutex<BlockingReadGate>>,
35 }
36
37 impl BlockingReadBlob {
38 fn new(data: Vec<u8>) -> (Self, oneshot::Receiver<()>, oneshot::Sender<()>) {
39 let (read_started_tx, read_started_rx) = oneshot::channel();
40 let (release_read_tx, release_read_rx) = oneshot::channel();
41 (
42 Self {
43 data: Arc::new(Mutex::new(data)),
44 gate: Arc::new(Mutex::new(BlockingReadGate {
45 read_started: Some(read_started_tx),
46 release_read: Some(release_read_rx),
47 })),
48 },
49 read_started_rx,
50 release_read_tx,
51 )
52 }
53
54 async fn block_once_on_read(&self) {
55 let rx = {
56 let mut gate = self.gate.lock();
57 gate.read_started.take().map(|read_started| {
58 let _ = read_started.send(());
59 gate.release_read.take().expect("release signal missing")
60 })
61 };
62 if let Some(rx) = rx {
63 let _ = rx.await;
64 }
65 }
66 }
67
68 impl crate::Blob for BlockingReadBlob {
69 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
70 self.read_at_buf(offset, len, IoBufMut::default()).await
71 }
72
73 async fn read_at_buf(
74 &self,
75 offset: u64,
76 len: usize,
77 buf: impl Into<IoBufsMut> + Send,
78 ) -> Result<IoBufsMut, Error> {
79 self.block_once_on_read().await;
80
81 let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
82 let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
83 let data = self.data.lock();
84 if end > data.len() {
85 return Err(Error::BlobInsufficientLength);
86 }
87
88 let mut out = buf.into();
89 out.put_slice(&data[start..end]);
90 Ok(out)
91 }
92
93 async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
94 let buf = buf.into().coalesce();
95 let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
96 let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
97
98 let mut data = self.data.lock();
99 if end > data.len() {
100 data.resize(end, 0);
101 }
102 data[start..end].copy_from_slice(buf.as_ref());
103 Ok(())
104 }
105
106 async fn write_at_sync(
107 &self,
108 offset: u64,
109 bufs: impl Into<IoBufs> + Send,
110 ) -> Result<(), Error> {
111 let bufs = bufs.into();
112 if !bufs.has_remaining() {
113 return Ok(());
114 }
115
116 self.write_at(offset, bufs).await?;
117 self.sync().await
118 }
119
120 async fn resize(&self, len: u64) -> Result<(), Error> {
121 let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
122 self.data.lock().resize(len, 0);
123 Ok(())
124 }
125
126 async fn sync(&self) -> Result<(), Error> {
127 Ok(())
128 }
129 }
130
131 #[derive(Default)]
132 struct RangeSyncState {
133 data: Vec<u8>,
138
139 durable: Vec<u8>,
141
142 writes: usize,
144
145 full_syncs: usize,
147
148 range_syncs: usize,
150 }
151
152 #[derive(Clone)]
159 pub struct SyncTrackingBlob {
160 state: Arc<Mutex<RangeSyncState>>,
161 }
162
163 impl SyncTrackingBlob {
164 pub fn new() -> Self {
165 Self {
166 state: Arc::new(Mutex::new(RangeSyncState::default())),
167 }
168 }
169
170 pub fn snapshot(&self) -> (Vec<u8>, usize, usize, usize) {
171 let state = self.state.lock();
172 (
173 state.durable.clone(),
174 state.writes,
175 state.full_syncs,
176 state.range_syncs,
177 )
178 }
179
180 pub fn size(&self) -> u64 {
181 self.state.lock().data.len() as u64
182 }
183
184 fn write(data: &mut Vec<u8>, offset: u64, buf: &[u8]) -> Result<(), Error> {
185 let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
186 let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
187 if end > data.len() {
188 data.resize(end, 0);
189 }
190 data[start..end].copy_from_slice(buf);
191 Ok(())
192 }
193 }
194
195 impl crate::Blob for SyncTrackingBlob {
196 async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
197 self.read_at_buf(offset, len, IoBufMut::default()).await
198 }
199
200 async fn read_at_buf(
201 &self,
202 offset: u64,
203 len: usize,
204 buf: impl Into<IoBufsMut> + Send,
205 ) -> Result<IoBufsMut, Error> {
206 let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
207 let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
208 let state = self.state.lock();
209 if end > state.data.len() {
210 return Err(Error::BlobInsufficientLength);
211 }
212
213 let mut out = buf.into();
214 out.put_slice(&state.data[start..end]);
215 Ok(out)
216 }
217
218 async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
219 let buf = buf.into().coalesce();
220 let mut state = self.state.lock();
221 Self::write(&mut state.data, offset, buf.as_ref())?;
222 state.writes += 1;
223 Ok(())
224 }
225
226 async fn write_at_sync(
227 &self,
228 offset: u64,
229 buf: impl Into<IoBufs> + Send,
230 ) -> Result<(), Error> {
231 let buf = buf.into().coalesce();
232 let mut state = self.state.lock();
233 Self::write(&mut state.data, offset, buf.as_ref())?;
234 Self::write(&mut state.durable, offset, buf.as_ref())?;
235 state.writes += 1;
236 state.range_syncs += 1;
237 Ok(())
238 }
239
240 async fn resize(&self, len: u64) -> Result<(), Error> {
241 let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
242 self.state.lock().data.resize(len, 0);
243 Ok(())
244 }
245
246 async fn sync(&self) -> Result<(), Error> {
247 let mut state = self.state.lock();
248 state.durable = state.data.clone();
249 state.full_syncs += 1;
250 Ok(())
251 }
252 }
253
254 #[test_traced]
255 fn test_read_basic() {
256 let executor = deterministic::Runner::default();
257 executor.start(|context| async move {
258 let data = b"Hello, world! This is a test.";
260 let (blob, size) = context.open("partition", b"test").await.unwrap();
261 assert_eq!(size, 0);
262 blob.write_at(0, data).await.unwrap();
263 let size = data.len() as u64;
264
265 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
267
268 let read = reader.read(5).await.unwrap().coalesce();
270 assert_eq!(read.as_ref(), b"Hello");
271
272 let read = reader.read(14).await.unwrap().coalesce();
274 assert_eq!(read.as_ref(), b", world! This ");
275
276 assert_eq!(reader.position(), 19);
278
279 let read = reader.read(7).await.unwrap().coalesce();
281 assert_eq!(read.as_ref(), b"is a te");
282
283 let result = reader.read(5).await;
285 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
286 });
287 }
288
289 #[test_traced]
290 fn test_read_cross_boundary() {
291 let executor = deterministic::Runner::default();
292 executor.start(|context| async move {
293 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
295 let (blob, size) = context.open("partition", b"test").await.unwrap();
296 assert_eq!(size, 0);
297 blob.write_at(0, data).await.unwrap();
298 let size = data.len() as u64;
299
300 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
302
303 let read = reader.read(15).await.unwrap().coalesce();
305 assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNO");
306
307 assert_eq!(reader.position(), 15);
309
310 let read = reader.read(11).await.unwrap().coalesce();
312 assert_eq!(read.as_ref(), b"PQRSTUVWXYZ");
313
314 assert_eq!(reader.position(), 26);
316 assert_eq!(reader.blob_remaining(), 0);
317 });
318 }
319
320 #[test_traced]
322 fn test_read_to_end_then_rewind_and_read_again() {
323 let executor = deterministic::Runner::default();
324 executor.start(|context| async move {
325 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
326 let (blob, size) = context.open("partition", b"test").await.unwrap();
327 assert_eq!(size, 0);
328 blob.write_at(0, data).await.unwrap();
329 let size = data.len() as u64;
330
331 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
332
333 let read = reader.read(21).await.unwrap().coalesce();
335 assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
336
337 assert_eq!(reader.position(), 21);
339
340 let read = reader.read(5).await.unwrap().coalesce();
342 assert_eq!(read.as_ref(), b"VWXYZ");
343
344 reader.seek_to(0).unwrap();
346 let read = reader.read(21).await.unwrap().coalesce();
347 assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
348 });
349 }
350
351 #[test_traced]
352 fn test_read_with_known_size() {
353 let executor = deterministic::Runner::default();
354 executor.start(|context| async move {
355 let data = b"This is a test with known size limitations.";
357 let (blob, size) = context.open("partition", b"test").await.unwrap();
358 assert_eq!(size, 0);
359 blob.write_at(0, data).await.unwrap();
360 let size = data.len() as u64;
361
362 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
364
365 assert_eq!(reader.blob_remaining(), size);
367
368 let read = reader.read(5).await.unwrap().coalesce();
370 assert_eq!(read.as_ref(), b"This ");
371
372 assert_eq!(reader.blob_remaining(), size - 5);
374
375 let read = reader.read((size - 5) as usize).await.unwrap().coalesce();
377 assert_eq!(read.as_ref(), b"is a test with known size limitations.");
378
379 assert_eq!(reader.blob_remaining(), 0);
381
382 let result = reader.read(1).await;
384 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
385 });
386 }
387
388 #[test_traced]
389 fn test_read_oversized_request_does_not_consume_buffered_bytes() {
390 let executor = deterministic::Runner::default();
391 executor.start(|context| async move {
392 let data = b"abcdefghij";
393 let (blob, size) = context
394 .open("partition", b"double-count-regression")
395 .await
396 .unwrap();
397 assert_eq!(size, 0);
398 blob.write_at(0, data).await.unwrap();
399
400 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(8));
401
402 let first = reader.read(6).await.unwrap().coalesce();
404 assert_eq!(first.as_ref(), b"abcdef");
405 assert_eq!(reader.position(), 6);
406
407 let err = reader.read(5).await.unwrap_err();
409 assert!(matches!(err, Error::BlobInsufficientLength));
410 assert_eq!(reader.position(), 6);
411
412 let tail = reader.read(4).await.unwrap().coalesce();
414 assert_eq!(tail.as_ref(), b"ghij");
415 assert_eq!(reader.position(), 10);
416 });
417 }
418
419 #[test_traced]
420 fn test_read_large_data() {
421 let executor = deterministic::Runner::default();
422 executor.start(|context| async move {
423 let data_size = 1024 * 256; let data = vec![0x42; data_size];
426 let (blob, size) = context.open("partition", b"test").await.unwrap();
427 assert_eq!(size, 0);
428 blob.write_at(0, data.clone()).await.unwrap();
429 let size = data.len() as u64;
430
431 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(64 * 1024));
433
434 let mut total_read = 0;
436 let chunk_size = 8 * 1024; while total_read < data_size {
439 let to_read = std::cmp::min(chunk_size, data_size - total_read);
440 let read = reader.read(to_read).await.unwrap().coalesce();
441
442 assert!(
444 read.as_ref().iter().all(|&b| b == 0x42),
445 "Data at position {total_read} is not correct"
446 );
447
448 total_read += to_read;
449 }
450
451 assert_eq!(total_read, data_size);
453
454 let result = reader.read(1).await;
456 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
457 });
458 }
459
460 #[test_traced]
461 fn test_read_exact_size_reads() {
462 let executor = deterministic::Runner::default();
463 executor.start(|context| async move {
464 let buffer_size = 1024;
466 let data_size = buffer_size * 5 / 2; let data = vec![0x37; data_size];
468
469 let (blob, size) = context.open("partition", b"test").await.unwrap();
470 assert_eq!(size, 0);
471 blob.write_at(0, data.clone()).await.unwrap();
472 let size = data.len() as u64;
473
474 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(buffer_size));
475
476 let read = reader.read(buffer_size).await.unwrap().coalesce();
478 assert!(read.as_ref().iter().all(|&b| b == 0x37));
479
480 let read = reader.read(buffer_size).await.unwrap().coalesce();
482 assert!(read.as_ref().iter().all(|&b| b == 0x37));
483
484 let half_buffer = buffer_size / 2;
486 let read = reader.read(half_buffer).await.unwrap().coalesce();
487 assert!(read.as_ref().iter().all(|&b| b == 0x37));
488
489 assert_eq!(reader.blob_remaining(), 0);
491 assert_eq!(reader.position(), size);
492 });
493 }
494
495 #[test_traced]
496 fn test_read_structure_single_vs_chunked() {
497 let executor = deterministic::Runner::default();
498 executor.start(|context| async move {
499 let data = b"ABCDEFGHIJKL";
500 let (blob, size) = context.open("partition", b"structural").await.unwrap();
501 assert_eq!(size, 0);
502 blob.write_at(0, data).await.unwrap();
503
504 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(5));
505
506 let first = reader.read(3).await.unwrap();
508 assert!(first.is_single());
509 assert_eq!(first.coalesce().as_ref(), b"ABC");
510
511 let second = reader.read(7).await.unwrap();
513 assert!(!second.is_single());
514 assert_eq!(second.coalesce().as_ref(), b"DEFGHIJ");
515 });
516 }
517
518 #[test_traced]
519 fn test_read_seek_to() {
520 let executor = deterministic::Runner::default();
521 executor.start(|context| async move {
522 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
524 let (blob, size) = context.open("partition", b"test").await.unwrap();
525 assert_eq!(size, 0);
526 blob.write_at(0, data).await.unwrap();
527 let size = data.len() as u64;
528
529 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
531
532 let read = reader.read(5).await.unwrap().coalesce();
534 assert_eq!(read.as_ref(), b"ABCDE");
535 assert_eq!(reader.position(), 5);
536
537 reader.seek_to(10).unwrap();
539 assert_eq!(reader.position(), 10);
540
541 let read = reader.read(5).await.unwrap().coalesce();
543 assert_eq!(read.as_ref(), b"KLMNO");
544
545 reader.seek_to(0).unwrap();
547 assert_eq!(reader.position(), 0);
548
549 let read = reader.read(5).await.unwrap().coalesce();
550 assert_eq!(read.as_ref(), b"ABCDE");
551
552 reader.seek_to(size).unwrap();
554 assert_eq!(reader.position(), size);
555
556 let result = reader.read(1).await;
558 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
559
560 let result = reader.seek_to(size + 10);
562 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
563 });
564 }
565
566 #[test_traced]
567 fn test_read_seek_with_refill() {
568 let executor = deterministic::Runner::default();
569 executor.start(|context| async move {
570 let data = vec![0x41; 1000]; let (blob, size) = context.open("partition", b"test").await.unwrap();
573 assert_eq!(size, 0);
574 blob.write_at(0, data.clone()).await.unwrap();
575 let size = data.len() as u64;
576
577 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
579
580 let _ = reader.read(5).await.unwrap().coalesce();
582
583 reader.seek_to(500).unwrap();
585
586 let read = reader.read(5).await.unwrap().coalesce();
588 assert_eq!(read.as_ref(), b"AAAAA"); assert_eq!(reader.position(), 505);
590
591 reader.seek_to(100).unwrap();
593
594 let _ = reader.read(5).await.unwrap().coalesce();
596 assert_eq!(reader.position(), 105);
597 });
598 }
599
600 #[test_traced]
601 fn test_read_seek_within_buffered_range() {
602 let executor = deterministic::Runner::default();
603 executor.start(|context| async move {
604 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
605 let (blob, size) = context.open("partition", b"test").await.unwrap();
606 assert_eq!(size, 0);
607 blob.write_at(0, data).await.unwrap();
608
609 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
610
611 let read = reader.read(6).await.unwrap().coalesce();
613 assert_eq!(read.as_ref(), b"ABCDEF");
614 assert_eq!(reader.position(), 6);
615 assert_eq!(reader.buffer_remaining(), 4);
616
617 reader.seek_to(3).unwrap();
619 assert_eq!(reader.position(), 3);
620 assert_eq!(reader.buffer_remaining(), 7);
621
622 let read = reader.read(5).await.unwrap().coalesce();
623 assert_eq!(read.as_ref(), b"DEFGH");
624 assert_eq!(reader.position(), 8);
625 assert_eq!(reader.buffer_remaining(), 2);
626 });
627 }
628
629 #[test_traced]
630 fn test_read_seek_within_unread_buffer_does_not_refill() {
631 let executor = deterministic::Runner::default();
632 executor.start(|context| async move {
633 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
634 let (blob, size) = context
635 .open("partition", b"seek_unread_no_refill")
636 .await
637 .unwrap();
638 assert_eq!(size, 0);
639 blob.write_at(0, data).await.unwrap();
640
641 let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
642
643 let first = reader.read(6).await.unwrap();
645 assert_eq!(first.coalesce().as_ref(), b"ABCDEF");
646 assert_eq!(reader.position(), 6);
647 assert_eq!(reader.buffer_remaining(), 4);
648
649 reader.seek_to(7).unwrap();
651 assert_eq!(reader.position(), 7);
652 assert_eq!(reader.buffer_remaining(), 3);
653
654 let second = reader.read(3).await.unwrap();
656 assert_eq!(second.coalesce().as_ref(), b"HIJ");
657 assert_eq!(reader.position(), 10);
658 assert_eq!(reader.buffer_remaining(), 0);
659
660 let third = reader.read(1).await.unwrap();
662 assert_eq!(third.coalesce().as_ref(), b"K");
663 assert_eq!(reader.position(), 11);
664 assert_eq!(reader.buffer_remaining(), 9);
665 });
666 }
667
668 #[test_traced]
669 fn test_read_resize() {
670 let executor = deterministic::Runner::default();
671 executor.start(|context| async move {
672 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
674 let (blob, size) = context.open("partition", b"test").await.unwrap();
675 assert_eq!(size, 0);
676 blob.write_at(0, data).await.unwrap();
677 let data_len = data.len() as u64;
678
679 let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
681
682 let resize_len = data_len / 2;
684 reader.resize(resize_len).await.unwrap();
685
686 let (blob, size) = context.open("partition", b"test").await.unwrap();
688 assert_eq!(size, resize_len, "Blob should be resized to half size");
689
690 let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
692
693 let read = new_reader.read(size as usize).await.unwrap().coalesce();
695 assert_eq!(
696 read.as_ref(),
697 b"ABCDEFGHIJKLM",
698 "Resized content should match"
699 );
700
701 let result = new_reader.read(1).await;
703 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
704
705 new_reader.resize(data_len * 2).await.unwrap();
707
708 let (blob, new_size) = context.open("partition", b"test").await.unwrap();
710 assert_eq!(new_size, data_len * 2);
711
712 let mut new_reader = Read::from_pooler(&context, blob, new_size, NZUsize!(10));
714 let read = new_reader.read(new_size as usize).await.unwrap().coalesce();
715 assert_eq!(&read.as_ref()[..size as usize], b"ABCDEFGHIJKLM");
716 assert_eq!(
717 &read.as_ref()[size as usize..],
718 vec![0u8; new_size as usize - size as usize]
719 );
720 });
721 }
722
723 #[test_traced]
724 fn test_read_resize_to_zero() {
725 let executor = deterministic::Runner::default();
726 executor.start(|context| async move {
727 let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
729 let data_len = data.len() as u64;
730 let (blob, size) = context.open("partition", b"test").await.unwrap();
731 assert_eq!(size, 0);
732 blob.write_at(0, data).await.unwrap();
733
734 let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
736
737 reader.resize(0).await.unwrap();
739
740 let (blob, size) = context.open("partition", b"test").await.unwrap();
742 assert_eq!(size, 0, "Blob should be resized to zero");
743
744 let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
746
747 let result = new_reader.read(1).await;
749 assert!(matches!(result, Err(Error::BlobInsufficientLength)));
750 });
751 }
752
753 #[test_traced]
754 fn test_write_basic() {
755 let executor = deterministic::Runner::default();
756 executor.start(|context| async move {
757 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
759 assert_eq!(size, 0);
760
761 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(8));
762 writer.write_at(0, b"hello").await.unwrap();
763 assert_eq!(writer.size().await, 5);
764 writer.sync().await.unwrap();
765 assert_eq!(writer.size().await, 5);
766
767 let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
769 assert_eq!(size, 5);
770 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
771 let read = reader.read(5).await.unwrap().coalesce();
772 assert_eq!(read.as_ref(), b"hello");
773 });
774 }
775
776 #[test_traced]
777 fn test_write_multiple_flushes() {
778 let executor = deterministic::Runner::default();
779 executor.start(|context| async move {
780 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
782 assert_eq!(size, 0);
783
784 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
785 writer.write_at(0, b"abc").await.unwrap();
786 assert_eq!(writer.size().await, 3);
787 writer.write_at(3, b"defg").await.unwrap();
788 assert_eq!(writer.size().await, 7);
789 writer.sync().await.unwrap();
790
791 let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
793 assert_eq!(size, 7);
794 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
795 let read = reader.read(7).await.unwrap().coalesce();
796 assert_eq!(read.as_ref(), b"abcdefg");
797 });
798 }
799
800 #[test_traced]
801 fn test_write_large_data() {
802 let executor = deterministic::Runner::default();
803 executor.start(|context| async move {
804 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
806 assert_eq!(size, 0);
807
808 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
809 writer.write_at(0, b"abc").await.unwrap();
810 assert_eq!(writer.size().await, 3);
811 writer
812 .write_at(3, b"defghijklmnopqrstuvwxyz")
813 .await
814 .unwrap();
815 assert_eq!(writer.size().await, 26);
816 writer.sync().await.unwrap();
817 assert_eq!(writer.size().await, 26);
818
819 let (blob, size) = context.open("partition", b"write_large").await.unwrap();
821 assert_eq!(size, 26);
822 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
823 let read = reader.read(26).await.unwrap().coalesce();
824 assert_eq!(read.as_ref(), b"abcdefghijklmnopqrstuvwxyz");
825 });
826 }
827
828 #[test_traced]
829 fn test_write_append_to_buffer() {
830 let executor = deterministic::Runner::default();
831 executor.start(|context| async move {
832 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
834 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
835
836 writer.write_at(0, b"hello").await.unwrap();
838 assert_eq!(writer.size().await, 5);
839
840 writer.write_at(5, b" world").await.unwrap();
842 writer.sync().await.unwrap();
843 assert_eq!(writer.size().await, 11);
844
845 let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
847 assert_eq!(size, 11);
848 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
849 let read = reader.read(11).await.unwrap().coalesce();
850 assert_eq!(read.as_ref(), b"hello world");
851 });
852 }
853
854 #[test_traced]
855 fn test_write_into_middle_of_buffer() {
856 let executor = deterministic::Runner::default();
857 executor.start(|context| async move {
858 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
860 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
861
862 writer.write_at(0, b"abcdefghij").await.unwrap();
864 assert_eq!(writer.size().await, 10);
865
866 writer.write_at(2, b"01234").await.unwrap();
868 assert_eq!(writer.size().await, 10);
869 writer.sync().await.unwrap();
870
871 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
873 assert_eq!(size, 10);
874 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
875 let read = reader.read(10).await.unwrap().coalesce();
876 assert_eq!(read.as_ref(), b"ab01234hij");
877
878 writer.write_at(10, b"klmnopqrst").await.unwrap();
880 assert_eq!(writer.size().await, 20);
881 writer.write_at(9, b"wxyz").await.unwrap();
882 assert_eq!(writer.size().await, 20);
883 writer.sync().await.unwrap();
884
885 let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
887 assert_eq!(size, 20);
888 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
889 let read = reader.read(20).await.unwrap().coalesce();
890 assert_eq!(read.as_ref(), b"ab01234hiwxyznopqrst");
891 });
892 }
893
894 #[test_traced]
895 fn test_write_before_buffer() {
896 let executor = deterministic::Runner::default();
897 executor.start(|context| async move {
898 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
900 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
901
902 writer.write_at(10, b"0123456789").await.unwrap();
904 assert_eq!(writer.size().await, 20);
905
906 writer.write_at(0, b"abcde").await.unwrap();
908 assert_eq!(writer.size().await, 20);
909 writer.sync().await.unwrap();
910
911 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
913 assert_eq!(size, 20);
914 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
915 let read = reader.read(20).await.unwrap().coalesce();
916 let mut expected = vec![0u8; 20];
917 expected[0..5].copy_from_slice("abcde".as_bytes());
918 expected[10..20].copy_from_slice("0123456789".as_bytes());
919 assert_eq!(read.as_ref(), expected.as_slice());
920
921 writer.write_at(5, b"fghij").await.unwrap();
923 assert_eq!(writer.size().await, 20);
924 writer.sync().await.unwrap();
925 assert_eq!(writer.size().await, 20);
926
927 let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
929 assert_eq!(size, 20);
930 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
931 let read = reader.read(20).await.unwrap().coalesce();
932 expected[0..10].copy_from_slice("abcdefghij".as_bytes());
933 assert_eq!(read.as_ref(), expected.as_slice());
934 });
935 }
936
937 #[test_traced]
938 fn test_write_resize() {
939 let executor = deterministic::Runner::default();
940 executor.start(|context| async move {
941 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
943 let writer = Write::from_pooler(&context, blob, size, NZUsize!(10));
944
945 writer.write_at(0, b"hello world").await.unwrap();
947 assert_eq!(writer.size().await, 11);
948 writer.sync().await.unwrap();
949 assert_eq!(writer.size().await, 11);
950
951 let (blob_check, size_check) =
952 context.open("partition", b"resize_write").await.unwrap();
953 assert_eq!(size_check, 11);
954 drop(blob_check);
955
956 writer.resize(5).await.unwrap();
958 assert_eq!(writer.size().await, 5);
959 writer.sync().await.unwrap();
960
961 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
963 assert_eq!(size, 5);
964 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
965 let read = reader.read(5).await.unwrap().coalesce();
966 assert_eq!(read.as_ref(), b"hello");
967
968 writer.write_at(0, b"X").await.unwrap();
970 assert_eq!(writer.size().await, 5);
971 writer.sync().await.unwrap();
972
973 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
975 assert_eq!(size, 5);
976 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
977 let read = reader.read(5).await.unwrap().coalesce();
978 assert_eq!(read.as_ref(), b"Xello");
979
980 writer.resize(10).await.unwrap();
982 assert_eq!(writer.size().await, 10);
983 writer.sync().await.unwrap();
984
985 let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
987 assert_eq!(size, 10);
988 let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
989 let read = reader.read(10).await.unwrap().coalesce();
990 assert_eq!(&read.as_ref()[0..5], b"Xello");
991 assert_eq!(&read.as_ref()[5..10], [0u8; 5]);
992
993 let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
995 let writer_zero = Write::from_pooler(&context, blob_zero.clone(), size, NZUsize!(10));
996 writer_zero.write_at(0, b"some data").await.unwrap();
997 assert_eq!(writer_zero.size().await, 9);
998 writer_zero.sync().await.unwrap();
999 assert_eq!(writer_zero.size().await, 9);
1000 writer_zero.resize(0).await.unwrap();
1001 assert_eq!(writer_zero.size().await, 0);
1002 writer_zero.sync().await.unwrap();
1003 assert_eq!(writer_zero.size().await, 0);
1004
1005 let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
1007 assert_eq!(size_z, 0);
1008 });
1009 }
1010
1011 #[test_traced]
1012 fn test_write_read_at_on_writer() {
1013 let executor = deterministic::Runner::default();
1014 executor.start(|context| async move {
1015 let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
1017 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1018
1019 writer.write_at(0, b"buffered").await.unwrap();
1021 assert_eq!(writer.size().await, 8);
1022
1023 let read_buf_vec = writer.read_at(0, 4).await.unwrap().coalesce();
1025 assert_eq!(read_buf_vec, b"buff");
1026
1027 let read_buf_vec = writer.read_at(4, 4).await.unwrap().coalesce();
1028 assert_eq!(read_buf_vec, b"ered");
1029
1030 assert!(writer.read_at(8, 1).await.is_err());
1032
1033 writer.write_at(8, b" and flushed").await.unwrap();
1035 assert_eq!(writer.size().await, 20);
1036 writer.sync().await.unwrap();
1037 assert_eq!(writer.size().await, 20);
1038
1039 let read_buf_vec_2 = writer.read_at(0, 4).await.unwrap().coalesce();
1041 assert_eq!(read_buf_vec_2, b"buff");
1042
1043 let read_buf_7_vec = writer.read_at(13, 7).await.unwrap().coalesce();
1044 assert_eq!(read_buf_7_vec, b"flushed");
1045
1046 writer.write_at(20, b" more data").await.unwrap();
1048 assert_eq!(writer.size().await, 30);
1049
1050 let read_buf_vec_3 = writer.read_at(20, 5).await.unwrap().coalesce();
1052 assert_eq!(read_buf_vec_3, b" more");
1053
1054 let combo_read_buf_vec = writer.read_at(16, 12).await.unwrap();
1056 assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
1057
1058 writer.sync().await.unwrap();
1060 assert_eq!(writer.size().await, 30);
1061 let (final_blob, final_size) =
1062 context.open("partition", b"read_at_writer").await.unwrap();
1063 assert_eq!(final_size, 30);
1064 let mut final_reader =
1065 Read::from_pooler(&context, final_blob, final_size, NZUsize!(30));
1066 let read = final_reader.read(30).await.unwrap().coalesce();
1067 assert_eq!(read.as_ref(), b"buffered and flushed more data");
1068 });
1069 }
1070
1071 #[test_traced]
1072 fn test_write_zero_length_read_past_eof_errors() {
1073 let executor = deterministic::Runner::default();
1074 executor.start(|context| async move {
1075 let (blob, size) = context.open("partition", b"zero_len_probe").await.unwrap();
1076 let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
1077 writer.write_at(0, b"abc").await.unwrap();
1078
1079 let empty = writer.read_at(3, 0).await.unwrap();
1080 assert!(empty.is_empty());
1081
1082 let err = writer.read_at(4, 0).await.unwrap_err();
1083 assert!(matches!(err, Error::BlobInsufficientLength));
1084 });
1085 }
1086
1087 #[test_traced]
1088 fn test_write_read_at_blocks_concurrent_write_until_persisted_read_completes() {
1089 let executor = deterministic::Runner::default();
1090 executor.start(|context| async move {
1091 let (blob, read_started_rx, release_read_tx) =
1092 BlockingReadBlob::new(b"abcdefghij".to_vec());
1093 let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
1094 let reader = writer.clone();
1095 let verifier = writer.clone();
1096
1097 let read_task = context
1099 .child("read")
1100 .spawn(move |_| async move { reader.read_at(0, 4).await.expect("read failed") });
1101
1102 read_started_rx.await.expect("read start signal missing");
1104
1105 let write_task = context.child("write").spawn(move |_| async move {
1106 writer.write_at(0, b"WXYZ").await.expect("write failed");
1107 });
1108 pin_mut!(write_task);
1109
1110 context.sleep(Duration::from_secs(1)).await;
1112 assert!(
1113 write_task.as_mut().now_or_never().is_none(),
1114 "write_at completed while read_at still held lock over blob I/O"
1115 );
1116
1117 release_read_tx
1119 .send(())
1120 .expect("failed to release blocked read");
1121 let read_result = read_task.await.expect("read task failed").coalesce();
1122 assert_eq!(read_result.as_ref(), b"abcd");
1123 write_task.await.expect("write task failed");
1124
1125 let updated = verifier.read_at(0, 4).await.unwrap().coalesce();
1126 assert_eq!(updated.as_ref(), b"WXYZ");
1127 });
1128 }
1129
1130 #[test_traced]
1131 fn test_write_read_at_overlap_blocks_concurrent_write_until_persisted_read_completes() {
1132 let executor = deterministic::Runner::default();
1133 executor.start(|context| async move {
1134 let (blob, read_started_rx, release_read_tx) =
1135 BlockingReadBlob::new(b"abcdefghij".to_vec());
1136 let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
1137 let verifier = writer.clone();
1138
1139 writer.write_at(10, b"XYZ").await.unwrap();
1141
1142 let reader = writer.clone();
1144 let read_task = context
1145 .child("read")
1146 .spawn(move |_| async move { reader.read_at(8, 5).await.expect("read failed") });
1147
1148 read_started_rx.await.expect("read start signal missing");
1150
1151 let write_task = context.child("write").spawn(move |_| async move {
1152 writer.write_at(10, b"UVW").await.expect("write failed");
1153 });
1154 pin_mut!(write_task);
1155
1156 context.sleep(Duration::from_secs(1)).await;
1158 assert!(
1159 write_task.as_mut().now_or_never().is_none(),
1160 "write_at completed while overlap read_at still held lock over blob I/O"
1161 );
1162
1163 release_read_tx
1165 .send(())
1166 .expect("failed to release blocked read");
1167 let read_result = read_task.await.expect("read task failed").coalesce();
1168 assert_eq!(read_result.as_ref(), b"ijXYZ");
1169 write_task.await.expect("write task failed");
1170
1171 let updated = verifier.read_at(8, 5).await.unwrap().coalesce();
1172 assert_eq!(updated.as_ref(), b"ijUVW");
1173 });
1174 }
1175
1176 #[test_traced]
1177 fn test_write_straddling_non_mergeable() {
1178 let executor = deterministic::Runner::default();
1179 executor.start(|context| async move {
1180 let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
1182 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1183
1184 writer.write_at(0, b"0123456789").await.unwrap();
1186 assert_eq!(writer.size().await, 10);
1187
1188 writer.write_at(15, b"abc").await.unwrap();
1190 assert_eq!(writer.size().await, 18);
1191 writer.sync().await.unwrap();
1192 assert_eq!(writer.size().await, 18);
1193
1194 let (blob_check, size_check) =
1196 context.open("partition", b"write_straddle").await.unwrap();
1197 assert_eq!(size_check, 18);
1198 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(20));
1199 let read = reader.read(18).await.unwrap().coalesce();
1200
1201 let mut expected = vec![0u8; 18];
1202 expected[0..10].copy_from_slice(b"0123456789");
1203 expected[15..18].copy_from_slice(b"abc");
1204 assert_eq!(read.as_ref(), expected.as_slice());
1205
1206 let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
1208 let writer2 = Write::from_pooler(&context, blob2.clone(), size, NZUsize!(10));
1209 writer2.write_at(0, b"0123456789").await.unwrap();
1210 assert_eq!(writer2.size().await, 10);
1211
1212 writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
1214 assert_eq!(writer2.size().await, 17);
1215 writer2.sync().await.unwrap();
1216 assert_eq!(writer2.size().await, 17);
1217
1218 let (blob_check2, size_check2) =
1220 context.open("partition", b"write_straddle2").await.unwrap();
1221 assert_eq!(size_check2, 17);
1222 let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(20));
1223 let read = reader2.read(17).await.unwrap().coalesce();
1224 assert_eq!(read.as_ref(), b"01234ABCDEFGHIJKL");
1225 });
1226 }
1227
1228 #[test_traced]
1229 fn test_write_close() {
1230 let executor = deterministic::Runner::default();
1231 executor.start(|context| async move {
1232 let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
1234 let writer = Write::from_pooler(&context, blob_orig.clone(), size, NZUsize!(8));
1235 writer.write_at(0, b"pending").await.unwrap();
1236 assert_eq!(writer.size().await, 7);
1237
1238 writer.sync().await.unwrap();
1240
1241 let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
1243 assert_eq!(size_check, 7);
1244 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(8));
1245 let read = reader.read(7).await.unwrap().coalesce();
1246 assert_eq!(read.as_ref(), b"pending");
1247 });
1248 }
1249
1250 #[test_traced]
1251 fn test_write_direct_due_to_size() {
1252 let executor = deterministic::Runner::default();
1253 executor.start(|context| async move {
1254 let (blob, size) = context
1256 .open("partition", b"write_direct_size")
1257 .await
1258 .unwrap();
1259 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1260
1261 let data_large = b"0123456789";
1263 writer.write_at(0, data_large).await.unwrap();
1264 assert_eq!(writer.size().await, 10);
1265
1266 writer.sync().await.unwrap();
1268
1269 let (blob_check, size_check) = context
1271 .open("partition", b"write_direct_size")
1272 .await
1273 .unwrap();
1274 assert_eq!(size_check, 10);
1275 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1276 let read = reader.read(10).await.unwrap().coalesce();
1277 assert_eq!(read.as_ref(), data_large.as_slice());
1278
1279 writer.write_at(10, b"abc").await.unwrap();
1281 assert_eq!(writer.size().await, 13);
1282
1283 let read_small_buf_vec = writer.read_at(10, 3).await.unwrap().coalesce();
1285 assert_eq!(read_small_buf_vec, b"abc");
1286
1287 writer.sync().await.unwrap();
1288
1289 let (blob_check2, size_check2) = context
1291 .open("partition", b"write_direct_size")
1292 .await
1293 .unwrap();
1294 assert_eq!(size_check2, 13);
1295 let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(13));
1296 let read = reader2.read(13).await.unwrap().coalesce();
1297 assert_eq!(&read.as_ref()[10..], b"abc".as_slice());
1298 });
1299 }
1300
1301 #[test_traced]
1302 fn test_write_overwrite_and_extend_in_buffer() {
1303 let executor = deterministic::Runner::default();
1304 executor.start(|context| async move {
1305 let (blob, size) = context
1307 .open("partition", b"overwrite_extend_buf")
1308 .await
1309 .unwrap();
1310 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(15));
1311
1312 writer.write_at(0, b"0123456789").await.unwrap();
1314 assert_eq!(writer.size().await, 10);
1315
1316 writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
1318 assert_eq!(writer.size().await, 15);
1319
1320 let read_buf_vec = writer.read_at(0, 15).await.unwrap().coalesce();
1322 assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
1323
1324 writer.sync().await.unwrap();
1325
1326 let (blob_check, size_check) = context
1328 .open("partition", b"overwrite_extend_buf")
1329 .await
1330 .unwrap();
1331 assert_eq!(size_check, 15);
1332 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(15));
1333 let read = reader.read(15).await.unwrap().coalesce();
1334 assert_eq!(read.as_ref(), b"01234ABCDEFGHIJ".as_slice());
1335 });
1336 }
1337
1338 #[test_traced]
1339 fn test_write_at_size() {
1340 let executor = deterministic::Runner::default();
1341 executor.start(|context| async move {
1342 let (blob, size) = context.open("partition", b"write_end").await.unwrap();
1344 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
1345
1346 writer.write_at(0, b"0123456789").await.unwrap();
1348 assert_eq!(writer.size().await, 10);
1349 writer.sync().await.unwrap();
1350
1351 writer.write_at(writer.size().await, b"abc").await.unwrap();
1353 assert_eq!(writer.size().await, 13);
1354 writer.sync().await.unwrap();
1355
1356 let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
1358 assert_eq!(size_check, 13);
1359 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(13));
1360 let read = reader.read(13).await.unwrap().coalesce();
1361 assert_eq!(read.as_ref(), b"0123456789abc");
1362 });
1363 }
1364
1365 #[test_traced]
1366 fn test_write_at_size_multiple_appends() {
1367 let executor = deterministic::Runner::default();
1368 executor.start(|context| async move {
1369 let (blob, size) = context
1371 .open("partition", b"write_multiple_appends_at_size")
1372 .await
1373 .unwrap();
1374 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1375
1376 writer.write_at(0, b"AAA").await.unwrap();
1378 assert_eq!(writer.size().await, 3);
1379 writer.sync().await.unwrap();
1380 assert_eq!(writer.size().await, 3);
1381
1382 writer.write_at(writer.size().await, b"BBB").await.unwrap();
1384 assert_eq!(writer.size().await, 6); writer.sync().await.unwrap();
1386 assert_eq!(writer.size().await, 6);
1387
1388 writer.write_at(writer.size().await, b"CCC").await.unwrap();
1390 assert_eq!(writer.size().await, 9); writer.sync().await.unwrap();
1392 assert_eq!(writer.size().await, 9);
1393
1394 let (blob_check, size_check) = context
1396 .open("partition", b"write_multiple_appends_at_size")
1397 .await
1398 .unwrap();
1399 assert_eq!(size_check, 9);
1400 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(9));
1401 let read = reader.read(9).await.unwrap().coalesce();
1402 assert_eq!(read.as_ref(), b"AAABBBCCC");
1403 });
1404 }
1405
1406 #[test_traced]
1407 fn test_write_non_contiguous_then_append_at_size() {
1408 let executor = deterministic::Runner::default();
1409 executor.start(|context| async move {
1410 let (blob, size) = context
1412 .open("partition", b"write_non_contiguous_then_append")
1413 .await
1414 .unwrap();
1415 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1416
1417 writer.write_at(0, b"INITIAL").await.unwrap(); assert_eq!(writer.size().await, 7);
1420 writer.write_at(20, b"NONCONTIG").await.unwrap();
1424 assert_eq!(writer.size().await, 29);
1425 writer.sync().await.unwrap();
1426 assert_eq!(writer.size().await, 29);
1427
1428 writer
1430 .write_at(writer.size().await, b"APPEND")
1431 .await
1432 .unwrap();
1433 assert_eq!(writer.size().await, 35); writer.sync().await.unwrap();
1435 assert_eq!(writer.size().await, 35);
1436
1437 let (blob_check, size_check) = context
1439 .open("partition", b"write_non_contiguous_then_append")
1440 .await
1441 .unwrap();
1442 assert_eq!(size_check, 35);
1443 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(35));
1444 let read = reader.read(35).await.unwrap().coalesce();
1445
1446 let mut expected = vec![0u8; 35];
1447 expected[0..7].copy_from_slice(b"INITIAL");
1448 expected[20..29].copy_from_slice(b"NONCONTIG");
1449 expected[29..35].copy_from_slice(b"APPEND");
1450 assert_eq!(read.as_ref(), expected.as_slice());
1451 });
1452 }
1453
1454 #[test_traced]
1455 fn test_write_resize_then_append_at_size() {
1456 let executor = deterministic::Runner::default();
1457 executor.start(|context| async move {
1458 let (blob, size) = context
1460 .open("partition", b"resize_then_append_at_size")
1461 .await
1462 .unwrap();
1463 let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1464
1465 writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); assert_eq!(writer.size().await, 16);
1468 writer.sync().await.unwrap(); assert_eq!(writer.size().await, 16);
1470
1471 let resize_to = 5;
1473 writer.resize(resize_to).await.unwrap();
1474 assert_eq!(writer.size().await, resize_to);
1477 writer.sync().await.unwrap(); assert_eq!(writer.size().await, resize_to);
1479
1480 writer
1482 .write_at(writer.size().await, b"XXXXX")
1483 .await
1484 .unwrap(); assert_eq!(writer.size().await, 10); writer.sync().await.unwrap();
1488 assert_eq!(writer.size().await, 10);
1489
1490 let (blob_check, size_check) = context
1492 .open("partition", b"resize_then_append_at_size")
1493 .await
1494 .unwrap();
1495 assert_eq!(size_check, 10);
1496 let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1497 let read = reader.read(10).await.unwrap().coalesce();
1498 assert_eq!(read.as_ref(), b"01234XXXXX");
1499 });
1500 }
1501
1502 #[test_traced]
1503 fn test_write_sync_uses_range_sync_for_buffer_only_write() {
1504 let executor = deterministic::Runner::default();
1505 executor.start(|context| async move {
1506 let blob = SyncTrackingBlob::new();
1507 let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1508
1509 writer.sync().await.unwrap();
1511 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1512 assert!(durable.is_empty());
1513 assert_eq!(writes, 0);
1514 assert_eq!(full_syncs, 1);
1515 assert_eq!(range_syncs, 0);
1516
1517 writer.write_at(0, b"abc").await.unwrap();
1519 writer.sync().await.unwrap();
1520
1521 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1523 assert_eq!(durable.as_slice(), b"abc");
1524 assert_eq!(writes, 1);
1525 assert_eq!(full_syncs, 1);
1526 assert_eq!(range_syncs, 1);
1527
1528 writer.sync().await.unwrap();
1530 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1531 assert_eq!(durable.as_slice(), b"abc");
1532 assert_eq!(writes, 1);
1533 assert_eq!(full_syncs, 1);
1534 assert_eq!(range_syncs, 1);
1535 });
1536 }
1537
1538 #[test_traced]
1539 fn test_write_sync_persists_pre_wrapped_blob_mutation() {
1540 let executor = deterministic::Runner::default();
1541 executor.start(|context| async move {
1542 let blob = SyncTrackingBlob::new();
1543
1544 blob.write_at(0, b"abc").await.unwrap();
1546
1547 let writer = Write::from_pooler(&context, blob.clone(), 3, NZUsize!(8));
1548 writer.sync().await.unwrap();
1549
1550 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1552 assert_eq!(durable.as_slice(), b"abc");
1553 assert_eq!(writes, 1);
1554 assert_eq!(full_syncs, 1);
1555 assert_eq!(range_syncs, 0);
1556
1557 writer.write_at(3, b"d").await.unwrap();
1559 writer.sync().await.unwrap();
1560
1561 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1562 assert_eq!(durable.as_slice(), b"abcd");
1563 assert_eq!(writes, 2);
1564 assert_eq!(full_syncs, 1);
1565 assert_eq!(range_syncs, 1);
1566 });
1567 }
1568
1569 #[test_traced]
1570 fn test_write_sync_failed_range_sync_does_not_mark_clean() {
1571 let executor = deterministic::Runner::default();
1572 executor.start(|context| async move {
1573 let name = b"failed_range_sync";
1574 let (blob, size) = context.open("partition", name).await.unwrap();
1575 let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
1576 writer.sync().await.unwrap();
1577
1578 writer.write_at(0, b"abc").await.unwrap();
1580
1581 context.remove("partition", Some(name)).await.unwrap();
1583 assert!(writer.sync().await.is_err());
1584
1585 assert!(writer.sync().await.is_err());
1588 });
1589 }
1590
1591 #[test_traced]
1592 fn test_write_sync_persists_prior_direct_flushes_with_buffered_tip() {
1593 let executor = deterministic::Runner::default();
1594 executor.start(|context| async move {
1595 let blob = SyncTrackingBlob::new();
1596 let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(4));
1597
1598 writer.write_at(0, b"abcdef").await.unwrap();
1600 writer.write_at(6, b"g").await.unwrap();
1601 writer.sync().await.unwrap();
1602
1603 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1605 assert_eq!(durable.as_slice(), b"abcdefg");
1606 assert_eq!(writes, 2);
1607 assert_eq!(full_syncs, 1);
1608 assert_eq!(range_syncs, 0);
1609
1610 writer.sync().await.unwrap();
1612 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1613 assert_eq!(durable.as_slice(), b"abcdefg");
1614 assert_eq!(writes, 2);
1615 assert_eq!(full_syncs, 1);
1616 assert_eq!(range_syncs, 0);
1617
1618 writer.write_at(7, b"h").await.unwrap();
1620 writer.sync().await.unwrap();
1621
1622 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1623 assert_eq!(durable.as_slice(), b"abcdefgh");
1624 assert_eq!(writes, 3);
1625 assert_eq!(full_syncs, 1);
1626 assert_eq!(range_syncs, 1);
1627 });
1628 }
1629
1630 #[test_traced]
1631 fn test_write_sync_uses_full_sync_after_resize() {
1632 let executor = deterministic::Runner::default();
1633 executor.start(|context| async move {
1634 let blob = SyncTrackingBlob::new();
1635 let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1636 writer.sync().await.unwrap();
1637
1638 writer.write_at(0, b"abcdef").await.unwrap();
1640 writer.sync().await.unwrap();
1641
1642 writer.resize(4).await.unwrap();
1644 writer.sync().await.unwrap();
1645
1646 let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1648 assert_eq!(durable.as_slice(), b"abcd");
1649 assert_eq!(writes, 1);
1650 assert_eq!(full_syncs, 2);
1651 assert_eq!(range_syncs, 1);
1652 });
1653 }
1654}