Skip to main content

commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3pub mod paged;
4mod read;
5mod tip;
6mod write;
7
8pub use read::Read;
9pub use write::Write;
10
11#[cfg(test)]
12mod tests {
13    use super::*;
14    use crate::{
15        deterministic, Blob as _, Buf, BufMut, Clock, Error, IoBufMut, IoBufs, IoBufsMut, Runner,
16        Spawner, Storage, Supervisor as _,
17    };
18    use commonware_macros::test_traced;
19    use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize};
20    use futures::{pin_mut, FutureExt};
21    use std::{sync::Arc, time::Duration};
22
23    struct BlockingReadGate {
24        read_started: Option<oneshot::Sender<()>>,
25        release_read: Option<oneshot::Receiver<()>>,
26    }
27
28    /// Test-only blob wrapper that blocks exactly one read call until explicitly released.
29    ///
30    /// Used to assert lock ordering / contention behavior in writer read-path tests.
31    #[derive(Clone)]
32    struct BlockingReadBlob {
33        data: Arc<Mutex<Vec<u8>>>,
34        gate: Arc<Mutex<BlockingReadGate>>,
35    }
36
37    impl BlockingReadBlob {
38        fn new(data: Vec<u8>) -> (Self, oneshot::Receiver<()>, oneshot::Sender<()>) {
39            let (read_started_tx, read_started_rx) = oneshot::channel();
40            let (release_read_tx, release_read_rx) = oneshot::channel();
41            (
42                Self {
43                    data: Arc::new(Mutex::new(data)),
44                    gate: Arc::new(Mutex::new(BlockingReadGate {
45                        read_started: Some(read_started_tx),
46                        release_read: Some(release_read_rx),
47                    })),
48                },
49                read_started_rx,
50                release_read_tx,
51            )
52        }
53
54        async fn block_once_on_read(&self) {
55            let rx = {
56                let mut gate = self.gate.lock();
57                gate.read_started.take().map(|read_started| {
58                    let _ = read_started.send(());
59                    gate.release_read.take().expect("release signal missing")
60                })
61            };
62            if let Some(rx) = rx {
63                let _ = rx.await;
64            }
65        }
66    }
67
68    impl crate::Blob for BlockingReadBlob {
69        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
70            self.read_at_buf(offset, len, IoBufMut::default()).await
71        }
72
73        async fn read_at_buf(
74            &self,
75            offset: u64,
76            len: usize,
77            buf: impl Into<IoBufsMut> + Send,
78        ) -> Result<IoBufsMut, Error> {
79            self.block_once_on_read().await;
80
81            let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
82            let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
83            let data = self.data.lock();
84            if end > data.len() {
85                return Err(Error::BlobInsufficientLength);
86            }
87
88            let mut out = buf.into();
89            out.put_slice(&data[start..end]);
90            Ok(out)
91        }
92
93        async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
94            let buf = buf.into().coalesce();
95            let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
96            let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
97
98            let mut data = self.data.lock();
99            if end > data.len() {
100                data.resize(end, 0);
101            }
102            data[start..end].copy_from_slice(buf.as_ref());
103            Ok(())
104        }
105
106        async fn write_at_sync(
107            &self,
108            offset: u64,
109            bufs: impl Into<IoBufs> + Send,
110        ) -> Result<(), Error> {
111            let bufs = bufs.into();
112            if !bufs.has_remaining() {
113                return Ok(());
114            }
115
116            self.write_at(offset, bufs).await?;
117            self.sync().await
118        }
119
120        async fn resize(&self, len: u64) -> Result<(), Error> {
121            let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
122            self.data.lock().resize(len, 0);
123            Ok(())
124        }
125
126        async fn sync(&self) -> Result<(), Error> {
127            Ok(())
128        }
129    }
130
131    #[derive(Default)]
132    struct RangeSyncState {
133        /// All data currently stored in the blob.
134        ///
135        /// This includes every durable byte plus any newer bytes that have not
136        /// been made durable yet.
137        data: Vec<u8>,
138
139        /// Prefix/ranges of `data` that would survive a crash.
140        durable: Vec<u8>,
141
142        /// Number of write operations.
143        writes: usize,
144
145        /// Number of full sync barriers.
146        full_syncs: usize,
147
148        /// Number of range-scoped write syncs.
149        range_syncs: usize,
150    }
151
152    /// Test blob with separate visible and durable state.
153    ///
154    /// Plain writes and resizes only update `data`. `write_at_sync` updates `data`
155    /// and then copies only that submitted range into `durable`. `sync` copies all
156    /// of `data` to `durable`. This lets tests assert that `Write::sync` uses range
157    /// sync only when no earlier unsynced mutation needs a full durability barrier.
158    #[derive(Clone)]
159    pub struct SyncTrackingBlob {
160        state: Arc<Mutex<RangeSyncState>>,
161    }
162
163    impl SyncTrackingBlob {
164        pub fn new() -> Self {
165            Self {
166                state: Arc::new(Mutex::new(RangeSyncState::default())),
167            }
168        }
169
170        pub fn snapshot(&self) -> (Vec<u8>, usize, usize, usize) {
171            let state = self.state.lock();
172            (
173                state.durable.clone(),
174                state.writes,
175                state.full_syncs,
176                state.range_syncs,
177            )
178        }
179
180        pub fn size(&self) -> u64 {
181            self.state.lock().data.len() as u64
182        }
183
184        fn write(data: &mut Vec<u8>, offset: u64, buf: &[u8]) -> Result<(), Error> {
185            let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
186            let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
187            if end > data.len() {
188                data.resize(end, 0);
189            }
190            data[start..end].copy_from_slice(buf);
191            Ok(())
192        }
193    }
194
195    impl crate::Blob for SyncTrackingBlob {
196        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
197            self.read_at_buf(offset, len, IoBufMut::default()).await
198        }
199
200        async fn read_at_buf(
201            &self,
202            offset: u64,
203            len: usize,
204            buf: impl Into<IoBufsMut> + Send,
205        ) -> Result<IoBufsMut, Error> {
206            let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
207            let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
208            let state = self.state.lock();
209            if end > state.data.len() {
210                return Err(Error::BlobInsufficientLength);
211            }
212
213            let mut out = buf.into();
214            out.put_slice(&state.data[start..end]);
215            Ok(out)
216        }
217
218        async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
219            let buf = buf.into().coalesce();
220            let mut state = self.state.lock();
221            Self::write(&mut state.data, offset, buf.as_ref())?;
222            state.writes += 1;
223            Ok(())
224        }
225
226        async fn write_at_sync(
227            &self,
228            offset: u64,
229            buf: impl Into<IoBufs> + Send,
230        ) -> Result<(), Error> {
231            let buf = buf.into().coalesce();
232            let mut state = self.state.lock();
233            Self::write(&mut state.data, offset, buf.as_ref())?;
234            Self::write(&mut state.durable, offset, buf.as_ref())?;
235            state.writes += 1;
236            state.range_syncs += 1;
237            Ok(())
238        }
239
240        async fn resize(&self, len: u64) -> Result<(), Error> {
241            let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
242            self.state.lock().data.resize(len, 0);
243            Ok(())
244        }
245
246        async fn sync(&self) -> Result<(), Error> {
247            let mut state = self.state.lock();
248            state.durable = state.data.clone();
249            state.full_syncs += 1;
250            Ok(())
251        }
252    }
253
254    #[test_traced]
255    fn test_read_basic() {
256        let executor = deterministic::Runner::default();
257        executor.start(|context| async move {
258            // Test basic buffered reading functionality with sequential reads
259            let data = b"Hello, world! This is a test.";
260            let (blob, size) = context.open("partition", b"test").await.unwrap();
261            assert_eq!(size, 0);
262            blob.write_at(0, data).await.unwrap();
263            let size = data.len() as u64;
264
265            // Create a buffered reader with small buffer to test refilling
266            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
267
268            // Read some data
269            let read = reader.read(5).await.unwrap().coalesce();
270            assert_eq!(read.as_ref(), b"Hello");
271
272            // Read more data that requires a buffer refill
273            let read = reader.read(14).await.unwrap().coalesce();
274            assert_eq!(read.as_ref(), b", world! This ");
275
276            // Verify position tracking
277            assert_eq!(reader.position(), 19);
278
279            // Read the remaining data
280            let read = reader.read(7).await.unwrap().coalesce();
281            assert_eq!(read.as_ref(), b"is a te");
282
283            // Attempt to read beyond the end should fail
284            let result = reader.read(5).await;
285            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
286        });
287    }
288
289    #[test_traced]
290    fn test_read_cross_boundary() {
291        let executor = deterministic::Runner::default();
292        executor.start(|context| async move {
293            // Test reading data that spans multiple buffer refills
294            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
295            let (blob, size) = context.open("partition", b"test").await.unwrap();
296            assert_eq!(size, 0);
297            blob.write_at(0, data).await.unwrap();
298            let size = data.len() as u64;
299
300            // Use a buffer smaller than the total data size
301            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
302
303            // Read data that crosses buffer boundaries
304            let read = reader.read(15).await.unwrap().coalesce();
305            assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNO");
306
307            // Verify position tracking
308            assert_eq!(reader.position(), 15);
309
310            // Read the remaining data
311            let read = reader.read(11).await.unwrap().coalesce();
312            assert_eq!(read.as_ref(), b"PQRSTUVWXYZ");
313
314            // Verify we're at the end
315            assert_eq!(reader.position(), 26);
316            assert_eq!(reader.blob_remaining(), 0);
317        });
318    }
319
320    // Regression test for https://github.com/commonwarexyz/monorepo/issues/1348
321    #[test_traced]
322    fn test_read_to_end_then_rewind_and_read_again() {
323        let executor = deterministic::Runner::default();
324        executor.start(|context| async move {
325            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
326            let (blob, size) = context.open("partition", b"test").await.unwrap();
327            assert_eq!(size, 0);
328            blob.write_at(0, data).await.unwrap();
329            let size = data.len() as u64;
330
331            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
332
333            // Read data that crosses buffer boundaries
334            let read = reader.read(21).await.unwrap().coalesce();
335            assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
336
337            // Verify position tracking
338            assert_eq!(reader.position(), 21);
339
340            // Read the remaining data
341            let read = reader.read(5).await.unwrap().coalesce();
342            assert_eq!(read.as_ref(), b"VWXYZ");
343
344            // Rewind and read again
345            reader.seek_to(0).unwrap();
346            let read = reader.read(21).await.unwrap().coalesce();
347            assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
348        });
349    }
350
351    #[test_traced]
352    fn test_read_with_known_size() {
353        let executor = deterministic::Runner::default();
354        executor.start(|context| async move {
355            // Test reader behavior with known blob size limits
356            let data = b"This is a test with known size limitations.";
357            let (blob, size) = context.open("partition", b"test").await.unwrap();
358            assert_eq!(size, 0);
359            blob.write_at(0, data).await.unwrap();
360            let size = data.len() as u64;
361
362            // Create a buffered reader with buffer smaller than total data
363            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
364
365            // Check initial remaining bytes
366            assert_eq!(reader.blob_remaining(), size);
367
368            // Read partial data
369            let read = reader.read(5).await.unwrap().coalesce();
370            assert_eq!(read.as_ref(), b"This ");
371
372            // Check remaining bytes after partial read
373            assert_eq!(reader.blob_remaining(), size - 5);
374
375            // Read exactly up to the size limit
376            let read = reader.read((size - 5) as usize).await.unwrap().coalesce();
377            assert_eq!(read.as_ref(), b"is a test with known size limitations.");
378
379            // Verify we're at the end
380            assert_eq!(reader.blob_remaining(), 0);
381
382            // Reading beyond the end should fail
383            let result = reader.read(1).await;
384            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
385        });
386    }
387
388    #[test_traced]
389    fn test_read_oversized_request_does_not_consume_buffered_bytes() {
390        let executor = deterministic::Runner::default();
391        executor.start(|context| async move {
392            let data = b"abcdefghij";
393            let (blob, size) = context
394                .open("partition", b"double-count-regression")
395                .await
396                .unwrap();
397            assert_eq!(size, 0);
398            blob.write_at(0, data).await.unwrap();
399
400            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(8));
401
402            // Fill the internal buffer and consume most of it (2 bytes remain buffered).
403            let first = reader.read(6).await.unwrap().coalesce();
404            assert_eq!(first.as_ref(), b"abcdef");
405            assert_eq!(reader.position(), 6);
406
407            // Only 4 bytes remain total, so this must fail without consuming anything.
408            let err = reader.read(5).await.unwrap_err();
409            assert!(matches!(err, Error::BlobInsufficientLength));
410            assert_eq!(reader.position(), 6);
411
412            // Remaining bytes should still be readable in full.
413            let tail = reader.read(4).await.unwrap().coalesce();
414            assert_eq!(tail.as_ref(), b"ghij");
415            assert_eq!(reader.position(), 10);
416        });
417    }
418
419    #[test_traced]
420    fn test_read_large_data() {
421        let executor = deterministic::Runner::default();
422        executor.start(|context| async move {
423            // Test reading large amounts of data in chunks
424            let data_size = 1024 * 256; // 256KB of data
425            let data = vec![0x42; data_size];
426            let (blob, size) = context.open("partition", b"test").await.unwrap();
427            assert_eq!(size, 0);
428            blob.write_at(0, data.clone()).await.unwrap();
429            let size = data.len() as u64;
430
431            // Use a buffer much smaller than the total data
432            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(64 * 1024));
433
434            // Read all data in smaller chunks
435            let mut total_read = 0;
436            let chunk_size = 8 * 1024; // 8KB chunks
437
438            while total_read < data_size {
439                let to_read = std::cmp::min(chunk_size, data_size - total_read);
440                let read = reader.read(to_read).await.unwrap().coalesce();
441
442                // Verify data integrity
443                assert!(
444                    read.as_ref().iter().all(|&b| b == 0x42),
445                    "Data at position {total_read} is not correct"
446                );
447
448                total_read += to_read;
449            }
450
451            // Verify we read everything
452            assert_eq!(total_read, data_size);
453
454            // Reading beyond the end should fail
455            let result = reader.read(1).await;
456            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
457        });
458    }
459
460    #[test_traced]
461    fn test_read_exact_size_reads() {
462        let executor = deterministic::Runner::default();
463        executor.start(|context| async move {
464            // Create a blob with exactly 2.5 buffer sizes of data
465            let buffer_size = 1024;
466            let data_size = buffer_size * 5 / 2; // 2.5 buffers
467            let data = vec![0x37; data_size];
468
469            let (blob, size) = context.open("partition", b"test").await.unwrap();
470            assert_eq!(size, 0);
471            blob.write_at(0, data.clone()).await.unwrap();
472            let size = data.len() as u64;
473
474            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(buffer_size));
475
476            // Read exactly one buffer size
477            let read = reader.read(buffer_size).await.unwrap().coalesce();
478            assert!(read.as_ref().iter().all(|&b| b == 0x37));
479
480            // Read exactly one buffer size more
481            let read = reader.read(buffer_size).await.unwrap().coalesce();
482            assert!(read.as_ref().iter().all(|&b| b == 0x37));
483
484            // Read the remaining half buffer
485            let half_buffer = buffer_size / 2;
486            let read = reader.read(half_buffer).await.unwrap().coalesce();
487            assert!(read.as_ref().iter().all(|&b| b == 0x37));
488
489            // Verify we're at the end
490            assert_eq!(reader.blob_remaining(), 0);
491            assert_eq!(reader.position(), size);
492        });
493    }
494
495    #[test_traced]
496    fn test_read_structure_single_vs_chunked() {
497        let executor = deterministic::Runner::default();
498        executor.start(|context| async move {
499            let data = b"ABCDEFGHIJKL";
500            let (blob, size) = context.open("partition", b"structural").await.unwrap();
501            assert_eq!(size, 0);
502            blob.write_at(0, data).await.unwrap();
503
504            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(5));
505
506            // First read fits in one fetched chunk.
507            let first = reader.read(3).await.unwrap();
508            assert!(first.is_single());
509            assert_eq!(first.coalesce().as_ref(), b"ABC");
510
511            // This read spans refill boundaries and should be represented as multiple chunks.
512            let second = reader.read(7).await.unwrap();
513            assert!(!second.is_single());
514            assert_eq!(second.coalesce().as_ref(), b"DEFGHIJ");
515        });
516    }
517
518    #[test_traced]
519    fn test_read_seek_to() {
520        let executor = deterministic::Runner::default();
521        executor.start(|context| async move {
522            // Create a memory blob with some test data
523            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
524            let (blob, size) = context.open("partition", b"test").await.unwrap();
525            assert_eq!(size, 0);
526            blob.write_at(0, data).await.unwrap();
527            let size = data.len() as u64;
528
529            // Create a buffer reader
530            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
531
532            // Read some data to advance the position
533            let read = reader.read(5).await.unwrap().coalesce();
534            assert_eq!(read.as_ref(), b"ABCDE");
535            assert_eq!(reader.position(), 5);
536
537            // Seek to a specific position
538            reader.seek_to(10).unwrap();
539            assert_eq!(reader.position(), 10);
540
541            // Read data from the new position
542            let read = reader.read(5).await.unwrap().coalesce();
543            assert_eq!(read.as_ref(), b"KLMNO");
544
545            // Seek to beginning
546            reader.seek_to(0).unwrap();
547            assert_eq!(reader.position(), 0);
548
549            let read = reader.read(5).await.unwrap().coalesce();
550            assert_eq!(read.as_ref(), b"ABCDE");
551
552            // Seek to end
553            reader.seek_to(size).unwrap();
554            assert_eq!(reader.position(), size);
555
556            // Trying to read should fail
557            let result = reader.read(1).await;
558            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
559
560            // Seek beyond end should fail
561            let result = reader.seek_to(size + 10);
562            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
563        });
564    }
565
566    #[test_traced]
567    fn test_read_seek_with_refill() {
568        let executor = deterministic::Runner::default();
569        executor.start(|context| async move {
570            // Create a memory blob with longer data
571            let data = vec![0x41; 1000]; // 1000 'A' characters
572            let (blob, size) = context.open("partition", b"test").await.unwrap();
573            assert_eq!(size, 0);
574            blob.write_at(0, data.clone()).await.unwrap();
575            let size = data.len() as u64;
576
577            // Create a buffer reader with small buffer
578            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
579
580            // Read some data
581            let _ = reader.read(5).await.unwrap().coalesce();
582
583            // Seek far ahead, past the current buffer
584            reader.seek_to(500).unwrap();
585
586            // Read data - should get data from position 500
587            let read = reader.read(5).await.unwrap().coalesce();
588            assert_eq!(read.as_ref(), b"AAAAA"); // Should still be 'A's);
589            assert_eq!(reader.position(), 505);
590
591            // Seek backwards
592            reader.seek_to(100).unwrap();
593
594            // Read again - should be at position 100
595            let _ = reader.read(5).await.unwrap().coalesce();
596            assert_eq!(reader.position(), 105);
597        });
598    }
599
600    #[test_traced]
601    fn test_read_seek_within_buffered_range() {
602        let executor = deterministic::Runner::default();
603        executor.start(|context| async move {
604            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
605            let (blob, size) = context.open("partition", b"test").await.unwrap();
606            assert_eq!(size, 0);
607            blob.write_at(0, data).await.unwrap();
608
609            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
610
611            // Reads 0..=5, while the internal fetch cursor advances to 10.
612            let read = reader.read(6).await.unwrap().coalesce();
613            assert_eq!(read.as_ref(), b"ABCDEF");
614            assert_eq!(reader.position(), 6);
615            assert_eq!(reader.buffer_remaining(), 4);
616
617            // Seek back within [buffer_start, fetch_position).
618            reader.seek_to(3).unwrap();
619            assert_eq!(reader.position(), 3);
620            assert_eq!(reader.buffer_remaining(), 7);
621
622            let read = reader.read(5).await.unwrap().coalesce();
623            assert_eq!(read.as_ref(), b"DEFGH");
624            assert_eq!(reader.position(), 8);
625            assert_eq!(reader.buffer_remaining(), 2);
626        });
627    }
628
629    #[test_traced]
630    fn test_read_seek_within_unread_buffer_does_not_refill() {
631        let executor = deterministic::Runner::default();
632        executor.start(|context| async move {
633            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
634            let (blob, size) = context
635                .open("partition", b"seek_unread_no_refill")
636                .await
637                .unwrap();
638            assert_eq!(size, 0);
639            blob.write_at(0, data).await.unwrap();
640
641            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
642
643            // First read triggers a single refill of 10 bytes.
644            let first = reader.read(6).await.unwrap();
645            assert_eq!(first.coalesce().as_ref(), b"ABCDEF");
646            assert_eq!(reader.position(), 6);
647            assert_eq!(reader.buffer_remaining(), 4);
648
649            // Seek within the unread buffered window [6, 10).
650            reader.seek_to(7).unwrap();
651            assert_eq!(reader.position(), 7);
652            assert_eq!(reader.buffer_remaining(), 3);
653
654            // Consume only from the already buffered window.
655            let second = reader.read(3).await.unwrap();
656            assert_eq!(second.coalesce().as_ref(), b"HIJ");
657            assert_eq!(reader.position(), 10);
658            assert_eq!(reader.buffer_remaining(), 0);
659
660            // Refill should happen only now (at exhaustion), not at seek/read above.
661            let third = reader.read(1).await.unwrap();
662            assert_eq!(third.coalesce().as_ref(), b"K");
663            assert_eq!(reader.position(), 11);
664            assert_eq!(reader.buffer_remaining(), 9);
665        });
666    }
667
668    #[test_traced]
669    fn test_read_resize() {
670        let executor = deterministic::Runner::default();
671        executor.start(|context| async move {
672            // Create a memory blob with some test data
673            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
674            let (blob, size) = context.open("partition", b"test").await.unwrap();
675            assert_eq!(size, 0);
676            blob.write_at(0, data).await.unwrap();
677            let data_len = data.len() as u64;
678
679            // Create a buffer reader
680            let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
681
682            // Resize the blob to half its size
683            let resize_len = data_len / 2;
684            reader.resize(resize_len).await.unwrap();
685
686            // Reopen to check truncation
687            let (blob, size) = context.open("partition", b"test").await.unwrap();
688            assert_eq!(size, resize_len, "Blob should be resized to half size");
689
690            // Create a new buffer and read to verify truncation
691            let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
692
693            // Read the content
694            let read = new_reader.read(size as usize).await.unwrap().coalesce();
695            assert_eq!(
696                read.as_ref(),
697                b"ABCDEFGHIJKLM",
698                "Resized content should match"
699            );
700
701            // Reading beyond resized size should fail
702            let result = new_reader.read(1).await;
703            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
704
705            // Test resize to larger size
706            new_reader.resize(data_len * 2).await.unwrap();
707
708            // Reopen to check resize
709            let (blob, new_size) = context.open("partition", b"test").await.unwrap();
710            assert_eq!(new_size, data_len * 2);
711
712            // Create a new buffer and read to verify resize
713            let mut new_reader = Read::from_pooler(&context, blob, new_size, NZUsize!(10));
714            let read = new_reader.read(new_size as usize).await.unwrap().coalesce();
715            assert_eq!(&read.as_ref()[..size as usize], b"ABCDEFGHIJKLM");
716            assert_eq!(
717                &read.as_ref()[size as usize..],
718                vec![0u8; new_size as usize - size as usize]
719            );
720        });
721    }
722
723    #[test_traced]
724    fn test_read_resize_to_zero() {
725        let executor = deterministic::Runner::default();
726        executor.start(|context| async move {
727            // Create a memory blob with some test data
728            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
729            let data_len = data.len() as u64;
730            let (blob, size) = context.open("partition", b"test").await.unwrap();
731            assert_eq!(size, 0);
732            blob.write_at(0, data).await.unwrap();
733
734            // Create a buffer reader
735            let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
736
737            // Resize the blob to zero
738            reader.resize(0).await.unwrap();
739
740            // Reopen to check truncation
741            let (blob, size) = context.open("partition", b"test").await.unwrap();
742            assert_eq!(size, 0, "Blob should be resized to zero");
743
744            // Create a new buffer and try to read (should fail)
745            let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
746
747            // Reading from resized blob should fail
748            let result = new_reader.read(1).await;
749            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
750        });
751    }
752
753    #[test_traced]
754    fn test_write_basic() {
755        let executor = deterministic::Runner::default();
756        executor.start(|context| async move {
757            // Test basic buffered write and sync functionality
758            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
759            assert_eq!(size, 0);
760
761            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(8));
762            writer.write_at(0, b"hello").await.unwrap();
763            assert_eq!(writer.size().await, 5);
764            writer.sync().await.unwrap();
765            assert_eq!(writer.size().await, 5);
766
767            // Verify data was written correctly
768            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
769            assert_eq!(size, 5);
770            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
771            let read = reader.read(5).await.unwrap().coalesce();
772            assert_eq!(read.as_ref(), b"hello");
773        });
774    }
775
776    #[test_traced]
777    fn test_write_multiple_flushes() {
778        let executor = deterministic::Runner::default();
779        executor.start(|context| async move {
780            // Test writes that cause buffer flushes due to capacity limits
781            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
782            assert_eq!(size, 0);
783
784            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
785            writer.write_at(0, b"abc").await.unwrap();
786            assert_eq!(writer.size().await, 3);
787            writer.write_at(3, b"defg").await.unwrap();
788            assert_eq!(writer.size().await, 7);
789            writer.sync().await.unwrap();
790
791            // Verify the final result
792            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
793            assert_eq!(size, 7);
794            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
795            let read = reader.read(7).await.unwrap().coalesce();
796            assert_eq!(read.as_ref(), b"abcdefg");
797        });
798    }
799
800    #[test_traced]
801    fn test_write_large_data() {
802        let executor = deterministic::Runner::default();
803        executor.start(|context| async move {
804            // Test writing data larger than buffer capacity (direct write)
805            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
806            assert_eq!(size, 0);
807
808            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
809            writer.write_at(0, b"abc").await.unwrap();
810            assert_eq!(writer.size().await, 3);
811            writer
812                .write_at(3, b"defghijklmnopqrstuvwxyz")
813                .await
814                .unwrap();
815            assert_eq!(writer.size().await, 26);
816            writer.sync().await.unwrap();
817            assert_eq!(writer.size().await, 26);
818
819            // Verify the complete data
820            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
821            assert_eq!(size, 26);
822            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
823            let read = reader.read(26).await.unwrap().coalesce();
824            assert_eq!(read.as_ref(), b"abcdefghijklmnopqrstuvwxyz");
825        });
826    }
827
828    #[test_traced]
829    fn test_write_append_to_buffer() {
830        let executor = deterministic::Runner::default();
831        executor.start(|context| async move {
832            // Test sequential appends that exceed buffer capacity
833            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
834            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
835
836            // Write data that fits in buffer
837            writer.write_at(0, b"hello").await.unwrap();
838            assert_eq!(writer.size().await, 5);
839
840            // Append data that causes buffer flush
841            writer.write_at(5, b" world").await.unwrap();
842            writer.sync().await.unwrap();
843            assert_eq!(writer.size().await, 11);
844
845            // Verify the complete result
846            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
847            assert_eq!(size, 11);
848            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
849            let read = reader.read(11).await.unwrap().coalesce();
850            assert_eq!(read.as_ref(), b"hello world");
851        });
852    }
853
854    #[test_traced]
855    fn test_write_into_middle_of_buffer() {
856        let executor = deterministic::Runner::default();
857        executor.start(|context| async move {
858            // Test overwriting data within the buffer and extending it
859            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
860            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
861
862            // Initial write
863            writer.write_at(0, b"abcdefghij").await.unwrap();
864            assert_eq!(writer.size().await, 10);
865
866            // Overwrite middle section
867            writer.write_at(2, b"01234").await.unwrap();
868            assert_eq!(writer.size().await, 10);
869            writer.sync().await.unwrap();
870
871            // Verify overwrite result
872            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
873            assert_eq!(size, 10);
874            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
875            let read = reader.read(10).await.unwrap().coalesce();
876            assert_eq!(read.as_ref(), b"ab01234hij");
877
878            // Extend buffer and do partial overwrite
879            writer.write_at(10, b"klmnopqrst").await.unwrap();
880            assert_eq!(writer.size().await, 20);
881            writer.write_at(9, b"wxyz").await.unwrap();
882            assert_eq!(writer.size().await, 20);
883            writer.sync().await.unwrap();
884
885            // Verify final result
886            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
887            assert_eq!(size, 20);
888            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
889            let read = reader.read(20).await.unwrap().coalesce();
890            assert_eq!(read.as_ref(), b"ab01234hiwxyznopqrst");
891        });
892    }
893
894    #[test_traced]
895    fn test_write_before_buffer() {
896        let executor = deterministic::Runner::default();
897        executor.start(|context| async move {
898            // Test writing at offsets before the current buffer position
899            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
900            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
901
902            // Write data at a later offset first
903            writer.write_at(10, b"0123456789").await.unwrap();
904            assert_eq!(writer.size().await, 20);
905
906            // Write at an earlier offset (should flush buffer first)
907            writer.write_at(0, b"abcde").await.unwrap();
908            assert_eq!(writer.size().await, 20);
909            writer.sync().await.unwrap();
910
911            // Verify data placement with gap
912            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
913            assert_eq!(size, 20);
914            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
915            let read = reader.read(20).await.unwrap().coalesce();
916            let mut expected = vec![0u8; 20];
917            expected[0..5].copy_from_slice("abcde".as_bytes());
918            expected[10..20].copy_from_slice("0123456789".as_bytes());
919            assert_eq!(read.as_ref(), expected.as_slice());
920
921            // Fill the gap between existing data
922            writer.write_at(5, b"fghij").await.unwrap();
923            assert_eq!(writer.size().await, 20);
924            writer.sync().await.unwrap();
925            assert_eq!(writer.size().await, 20);
926
927            // Verify gap is filled
928            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
929            assert_eq!(size, 20);
930            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
931            let read = reader.read(20).await.unwrap().coalesce();
932            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
933            assert_eq!(read.as_ref(), expected.as_slice());
934        });
935    }
936
937    #[test_traced]
938    fn test_write_resize() {
939        let executor = deterministic::Runner::default();
940        executor.start(|context| async move {
941            // Test blob resize functionality and subsequent writes
942            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
943            let writer = Write::from_pooler(&context, blob, size, NZUsize!(10));
944
945            // Write initial data
946            writer.write_at(0, b"hello world").await.unwrap();
947            assert_eq!(writer.size().await, 11);
948            writer.sync().await.unwrap();
949            assert_eq!(writer.size().await, 11);
950
951            let (blob_check, size_check) =
952                context.open("partition", b"resize_write").await.unwrap();
953            assert_eq!(size_check, 11);
954            drop(blob_check);
955
956            // Resize to smaller size
957            writer.resize(5).await.unwrap();
958            assert_eq!(writer.size().await, 5);
959            writer.sync().await.unwrap();
960
961            // Verify resize
962            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
963            assert_eq!(size, 5);
964            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
965            let read = reader.read(5).await.unwrap().coalesce();
966            assert_eq!(read.as_ref(), b"hello");
967
968            // Write to resized blob
969            writer.write_at(0, b"X").await.unwrap();
970            assert_eq!(writer.size().await, 5);
971            writer.sync().await.unwrap();
972
973            // Verify overwrite
974            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
975            assert_eq!(size, 5);
976            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
977            let read = reader.read(5).await.unwrap().coalesce();
978            assert_eq!(read.as_ref(), b"Xello");
979
980            // Test resize to larger size
981            writer.resize(10).await.unwrap();
982            assert_eq!(writer.size().await, 10);
983            writer.sync().await.unwrap();
984
985            // Verify resize
986            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
987            assert_eq!(size, 10);
988            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
989            let read = reader.read(10).await.unwrap().coalesce();
990            assert_eq!(&read.as_ref()[0..5], b"Xello");
991            assert_eq!(&read.as_ref()[5..10], [0u8; 5]);
992
993            // Test resize to zero
994            let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
995            let writer_zero = Write::from_pooler(&context, blob_zero.clone(), size, NZUsize!(10));
996            writer_zero.write_at(0, b"some data").await.unwrap();
997            assert_eq!(writer_zero.size().await, 9);
998            writer_zero.sync().await.unwrap();
999            assert_eq!(writer_zero.size().await, 9);
1000            writer_zero.resize(0).await.unwrap();
1001            assert_eq!(writer_zero.size().await, 0);
1002            writer_zero.sync().await.unwrap();
1003            assert_eq!(writer_zero.size().await, 0);
1004
1005            // Ensure the blob is empty
1006            let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
1007            assert_eq!(size_z, 0);
1008        });
1009    }
1010
1011    #[test_traced]
1012    fn test_write_read_at_on_writer() {
1013        let executor = deterministic::Runner::default();
1014        executor.start(|context| async move {
1015            // Test reading through writer's read_at method (buffer + blob reads)
1016            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
1017            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1018
1019            // Write data that stays in buffer
1020            writer.write_at(0, b"buffered").await.unwrap();
1021            assert_eq!(writer.size().await, 8);
1022
1023            // Read from buffer via writer
1024            let read_buf_vec = writer.read_at(0, 4).await.unwrap().coalesce();
1025            assert_eq!(read_buf_vec, b"buff");
1026
1027            let read_buf_vec = writer.read_at(4, 4).await.unwrap().coalesce();
1028            assert_eq!(read_buf_vec, b"ered");
1029
1030            // Reading past buffer end should fail
1031            assert!(writer.read_at(8, 1).await.is_err());
1032
1033            // Write large data that flushes buffer
1034            writer.write_at(8, b" and flushed").await.unwrap();
1035            assert_eq!(writer.size().await, 20);
1036            writer.sync().await.unwrap();
1037            assert_eq!(writer.size().await, 20);
1038
1039            // Read from underlying blob through writer
1040            let read_buf_vec_2 = writer.read_at(0, 4).await.unwrap().coalesce();
1041            assert_eq!(read_buf_vec_2, b"buff");
1042
1043            let read_buf_7_vec = writer.read_at(13, 7).await.unwrap().coalesce();
1044            assert_eq!(read_buf_7_vec, b"flushed");
1045
1046            // Buffer new data at the end
1047            writer.write_at(20, b" more data").await.unwrap();
1048            assert_eq!(writer.size().await, 30);
1049
1050            // Read newly buffered data
1051            let read_buf_vec_3 = writer.read_at(20, 5).await.unwrap().coalesce();
1052            assert_eq!(read_buf_vec_3, b" more");
1053
1054            // Read spanning both blob and buffer
1055            let combo_read_buf_vec = writer.read_at(16, 12).await.unwrap();
1056            assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
1057
1058            // Verify complete content by reopening
1059            writer.sync().await.unwrap();
1060            assert_eq!(writer.size().await, 30);
1061            let (final_blob, final_size) =
1062                context.open("partition", b"read_at_writer").await.unwrap();
1063            assert_eq!(final_size, 30);
1064            let mut final_reader =
1065                Read::from_pooler(&context, final_blob, final_size, NZUsize!(30));
1066            let read = final_reader.read(30).await.unwrap().coalesce();
1067            assert_eq!(read.as_ref(), b"buffered and flushed more data");
1068        });
1069    }
1070
1071    #[test_traced]
1072    fn test_write_zero_length_read_past_eof_errors() {
1073        let executor = deterministic::Runner::default();
1074        executor.start(|context| async move {
1075            let (blob, size) = context.open("partition", b"zero_len_probe").await.unwrap();
1076            let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
1077            writer.write_at(0, b"abc").await.unwrap();
1078
1079            let empty = writer.read_at(3, 0).await.unwrap();
1080            assert!(empty.is_empty());
1081
1082            let err = writer.read_at(4, 0).await.unwrap_err();
1083            assert!(matches!(err, Error::BlobInsufficientLength));
1084        });
1085    }
1086
1087    #[test_traced]
1088    fn test_write_read_at_blocks_concurrent_write_until_persisted_read_completes() {
1089        let executor = deterministic::Runner::default();
1090        executor.start(|context| async move {
1091            let (blob, read_started_rx, release_read_tx) =
1092                BlockingReadBlob::new(b"abcdefghij".to_vec());
1093            let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
1094            let reader = writer.clone();
1095            let verifier = writer.clone();
1096
1097            // This read is entirely in persisted blob bytes (no buffered tip overlap).
1098            let read_task = context
1099                .child("read")
1100                .spawn(move |_| async move { reader.read_at(0, 4).await.expect("read failed") });
1101
1102            // Wait until read_at reached underlying blob I/O while holding the tip lock.
1103            read_started_rx.await.expect("read start signal missing");
1104
1105            let write_task = context.child("write").spawn(move |_| async move {
1106                writer.write_at(0, b"WXYZ").await.expect("write failed");
1107            });
1108            pin_mut!(write_task);
1109
1110            // Let scheduler poll the write task, it should be blocked on the tip write lock.
1111            context.sleep(Duration::from_secs(1)).await;
1112            assert!(
1113                write_task.as_mut().now_or_never().is_none(),
1114                "write_at completed while read_at still held lock over blob I/O"
1115            );
1116
1117            // Unblock persisted read and ensure both operations complete.
1118            release_read_tx
1119                .send(())
1120                .expect("failed to release blocked read");
1121            let read_result = read_task.await.expect("read task failed").coalesce();
1122            assert_eq!(read_result.as_ref(), b"abcd");
1123            write_task.await.expect("write task failed");
1124
1125            let updated = verifier.read_at(0, 4).await.unwrap().coalesce();
1126            assert_eq!(updated.as_ref(), b"WXYZ");
1127        });
1128    }
1129
1130    #[test_traced]
1131    fn test_write_read_at_overlap_blocks_concurrent_write_until_persisted_read_completes() {
1132        let executor = deterministic::Runner::default();
1133        executor.start(|context| async move {
1134            let (blob, read_started_rx, release_read_tx) =
1135                BlockingReadBlob::new(b"abcdefghij".to_vec());
1136            let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
1137            let verifier = writer.clone();
1138
1139            // This creates a tip buffer with "XYZ" at offset 10.
1140            writer.write_at(10, b"XYZ").await.unwrap();
1141
1142            // This reads overlaps blob and tip buffer.
1143            let reader = writer.clone();
1144            let read_task = context
1145                .child("read")
1146                .spawn(move |_| async move { reader.read_at(8, 5).await.expect("read failed") });
1147
1148            // Wait until overlap read reaches persisted blob I/O while holding the tip lock.
1149            read_started_rx.await.expect("read start signal missing");
1150
1151            let write_task = context.child("write").spawn(move |_| async move {
1152                writer.write_at(10, b"UVW").await.expect("write failed");
1153            });
1154            pin_mut!(write_task);
1155
1156            // Write should remain blocked on the tip write lock until read releases it.
1157            context.sleep(Duration::from_secs(1)).await;
1158            assert!(
1159                write_task.as_mut().now_or_never().is_none(),
1160                "write_at completed while overlap read_at still held lock over blob I/O"
1161            );
1162
1163            // Unblock persisted read and ensure both operations complete.
1164            release_read_tx
1165                .send(())
1166                .expect("failed to release blocked read");
1167            let read_result = read_task.await.expect("read task failed").coalesce();
1168            assert_eq!(read_result.as_ref(), b"ijXYZ");
1169            write_task.await.expect("write task failed");
1170
1171            let updated = verifier.read_at(8, 5).await.unwrap().coalesce();
1172            assert_eq!(updated.as_ref(), b"ijUVW");
1173        });
1174    }
1175
1176    #[test_traced]
1177    fn test_write_straddling_non_mergeable() {
1178        let executor = deterministic::Runner::default();
1179        executor.start(|context| async move {
1180            // Test writes that cannot be merged into buffer (non-contiguous/too large)
1181            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
1182            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1183
1184            // Fill buffer completely
1185            writer.write_at(0, b"0123456789").await.unwrap();
1186            assert_eq!(writer.size().await, 10);
1187
1188            // Write at non-contiguous offset (should flush then write directly)
1189            writer.write_at(15, b"abc").await.unwrap();
1190            assert_eq!(writer.size().await, 18);
1191            writer.sync().await.unwrap();
1192            assert_eq!(writer.size().await, 18);
1193
1194            // Verify data with gap
1195            let (blob_check, size_check) =
1196                context.open("partition", b"write_straddle").await.unwrap();
1197            assert_eq!(size_check, 18);
1198            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(20));
1199            let read = reader.read(18).await.unwrap().coalesce();
1200
1201            let mut expected = vec![0u8; 18];
1202            expected[0..10].copy_from_slice(b"0123456789");
1203            expected[15..18].copy_from_slice(b"abc");
1204            assert_eq!(read.as_ref(), expected.as_slice());
1205
1206            // Test write that exceeds buffer capacity
1207            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
1208            let writer2 = Write::from_pooler(&context, blob2.clone(), size, NZUsize!(10));
1209            writer2.write_at(0, b"0123456789").await.unwrap();
1210            assert_eq!(writer2.size().await, 10);
1211
1212            // Write large data that exceeds capacity
1213            writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
1214            assert_eq!(writer2.size().await, 17);
1215            writer2.sync().await.unwrap();
1216            assert_eq!(writer2.size().await, 17);
1217
1218            // Verify overwrite result
1219            let (blob_check2, size_check2) =
1220                context.open("partition", b"write_straddle2").await.unwrap();
1221            assert_eq!(size_check2, 17);
1222            let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(20));
1223            let read = reader2.read(17).await.unwrap().coalesce();
1224            assert_eq!(read.as_ref(), b"01234ABCDEFGHIJKL");
1225        });
1226    }
1227
1228    #[test_traced]
1229    fn test_write_close() {
1230        let executor = deterministic::Runner::default();
1231        executor.start(|context| async move {
1232            // Test that closing writer flushes and persists buffered data
1233            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
1234            let writer = Write::from_pooler(&context, blob_orig.clone(), size, NZUsize!(8));
1235            writer.write_at(0, b"pending").await.unwrap();
1236            assert_eq!(writer.size().await, 7);
1237
1238            // Sync writer to persist data
1239            writer.sync().await.unwrap();
1240
1241            // Verify data persistence
1242            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
1243            assert_eq!(size_check, 7);
1244            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(8));
1245            let read = reader.read(7).await.unwrap().coalesce();
1246            assert_eq!(read.as_ref(), b"pending");
1247        });
1248    }
1249
1250    #[test_traced]
1251    fn test_write_direct_due_to_size() {
1252        let executor = deterministic::Runner::default();
1253        executor.start(|context| async move {
1254            // Test direct writes when data exceeds buffer capacity
1255            let (blob, size) = context
1256                .open("partition", b"write_direct_size")
1257                .await
1258                .unwrap();
1259            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1260
1261            // Write data larger than buffer capacity (should write directly)
1262            let data_large = b"0123456789";
1263            writer.write_at(0, data_large).await.unwrap();
1264            assert_eq!(writer.size().await, 10);
1265
1266            // Sync to ensure data is persisted
1267            writer.sync().await.unwrap();
1268
1269            // Verify direct write worked
1270            let (blob_check, size_check) = context
1271                .open("partition", b"write_direct_size")
1272                .await
1273                .unwrap();
1274            assert_eq!(size_check, 10);
1275            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1276            let read = reader.read(10).await.unwrap().coalesce();
1277            assert_eq!(read.as_ref(), data_large.as_slice());
1278
1279            // Now write small data that should be buffered
1280            writer.write_at(10, b"abc").await.unwrap();
1281            assert_eq!(writer.size().await, 13);
1282
1283            // Verify it's in buffer by reading through writer
1284            let read_small_buf_vec = writer.read_at(10, 3).await.unwrap().coalesce();
1285            assert_eq!(read_small_buf_vec, b"abc");
1286
1287            writer.sync().await.unwrap();
1288
1289            // Verify final state
1290            let (blob_check2, size_check2) = context
1291                .open("partition", b"write_direct_size")
1292                .await
1293                .unwrap();
1294            assert_eq!(size_check2, 13);
1295            let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(13));
1296            let read = reader2.read(13).await.unwrap().coalesce();
1297            assert_eq!(&read.as_ref()[10..], b"abc".as_slice());
1298        });
1299    }
1300
1301    #[test_traced]
1302    fn test_write_overwrite_and_extend_in_buffer() {
1303        let executor = deterministic::Runner::default();
1304        executor.start(|context| async move {
1305            // Test complex buffer operations: overwrite and extend within capacity
1306            let (blob, size) = context
1307                .open("partition", b"overwrite_extend_buf")
1308                .await
1309                .unwrap();
1310            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(15));
1311
1312            // Write initial data
1313            writer.write_at(0, b"0123456789").await.unwrap();
1314            assert_eq!(writer.size().await, 10);
1315
1316            // Overwrite and extend within buffer capacity
1317            writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
1318            assert_eq!(writer.size().await, 15);
1319
1320            // Verify buffer content through writer
1321            let read_buf_vec = writer.read_at(0, 15).await.unwrap().coalesce();
1322            assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
1323
1324            writer.sync().await.unwrap();
1325
1326            // Verify persisted result
1327            let (blob_check, size_check) = context
1328                .open("partition", b"overwrite_extend_buf")
1329                .await
1330                .unwrap();
1331            assert_eq!(size_check, 15);
1332            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(15));
1333            let read = reader.read(15).await.unwrap().coalesce();
1334            assert_eq!(read.as_ref(), b"01234ABCDEFGHIJ".as_slice());
1335        });
1336    }
1337
1338    #[test_traced]
1339    fn test_write_at_size() {
1340        let executor = deterministic::Runner::default();
1341        executor.start(|context| async move {
1342            // Test writing at the current logical end of the blob
1343            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
1344            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
1345
1346            // Write initial data
1347            writer.write_at(0, b"0123456789").await.unwrap();
1348            assert_eq!(writer.size().await, 10);
1349            writer.sync().await.unwrap();
1350
1351            // Append at the current size (logical end)
1352            writer.write_at(writer.size().await, b"abc").await.unwrap();
1353            assert_eq!(writer.size().await, 13);
1354            writer.sync().await.unwrap();
1355
1356            // Verify complete result
1357            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
1358            assert_eq!(size_check, 13);
1359            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(13));
1360            let read = reader.read(13).await.unwrap().coalesce();
1361            assert_eq!(read.as_ref(), b"0123456789abc");
1362        });
1363    }
1364
1365    #[test_traced]
1366    fn test_write_at_size_multiple_appends() {
1367        let executor = deterministic::Runner::default();
1368        executor.start(|context| async move {
1369            // Test multiple appends using writer.size()
1370            let (blob, size) = context
1371                .open("partition", b"write_multiple_appends_at_size")
1372                .await
1373                .unwrap();
1374            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1375
1376            // First write
1377            writer.write_at(0, b"AAA").await.unwrap();
1378            assert_eq!(writer.size().await, 3);
1379            writer.sync().await.unwrap();
1380            assert_eq!(writer.size().await, 3);
1381
1382            // Append using size()
1383            writer.write_at(writer.size().await, b"BBB").await.unwrap();
1384            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
1385            writer.sync().await.unwrap();
1386            assert_eq!(writer.size().await, 6);
1387
1388            // Append again using size()
1389            writer.write_at(writer.size().await, b"CCC").await.unwrap();
1390            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
1391            writer.sync().await.unwrap();
1392            assert_eq!(writer.size().await, 9);
1393
1394            // Verify final content
1395            let (blob_check, size_check) = context
1396                .open("partition", b"write_multiple_appends_at_size")
1397                .await
1398                .unwrap();
1399            assert_eq!(size_check, 9);
1400            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(9));
1401            let read = reader.read(9).await.unwrap().coalesce();
1402            assert_eq!(read.as_ref(), b"AAABBBCCC");
1403        });
1404    }
1405
1406    #[test_traced]
1407    fn test_write_non_contiguous_then_append_at_size() {
1408        let executor = deterministic::Runner::default();
1409        executor.start(|context| async move {
1410            // Test writing non-contiguously, then appending at the new size
1411            let (blob, size) = context
1412                .open("partition", b"write_non_contiguous_then_append")
1413                .await
1414                .unwrap();
1415            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1416
1417            // Initial buffered write
1418            writer.write_at(0, b"INITIAL").await.unwrap(); // 7 bytes
1419            assert_eq!(writer.size().await, 7);
1420            // Buffer contains "INITIAL", inner.position = 0
1421
1422            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1423            writer.write_at(20, b"NONCONTIG").await.unwrap();
1424            assert_eq!(writer.size().await, 29);
1425            writer.sync().await.unwrap();
1426            assert_eq!(writer.size().await, 29);
1427
1428            // Append at the new size
1429            writer
1430                .write_at(writer.size().await, b"APPEND")
1431                .await
1432                .unwrap();
1433            assert_eq!(writer.size().await, 35); // 29 + 6
1434            writer.sync().await.unwrap();
1435            assert_eq!(writer.size().await, 35);
1436
1437            // Verify final content
1438            let (blob_check, size_check) = context
1439                .open("partition", b"write_non_contiguous_then_append")
1440                .await
1441                .unwrap();
1442            assert_eq!(size_check, 35);
1443            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(35));
1444            let read = reader.read(35).await.unwrap().coalesce();
1445
1446            let mut expected = vec![0u8; 35];
1447            expected[0..7].copy_from_slice(b"INITIAL");
1448            expected[20..29].copy_from_slice(b"NONCONTIG");
1449            expected[29..35].copy_from_slice(b"APPEND");
1450            assert_eq!(read.as_ref(), expected.as_slice());
1451        });
1452    }
1453
1454    #[test_traced]
1455    fn test_write_resize_then_append_at_size() {
1456        let executor = deterministic::Runner::default();
1457        executor.start(|context| async move {
1458            // Test truncating, then appending at the new size
1459            let (blob, size) = context
1460                .open("partition", b"resize_then_append_at_size")
1461                .await
1462                .unwrap();
1463            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1464
1465            // Write initial data and sync
1466            writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); // 16 bytes
1467            assert_eq!(writer.size().await, 16);
1468            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1469            assert_eq!(writer.size().await, 16);
1470
1471            // Resize
1472            let resize_to = 5;
1473            writer.resize(resize_to).await.unwrap();
1474            // after resize, inner.position should be `resize_to` (5)
1475            // buffer should be empty
1476            assert_eq!(writer.size().await, resize_to);
1477            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1478            assert_eq!(writer.size().await, resize_to);
1479
1480            // Append at the new (resized) size
1481            writer
1482                .write_at(writer.size().await, b"XXXXX")
1483                .await
1484                .unwrap(); // 5 bytes
1485                           // inner.buffer = "XXXXX", inner.position = 5
1486            assert_eq!(writer.size().await, 10); // 5 (resized) + 5 (XXXXX)
1487            writer.sync().await.unwrap();
1488            assert_eq!(writer.size().await, 10);
1489
1490            // Verify final content
1491            let (blob_check, size_check) = context
1492                .open("partition", b"resize_then_append_at_size")
1493                .await
1494                .unwrap();
1495            assert_eq!(size_check, 10);
1496            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1497            let read = reader.read(10).await.unwrap().coalesce();
1498            assert_eq!(read.as_ref(), b"01234XXXXX");
1499        });
1500    }
1501
1502    #[test_traced]
1503    fn test_write_sync_uses_range_sync_for_buffer_only_write() {
1504        let executor = deterministic::Runner::default();
1505        executor.start(|context| async move {
1506            let blob = SyncTrackingBlob::new();
1507            let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1508
1509            // A fresh writer preserves one sync barrier for mutations that predate wrapping.
1510            writer.sync().await.unwrap();
1511            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1512            assert!(durable.is_empty());
1513            assert_eq!(writes, 0);
1514            assert_eq!(full_syncs, 1);
1515            assert_eq!(range_syncs, 0);
1516
1517            // The write remains entirely buffered, so sync can make just this range durable.
1518            writer.write_at(0, b"abc").await.unwrap();
1519            writer.sync().await.unwrap();
1520
1521            // No prior plain blob mutation required another full sync barrier.
1522            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1523            assert_eq!(durable.as_slice(), b"abc");
1524            assert_eq!(writes, 1);
1525            assert_eq!(full_syncs, 1);
1526            assert_eq!(range_syncs, 1);
1527
1528            // The prior sync used write_at_sync, so there is still no pending full-sync barrier.
1529            writer.sync().await.unwrap();
1530            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1531            assert_eq!(durable.as_slice(), b"abc");
1532            assert_eq!(writes, 1);
1533            assert_eq!(full_syncs, 1);
1534            assert_eq!(range_syncs, 1);
1535        });
1536    }
1537
1538    #[test_traced]
1539    fn test_write_sync_persists_pre_wrapped_blob_mutation() {
1540        let executor = deterministic::Runner::default();
1541        executor.start(|context| async move {
1542            let blob = SyncTrackingBlob::new();
1543
1544            // Simulate a plain blob mutation before the writer wraps it.
1545            blob.write_at(0, b"abc").await.unwrap();
1546
1547            let writer = Write::from_pooler(&context, blob.clone(), 3, NZUsize!(8));
1548            writer.sync().await.unwrap();
1549
1550            // The first sync must use a full barrier to make the pre-wrapped write durable.
1551            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1552            assert_eq!(durable.as_slice(), b"abc");
1553            assert_eq!(writes, 1);
1554            assert_eq!(full_syncs, 1);
1555            assert_eq!(range_syncs, 0);
1556
1557            // After the barrier is clear, a buffered tip-only write can use range sync again.
1558            writer.write_at(3, b"d").await.unwrap();
1559            writer.sync().await.unwrap();
1560
1561            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1562            assert_eq!(durable.as_slice(), b"abcd");
1563            assert_eq!(writes, 2);
1564            assert_eq!(full_syncs, 1);
1565            assert_eq!(range_syncs, 1);
1566        });
1567    }
1568
1569    #[test_traced]
1570    fn test_write_sync_failed_range_sync_does_not_mark_clean() {
1571        let executor = deterministic::Runner::default();
1572        executor.start(|context| async move {
1573            let name = b"failed_range_sync";
1574            let (blob, size) = context.open("partition", name).await.unwrap();
1575            let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
1576            writer.sync().await.unwrap();
1577
1578            // Keep the write buffered so sync attempts the clean `write_at_sync` path.
1579            writer.write_at(0, b"abc").await.unwrap();
1580
1581            // Removing the blob makes the range-sync flush fail.
1582            context.remove("partition", Some(name)).await.unwrap();
1583            assert!(writer.sync().await.is_err());
1584
1585            // The failed `write_at_sync` must leave a pending full-sync barrier, so a
1586            // later sync cannot report success.
1587            assert!(writer.sync().await.is_err());
1588        });
1589    }
1590
1591    #[test_traced]
1592    fn test_write_sync_persists_prior_direct_flushes_with_buffered_tip() {
1593        let executor = deterministic::Runner::default();
1594        executor.start(|context| async move {
1595            let blob = SyncTrackingBlob::new();
1596            let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(4));
1597
1598            // This exceeds the buffer and forces a plain write before the final buffered tip.
1599            writer.write_at(0, b"abcdef").await.unwrap();
1600            writer.write_at(6, b"g").await.unwrap();
1601            writer.sync().await.unwrap();
1602
1603            // The final sync must cover both the prior plain write and the buffered tip.
1604            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1605            assert_eq!(durable.as_slice(), b"abcdefg");
1606            assert_eq!(writes, 2);
1607            assert_eq!(full_syncs, 1);
1608            assert_eq!(range_syncs, 0);
1609
1610            // With no new writes, sync has no work left.
1611            writer.sync().await.unwrap();
1612            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1613            assert_eq!(durable.as_slice(), b"abcdefg");
1614            assert_eq!(writes, 2);
1615            assert_eq!(full_syncs, 1);
1616            assert_eq!(range_syncs, 0);
1617
1618            // After the full sync, the next buffer-only write can use range sync again.
1619            writer.write_at(7, b"h").await.unwrap();
1620            writer.sync().await.unwrap();
1621
1622            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1623            assert_eq!(durable.as_slice(), b"abcdefgh");
1624            assert_eq!(writes, 3);
1625            assert_eq!(full_syncs, 1);
1626            assert_eq!(range_syncs, 1);
1627        });
1628    }
1629
1630    #[test_traced]
1631    fn test_write_sync_uses_full_sync_after_resize() {
1632        let executor = deterministic::Runner::default();
1633        executor.start(|context| async move {
1634            let blob = SyncTrackingBlob::new();
1635            let writer = Write::from_pooler(&context, blob.clone(), 0, NZUsize!(8));
1636            writer.sync().await.unwrap();
1637
1638            // Establish already-durable data with a range sync.
1639            writer.write_at(0, b"abcdef").await.unwrap();
1640            writer.sync().await.unwrap();
1641
1642            // Resize alone is an unsynced blob mutation.
1643            writer.resize(4).await.unwrap();
1644            writer.sync().await.unwrap();
1645
1646            // The resized contents require a full sync barrier to become durable.
1647            let (durable, writes, full_syncs, range_syncs) = blob.snapshot();
1648            assert_eq!(durable.as_slice(), b"abcd");
1649            assert_eq!(writes, 1);
1650            assert_eq!(full_syncs, 2);
1651            assert_eq!(range_syncs, 1);
1652        });
1653    }
1654}