commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3pub mod pool;
4mod read;
5mod tip;
6mod write;
7
8pub use pool::PoolRef;
9pub use read::Read;
10pub use write::Write;
11
12#[cfg(test)]
13mod tests {
14    use super::*;
15    use crate::{deterministic, Blob as _, Error, Runner, Storage};
16    use commonware_macros::test_traced;
17    use commonware_utils::NZUsize;
18
19    #[test_traced]
20    fn test_read_basic() {
21        let executor = deterministic::Runner::default();
22        executor.start(|context| async move {
23            // Test basic buffered reading functionality with sequential reads
24            let data = b"Hello, world! This is a test.";
25            let (blob, size) = context.open("partition", b"test").await.unwrap();
26            assert_eq!(size, 0);
27            blob.write_at(data.to_vec(), 0).await.unwrap();
28            let size = data.len() as u64;
29
30            // Create a buffered reader with small buffer to test refilling
31            let mut reader = Read::new(blob, size, NZUsize!(10));
32
33            // Read some data
34            let mut buf = [0u8; 5];
35            reader.read_exact(&mut buf, 5).await.unwrap();
36            assert_eq!(&buf, b"Hello");
37
38            // Read more data that requires a buffer refill
39            let mut buf = [0u8; 14];
40            reader.read_exact(&mut buf, 14).await.unwrap();
41            assert_eq!(&buf, b", world! This ");
42
43            // Verify position tracking
44            assert_eq!(reader.position(), 19);
45
46            // Read the remaining data
47            let mut buf = [0u8; 10];
48            reader.read_exact(&mut buf, 7).await.unwrap();
49            assert_eq!(&buf[..7], b"is a te");
50
51            // Attempt to read beyond the end should fail
52            let mut buf = [0u8; 5];
53            let result = reader.read_exact(&mut buf, 5).await;
54            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
55        });
56    }
57
58    #[test_traced]
59    fn test_read_cross_boundary() {
60        let executor = deterministic::Runner::default();
61        executor.start(|context| async move {
62            // Test reading data that spans multiple buffer refills
63            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
64            let (blob, size) = context.open("partition", b"test").await.unwrap();
65            assert_eq!(size, 0);
66            blob.write_at(data.to_vec(), 0).await.unwrap();
67            let size = data.len() as u64;
68
69            // Use a buffer smaller than the total data size
70            let mut reader = Read::new(blob, size, NZUsize!(10));
71
72            // Read data that crosses buffer boundaries
73            let mut buf = [0u8; 15];
74            reader.read_exact(&mut buf, 15).await.unwrap();
75            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
76
77            // Verify position tracking
78            assert_eq!(reader.position(), 15);
79
80            // Read the remaining data
81            let mut buf = [0u8; 11];
82            reader.read_exact(&mut buf, 11).await.unwrap();
83            assert_eq!(&buf, b"PQRSTUVWXYZ");
84
85            // Verify we're at the end
86            assert_eq!(reader.position(), 26);
87            assert_eq!(reader.blob_remaining(), 0);
88        });
89    }
90
91    // Regression test for https://github.com/commonwarexyz/monorepo/issues/1348
92    #[test_traced]
93    fn test_read_to_end_then_rewind_and_read_again() {
94        let executor = deterministic::Runner::default();
95        executor.start(|context| async move {
96            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
97            let (blob, size) = context.open("partition", b"test").await.unwrap();
98            assert_eq!(size, 0);
99            blob.write_at(data.to_vec(), 0).await.unwrap();
100            let size = data.len() as u64;
101
102            let mut reader = Read::new(blob, size, NZUsize!(20));
103
104            // Read data that crosses buffer boundaries
105            let mut buf = [0u8; 21];
106            reader.read_exact(&mut buf, 21).await.unwrap();
107            assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
108
109            // Verify position tracking
110            assert_eq!(reader.position(), 21);
111
112            // Read the remaining data
113            let mut buf = [0u8; 5];
114            reader.read_exact(&mut buf, 5).await.unwrap();
115            assert_eq!(&buf, b"VWXYZ");
116
117            // Rewind and read again
118            reader.seek_to(0).unwrap();
119            let mut buf = [0u8; 21];
120            reader.read_exact(&mut buf, 21).await.unwrap();
121            assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
122        });
123    }
124
125    #[test_traced]
126    fn test_read_with_known_size() {
127        let executor = deterministic::Runner::default();
128        executor.start(|context| async move {
129            // Test reader behavior with known blob size limits
130            let data = b"This is a test with known size limitations.";
131            let (blob, size) = context.open("partition", b"test").await.unwrap();
132            assert_eq!(size, 0);
133            blob.write_at(data.to_vec(), 0).await.unwrap();
134            let size = data.len() as u64;
135
136            // Create a buffered reader with buffer smaller than total data
137            let mut reader = Read::new(blob, size, NZUsize!(10));
138
139            // Check initial remaining bytes
140            assert_eq!(reader.blob_remaining(), size);
141
142            // Read partial data
143            let mut buf = [0u8; 5];
144            reader.read_exact(&mut buf, 5).await.unwrap();
145            assert_eq!(&buf, b"This ");
146
147            // Check remaining bytes after partial read
148            assert_eq!(reader.blob_remaining(), size - 5);
149
150            // Read exactly up to the size limit
151            let mut buf = vec![0u8; (size - 5) as usize];
152            reader
153                .read_exact(&mut buf, (size - 5) as usize)
154                .await
155                .unwrap();
156            assert_eq!(&buf, b"is a test with known size limitations.");
157
158            // Verify we're at the end
159            assert_eq!(reader.blob_remaining(), 0);
160
161            // Reading beyond the end should fail
162            let mut buf = [0u8; 1];
163            let result = reader.read_exact(&mut buf, 1).await;
164            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
165        });
166    }
167
168    #[test_traced]
169    fn test_read_large_data() {
170        let executor = deterministic::Runner::default();
171        executor.start(|context| async move {
172            // Test reading large amounts of data in chunks
173            let data_size = 1024 * 256; // 256KB of data
174            let data = vec![0x42; data_size];
175            let (blob, size) = context.open("partition", b"test").await.unwrap();
176            assert_eq!(size, 0);
177            blob.write_at(data.clone(), 0).await.unwrap();
178            let size = data.len() as u64;
179
180            // Use a buffer much smaller than the total data
181            let mut reader = Read::new(blob, size, NZUsize!(64 * 1024)); // 64KB buffer
182
183            // Read all data in smaller chunks
184            let mut total_read = 0;
185            let chunk_size = 8 * 1024; // 8KB chunks
186            let mut buf = vec![0u8; chunk_size];
187
188            while total_read < data_size {
189                let to_read = std::cmp::min(chunk_size, data_size - total_read);
190                reader
191                    .read_exact(&mut buf[..to_read], to_read)
192                    .await
193                    .unwrap();
194
195                // Verify data integrity
196                assert!(
197                    buf[..to_read].iter().all(|&b| b == 0x42),
198                    "Data at position {total_read} is not correct"
199                );
200
201                total_read += to_read;
202            }
203
204            // Verify we read everything
205            assert_eq!(total_read, data_size);
206
207            // Reading beyond the end should fail
208            let mut extra_buf = [0u8; 1];
209            let result = reader.read_exact(&mut extra_buf, 1).await;
210            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
211        });
212    }
213
214    #[test_traced]
215    fn test_read_exact_size_reads() {
216        let executor = deterministic::Runner::default();
217        executor.start(|context| async move {
218            // Create a blob with exactly 2.5 buffer sizes of data
219            let buffer_size = 1024;
220            let data_size = buffer_size * 5 / 2; // 2.5 buffers
221            let data = vec![0x37; data_size];
222
223            let (blob, size) = context.open("partition", b"test").await.unwrap();
224            assert_eq!(size, 0);
225            blob.write_at(data.clone(), 0).await.unwrap();
226            let size = data.len() as u64;
227
228            let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
229
230            // Read exactly one buffer size
231            let mut buf1 = vec![0u8; buffer_size];
232            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
233            assert!(buf1.iter().all(|&b| b == 0x37));
234
235            // Read exactly one buffer size more
236            let mut buf2 = vec![0u8; buffer_size];
237            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
238            assert!(buf2.iter().all(|&b| b == 0x37));
239
240            // Read the remaining half buffer
241            let half_buffer = buffer_size / 2;
242            let mut buf3 = vec![0u8; half_buffer];
243            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
244            assert!(buf3.iter().all(|&b| b == 0x37));
245
246            // Verify we're at the end
247            assert_eq!(reader.blob_remaining(), 0);
248            assert_eq!(reader.position(), size);
249        });
250    }
251
252    #[test_traced]
253    fn test_read_seek_to() {
254        let executor = deterministic::Runner::default();
255        executor.start(|context| async move {
256            // Create a memory blob with some test data
257            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
258            let (blob, size) = context.open("partition", b"test").await.unwrap();
259            assert_eq!(size, 0);
260            blob.write_at(data.to_vec(), 0).await.unwrap();
261            let size = data.len() as u64;
262
263            // Create a buffer reader
264            let mut reader = Read::new(blob, size, NZUsize!(10));
265
266            // Read some data to advance the position
267            let mut buf = [0u8; 5];
268            reader.read_exact(&mut buf, 5).await.unwrap();
269            assert_eq!(&buf, b"ABCDE");
270            assert_eq!(reader.position(), 5);
271
272            // Seek to a specific position
273            reader.seek_to(10).unwrap();
274            assert_eq!(reader.position(), 10);
275
276            // Read data from the new position
277            let mut buf = [0u8; 5];
278            reader.read_exact(&mut buf, 5).await.unwrap();
279            assert_eq!(&buf, b"KLMNO");
280
281            // Seek to beginning
282            reader.seek_to(0).unwrap();
283            assert_eq!(reader.position(), 0);
284
285            let mut buf = [0u8; 5];
286            reader.read_exact(&mut buf, 5).await.unwrap();
287            assert_eq!(&buf, b"ABCDE");
288
289            // Seek to end
290            reader.seek_to(size).unwrap();
291            assert_eq!(reader.position(), size);
292
293            // Trying to read should fail
294            let mut buf = [0u8; 1];
295            let result = reader.read_exact(&mut buf, 1).await;
296            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
297
298            // Seek beyond end should fail
299            let result = reader.seek_to(size + 10);
300            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
301        });
302    }
303
304    #[test_traced]
305    fn test_read_seek_with_refill() {
306        let executor = deterministic::Runner::default();
307        executor.start(|context| async move {
308            // Create a memory blob with longer data
309            let data = vec![0x41; 1000]; // 1000 'A' characters
310            let (blob, size) = context.open("partition", b"test").await.unwrap();
311            assert_eq!(size, 0);
312            blob.write_at(data.clone(), 0).await.unwrap();
313            let size = data.len() as u64;
314
315            // Create a buffer reader with small buffer
316            let mut reader = Read::new(blob, size, NZUsize!(10));
317
318            // Read some data
319            let mut buf = [0u8; 5];
320            reader.read_exact(&mut buf, 5).await.unwrap();
321
322            // Seek far ahead, past the current buffer
323            reader.seek_to(500).unwrap();
324
325            // Read data - should get data from position 500
326            let mut buf = [0u8; 5];
327            reader.read_exact(&mut buf, 5).await.unwrap();
328            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
329            assert_eq!(reader.position(), 505);
330
331            // Seek backwards
332            reader.seek_to(100).unwrap();
333
334            // Read again - should be at position 100
335            let mut buf = [0u8; 5];
336            reader.read_exact(&mut buf, 5).await.unwrap();
337            assert_eq!(reader.position(), 105);
338        });
339    }
340
341    #[test_traced]
342    fn test_read_resize() {
343        let executor = deterministic::Runner::default();
344        executor.start(|context| async move {
345            // Create a memory blob with some test data
346            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
347            let (blob, size) = context.open("partition", b"test").await.unwrap();
348            assert_eq!(size, 0);
349            blob.write_at(data.to_vec(), 0).await.unwrap();
350            let data_len = data.len() as u64;
351
352            // Create a buffer reader
353            let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
354
355            // Resize the blob to half its size
356            let resize_len = data_len / 2;
357            reader.resize(resize_len).await.unwrap();
358
359            // Reopen to check truncation
360            let (blob, size) = context.open("partition", b"test").await.unwrap();
361            assert_eq!(size, resize_len, "Blob should be resized to half size");
362
363            // Create a new buffer and read to verify truncation
364            let mut new_reader = Read::new(blob, size, NZUsize!(10));
365
366            // Read the content
367            let mut buf = vec![0u8; size as usize];
368            new_reader
369                .read_exact(&mut buf, size as usize)
370                .await
371                .unwrap();
372            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
373
374            // Reading beyond resized size should fail
375            let mut extra_buf = [0u8; 1];
376            let result = new_reader.read_exact(&mut extra_buf, 1).await;
377            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
378
379            // Test resize to larger size
380            new_reader.resize(data_len * 2).await.unwrap();
381
382            // Reopen to check resize
383            let (blob, new_size) = context.open("partition", b"test").await.unwrap();
384            assert_eq!(new_size, data_len * 2);
385
386            // Create a new buffer and read to verify resize
387            let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
388            let mut buf = vec![0u8; new_size as usize];
389            new_reader
390                .read_exact(&mut buf, new_size as usize)
391                .await
392                .unwrap();
393            assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
394            assert_eq!(
395                &buf[size as usize..],
396                vec![0u8; new_size as usize - size as usize]
397            );
398        });
399    }
400
401    #[test_traced]
402    fn test_read_resize_to_zero() {
403        let executor = deterministic::Runner::default();
404        executor.start(|context| async move {
405            // Create a memory blob with some test data
406            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
407            let data_len = data.len() as u64;
408            let (blob, size) = context.open("partition", b"test").await.unwrap();
409            assert_eq!(size, 0);
410            blob.write_at(data.to_vec(), 0).await.unwrap();
411
412            // Create a buffer reader
413            let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
414
415            // Resize the blob to zero
416            reader.resize(0).await.unwrap();
417
418            // Reopen to check truncation
419            let (blob, size) = context.open("partition", b"test").await.unwrap();
420            assert_eq!(size, 0, "Blob should be resized to zero");
421
422            // Create a new buffer and try to read (should fail)
423            let mut new_reader = Read::new(blob, size, NZUsize!(10));
424
425            // Reading from resized blob should fail
426            let mut buf = [0u8; 1];
427            let result = new_reader.read_exact(&mut buf, 1).await;
428            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
429        });
430    }
431
432    #[test_traced]
433    fn test_write_basic() {
434        let executor = deterministic::Runner::default();
435        executor.start(|context| async move {
436            // Test basic buffered write and sync functionality
437            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
438            assert_eq!(size, 0);
439
440            let writer = Write::new(blob.clone(), size, NZUsize!(8));
441            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
442            assert_eq!(writer.size().await, 5);
443            writer.sync().await.unwrap();
444            assert_eq!(writer.size().await, 5);
445
446            // Verify data was written correctly
447            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
448            assert_eq!(size, 5);
449            let mut reader = Read::new(blob, size, NZUsize!(8));
450            let mut buf = [0u8; 5];
451            reader.read_exact(&mut buf, 5).await.unwrap();
452            assert_eq!(&buf, b"hello");
453        });
454    }
455
456    #[test_traced]
457    fn test_write_multiple_flushes() {
458        let executor = deterministic::Runner::default();
459        executor.start(|context| async move {
460            // Test writes that cause buffer flushes due to capacity limits
461            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
462            assert_eq!(size, 0);
463
464            let writer = Write::new(blob.clone(), size, NZUsize!(4));
465            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
466            assert_eq!(writer.size().await, 3);
467            writer.write_at(b"defg".to_vec(), 3).await.unwrap();
468            assert_eq!(writer.size().await, 7);
469            writer.sync().await.unwrap();
470
471            // Verify the final result
472            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
473            assert_eq!(size, 7);
474            let mut reader = Read::new(blob, size, NZUsize!(4));
475            let mut buf = [0u8; 7];
476            reader.read_exact(&mut buf, 7).await.unwrap();
477            assert_eq!(&buf, b"abcdefg");
478        });
479    }
480
481    #[test_traced]
482    fn test_write_large_data() {
483        let executor = deterministic::Runner::default();
484        executor.start(|context| async move {
485            // Test writing data larger than buffer capacity (direct write)
486            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
487            assert_eq!(size, 0);
488
489            let writer = Write::new(blob.clone(), size, NZUsize!(4));
490            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
491            assert_eq!(writer.size().await, 3);
492            writer
493                .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
494                .await
495                .unwrap();
496            assert_eq!(writer.size().await, 26);
497            writer.sync().await.unwrap();
498            assert_eq!(writer.size().await, 26);
499
500            // Verify the complete data
501            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
502            assert_eq!(size, 26);
503            let mut reader = Read::new(blob, size, NZUsize!(4));
504            let mut buf = [0u8; 26];
505            reader.read_exact(&mut buf, 26).await.unwrap();
506            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
507        });
508    }
509
510    #[test_traced]
511    fn test_write_append_to_buffer() {
512        let executor = deterministic::Runner::default();
513        executor.start(|context| async move {
514            // Test sequential appends that exceed buffer capacity
515            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
516            let writer = Write::new(blob.clone(), size, NZUsize!(10));
517
518            // Write data that fits in buffer
519            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
520            assert_eq!(writer.size().await, 5);
521
522            // Append data that causes buffer flush
523            writer.write_at(b" world".to_vec(), 5).await.unwrap();
524            writer.sync().await.unwrap();
525            assert_eq!(writer.size().await, 11);
526
527            // Verify the complete result
528            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
529            assert_eq!(size, 11);
530            let mut reader = Read::new(blob, size, NZUsize!(10));
531            let mut buf = vec![0u8; 11];
532            reader.read_exact(&mut buf, 11).await.unwrap();
533            assert_eq!(&buf, b"hello world");
534        });
535    }
536
537    #[test_traced]
538    fn test_write_into_middle_of_buffer() {
539        let executor = deterministic::Runner::default();
540        executor.start(|context| async move {
541            // Test overwriting data within the buffer and extending it
542            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
543            let writer = Write::new(blob.clone(), size, NZUsize!(20));
544
545            // Initial write
546            writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
547            assert_eq!(writer.size().await, 10);
548
549            // Overwrite middle section
550            writer.write_at(b"01234".to_vec(), 2).await.unwrap();
551            assert_eq!(writer.size().await, 10);
552            writer.sync().await.unwrap();
553
554            // Verify overwrite result
555            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
556            assert_eq!(size, 10);
557            let mut reader = Read::new(blob, size, NZUsize!(10));
558            let mut buf = vec![0u8; 10];
559            reader.read_exact(&mut buf, 10).await.unwrap();
560            assert_eq!(&buf, b"ab01234hij");
561
562            // Extend buffer and do partial overwrite
563            writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
564            assert_eq!(writer.size().await, 20);
565            writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
566            assert_eq!(writer.size().await, 20);
567            writer.sync().await.unwrap();
568
569            // Verify final result
570            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
571            assert_eq!(size, 20);
572            let mut reader = Read::new(blob, size, NZUsize!(20));
573            let mut buf = vec![0u8; 20];
574            reader.read_exact(&mut buf, 20).await.unwrap();
575            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
576        });
577    }
578
579    #[test_traced]
580    fn test_write_before_buffer() {
581        let executor = deterministic::Runner::default();
582        executor.start(|context| async move {
583            // Test writing at offsets before the current buffer position
584            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
585            let writer = Write::new(blob.clone(), size, NZUsize!(10));
586
587            // Write data at a later offset first
588            writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
589            assert_eq!(writer.size().await, 20);
590
591            // Write at an earlier offset (should flush buffer first)
592            writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
593            assert_eq!(writer.size().await, 20);
594            writer.sync().await.unwrap();
595
596            // Verify data placement with gap
597            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
598            assert_eq!(size, 20);
599            let mut reader = Read::new(blob, size, NZUsize!(20));
600            let mut buf = vec![0u8; 20];
601            reader.read_exact(&mut buf, 20).await.unwrap();
602            let mut expected = vec![0u8; 20];
603            expected[0..5].copy_from_slice("abcde".as_bytes());
604            expected[10..20].copy_from_slice("0123456789".as_bytes());
605            assert_eq!(buf, expected);
606
607            // Fill the gap between existing data
608            writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
609            assert_eq!(writer.size().await, 20);
610            writer.sync().await.unwrap();
611            assert_eq!(writer.size().await, 20);
612
613            // Verify gap is filled
614            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
615            assert_eq!(size, 20);
616            let mut reader = Read::new(blob, size, NZUsize!(20));
617            let mut buf = vec![0u8; 20];
618            reader.read_exact(&mut buf, 20).await.unwrap();
619            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
620            assert_eq!(buf, expected);
621        });
622    }
623
624    #[test_traced]
625    fn test_write_resize() {
626        let executor = deterministic::Runner::default();
627        executor.start(|context| async move {
628            // Test blob resize functionality and subsequent writes
629            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
630            let writer = Write::new(blob, size, NZUsize!(10));
631
632            // Write initial data
633            writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
634            assert_eq!(writer.size().await, 11);
635            writer.sync().await.unwrap();
636            assert_eq!(writer.size().await, 11);
637
638            let (blob_check, size_check) =
639                context.open("partition", b"resize_write").await.unwrap();
640            assert_eq!(size_check, 11);
641            drop(blob_check);
642
643            // Resize to smaller size
644            writer.resize(5).await.unwrap();
645            assert_eq!(writer.size().await, 5);
646            writer.sync().await.unwrap();
647
648            // Verify resize
649            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
650            assert_eq!(size, 5);
651            let mut reader = Read::new(blob, size, NZUsize!(5));
652            let mut buf = vec![0u8; 5];
653            reader.read_exact(&mut buf, 5).await.unwrap();
654            assert_eq!(&buf, b"hello");
655
656            // Write to resized blob
657            writer.write_at(b"X".to_vec(), 0).await.unwrap();
658            assert_eq!(writer.size().await, 5);
659            writer.sync().await.unwrap();
660
661            // Verify overwrite
662            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
663            assert_eq!(size, 5);
664            let mut reader = Read::new(blob, size, NZUsize!(5));
665            let mut buf = vec![0u8; 5];
666            reader.read_exact(&mut buf, 5).await.unwrap();
667            assert_eq!(&buf, b"Xello");
668
669            // Test resize to larger size
670            writer.resize(10).await.unwrap();
671            assert_eq!(writer.size().await, 10);
672            writer.sync().await.unwrap();
673
674            // Verify resize
675            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
676            assert_eq!(size, 10);
677            let mut reader = Read::new(blob, size, NZUsize!(10));
678            let mut buf = vec![0u8; 10];
679            reader.read_exact(&mut buf, 10).await.unwrap();
680            assert_eq!(&buf[0..5], b"Xello");
681            assert_eq!(&buf[5..10], [0u8; 5]);
682
683            // Test resize to zero
684            let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
685            let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
686            writer_zero
687                .write_at(b"some data".to_vec(), 0)
688                .await
689                .unwrap();
690            assert_eq!(writer_zero.size().await, 9);
691            writer_zero.sync().await.unwrap();
692            assert_eq!(writer_zero.size().await, 9);
693            writer_zero.resize(0).await.unwrap();
694            assert_eq!(writer_zero.size().await, 0);
695            writer_zero.sync().await.unwrap();
696            assert_eq!(writer_zero.size().await, 0);
697
698            // Ensure the blob is empty
699            let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
700            assert_eq!(size_z, 0);
701        });
702    }
703
704    #[test_traced]
705    fn test_write_read_at_on_writer() {
706        let executor = deterministic::Runner::default();
707        executor.start(|context| async move {
708            // Test reading through writer's read_at method (buffer + blob reads)
709            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
710            let writer = Write::new(blob.clone(), size, NZUsize!(10));
711
712            // Write data that stays in buffer
713            writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
714            assert_eq!(writer.size().await, 8);
715
716            // Read from buffer via writer
717            let mut read_buf_vec = vec![0u8; 4].into();
718            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
719            assert_eq!(read_buf_vec.as_ref(), b"buff");
720
721            read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
722            assert_eq!(read_buf_vec.as_ref(), b"ered");
723
724            // Reading past buffer end should fail
725            let small_buf_vec = vec![0u8; 1];
726            assert!(writer.read_at(small_buf_vec, 8).await.is_err());
727
728            // Write large data that flushes buffer
729            writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
730            assert_eq!(writer.size().await, 20);
731            writer.sync().await.unwrap();
732            assert_eq!(writer.size().await, 20);
733
734            // Read from underlying blob through writer
735            let mut read_buf_vec_2 = vec![0u8; 4].into();
736            read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
737            assert_eq!(read_buf_vec_2.as_ref(), b"buff");
738
739            let mut read_buf_7_vec = vec![0u8; 7].into();
740            read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
741            assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
742
743            // Buffer new data at the end
744            writer.write_at(b" more data".to_vec(), 20).await.unwrap();
745            assert_eq!(writer.size().await, 30);
746
747            // Read newly buffered data
748            let mut read_buf_vec_3 = vec![0u8; 5].into();
749            read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
750            assert_eq!(read_buf_vec_3.as_ref(), b" more");
751
752            // Read spanning both blob and buffer
753            let mut combo_read_buf_vec = vec![0u8; 12].into();
754            combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
755            assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
756
757            // Verify complete content by reopening
758            writer.sync().await.unwrap();
759            assert_eq!(writer.size().await, 30);
760            let (final_blob, final_size) =
761                context.open("partition", b"read_at_writer").await.unwrap();
762            assert_eq!(final_size, 30);
763            let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
764            let mut full_content = vec![0u8; 30];
765            final_reader
766                .read_exact(&mut full_content, 30)
767                .await
768                .unwrap();
769            assert_eq!(&full_content, b"buffered and flushed more data");
770        });
771    }
772
773    #[test_traced]
774    fn test_write_straddling_non_mergeable() {
775        let executor = deterministic::Runner::default();
776        executor.start(|context| async move {
777            // Test writes that cannot be merged into buffer (non-contiguous/too large)
778            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
779            let writer = Write::new(blob.clone(), size, NZUsize!(10));
780
781            // Fill buffer completely
782            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
783            assert_eq!(writer.size().await, 10);
784
785            // Write at non-contiguous offset (should flush then write directly)
786            writer.write_at(b"abc".to_vec(), 15).await.unwrap();
787            assert_eq!(writer.size().await, 18);
788            writer.sync().await.unwrap();
789            assert_eq!(writer.size().await, 18);
790
791            // Verify data with gap
792            let (blob_check, size_check) =
793                context.open("partition", b"write_straddle").await.unwrap();
794            assert_eq!(size_check, 18);
795            let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
796            let mut buf = vec![0u8; 18];
797            reader.read_exact(&mut buf, 18).await.unwrap();
798
799            let mut expected = vec![0u8; 18];
800            expected[0..10].copy_from_slice(b"0123456789");
801            expected[15..18].copy_from_slice(b"abc");
802            assert_eq!(buf, expected);
803
804            // Test write that exceeds buffer capacity
805            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
806            let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
807            writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
808            assert_eq!(writer2.size().await, 10);
809
810            // Write large data that exceeds capacity
811            writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
812            assert_eq!(writer2.size().await, 17);
813            writer2.sync().await.unwrap();
814            assert_eq!(writer2.size().await, 17);
815
816            // Verify overwrite result
817            let (blob_check2, size_check2) =
818                context.open("partition", b"write_straddle2").await.unwrap();
819            assert_eq!(size_check2, 17);
820            let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
821            let mut buf2 = vec![0u8; 17];
822            reader2.read_exact(&mut buf2, 17).await.unwrap();
823            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
824        });
825    }
826
827    #[test_traced]
828    fn test_write_close() {
829        let executor = deterministic::Runner::default();
830        executor.start(|context| async move {
831            // Test that closing writer flushes and persists buffered data
832            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
833            let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
834            writer.write_at(b"pending".to_vec(), 0).await.unwrap();
835            assert_eq!(writer.size().await, 7);
836
837            // Sync writer to persist data
838            writer.sync().await.unwrap();
839
840            // Verify data persistence
841            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
842            assert_eq!(size_check, 7);
843            let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
844            let mut buf = [0u8; 7];
845            reader.read_exact(&mut buf, 7).await.unwrap();
846            assert_eq!(&buf, b"pending");
847        });
848    }
849
850    #[test_traced]
851    fn test_write_direct_due_to_size() {
852        let executor = deterministic::Runner::default();
853        executor.start(|context| async move {
854            // Test direct writes when data exceeds buffer capacity
855            let (blob, size) = context
856                .open("partition", b"write_direct_size")
857                .await
858                .unwrap();
859            let writer = Write::new(blob.clone(), size, NZUsize!(5));
860
861            // Write data larger than buffer capacity (should write directly)
862            let data_large = b"0123456789";
863            writer.write_at(data_large.to_vec(), 0).await.unwrap();
864            assert_eq!(writer.size().await, 10);
865
866            // Sync to ensure data is persisted
867            writer.sync().await.unwrap();
868
869            // Verify direct write worked
870            let (blob_check, size_check) = context
871                .open("partition", b"write_direct_size")
872                .await
873                .unwrap();
874            assert_eq!(size_check, 10);
875            let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
876            let mut buf = vec![0u8; 10];
877            reader.read_exact(&mut buf, 10).await.unwrap();
878            assert_eq!(&buf, data_large.as_slice());
879
880            // Now write small data that should be buffered
881            writer.write_at(b"abc".to_vec(), 10).await.unwrap();
882            assert_eq!(writer.size().await, 13);
883
884            // Verify it's in buffer by reading through writer
885            let mut read_small_buf_vec = vec![0u8; 3].into();
886            read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
887            assert_eq!(read_small_buf_vec.as_ref(), b"abc");
888
889            writer.sync().await.unwrap();
890
891            // Verify final state
892            let (blob_check2, size_check2) = context
893                .open("partition", b"write_direct_size")
894                .await
895                .unwrap();
896            assert_eq!(size_check2, 13);
897            let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
898            let mut buf2 = vec![0u8; 13];
899            reader2.read_exact(&mut buf2, 13).await.unwrap();
900            assert_eq!(&buf2[10..], b"abc".as_slice());
901        });
902    }
903
904    #[test_traced]
905    fn test_write_overwrite_and_extend_in_buffer() {
906        let executor = deterministic::Runner::default();
907        executor.start(|context| async move {
908            // Test complex buffer operations: overwrite and extend within capacity
909            let (blob, size) = context
910                .open("partition", b"overwrite_extend_buf")
911                .await
912                .unwrap();
913            let writer = Write::new(blob.clone(), size, NZUsize!(15));
914
915            // Write initial data
916            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
917            assert_eq!(writer.size().await, 10);
918
919            // Overwrite and extend within buffer capacity
920            writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
921            assert_eq!(writer.size().await, 15);
922
923            // Verify buffer content through writer
924            let mut read_buf_vec = vec![0u8; 15].into();
925            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
926            assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
927
928            writer.sync().await.unwrap();
929
930            // Verify persisted result
931            let (blob_check, size_check) = context
932                .open("partition", b"overwrite_extend_buf")
933                .await
934                .unwrap();
935            assert_eq!(size_check, 15);
936            let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
937            let mut final_buf = vec![0u8; 15];
938            reader.read_exact(&mut final_buf, 15).await.unwrap();
939            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
940        });
941    }
942
943    #[test_traced]
944    fn test_write_at_size() {
945        let executor = deterministic::Runner::default();
946        executor.start(|context| async move {
947            // Test writing at the current logical end of the blob
948            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
949            let writer = Write::new(blob.clone(), size, NZUsize!(20));
950
951            // Write initial data
952            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
953            assert_eq!(writer.size().await, 10);
954            writer.sync().await.unwrap();
955
956            // Append at the current size (logical end)
957            writer
958                .write_at(b"abc".to_vec(), writer.size().await)
959                .await
960                .unwrap();
961            assert_eq!(writer.size().await, 13);
962            writer.sync().await.unwrap();
963
964            // Verify complete result
965            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
966            assert_eq!(size_check, 13);
967            let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
968            let mut buf = vec![0u8; 13];
969            reader.read_exact(&mut buf, 13).await.unwrap();
970            assert_eq!(&buf, b"0123456789abc");
971        });
972    }
973
974    #[test_traced]
975    fn test_write_at_size_multiple_appends() {
976        let executor = deterministic::Runner::default();
977        executor.start(|context| async move {
978            // Test multiple appends using writer.size()
979            let (blob, size) = context
980                .open("partition", b"write_multiple_appends_at_size")
981                .await
982                .unwrap();
983            let writer = Write::new(blob.clone(), size, NZUsize!(5)); // Small buffer
984
985            // First write
986            writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
987            assert_eq!(writer.size().await, 3);
988            writer.sync().await.unwrap();
989            assert_eq!(writer.size().await, 3);
990
991            // Append using size()
992            writer
993                .write_at(b"BBB".to_vec(), writer.size().await)
994                .await
995                .unwrap();
996            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
997            writer.sync().await.unwrap();
998            assert_eq!(writer.size().await, 6);
999
1000            // Append again using size()
1001            writer
1002                .write_at(b"CCC".to_vec(), writer.size().await)
1003                .await
1004                .unwrap();
1005            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
1006            writer.sync().await.unwrap();
1007            assert_eq!(writer.size().await, 9);
1008
1009            // Verify final content
1010            let (blob_check, size_check) = context
1011                .open("partition", b"write_multiple_appends_at_size")
1012                .await
1013                .unwrap();
1014            assert_eq!(size_check, 9);
1015            let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
1016            let mut buf = vec![0u8; 9];
1017            reader.read_exact(&mut buf, 9).await.unwrap();
1018            assert_eq!(&buf, b"AAABBBCCC");
1019        });
1020    }
1021
1022    #[test_traced]
1023    fn test_write_non_contiguous_then_append_at_size() {
1024        let executor = deterministic::Runner::default();
1025        executor.start(|context| async move {
1026            // Test writing non-contiguously, then appending at the new size
1027            let (blob, size) = context
1028                .open("partition", b"write_non_contiguous_then_append")
1029                .await
1030                .unwrap();
1031            let writer = Write::new(blob.clone(), size, NZUsize!(10));
1032
1033            // Initial buffered write
1034            writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); // 7 bytes
1035            assert_eq!(writer.size().await, 7);
1036            // Buffer contains "INITIAL", inner.position = 0
1037
1038            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1039            writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1040            assert_eq!(writer.size().await, 29);
1041            writer.sync().await.unwrap();
1042            assert_eq!(writer.size().await, 29);
1043
1044            // Append at the new size
1045            writer
1046                .write_at(b"APPEND".to_vec(), writer.size().await)
1047                .await
1048                .unwrap();
1049            assert_eq!(writer.size().await, 35); // 29 + 6
1050            writer.sync().await.unwrap();
1051            assert_eq!(writer.size().await, 35);
1052
1053            // Verify final content
1054            let (blob_check, size_check) = context
1055                .open("partition", b"write_non_contiguous_then_append")
1056                .await
1057                .unwrap();
1058            assert_eq!(size_check, 35);
1059            let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
1060            let mut buf = vec![0u8; 35];
1061            reader.read_exact(&mut buf, 35).await.unwrap();
1062
1063            let mut expected = vec![0u8; 35];
1064            expected[0..7].copy_from_slice(b"INITIAL");
1065            expected[20..29].copy_from_slice(b"NONCONTIG");
1066            expected[29..35].copy_from_slice(b"APPEND");
1067            assert_eq!(buf, expected);
1068        });
1069    }
1070
1071    #[test_traced]
1072    fn test_resize_then_append_at_size() {
1073        let executor = deterministic::Runner::default();
1074        executor.start(|context| async move {
1075            // Test truncating, then appending at the new size
1076            let (blob, size) = context
1077                .open("partition", b"resize_then_append_at_size")
1078                .await
1079                .unwrap();
1080            let writer = Write::new(blob.clone(), size, NZUsize!(10));
1081
1082            // Write initial data and sync
1083            writer
1084                .write_at(b"0123456789ABCDEF".to_vec(), 0)
1085                .await
1086                .unwrap(); // 16 bytes
1087            assert_eq!(writer.size().await, 16);
1088            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1089            assert_eq!(writer.size().await, 16);
1090
1091            // Resize
1092            let resize_to = 5;
1093            writer.resize(resize_to).await.unwrap();
1094            // after resize, inner.position should be `resize_to` (5)
1095            // buffer should be empty
1096            assert_eq!(writer.size().await, resize_to);
1097            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1098            assert_eq!(writer.size().await, resize_to);
1099
1100            // Append at the new (resized) size
1101            writer
1102                .write_at(b"XXXXX".to_vec(), writer.size().await)
1103                .await
1104                .unwrap(); // 5 bytes
1105                           // inner.buffer = "XXXXX", inner.position = 5
1106            assert_eq!(writer.size().await, 10); // 5 (resized) + 5 (XXXXX)
1107            writer.sync().await.unwrap();
1108            assert_eq!(writer.size().await, 10);
1109
1110            // Verify final content
1111            let (blob_check, size_check) = context
1112                .open("partition", b"resize_then_append_at_size")
1113                .await
1114                .unwrap();
1115            assert_eq!(size_check, 10);
1116            let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
1117            let mut buf = vec![0u8; 10];
1118            reader.read_exact(&mut buf, 10).await.unwrap();
1119            assert_eq!(&buf, b"01234XXXXX");
1120        });
1121    }
1122}