Skip to main content

commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3pub mod paged;
4mod read;
5mod tip;
6mod write;
7
8pub use read::Read;
9pub use write::Write;
10
11#[cfg(test)]
12mod tests {
13    use super::*;
14    use crate::{
15        deterministic, Blob as _, BufMut, Clock, Error, IoBufMut, IoBufs, IoBufsMut, Runner,
16        Spawner, Storage,
17    };
18    use commonware_macros::test_traced;
19    use commonware_utils::{channel::oneshot, sync::Mutex, NZUsize};
20    use futures::{pin_mut, FutureExt};
21    use std::{sync::Arc, time::Duration};
22
23    struct BlockingReadGate {
24        read_started: Option<oneshot::Sender<()>>,
25        release_read: Option<oneshot::Receiver<()>>,
26    }
27
28    /// Test-only blob wrapper that blocks exactly one read call until explicitly released.
29    ///
30    /// Used to assert lock ordering / contention behavior in writer read-path tests.
31    #[derive(Clone)]
32    struct BlockingReadBlob {
33        data: Arc<Mutex<Vec<u8>>>,
34        gate: Arc<Mutex<BlockingReadGate>>,
35    }
36
37    impl BlockingReadBlob {
38        fn new(data: Vec<u8>) -> (Self, oneshot::Receiver<()>, oneshot::Sender<()>) {
39            let (read_started_tx, read_started_rx) = oneshot::channel();
40            let (release_read_tx, release_read_rx) = oneshot::channel();
41            (
42                Self {
43                    data: Arc::new(Mutex::new(data)),
44                    gate: Arc::new(Mutex::new(BlockingReadGate {
45                        read_started: Some(read_started_tx),
46                        release_read: Some(release_read_rx),
47                    })),
48                },
49                read_started_rx,
50                release_read_tx,
51            )
52        }
53
54        async fn block_once_on_read(&self) {
55            let rx = {
56                let mut gate = self.gate.lock();
57                gate.read_started.take().map(|read_started| {
58                    let _ = read_started.send(());
59                    gate.release_read.take().expect("release signal missing")
60                })
61            };
62            if let Some(rx) = rx {
63                let _ = rx.await;
64            }
65        }
66    }
67
68    impl crate::Blob for BlockingReadBlob {
69        async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
70            self.read_at_buf(offset, len, IoBufMut::default()).await
71        }
72
73        async fn read_at_buf(
74            &self,
75            offset: u64,
76            len: usize,
77            buf: impl Into<IoBufsMut> + Send,
78        ) -> Result<IoBufsMut, Error> {
79            self.block_once_on_read().await;
80
81            let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
82            let end = start.checked_add(len).ok_or(Error::OffsetOverflow)?;
83            let data = self.data.lock();
84            if end > data.len() {
85                return Err(Error::BlobInsufficientLength);
86            }
87
88            let mut out = buf.into();
89            out.put_slice(&data[start..end]);
90            Ok(out)
91        }
92
93        async fn write_at(&self, offset: u64, buf: impl Into<IoBufs> + Send) -> Result<(), Error> {
94            let buf = buf.into().coalesce();
95            let start = usize::try_from(offset).map_err(|_| Error::OffsetOverflow)?;
96            let end = start.checked_add(buf.len()).ok_or(Error::OffsetOverflow)?;
97
98            let mut data = self.data.lock();
99            if end > data.len() {
100                data.resize(end, 0);
101            }
102            data[start..end].copy_from_slice(buf.as_ref());
103            Ok(())
104        }
105
106        async fn resize(&self, len: u64) -> Result<(), Error> {
107            let len = usize::try_from(len).map_err(|_| Error::OffsetOverflow)?;
108            self.data.lock().resize(len, 0);
109            Ok(())
110        }
111
112        async fn sync(&self) -> Result<(), Error> {
113            Ok(())
114        }
115    }
116
117    #[test_traced]
118    fn test_read_basic() {
119        let executor = deterministic::Runner::default();
120        executor.start(|context| async move {
121            // Test basic buffered reading functionality with sequential reads
122            let data = b"Hello, world! This is a test.";
123            let (blob, size) = context.open("partition", b"test").await.unwrap();
124            assert_eq!(size, 0);
125            blob.write_at(0, data).await.unwrap();
126            let size = data.len() as u64;
127
128            // Create a buffered reader with small buffer to test refilling
129            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
130
131            // Read some data
132            let read = reader.read(5).await.unwrap().coalesce();
133            assert_eq!(read.as_ref(), b"Hello");
134
135            // Read more data that requires a buffer refill
136            let read = reader.read(14).await.unwrap().coalesce();
137            assert_eq!(read.as_ref(), b", world! This ");
138
139            // Verify position tracking
140            assert_eq!(reader.position(), 19);
141
142            // Read the remaining data
143            let read = reader.read(7).await.unwrap().coalesce();
144            assert_eq!(read.as_ref(), b"is a te");
145
146            // Attempt to read beyond the end should fail
147            let result = reader.read(5).await;
148            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
149        });
150    }
151
152    #[test_traced]
153    fn test_read_cross_boundary() {
154        let executor = deterministic::Runner::default();
155        executor.start(|context| async move {
156            // Test reading data that spans multiple buffer refills
157            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
158            let (blob, size) = context.open("partition", b"test").await.unwrap();
159            assert_eq!(size, 0);
160            blob.write_at(0, data).await.unwrap();
161            let size = data.len() as u64;
162
163            // Use a buffer smaller than the total data size
164            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
165
166            // Read data that crosses buffer boundaries
167            let read = reader.read(15).await.unwrap().coalesce();
168            assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNO");
169
170            // Verify position tracking
171            assert_eq!(reader.position(), 15);
172
173            // Read the remaining data
174            let read = reader.read(11).await.unwrap().coalesce();
175            assert_eq!(read.as_ref(), b"PQRSTUVWXYZ");
176
177            // Verify we're at the end
178            assert_eq!(reader.position(), 26);
179            assert_eq!(reader.blob_remaining(), 0);
180        });
181    }
182
183    // Regression test for https://github.com/commonwarexyz/monorepo/issues/1348
184    #[test_traced]
185    fn test_read_to_end_then_rewind_and_read_again() {
186        let executor = deterministic::Runner::default();
187        executor.start(|context| async move {
188            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
189            let (blob, size) = context.open("partition", b"test").await.unwrap();
190            assert_eq!(size, 0);
191            blob.write_at(0, data).await.unwrap();
192            let size = data.len() as u64;
193
194            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
195
196            // Read data that crosses buffer boundaries
197            let read = reader.read(21).await.unwrap().coalesce();
198            assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
199
200            // Verify position tracking
201            assert_eq!(reader.position(), 21);
202
203            // Read the remaining data
204            let read = reader.read(5).await.unwrap().coalesce();
205            assert_eq!(read.as_ref(), b"VWXYZ");
206
207            // Rewind and read again
208            reader.seek_to(0).unwrap();
209            let read = reader.read(21).await.unwrap().coalesce();
210            assert_eq!(read.as_ref(), b"ABCDEFGHIJKLMNOPQRSTU");
211        });
212    }
213
214    #[test_traced]
215    fn test_read_with_known_size() {
216        let executor = deterministic::Runner::default();
217        executor.start(|context| async move {
218            // Test reader behavior with known blob size limits
219            let data = b"This is a test with known size limitations.";
220            let (blob, size) = context.open("partition", b"test").await.unwrap();
221            assert_eq!(size, 0);
222            blob.write_at(0, data).await.unwrap();
223            let size = data.len() as u64;
224
225            // Create a buffered reader with buffer smaller than total data
226            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
227
228            // Check initial remaining bytes
229            assert_eq!(reader.blob_remaining(), size);
230
231            // Read partial data
232            let read = reader.read(5).await.unwrap().coalesce();
233            assert_eq!(read.as_ref(), b"This ");
234
235            // Check remaining bytes after partial read
236            assert_eq!(reader.blob_remaining(), size - 5);
237
238            // Read exactly up to the size limit
239            let read = reader.read((size - 5) as usize).await.unwrap().coalesce();
240            assert_eq!(read.as_ref(), b"is a test with known size limitations.");
241
242            // Verify we're at the end
243            assert_eq!(reader.blob_remaining(), 0);
244
245            // Reading beyond the end should fail
246            let result = reader.read(1).await;
247            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
248        });
249    }
250
251    #[test_traced]
252    fn test_read_oversized_request_does_not_consume_buffered_bytes() {
253        let executor = deterministic::Runner::default();
254        executor.start(|context| async move {
255            let data = b"abcdefghij";
256            let (blob, size) = context
257                .open("partition", b"double-count-regression")
258                .await
259                .unwrap();
260            assert_eq!(size, 0);
261            blob.write_at(0, data).await.unwrap();
262
263            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(8));
264
265            // Fill the internal buffer and consume most of it (2 bytes remain buffered).
266            let first = reader.read(6).await.unwrap().coalesce();
267            assert_eq!(first.as_ref(), b"abcdef");
268            assert_eq!(reader.position(), 6);
269
270            // Only 4 bytes remain total, so this must fail without consuming anything.
271            let err = reader.read(5).await.unwrap_err();
272            assert!(matches!(err, Error::BlobInsufficientLength));
273            assert_eq!(reader.position(), 6);
274
275            // Remaining bytes should still be readable in full.
276            let tail = reader.read(4).await.unwrap().coalesce();
277            assert_eq!(tail.as_ref(), b"ghij");
278            assert_eq!(reader.position(), 10);
279        });
280    }
281
282    #[test_traced]
283    fn test_read_large_data() {
284        let executor = deterministic::Runner::default();
285        executor.start(|context| async move {
286            // Test reading large amounts of data in chunks
287            let data_size = 1024 * 256; // 256KB of data
288            let data = vec![0x42; data_size];
289            let (blob, size) = context.open("partition", b"test").await.unwrap();
290            assert_eq!(size, 0);
291            blob.write_at(0, data.clone()).await.unwrap();
292            let size = data.len() as u64;
293
294            // Use a buffer much smaller than the total data
295            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(64 * 1024));
296
297            // Read all data in smaller chunks
298            let mut total_read = 0;
299            let chunk_size = 8 * 1024; // 8KB chunks
300
301            while total_read < data_size {
302                let to_read = std::cmp::min(chunk_size, data_size - total_read);
303                let read = reader.read(to_read).await.unwrap().coalesce();
304
305                // Verify data integrity
306                assert!(
307                    read.as_ref().iter().all(|&b| b == 0x42),
308                    "Data at position {total_read} is not correct"
309                );
310
311                total_read += to_read;
312            }
313
314            // Verify we read everything
315            assert_eq!(total_read, data_size);
316
317            // Reading beyond the end should fail
318            let result = reader.read(1).await;
319            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
320        });
321    }
322
323    #[test_traced]
324    fn test_read_exact_size_reads() {
325        let executor = deterministic::Runner::default();
326        executor.start(|context| async move {
327            // Create a blob with exactly 2.5 buffer sizes of data
328            let buffer_size = 1024;
329            let data_size = buffer_size * 5 / 2; // 2.5 buffers
330            let data = vec![0x37; data_size];
331
332            let (blob, size) = context.open("partition", b"test").await.unwrap();
333            assert_eq!(size, 0);
334            blob.write_at(0, data.clone()).await.unwrap();
335            let size = data.len() as u64;
336
337            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(buffer_size));
338
339            // Read exactly one buffer size
340            let read = reader.read(buffer_size).await.unwrap().coalesce();
341            assert!(read.as_ref().iter().all(|&b| b == 0x37));
342
343            // Read exactly one buffer size more
344            let read = reader.read(buffer_size).await.unwrap().coalesce();
345            assert!(read.as_ref().iter().all(|&b| b == 0x37));
346
347            // Read the remaining half buffer
348            let half_buffer = buffer_size / 2;
349            let read = reader.read(half_buffer).await.unwrap().coalesce();
350            assert!(read.as_ref().iter().all(|&b| b == 0x37));
351
352            // Verify we're at the end
353            assert_eq!(reader.blob_remaining(), 0);
354            assert_eq!(reader.position(), size);
355        });
356    }
357
358    #[test_traced]
359    fn test_read_structure_single_vs_chunked() {
360        let executor = deterministic::Runner::default();
361        executor.start(|context| async move {
362            let data = b"ABCDEFGHIJKL";
363            let (blob, size) = context.open("partition", b"structural").await.unwrap();
364            assert_eq!(size, 0);
365            blob.write_at(0, data).await.unwrap();
366
367            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(5));
368
369            // First read fits in one fetched chunk.
370            let first = reader.read(3).await.unwrap();
371            assert!(first.is_single());
372            assert_eq!(first.coalesce().as_ref(), b"ABC");
373
374            // This read spans refill boundaries and should be represented as multiple chunks.
375            let second = reader.read(7).await.unwrap();
376            assert!(!second.is_single());
377            assert_eq!(second.coalesce().as_ref(), b"DEFGHIJ");
378        });
379    }
380
381    #[test_traced]
382    fn test_read_seek_to() {
383        let executor = deterministic::Runner::default();
384        executor.start(|context| async move {
385            // Create a memory blob with some test data
386            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
387            let (blob, size) = context.open("partition", b"test").await.unwrap();
388            assert_eq!(size, 0);
389            blob.write_at(0, data).await.unwrap();
390            let size = data.len() as u64;
391
392            // Create a buffer reader
393            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
394
395            // Read some data to advance the position
396            let read = reader.read(5).await.unwrap().coalesce();
397            assert_eq!(read.as_ref(), b"ABCDE");
398            assert_eq!(reader.position(), 5);
399
400            // Seek to a specific position
401            reader.seek_to(10).unwrap();
402            assert_eq!(reader.position(), 10);
403
404            // Read data from the new position
405            let read = reader.read(5).await.unwrap().coalesce();
406            assert_eq!(read.as_ref(), b"KLMNO");
407
408            // Seek to beginning
409            reader.seek_to(0).unwrap();
410            assert_eq!(reader.position(), 0);
411
412            let read = reader.read(5).await.unwrap().coalesce();
413            assert_eq!(read.as_ref(), b"ABCDE");
414
415            // Seek to end
416            reader.seek_to(size).unwrap();
417            assert_eq!(reader.position(), size);
418
419            // Trying to read should fail
420            let result = reader.read(1).await;
421            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
422
423            // Seek beyond end should fail
424            let result = reader.seek_to(size + 10);
425            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
426        });
427    }
428
429    #[test_traced]
430    fn test_read_seek_with_refill() {
431        let executor = deterministic::Runner::default();
432        executor.start(|context| async move {
433            // Create a memory blob with longer data
434            let data = vec![0x41; 1000]; // 1000 'A' characters
435            let (blob, size) = context.open("partition", b"test").await.unwrap();
436            assert_eq!(size, 0);
437            blob.write_at(0, data.clone()).await.unwrap();
438            let size = data.len() as u64;
439
440            // Create a buffer reader with small buffer
441            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
442
443            // Read some data
444            let _ = reader.read(5).await.unwrap().coalesce();
445
446            // Seek far ahead, past the current buffer
447            reader.seek_to(500).unwrap();
448
449            // Read data - should get data from position 500
450            let read = reader.read(5).await.unwrap().coalesce();
451            assert_eq!(read.as_ref(), b"AAAAA"); // Should still be 'A's);
452            assert_eq!(reader.position(), 505);
453
454            // Seek backwards
455            reader.seek_to(100).unwrap();
456
457            // Read again - should be at position 100
458            let _ = reader.read(5).await.unwrap().coalesce();
459            assert_eq!(reader.position(), 105);
460        });
461    }
462
463    #[test_traced]
464    fn test_read_seek_within_buffered_range() {
465        let executor = deterministic::Runner::default();
466        executor.start(|context| async move {
467            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
468            let (blob, size) = context.open("partition", b"test").await.unwrap();
469            assert_eq!(size, 0);
470            blob.write_at(0, data).await.unwrap();
471
472            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
473
474            // Reads 0..=5, while the internal fetch cursor advances to 10.
475            let read = reader.read(6).await.unwrap().coalesce();
476            assert_eq!(read.as_ref(), b"ABCDEF");
477            assert_eq!(reader.position(), 6);
478            assert_eq!(reader.buffer_remaining(), 4);
479
480            // Seek back within [buffer_start, fetch_position).
481            reader.seek_to(3).unwrap();
482            assert_eq!(reader.position(), 3);
483            assert_eq!(reader.buffer_remaining(), 7);
484
485            let read = reader.read(5).await.unwrap().coalesce();
486            assert_eq!(read.as_ref(), b"DEFGH");
487            assert_eq!(reader.position(), 8);
488            assert_eq!(reader.buffer_remaining(), 2);
489        });
490    }
491
492    #[test_traced]
493    fn test_read_seek_within_unread_buffer_does_not_refill() {
494        let executor = deterministic::Runner::default();
495        executor.start(|context| async move {
496            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
497            let (blob, size) = context
498                .open("partition", b"seek_unread_no_refill")
499                .await
500                .unwrap();
501            assert_eq!(size, 0);
502            blob.write_at(0, data).await.unwrap();
503
504            let mut reader = Read::from_pooler(&context, blob, data.len() as u64, NZUsize!(10));
505
506            // First read triggers a single refill of 10 bytes.
507            let first = reader.read(6).await.unwrap();
508            assert_eq!(first.coalesce().as_ref(), b"ABCDEF");
509            assert_eq!(reader.position(), 6);
510            assert_eq!(reader.buffer_remaining(), 4);
511
512            // Seek within the unread buffered window [6, 10).
513            reader.seek_to(7).unwrap();
514            assert_eq!(reader.position(), 7);
515            assert_eq!(reader.buffer_remaining(), 3);
516
517            // Consume only from the already buffered window.
518            let second = reader.read(3).await.unwrap();
519            assert_eq!(second.coalesce().as_ref(), b"HIJ");
520            assert_eq!(reader.position(), 10);
521            assert_eq!(reader.buffer_remaining(), 0);
522
523            // Refill should happen only now (at exhaustion), not at seek/read above.
524            let third = reader.read(1).await.unwrap();
525            assert_eq!(third.coalesce().as_ref(), b"K");
526            assert_eq!(reader.position(), 11);
527            assert_eq!(reader.buffer_remaining(), 9);
528        });
529    }
530
531    #[test_traced]
532    fn test_read_resize() {
533        let executor = deterministic::Runner::default();
534        executor.start(|context| async move {
535            // Create a memory blob with some test data
536            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
537            let (blob, size) = context.open("partition", b"test").await.unwrap();
538            assert_eq!(size, 0);
539            blob.write_at(0, data).await.unwrap();
540            let data_len = data.len() as u64;
541
542            // Create a buffer reader
543            let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
544
545            // Resize the blob to half its size
546            let resize_len = data_len / 2;
547            reader.resize(resize_len).await.unwrap();
548
549            // Reopen to check truncation
550            let (blob, size) = context.open("partition", b"test").await.unwrap();
551            assert_eq!(size, resize_len, "Blob should be resized to half size");
552
553            // Create a new buffer and read to verify truncation
554            let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
555
556            // Read the content
557            let read = new_reader.read(size as usize).await.unwrap().coalesce();
558            assert_eq!(
559                read.as_ref(),
560                b"ABCDEFGHIJKLM",
561                "Resized content should match"
562            );
563
564            // Reading beyond resized size should fail
565            let result = new_reader.read(1).await;
566            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
567
568            // Test resize to larger size
569            new_reader.resize(data_len * 2).await.unwrap();
570
571            // Reopen to check resize
572            let (blob, new_size) = context.open("partition", b"test").await.unwrap();
573            assert_eq!(new_size, data_len * 2);
574
575            // Create a new buffer and read to verify resize
576            let mut new_reader = Read::from_pooler(&context, blob, new_size, NZUsize!(10));
577            let read = new_reader.read(new_size as usize).await.unwrap().coalesce();
578            assert_eq!(&read.as_ref()[..size as usize], b"ABCDEFGHIJKLM");
579            assert_eq!(
580                &read.as_ref()[size as usize..],
581                vec![0u8; new_size as usize - size as usize]
582            );
583        });
584    }
585
586    #[test_traced]
587    fn test_read_resize_to_zero() {
588        let executor = deterministic::Runner::default();
589        executor.start(|context| async move {
590            // Create a memory blob with some test data
591            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
592            let data_len = data.len() as u64;
593            let (blob, size) = context.open("partition", b"test").await.unwrap();
594            assert_eq!(size, 0);
595            blob.write_at(0, data).await.unwrap();
596
597            // Create a buffer reader
598            let reader = Read::from_pooler(&context, blob.clone(), data_len, NZUsize!(10));
599
600            // Resize the blob to zero
601            reader.resize(0).await.unwrap();
602
603            // Reopen to check truncation
604            let (blob, size) = context.open("partition", b"test").await.unwrap();
605            assert_eq!(size, 0, "Blob should be resized to zero");
606
607            // Create a new buffer and try to read (should fail)
608            let mut new_reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
609
610            // Reading from resized blob should fail
611            let result = new_reader.read(1).await;
612            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
613        });
614    }
615
616    #[test_traced]
617    fn test_write_basic() {
618        let executor = deterministic::Runner::default();
619        executor.start(|context| async move {
620            // Test basic buffered write and sync functionality
621            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
622            assert_eq!(size, 0);
623
624            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(8));
625            writer.write_at(0, b"hello").await.unwrap();
626            assert_eq!(writer.size().await, 5);
627            writer.sync().await.unwrap();
628            assert_eq!(writer.size().await, 5);
629
630            // Verify data was written correctly
631            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
632            assert_eq!(size, 5);
633            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(8));
634            let read = reader.read(5).await.unwrap().coalesce();
635            assert_eq!(read.as_ref(), b"hello");
636        });
637    }
638
639    #[test_traced]
640    fn test_write_multiple_flushes() {
641        let executor = deterministic::Runner::default();
642        executor.start(|context| async move {
643            // Test writes that cause buffer flushes due to capacity limits
644            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
645            assert_eq!(size, 0);
646
647            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
648            writer.write_at(0, b"abc").await.unwrap();
649            assert_eq!(writer.size().await, 3);
650            writer.write_at(3, b"defg").await.unwrap();
651            assert_eq!(writer.size().await, 7);
652            writer.sync().await.unwrap();
653
654            // Verify the final result
655            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
656            assert_eq!(size, 7);
657            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
658            let read = reader.read(7).await.unwrap().coalesce();
659            assert_eq!(read.as_ref(), b"abcdefg");
660        });
661    }
662
663    #[test_traced]
664    fn test_write_large_data() {
665        let executor = deterministic::Runner::default();
666        executor.start(|context| async move {
667            // Test writing data larger than buffer capacity (direct write)
668            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
669            assert_eq!(size, 0);
670
671            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(4));
672            writer.write_at(0, b"abc").await.unwrap();
673            assert_eq!(writer.size().await, 3);
674            writer
675                .write_at(3, b"defghijklmnopqrstuvwxyz")
676                .await
677                .unwrap();
678            assert_eq!(writer.size().await, 26);
679            writer.sync().await.unwrap();
680            assert_eq!(writer.size().await, 26);
681
682            // Verify the complete data
683            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
684            assert_eq!(size, 26);
685            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(4));
686            let read = reader.read(26).await.unwrap().coalesce();
687            assert_eq!(read.as_ref(), b"abcdefghijklmnopqrstuvwxyz");
688        });
689    }
690
691    #[test_traced]
692    fn test_write_append_to_buffer() {
693        let executor = deterministic::Runner::default();
694        executor.start(|context| async move {
695            // Test sequential appends that exceed buffer capacity
696            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
697            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
698
699            // Write data that fits in buffer
700            writer.write_at(0, b"hello").await.unwrap();
701            assert_eq!(writer.size().await, 5);
702
703            // Append data that causes buffer flush
704            writer.write_at(5, b" world").await.unwrap();
705            writer.sync().await.unwrap();
706            assert_eq!(writer.size().await, 11);
707
708            // Verify the complete result
709            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
710            assert_eq!(size, 11);
711            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
712            let read = reader.read(11).await.unwrap().coalesce();
713            assert_eq!(read.as_ref(), b"hello world");
714        });
715    }
716
717    #[test_traced]
718    fn test_write_into_middle_of_buffer() {
719        let executor = deterministic::Runner::default();
720        executor.start(|context| async move {
721            // Test overwriting data within the buffer and extending it
722            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
723            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
724
725            // Initial write
726            writer.write_at(0, b"abcdefghij").await.unwrap();
727            assert_eq!(writer.size().await, 10);
728
729            // Overwrite middle section
730            writer.write_at(2, b"01234").await.unwrap();
731            assert_eq!(writer.size().await, 10);
732            writer.sync().await.unwrap();
733
734            // Verify overwrite result
735            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
736            assert_eq!(size, 10);
737            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
738            let read = reader.read(10).await.unwrap().coalesce();
739            assert_eq!(read.as_ref(), b"ab01234hij");
740
741            // Extend buffer and do partial overwrite
742            writer.write_at(10, b"klmnopqrst").await.unwrap();
743            assert_eq!(writer.size().await, 20);
744            writer.write_at(9, b"wxyz").await.unwrap();
745            assert_eq!(writer.size().await, 20);
746            writer.sync().await.unwrap();
747
748            // Verify final result
749            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
750            assert_eq!(size, 20);
751            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
752            let read = reader.read(20).await.unwrap().coalesce();
753            assert_eq!(read.as_ref(), b"ab01234hiwxyznopqrst");
754        });
755    }
756
757    #[test_traced]
758    fn test_write_before_buffer() {
759        let executor = deterministic::Runner::default();
760        executor.start(|context| async move {
761            // Test writing at offsets before the current buffer position
762            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
763            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
764
765            // Write data at a later offset first
766            writer.write_at(10, b"0123456789").await.unwrap();
767            assert_eq!(writer.size().await, 20);
768
769            // Write at an earlier offset (should flush buffer first)
770            writer.write_at(0, b"abcde").await.unwrap();
771            assert_eq!(writer.size().await, 20);
772            writer.sync().await.unwrap();
773
774            // Verify data placement with gap
775            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
776            assert_eq!(size, 20);
777            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
778            let read = reader.read(20).await.unwrap().coalesce();
779            let mut expected = vec![0u8; 20];
780            expected[0..5].copy_from_slice("abcde".as_bytes());
781            expected[10..20].copy_from_slice("0123456789".as_bytes());
782            assert_eq!(read.as_ref(), expected.as_slice());
783
784            // Fill the gap between existing data
785            writer.write_at(5, b"fghij").await.unwrap();
786            assert_eq!(writer.size().await, 20);
787            writer.sync().await.unwrap();
788            assert_eq!(writer.size().await, 20);
789
790            // Verify gap is filled
791            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
792            assert_eq!(size, 20);
793            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(20));
794            let read = reader.read(20).await.unwrap().coalesce();
795            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
796            assert_eq!(read.as_ref(), expected.as_slice());
797        });
798    }
799
800    #[test_traced]
801    fn test_write_resize() {
802        let executor = deterministic::Runner::default();
803        executor.start(|context| async move {
804            // Test blob resize functionality and subsequent writes
805            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
806            let writer = Write::from_pooler(&context, blob, size, NZUsize!(10));
807
808            // Write initial data
809            writer.write_at(0, b"hello world").await.unwrap();
810            assert_eq!(writer.size().await, 11);
811            writer.sync().await.unwrap();
812            assert_eq!(writer.size().await, 11);
813
814            let (blob_check, size_check) =
815                context.open("partition", b"resize_write").await.unwrap();
816            assert_eq!(size_check, 11);
817            drop(blob_check);
818
819            // Resize to smaller size
820            writer.resize(5).await.unwrap();
821            assert_eq!(writer.size().await, 5);
822            writer.sync().await.unwrap();
823
824            // Verify resize
825            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
826            assert_eq!(size, 5);
827            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
828            let read = reader.read(5).await.unwrap().coalesce();
829            assert_eq!(read.as_ref(), b"hello");
830
831            // Write to resized blob
832            writer.write_at(0, b"X").await.unwrap();
833            assert_eq!(writer.size().await, 5);
834            writer.sync().await.unwrap();
835
836            // Verify overwrite
837            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
838            assert_eq!(size, 5);
839            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(5));
840            let read = reader.read(5).await.unwrap().coalesce();
841            assert_eq!(read.as_ref(), b"Xello");
842
843            // Test resize to larger size
844            writer.resize(10).await.unwrap();
845            assert_eq!(writer.size().await, 10);
846            writer.sync().await.unwrap();
847
848            // Verify resize
849            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
850            assert_eq!(size, 10);
851            let mut reader = Read::from_pooler(&context, blob, size, NZUsize!(10));
852            let read = reader.read(10).await.unwrap().coalesce();
853            assert_eq!(&read.as_ref()[0..5], b"Xello");
854            assert_eq!(&read.as_ref()[5..10], [0u8; 5]);
855
856            // Test resize to zero
857            let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
858            let writer_zero = Write::from_pooler(&context, blob_zero.clone(), size, NZUsize!(10));
859            writer_zero.write_at(0, b"some data").await.unwrap();
860            assert_eq!(writer_zero.size().await, 9);
861            writer_zero.sync().await.unwrap();
862            assert_eq!(writer_zero.size().await, 9);
863            writer_zero.resize(0).await.unwrap();
864            assert_eq!(writer_zero.size().await, 0);
865            writer_zero.sync().await.unwrap();
866            assert_eq!(writer_zero.size().await, 0);
867
868            // Ensure the blob is empty
869            let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
870            assert_eq!(size_z, 0);
871        });
872    }
873
874    #[test_traced]
875    fn test_write_read_at_on_writer() {
876        let executor = deterministic::Runner::default();
877        executor.start(|context| async move {
878            // Test reading through writer's read_at method (buffer + blob reads)
879            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
880            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
881
882            // Write data that stays in buffer
883            writer.write_at(0, b"buffered").await.unwrap();
884            assert_eq!(writer.size().await, 8);
885
886            // Read from buffer via writer
887            let read_buf_vec = writer.read_at(0, 4).await.unwrap().coalesce();
888            assert_eq!(read_buf_vec, b"buff");
889
890            let read_buf_vec = writer.read_at(4, 4).await.unwrap().coalesce();
891            assert_eq!(read_buf_vec, b"ered");
892
893            // Reading past buffer end should fail
894            assert!(writer.read_at(8, 1).await.is_err());
895
896            // Write large data that flushes buffer
897            writer.write_at(8, b" and flushed").await.unwrap();
898            assert_eq!(writer.size().await, 20);
899            writer.sync().await.unwrap();
900            assert_eq!(writer.size().await, 20);
901
902            // Read from underlying blob through writer
903            let read_buf_vec_2 = writer.read_at(0, 4).await.unwrap().coalesce();
904            assert_eq!(read_buf_vec_2, b"buff");
905
906            let read_buf_7_vec = writer.read_at(13, 7).await.unwrap().coalesce();
907            assert_eq!(read_buf_7_vec, b"flushed");
908
909            // Buffer new data at the end
910            writer.write_at(20, b" more data").await.unwrap();
911            assert_eq!(writer.size().await, 30);
912
913            // Read newly buffered data
914            let read_buf_vec_3 = writer.read_at(20, 5).await.unwrap().coalesce();
915            assert_eq!(read_buf_vec_3, b" more");
916
917            // Read spanning both blob and buffer
918            let combo_read_buf_vec = writer.read_at(16, 12).await.unwrap();
919            assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
920
921            // Verify complete content by reopening
922            writer.sync().await.unwrap();
923            assert_eq!(writer.size().await, 30);
924            let (final_blob, final_size) =
925                context.open("partition", b"read_at_writer").await.unwrap();
926            assert_eq!(final_size, 30);
927            let mut final_reader =
928                Read::from_pooler(&context, final_blob, final_size, NZUsize!(30));
929            let read = final_reader.read(30).await.unwrap().coalesce();
930            assert_eq!(read.as_ref(), b"buffered and flushed more data");
931        });
932    }
933
934    #[test_traced]
935    fn test_write_zero_length_read_past_eof_errors() {
936        let executor = deterministic::Runner::default();
937        executor.start(|context| async move {
938            let (blob, size) = context.open("partition", b"zero_len_probe").await.unwrap();
939            let writer = Write::from_pooler(&context, blob, size, NZUsize!(8));
940            writer.write_at(0, b"abc").await.unwrap();
941
942            let empty = writer.read_at(3, 0).await.unwrap();
943            assert!(empty.is_empty());
944
945            let err = writer.read_at(4, 0).await.unwrap_err();
946            assert!(matches!(err, Error::BlobInsufficientLength));
947        });
948    }
949
950    #[test_traced]
951    fn test_write_read_at_blocks_concurrent_write_until_persisted_read_completes() {
952        let executor = deterministic::Runner::default();
953        executor.start(|context| async move {
954            let (blob, read_started_rx, release_read_tx) =
955                BlockingReadBlob::new(b"abcdefghij".to_vec());
956            let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
957            let reader = writer.clone();
958            let verifier = writer.clone();
959
960            // This read is entirely in persisted blob bytes (no buffered tip overlap).
961            let read_task = context
962                .clone()
963                .spawn(move |_| async move { reader.read_at(0, 4).await.expect("read failed") });
964
965            // Wait until read_at reached underlying blob I/O while holding the tip lock.
966            read_started_rx.await.expect("read start signal missing");
967
968            let write_task = context.clone().spawn(move |_| async move {
969                writer.write_at(0, b"WXYZ").await.expect("write failed");
970            });
971            pin_mut!(write_task);
972
973            // Let scheduler poll the write task, it should be blocked on the tip write lock.
974            context.sleep(Duration::from_secs(1)).await;
975            assert!(
976                write_task.as_mut().now_or_never().is_none(),
977                "write_at completed while read_at still held lock over blob I/O"
978            );
979
980            // Unblock persisted read and ensure both operations complete.
981            release_read_tx
982                .send(())
983                .expect("failed to release blocked read");
984            let read_result = read_task.await.expect("read task failed").coalesce();
985            assert_eq!(read_result.as_ref(), b"abcd");
986            write_task.await.expect("write task failed");
987
988            let updated = verifier.read_at(0, 4).await.unwrap().coalesce();
989            assert_eq!(updated.as_ref(), b"WXYZ");
990        });
991    }
992
993    #[test_traced]
994    fn test_write_read_at_overlap_blocks_concurrent_write_until_persisted_read_completes() {
995        let executor = deterministic::Runner::default();
996        executor.start(|context| async move {
997            let (blob, read_started_rx, release_read_tx) =
998                BlockingReadBlob::new(b"abcdefghij".to_vec());
999            let writer = Write::from_pooler(&context, blob, 10, NZUsize!(8));
1000            let verifier = writer.clone();
1001
1002            // This creates a tip buffer with "XYZ" at offset 10.
1003            writer.write_at(10, b"XYZ").await.unwrap();
1004
1005            // This reads overlaps blob and tip buffer.
1006            let reader = writer.clone();
1007            let read_task = context
1008                .clone()
1009                .spawn(move |_| async move { reader.read_at(8, 5).await.expect("read failed") });
1010
1011            // Wait until overlap read reaches persisted blob I/O while holding the tip lock.
1012            read_started_rx.await.expect("read start signal missing");
1013
1014            let write_task = context.clone().spawn(move |_| async move {
1015                writer.write_at(10, b"UVW").await.expect("write failed");
1016            });
1017            pin_mut!(write_task);
1018
1019            // Write should remain blocked on the tip write lock until read releases it.
1020            context.sleep(Duration::from_secs(1)).await;
1021            assert!(
1022                write_task.as_mut().now_or_never().is_none(),
1023                "write_at completed while overlap read_at still held lock over blob I/O"
1024            );
1025
1026            // Unblock persisted read and ensure both operations complete.
1027            release_read_tx
1028                .send(())
1029                .expect("failed to release blocked read");
1030            let read_result = read_task.await.expect("read task failed").coalesce();
1031            assert_eq!(read_result.as_ref(), b"ijXYZ");
1032            write_task.await.expect("write task failed");
1033
1034            let updated = verifier.read_at(8, 5).await.unwrap().coalesce();
1035            assert_eq!(updated.as_ref(), b"ijUVW");
1036        });
1037    }
1038
1039    #[test_traced]
1040    fn test_write_straddling_non_mergeable() {
1041        let executor = deterministic::Runner::default();
1042        executor.start(|context| async move {
1043            // Test writes that cannot be merged into buffer (non-contiguous/too large)
1044            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
1045            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1046
1047            // Fill buffer completely
1048            writer.write_at(0, b"0123456789").await.unwrap();
1049            assert_eq!(writer.size().await, 10);
1050
1051            // Write at non-contiguous offset (should flush then write directly)
1052            writer.write_at(15, b"abc").await.unwrap();
1053            assert_eq!(writer.size().await, 18);
1054            writer.sync().await.unwrap();
1055            assert_eq!(writer.size().await, 18);
1056
1057            // Verify data with gap
1058            let (blob_check, size_check) =
1059                context.open("partition", b"write_straddle").await.unwrap();
1060            assert_eq!(size_check, 18);
1061            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(20));
1062            let read = reader.read(18).await.unwrap().coalesce();
1063
1064            let mut expected = vec![0u8; 18];
1065            expected[0..10].copy_from_slice(b"0123456789");
1066            expected[15..18].copy_from_slice(b"abc");
1067            assert_eq!(read.as_ref(), expected.as_slice());
1068
1069            // Test write that exceeds buffer capacity
1070            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
1071            let writer2 = Write::from_pooler(&context, blob2.clone(), size, NZUsize!(10));
1072            writer2.write_at(0, b"0123456789").await.unwrap();
1073            assert_eq!(writer2.size().await, 10);
1074
1075            // Write large data that exceeds capacity
1076            writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
1077            assert_eq!(writer2.size().await, 17);
1078            writer2.sync().await.unwrap();
1079            assert_eq!(writer2.size().await, 17);
1080
1081            // Verify overwrite result
1082            let (blob_check2, size_check2) =
1083                context.open("partition", b"write_straddle2").await.unwrap();
1084            assert_eq!(size_check2, 17);
1085            let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(20));
1086            let read = reader2.read(17).await.unwrap().coalesce();
1087            assert_eq!(read.as_ref(), b"01234ABCDEFGHIJKL");
1088        });
1089    }
1090
1091    #[test_traced]
1092    fn test_write_close() {
1093        let executor = deterministic::Runner::default();
1094        executor.start(|context| async move {
1095            // Test that closing writer flushes and persists buffered data
1096            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
1097            let writer = Write::from_pooler(&context, blob_orig.clone(), size, NZUsize!(8));
1098            writer.write_at(0, b"pending").await.unwrap();
1099            assert_eq!(writer.size().await, 7);
1100
1101            // Sync writer to persist data
1102            writer.sync().await.unwrap();
1103
1104            // Verify data persistence
1105            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
1106            assert_eq!(size_check, 7);
1107            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(8));
1108            let read = reader.read(7).await.unwrap().coalesce();
1109            assert_eq!(read.as_ref(), b"pending");
1110        });
1111    }
1112
1113    #[test_traced]
1114    fn test_write_direct_due_to_size() {
1115        let executor = deterministic::Runner::default();
1116        executor.start(|context| async move {
1117            // Test direct writes when data exceeds buffer capacity
1118            let (blob, size) = context
1119                .open("partition", b"write_direct_size")
1120                .await
1121                .unwrap();
1122            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1123
1124            // Write data larger than buffer capacity (should write directly)
1125            let data_large = b"0123456789";
1126            writer.write_at(0, data_large).await.unwrap();
1127            assert_eq!(writer.size().await, 10);
1128
1129            // Sync to ensure data is persisted
1130            writer.sync().await.unwrap();
1131
1132            // Verify direct write worked
1133            let (blob_check, size_check) = context
1134                .open("partition", b"write_direct_size")
1135                .await
1136                .unwrap();
1137            assert_eq!(size_check, 10);
1138            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1139            let read = reader.read(10).await.unwrap().coalesce();
1140            assert_eq!(read.as_ref(), data_large.as_slice());
1141
1142            // Now write small data that should be buffered
1143            writer.write_at(10, b"abc").await.unwrap();
1144            assert_eq!(writer.size().await, 13);
1145
1146            // Verify it's in buffer by reading through writer
1147            let read_small_buf_vec = writer.read_at(10, 3).await.unwrap().coalesce();
1148            assert_eq!(read_small_buf_vec, b"abc");
1149
1150            writer.sync().await.unwrap();
1151
1152            // Verify final state
1153            let (blob_check2, size_check2) = context
1154                .open("partition", b"write_direct_size")
1155                .await
1156                .unwrap();
1157            assert_eq!(size_check2, 13);
1158            let mut reader2 = Read::from_pooler(&context, blob_check2, size_check2, NZUsize!(13));
1159            let read = reader2.read(13).await.unwrap().coalesce();
1160            assert_eq!(&read.as_ref()[10..], b"abc".as_slice());
1161        });
1162    }
1163
1164    #[test_traced]
1165    fn test_write_overwrite_and_extend_in_buffer() {
1166        let executor = deterministic::Runner::default();
1167        executor.start(|context| async move {
1168            // Test complex buffer operations: overwrite and extend within capacity
1169            let (blob, size) = context
1170                .open("partition", b"overwrite_extend_buf")
1171                .await
1172                .unwrap();
1173            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(15));
1174
1175            // Write initial data
1176            writer.write_at(0, b"0123456789").await.unwrap();
1177            assert_eq!(writer.size().await, 10);
1178
1179            // Overwrite and extend within buffer capacity
1180            writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
1181            assert_eq!(writer.size().await, 15);
1182
1183            // Verify buffer content through writer
1184            let read_buf_vec = writer.read_at(0, 15).await.unwrap().coalesce();
1185            assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
1186
1187            writer.sync().await.unwrap();
1188
1189            // Verify persisted result
1190            let (blob_check, size_check) = context
1191                .open("partition", b"overwrite_extend_buf")
1192                .await
1193                .unwrap();
1194            assert_eq!(size_check, 15);
1195            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(15));
1196            let read = reader.read(15).await.unwrap().coalesce();
1197            assert_eq!(read.as_ref(), b"01234ABCDEFGHIJ".as_slice());
1198        });
1199    }
1200
1201    #[test_traced]
1202    fn test_write_at_size() {
1203        let executor = deterministic::Runner::default();
1204        executor.start(|context| async move {
1205            // Test writing at the current logical end of the blob
1206            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
1207            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(20));
1208
1209            // Write initial data
1210            writer.write_at(0, b"0123456789").await.unwrap();
1211            assert_eq!(writer.size().await, 10);
1212            writer.sync().await.unwrap();
1213
1214            // Append at the current size (logical end)
1215            writer.write_at(writer.size().await, b"abc").await.unwrap();
1216            assert_eq!(writer.size().await, 13);
1217            writer.sync().await.unwrap();
1218
1219            // Verify complete result
1220            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
1221            assert_eq!(size_check, 13);
1222            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(13));
1223            let read = reader.read(13).await.unwrap().coalesce();
1224            assert_eq!(read.as_ref(), b"0123456789abc");
1225        });
1226    }
1227
1228    #[test_traced]
1229    fn test_write_at_size_multiple_appends() {
1230        let executor = deterministic::Runner::default();
1231        executor.start(|context| async move {
1232            // Test multiple appends using writer.size()
1233            let (blob, size) = context
1234                .open("partition", b"write_multiple_appends_at_size")
1235                .await
1236                .unwrap();
1237            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(5));
1238
1239            // First write
1240            writer.write_at(0, b"AAA").await.unwrap();
1241            assert_eq!(writer.size().await, 3);
1242            writer.sync().await.unwrap();
1243            assert_eq!(writer.size().await, 3);
1244
1245            // Append using size()
1246            writer.write_at(writer.size().await, b"BBB").await.unwrap();
1247            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
1248            writer.sync().await.unwrap();
1249            assert_eq!(writer.size().await, 6);
1250
1251            // Append again using size()
1252            writer.write_at(writer.size().await, b"CCC").await.unwrap();
1253            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
1254            writer.sync().await.unwrap();
1255            assert_eq!(writer.size().await, 9);
1256
1257            // Verify final content
1258            let (blob_check, size_check) = context
1259                .open("partition", b"write_multiple_appends_at_size")
1260                .await
1261                .unwrap();
1262            assert_eq!(size_check, 9);
1263            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(9));
1264            let read = reader.read(9).await.unwrap().coalesce();
1265            assert_eq!(read.as_ref(), b"AAABBBCCC");
1266        });
1267    }
1268
1269    #[test_traced]
1270    fn test_write_non_contiguous_then_append_at_size() {
1271        let executor = deterministic::Runner::default();
1272        executor.start(|context| async move {
1273            // Test writing non-contiguously, then appending at the new size
1274            let (blob, size) = context
1275                .open("partition", b"write_non_contiguous_then_append")
1276                .await
1277                .unwrap();
1278            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1279
1280            // Initial buffered write
1281            writer.write_at(0, b"INITIAL").await.unwrap(); // 7 bytes
1282            assert_eq!(writer.size().await, 7);
1283            // Buffer contains "INITIAL", inner.position = 0
1284
1285            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1286            writer.write_at(20, b"NONCONTIG").await.unwrap();
1287            assert_eq!(writer.size().await, 29);
1288            writer.sync().await.unwrap();
1289            assert_eq!(writer.size().await, 29);
1290
1291            // Append at the new size
1292            writer
1293                .write_at(writer.size().await, b"APPEND")
1294                .await
1295                .unwrap();
1296            assert_eq!(writer.size().await, 35); // 29 + 6
1297            writer.sync().await.unwrap();
1298            assert_eq!(writer.size().await, 35);
1299
1300            // Verify final content
1301            let (blob_check, size_check) = context
1302                .open("partition", b"write_non_contiguous_then_append")
1303                .await
1304                .unwrap();
1305            assert_eq!(size_check, 35);
1306            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(35));
1307            let read = reader.read(35).await.unwrap().coalesce();
1308
1309            let mut expected = vec![0u8; 35];
1310            expected[0..7].copy_from_slice(b"INITIAL");
1311            expected[20..29].copy_from_slice(b"NONCONTIG");
1312            expected[29..35].copy_from_slice(b"APPEND");
1313            assert_eq!(read.as_ref(), expected.as_slice());
1314        });
1315    }
1316
1317    #[test_traced]
1318    fn test_resize_then_append_at_size() {
1319        let executor = deterministic::Runner::default();
1320        executor.start(|context| async move {
1321            // Test truncating, then appending at the new size
1322            let (blob, size) = context
1323                .open("partition", b"resize_then_append_at_size")
1324                .await
1325                .unwrap();
1326            let writer = Write::from_pooler(&context, blob.clone(), size, NZUsize!(10));
1327
1328            // Write initial data and sync
1329            writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); // 16 bytes
1330            assert_eq!(writer.size().await, 16);
1331            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1332            assert_eq!(writer.size().await, 16);
1333
1334            // Resize
1335            let resize_to = 5;
1336            writer.resize(resize_to).await.unwrap();
1337            // after resize, inner.position should be `resize_to` (5)
1338            // buffer should be empty
1339            assert_eq!(writer.size().await, resize_to);
1340            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1341            assert_eq!(writer.size().await, resize_to);
1342
1343            // Append at the new (resized) size
1344            writer
1345                .write_at(writer.size().await, b"XXXXX")
1346                .await
1347                .unwrap(); // 5 bytes
1348                           // inner.buffer = "XXXXX", inner.position = 5
1349            assert_eq!(writer.size().await, 10); // 5 (resized) + 5 (XXXXX)
1350            writer.sync().await.unwrap();
1351            assert_eq!(writer.size().await, 10);
1352
1353            // Verify final content
1354            let (blob_check, size_check) = context
1355                .open("partition", b"resize_then_append_at_size")
1356                .await
1357                .unwrap();
1358            assert_eq!(size_check, 10);
1359            let mut reader = Read::from_pooler(&context, blob_check, size_check, NZUsize!(10));
1360            let read = reader.read(10).await.unwrap().coalesce();
1361            assert_eq!(read.as_ref(), b"01234XXXXX");
1362        });
1363    }
1364}