Skip to main content

commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3pub mod paged;
4mod read;
5mod tip;
6mod write;
7
8pub use read::Read;
9pub use write::Write;
10
11#[cfg(test)]
12mod tests {
13    use super::*;
14    use crate::{deterministic, Blob as _, Error, IoBufMut, Runner, Storage};
15    use commonware_macros::test_traced;
16    use commonware_utils::NZUsize;
17
18    #[test_traced]
19    fn test_read_basic() {
20        let executor = deterministic::Runner::default();
21        executor.start(|context| async move {
22            // Test basic buffered reading functionality with sequential reads
23            let data = b"Hello, world! This is a test.";
24            let (blob, size) = context.open("partition", b"test").await.unwrap();
25            assert_eq!(size, 0);
26            blob.write_at(0, data).await.unwrap();
27            let size = data.len() as u64;
28
29            // Create a buffered reader with small buffer to test refilling
30            let mut reader = Read::new(blob, size, NZUsize!(10));
31
32            // Read some data
33            let mut buf = [0u8; 5];
34            reader.read_exact(&mut buf, 5).await.unwrap();
35            assert_eq!(&buf, b"Hello");
36
37            // Read more data that requires a buffer refill
38            let mut buf = [0u8; 14];
39            reader.read_exact(&mut buf, 14).await.unwrap();
40            assert_eq!(&buf, b", world! This ");
41
42            // Verify position tracking
43            assert_eq!(reader.position(), 19);
44
45            // Read the remaining data
46            let mut buf = [0u8; 10];
47            reader.read_exact(&mut buf, 7).await.unwrap();
48            assert_eq!(&buf[..7], b"is a te");
49
50            // Attempt to read beyond the end should fail
51            let mut buf = [0u8; 5];
52            let result = reader.read_exact(&mut buf, 5).await;
53            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
54        });
55    }
56
57    #[test_traced]
58    fn test_read_cross_boundary() {
59        let executor = deterministic::Runner::default();
60        executor.start(|context| async move {
61            // Test reading data that spans multiple buffer refills
62            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
63            let (blob, size) = context.open("partition", b"test").await.unwrap();
64            assert_eq!(size, 0);
65            blob.write_at(0, data).await.unwrap();
66            let size = data.len() as u64;
67
68            // Use a buffer smaller than the total data size
69            let mut reader = Read::new(blob, size, NZUsize!(10));
70
71            // Read data that crosses buffer boundaries
72            let mut buf = [0u8; 15];
73            reader.read_exact(&mut buf, 15).await.unwrap();
74            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
75
76            // Verify position tracking
77            assert_eq!(reader.position(), 15);
78
79            // Read the remaining data
80            let mut buf = [0u8; 11];
81            reader.read_exact(&mut buf, 11).await.unwrap();
82            assert_eq!(&buf, b"PQRSTUVWXYZ");
83
84            // Verify we're at the end
85            assert_eq!(reader.position(), 26);
86            assert_eq!(reader.blob_remaining(), 0);
87        });
88    }
89
90    // Regression test for https://github.com/commonwarexyz/monorepo/issues/1348
91    #[test_traced]
92    fn test_read_to_end_then_rewind_and_read_again() {
93        let executor = deterministic::Runner::default();
94        executor.start(|context| async move {
95            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
96            let (blob, size) = context.open("partition", b"test").await.unwrap();
97            assert_eq!(size, 0);
98            blob.write_at(0, data).await.unwrap();
99            let size = data.len() as u64;
100
101            let mut reader = Read::new(blob, size, NZUsize!(20));
102
103            // Read data that crosses buffer boundaries
104            let mut buf = [0u8; 21];
105            reader.read_exact(&mut buf, 21).await.unwrap();
106            assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
107
108            // Verify position tracking
109            assert_eq!(reader.position(), 21);
110
111            // Read the remaining data
112            let mut buf = [0u8; 5];
113            reader.read_exact(&mut buf, 5).await.unwrap();
114            assert_eq!(&buf, b"VWXYZ");
115
116            // Rewind and read again
117            reader.seek_to(0).unwrap();
118            let mut buf = [0u8; 21];
119            reader.read_exact(&mut buf, 21).await.unwrap();
120            assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
121        });
122    }
123
124    #[test_traced]
125    fn test_read_with_known_size() {
126        let executor = deterministic::Runner::default();
127        executor.start(|context| async move {
128            // Test reader behavior with known blob size limits
129            let data = b"This is a test with known size limitations.";
130            let (blob, size) = context.open("partition", b"test").await.unwrap();
131            assert_eq!(size, 0);
132            blob.write_at(0, data).await.unwrap();
133            let size = data.len() as u64;
134
135            // Create a buffered reader with buffer smaller than total data
136            let mut reader = Read::new(blob, size, NZUsize!(10));
137
138            // Check initial remaining bytes
139            assert_eq!(reader.blob_remaining(), size);
140
141            // Read partial data
142            let mut buf = [0u8; 5];
143            reader.read_exact(&mut buf, 5).await.unwrap();
144            assert_eq!(&buf, b"This ");
145
146            // Check remaining bytes after partial read
147            assert_eq!(reader.blob_remaining(), size - 5);
148
149            // Read exactly up to the size limit
150            let mut buf = vec![0u8; (size - 5) as usize];
151            reader
152                .read_exact(&mut buf, (size - 5) as usize)
153                .await
154                .unwrap();
155            assert_eq!(&buf, b"is a test with known size limitations.");
156
157            // Verify we're at the end
158            assert_eq!(reader.blob_remaining(), 0);
159
160            // Reading beyond the end should fail
161            let mut buf = [0u8; 1];
162            let result = reader.read_exact(&mut buf, 1).await;
163            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
164        });
165    }
166
167    #[test_traced]
168    fn test_read_large_data() {
169        let executor = deterministic::Runner::default();
170        executor.start(|context| async move {
171            // Test reading large amounts of data in chunks
172            let data_size = 1024 * 256; // 256KB of data
173            let data = vec![0x42; data_size];
174            let (blob, size) = context.open("partition", b"test").await.unwrap();
175            assert_eq!(size, 0);
176            blob.write_at(0, data.clone()).await.unwrap();
177            let size = data.len() as u64;
178
179            // Use a buffer much smaller than the total data
180            let mut reader = Read::new(blob, size, NZUsize!(64 * 1024)); // 64KB buffer
181
182            // Read all data in smaller chunks
183            let mut total_read = 0;
184            let chunk_size = 8 * 1024; // 8KB chunks
185            let mut buf = vec![0u8; chunk_size];
186
187            while total_read < data_size {
188                let to_read = std::cmp::min(chunk_size, data_size - total_read);
189                reader
190                    .read_exact(&mut buf[..to_read], to_read)
191                    .await
192                    .unwrap();
193
194                // Verify data integrity
195                assert!(
196                    buf[..to_read].iter().all(|&b| b == 0x42),
197                    "Data at position {total_read} is not correct"
198                );
199
200                total_read += to_read;
201            }
202
203            // Verify we read everything
204            assert_eq!(total_read, data_size);
205
206            // Reading beyond the end should fail
207            let mut extra_buf = [0u8; 1];
208            let result = reader.read_exact(&mut extra_buf, 1).await;
209            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
210        });
211    }
212
213    #[test_traced]
214    fn test_read_exact_size_reads() {
215        let executor = deterministic::Runner::default();
216        executor.start(|context| async move {
217            // Create a blob with exactly 2.5 buffer sizes of data
218            let buffer_size = 1024;
219            let data_size = buffer_size * 5 / 2; // 2.5 buffers
220            let data = vec![0x37; data_size];
221
222            let (blob, size) = context.open("partition", b"test").await.unwrap();
223            assert_eq!(size, 0);
224            blob.write_at(0, data.clone()).await.unwrap();
225            let size = data.len() as u64;
226
227            let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
228
229            // Read exactly one buffer size
230            let mut buf1 = vec![0u8; buffer_size];
231            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
232            assert!(buf1.iter().all(|&b| b == 0x37));
233
234            // Read exactly one buffer size more
235            let mut buf2 = vec![0u8; buffer_size];
236            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
237            assert!(buf2.iter().all(|&b| b == 0x37));
238
239            // Read the remaining half buffer
240            let half_buffer = buffer_size / 2;
241            let mut buf3 = vec![0u8; half_buffer];
242            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
243            assert!(buf3.iter().all(|&b| b == 0x37));
244
245            // Verify we're at the end
246            assert_eq!(reader.blob_remaining(), 0);
247            assert_eq!(reader.position(), size);
248        });
249    }
250
251    #[test_traced]
252    fn test_read_seek_to() {
253        let executor = deterministic::Runner::default();
254        executor.start(|context| async move {
255            // Create a memory blob with some test data
256            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
257            let (blob, size) = context.open("partition", b"test").await.unwrap();
258            assert_eq!(size, 0);
259            blob.write_at(0, data).await.unwrap();
260            let size = data.len() as u64;
261
262            // Create a buffer reader
263            let mut reader = Read::new(blob, size, NZUsize!(10));
264
265            // Read some data to advance the position
266            let mut buf = [0u8; 5];
267            reader.read_exact(&mut buf, 5).await.unwrap();
268            assert_eq!(&buf, b"ABCDE");
269            assert_eq!(reader.position(), 5);
270
271            // Seek to a specific position
272            reader.seek_to(10).unwrap();
273            assert_eq!(reader.position(), 10);
274
275            // Read data from the new position
276            let mut buf = [0u8; 5];
277            reader.read_exact(&mut buf, 5).await.unwrap();
278            assert_eq!(&buf, b"KLMNO");
279
280            // Seek to beginning
281            reader.seek_to(0).unwrap();
282            assert_eq!(reader.position(), 0);
283
284            let mut buf = [0u8; 5];
285            reader.read_exact(&mut buf, 5).await.unwrap();
286            assert_eq!(&buf, b"ABCDE");
287
288            // Seek to end
289            reader.seek_to(size).unwrap();
290            assert_eq!(reader.position(), size);
291
292            // Trying to read should fail
293            let mut buf = [0u8; 1];
294            let result = reader.read_exact(&mut buf, 1).await;
295            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
296
297            // Seek beyond end should fail
298            let result = reader.seek_to(size + 10);
299            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
300        });
301    }
302
303    #[test_traced]
304    fn test_read_seek_with_refill() {
305        let executor = deterministic::Runner::default();
306        executor.start(|context| async move {
307            // Create a memory blob with longer data
308            let data = vec![0x41; 1000]; // 1000 'A' characters
309            let (blob, size) = context.open("partition", b"test").await.unwrap();
310            assert_eq!(size, 0);
311            blob.write_at(0, data.clone()).await.unwrap();
312            let size = data.len() as u64;
313
314            // Create a buffer reader with small buffer
315            let mut reader = Read::new(blob, size, NZUsize!(10));
316
317            // Read some data
318            let mut buf = [0u8; 5];
319            reader.read_exact(&mut buf, 5).await.unwrap();
320
321            // Seek far ahead, past the current buffer
322            reader.seek_to(500).unwrap();
323
324            // Read data - should get data from position 500
325            let mut buf = [0u8; 5];
326            reader.read_exact(&mut buf, 5).await.unwrap();
327            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
328            assert_eq!(reader.position(), 505);
329
330            // Seek backwards
331            reader.seek_to(100).unwrap();
332
333            // Read again - should be at position 100
334            let mut buf = [0u8; 5];
335            reader.read_exact(&mut buf, 5).await.unwrap();
336            assert_eq!(reader.position(), 105);
337        });
338    }
339
340    #[test_traced]
341    fn test_read_resize() {
342        let executor = deterministic::Runner::default();
343        executor.start(|context| async move {
344            // Create a memory blob with some test data
345            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
346            let (blob, size) = context.open("partition", b"test").await.unwrap();
347            assert_eq!(size, 0);
348            blob.write_at(0, data).await.unwrap();
349            let data_len = data.len() as u64;
350
351            // Create a buffer reader
352            let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
353
354            // Resize the blob to half its size
355            let resize_len = data_len / 2;
356            reader.resize(resize_len).await.unwrap();
357
358            // Reopen to check truncation
359            let (blob, size) = context.open("partition", b"test").await.unwrap();
360            assert_eq!(size, resize_len, "Blob should be resized to half size");
361
362            // Create a new buffer and read to verify truncation
363            let mut new_reader = Read::new(blob, size, NZUsize!(10));
364
365            // Read the content
366            let mut buf = vec![0u8; size as usize];
367            new_reader
368                .read_exact(&mut buf, size as usize)
369                .await
370                .unwrap();
371            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
372
373            // Reading beyond resized size should fail
374            let mut extra_buf = [0u8; 1];
375            let result = new_reader.read_exact(&mut extra_buf, 1).await;
376            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
377
378            // Test resize to larger size
379            new_reader.resize(data_len * 2).await.unwrap();
380
381            // Reopen to check resize
382            let (blob, new_size) = context.open("partition", b"test").await.unwrap();
383            assert_eq!(new_size, data_len * 2);
384
385            // Create a new buffer and read to verify resize
386            let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
387            let mut buf = vec![0u8; new_size as usize];
388            new_reader
389                .read_exact(&mut buf, new_size as usize)
390                .await
391                .unwrap();
392            assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
393            assert_eq!(
394                &buf[size as usize..],
395                vec![0u8; new_size as usize - size as usize]
396            );
397        });
398    }
399
400    #[test_traced]
401    fn test_read_resize_to_zero() {
402        let executor = deterministic::Runner::default();
403        executor.start(|context| async move {
404            // Create a memory blob with some test data
405            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
406            let data_len = data.len() as u64;
407            let (blob, size) = context.open("partition", b"test").await.unwrap();
408            assert_eq!(size, 0);
409            blob.write_at(0, data).await.unwrap();
410
411            // Create a buffer reader
412            let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
413
414            // Resize the blob to zero
415            reader.resize(0).await.unwrap();
416
417            // Reopen to check truncation
418            let (blob, size) = context.open("partition", b"test").await.unwrap();
419            assert_eq!(size, 0, "Blob should be resized to zero");
420
421            // Create a new buffer and try to read (should fail)
422            let mut new_reader = Read::new(blob, size, NZUsize!(10));
423
424            // Reading from resized blob should fail
425            let mut buf = [0u8; 1];
426            let result = new_reader.read_exact(&mut buf, 1).await;
427            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
428        });
429    }
430
431    #[test_traced]
432    fn test_write_basic() {
433        let executor = deterministic::Runner::default();
434        executor.start(|context| async move {
435            // Test basic buffered write and sync functionality
436            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
437            assert_eq!(size, 0);
438
439            let writer = Write::new(blob.clone(), size, NZUsize!(8));
440            writer.write_at(0, b"hello").await.unwrap();
441            assert_eq!(writer.size().await, 5);
442            writer.sync().await.unwrap();
443            assert_eq!(writer.size().await, 5);
444
445            // Verify data was written correctly
446            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
447            assert_eq!(size, 5);
448            let mut reader = Read::new(blob, size, NZUsize!(8));
449            let mut buf = [0u8; 5];
450            reader.read_exact(&mut buf, 5).await.unwrap();
451            assert_eq!(&buf, b"hello");
452        });
453    }
454
455    #[test_traced]
456    fn test_write_multiple_flushes() {
457        let executor = deterministic::Runner::default();
458        executor.start(|context| async move {
459            // Test writes that cause buffer flushes due to capacity limits
460            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
461            assert_eq!(size, 0);
462
463            let writer = Write::new(blob.clone(), size, NZUsize!(4));
464            writer.write_at(0, b"abc").await.unwrap();
465            assert_eq!(writer.size().await, 3);
466            writer.write_at(3, b"defg").await.unwrap();
467            assert_eq!(writer.size().await, 7);
468            writer.sync().await.unwrap();
469
470            // Verify the final result
471            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
472            assert_eq!(size, 7);
473            let mut reader = Read::new(blob, size, NZUsize!(4));
474            let mut buf = [0u8; 7];
475            reader.read_exact(&mut buf, 7).await.unwrap();
476            assert_eq!(&buf, b"abcdefg");
477        });
478    }
479
480    #[test_traced]
481    fn test_write_large_data() {
482        let executor = deterministic::Runner::default();
483        executor.start(|context| async move {
484            // Test writing data larger than buffer capacity (direct write)
485            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
486            assert_eq!(size, 0);
487
488            let writer = Write::new(blob.clone(), size, NZUsize!(4));
489            writer.write_at(0, b"abc").await.unwrap();
490            assert_eq!(writer.size().await, 3);
491            writer
492                .write_at(3, b"defghijklmnopqrstuvwxyz")
493                .await
494                .unwrap();
495            assert_eq!(writer.size().await, 26);
496            writer.sync().await.unwrap();
497            assert_eq!(writer.size().await, 26);
498
499            // Verify the complete data
500            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
501            assert_eq!(size, 26);
502            let mut reader = Read::new(blob, size, NZUsize!(4));
503            let mut buf = [0u8; 26];
504            reader.read_exact(&mut buf, 26).await.unwrap();
505            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
506        });
507    }
508
509    #[test_traced]
510    fn test_write_append_to_buffer() {
511        let executor = deterministic::Runner::default();
512        executor.start(|context| async move {
513            // Test sequential appends that exceed buffer capacity
514            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
515            let writer = Write::new(blob.clone(), size, NZUsize!(10));
516
517            // Write data that fits in buffer
518            writer.write_at(0, b"hello").await.unwrap();
519            assert_eq!(writer.size().await, 5);
520
521            // Append data that causes buffer flush
522            writer.write_at(5, b" world").await.unwrap();
523            writer.sync().await.unwrap();
524            assert_eq!(writer.size().await, 11);
525
526            // Verify the complete result
527            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
528            assert_eq!(size, 11);
529            let mut reader = Read::new(blob, size, NZUsize!(10));
530            let mut buf = vec![0u8; 11];
531            reader.read_exact(&mut buf, 11).await.unwrap();
532            assert_eq!(&buf, b"hello world");
533        });
534    }
535
536    #[test_traced]
537    fn test_write_into_middle_of_buffer() {
538        let executor = deterministic::Runner::default();
539        executor.start(|context| async move {
540            // Test overwriting data within the buffer and extending it
541            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
542            let writer = Write::new(blob.clone(), size, NZUsize!(20));
543
544            // Initial write
545            writer.write_at(0, b"abcdefghij").await.unwrap();
546            assert_eq!(writer.size().await, 10);
547
548            // Overwrite middle section
549            writer.write_at(2, b"01234").await.unwrap();
550            assert_eq!(writer.size().await, 10);
551            writer.sync().await.unwrap();
552
553            // Verify overwrite result
554            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
555            assert_eq!(size, 10);
556            let mut reader = Read::new(blob, size, NZUsize!(10));
557            let mut buf = vec![0u8; 10];
558            reader.read_exact(&mut buf, 10).await.unwrap();
559            assert_eq!(&buf, b"ab01234hij");
560
561            // Extend buffer and do partial overwrite
562            writer.write_at(10, b"klmnopqrst").await.unwrap();
563            assert_eq!(writer.size().await, 20);
564            writer.write_at(9, b"wxyz").await.unwrap();
565            assert_eq!(writer.size().await, 20);
566            writer.sync().await.unwrap();
567
568            // Verify final result
569            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
570            assert_eq!(size, 20);
571            let mut reader = Read::new(blob, size, NZUsize!(20));
572            let mut buf = vec![0u8; 20];
573            reader.read_exact(&mut buf, 20).await.unwrap();
574            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
575        });
576    }
577
578    #[test_traced]
579    fn test_write_before_buffer() {
580        let executor = deterministic::Runner::default();
581        executor.start(|context| async move {
582            // Test writing at offsets before the current buffer position
583            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
584            let writer = Write::new(blob.clone(), size, NZUsize!(10));
585
586            // Write data at a later offset first
587            writer.write_at(10, b"0123456789").await.unwrap();
588            assert_eq!(writer.size().await, 20);
589
590            // Write at an earlier offset (should flush buffer first)
591            writer.write_at(0, b"abcde").await.unwrap();
592            assert_eq!(writer.size().await, 20);
593            writer.sync().await.unwrap();
594
595            // Verify data placement with gap
596            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
597            assert_eq!(size, 20);
598            let mut reader = Read::new(blob, size, NZUsize!(20));
599            let mut buf = vec![0u8; 20];
600            reader.read_exact(&mut buf, 20).await.unwrap();
601            let mut expected = vec![0u8; 20];
602            expected[0..5].copy_from_slice("abcde".as_bytes());
603            expected[10..20].copy_from_slice("0123456789".as_bytes());
604            assert_eq!(buf, expected);
605
606            // Fill the gap between existing data
607            writer.write_at(5, b"fghij").await.unwrap();
608            assert_eq!(writer.size().await, 20);
609            writer.sync().await.unwrap();
610            assert_eq!(writer.size().await, 20);
611
612            // Verify gap is filled
613            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
614            assert_eq!(size, 20);
615            let mut reader = Read::new(blob, size, NZUsize!(20));
616            let mut buf = vec![0u8; 20];
617            reader.read_exact(&mut buf, 20).await.unwrap();
618            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
619            assert_eq!(buf, expected);
620        });
621    }
622
623    #[test_traced]
624    fn test_write_resize() {
625        let executor = deterministic::Runner::default();
626        executor.start(|context| async move {
627            // Test blob resize functionality and subsequent writes
628            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
629            let writer = Write::new(blob, size, NZUsize!(10));
630
631            // Write initial data
632            writer.write_at(0, b"hello world").await.unwrap();
633            assert_eq!(writer.size().await, 11);
634            writer.sync().await.unwrap();
635            assert_eq!(writer.size().await, 11);
636
637            let (blob_check, size_check) =
638                context.open("partition", b"resize_write").await.unwrap();
639            assert_eq!(size_check, 11);
640            drop(blob_check);
641
642            // Resize to smaller size
643            writer.resize(5).await.unwrap();
644            assert_eq!(writer.size().await, 5);
645            writer.sync().await.unwrap();
646
647            // Verify resize
648            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
649            assert_eq!(size, 5);
650            let mut reader = Read::new(blob, size, NZUsize!(5));
651            let mut buf = vec![0u8; 5];
652            reader.read_exact(&mut buf, 5).await.unwrap();
653            assert_eq!(&buf, b"hello");
654
655            // Write to resized blob
656            writer.write_at(0, b"X").await.unwrap();
657            assert_eq!(writer.size().await, 5);
658            writer.sync().await.unwrap();
659
660            // Verify overwrite
661            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
662            assert_eq!(size, 5);
663            let mut reader = Read::new(blob, size, NZUsize!(5));
664            let mut buf = vec![0u8; 5];
665            reader.read_exact(&mut buf, 5).await.unwrap();
666            assert_eq!(&buf, b"Xello");
667
668            // Test resize to larger size
669            writer.resize(10).await.unwrap();
670            assert_eq!(writer.size().await, 10);
671            writer.sync().await.unwrap();
672
673            // Verify resize
674            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
675            assert_eq!(size, 10);
676            let mut reader = Read::new(blob, size, NZUsize!(10));
677            let mut buf = vec![0u8; 10];
678            reader.read_exact(&mut buf, 10).await.unwrap();
679            assert_eq!(&buf[0..5], b"Xello");
680            assert_eq!(&buf[5..10], [0u8; 5]);
681
682            // Test resize to zero
683            let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
684            let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
685            writer_zero.write_at(0, b"some data").await.unwrap();
686            assert_eq!(writer_zero.size().await, 9);
687            writer_zero.sync().await.unwrap();
688            assert_eq!(writer_zero.size().await, 9);
689            writer_zero.resize(0).await.unwrap();
690            assert_eq!(writer_zero.size().await, 0);
691            writer_zero.sync().await.unwrap();
692            assert_eq!(writer_zero.size().await, 0);
693
694            // Ensure the blob is empty
695            let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
696            assert_eq!(size_z, 0);
697        });
698    }
699
700    #[test_traced]
701    fn test_write_read_at_on_writer() {
702        let executor = deterministic::Runner::default();
703        executor.start(|context| async move {
704            // Test reading through writer's read_at method (buffer + blob reads)
705            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
706            let writer = Write::new(blob.clone(), size, NZUsize!(10));
707
708            // Write data that stays in buffer
709            writer.write_at(0, b"buffered").await.unwrap();
710            assert_eq!(writer.size().await, 8);
711
712            // Read from buffer via writer
713            let read_buf_vec = writer
714                .read_at(0, IoBufMut::zeroed(4))
715                .await
716                .unwrap()
717                .coalesce();
718            assert_eq!(read_buf_vec, b"buff");
719
720            let read_buf_vec = writer
721                .read_at(4, IoBufMut::zeroed(4))
722                .await
723                .unwrap()
724                .coalesce();
725            assert_eq!(read_buf_vec, b"ered");
726
727            // Reading past buffer end should fail
728            assert!(writer.read_at(8, IoBufMut::zeroed(1)).await.is_err());
729
730            // Write large data that flushes buffer
731            writer.write_at(8, b" and flushed").await.unwrap();
732            assert_eq!(writer.size().await, 20);
733            writer.sync().await.unwrap();
734            assert_eq!(writer.size().await, 20);
735
736            // Read from underlying blob through writer
737            let read_buf_vec_2 = writer
738                .read_at(0, IoBufMut::zeroed(4))
739                .await
740                .unwrap()
741                .coalesce();
742            assert_eq!(read_buf_vec_2, b"buff");
743
744            let read_buf_7_vec = writer
745                .read_at(13, IoBufMut::zeroed(7))
746                .await
747                .unwrap()
748                .coalesce();
749            assert_eq!(read_buf_7_vec, b"flushed");
750
751            // Buffer new data at the end
752            writer.write_at(20, b" more data").await.unwrap();
753            assert_eq!(writer.size().await, 30);
754
755            // Read newly buffered data
756            let read_buf_vec_3 = writer
757                .read_at(20, IoBufMut::zeroed(5))
758                .await
759                .unwrap()
760                .coalesce();
761            assert_eq!(read_buf_vec_3, b" more");
762
763            // Read spanning both blob and buffer
764            let combo_read_buf_vec = writer.read_at(16, IoBufMut::zeroed(12)).await.unwrap();
765            assert_eq!(combo_read_buf_vec.coalesce(), b"shed more da");
766
767            // Verify complete content by reopening
768            writer.sync().await.unwrap();
769            assert_eq!(writer.size().await, 30);
770            let (final_blob, final_size) =
771                context.open("partition", b"read_at_writer").await.unwrap();
772            assert_eq!(final_size, 30);
773            let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
774            let mut full_content = vec![0u8; 30];
775            final_reader
776                .read_exact(&mut full_content, 30)
777                .await
778                .unwrap();
779            assert_eq!(&full_content, b"buffered and flushed more data");
780        });
781    }
782
783    #[test_traced]
784    fn test_write_straddling_non_mergeable() {
785        let executor = deterministic::Runner::default();
786        executor.start(|context| async move {
787            // Test writes that cannot be merged into buffer (non-contiguous/too large)
788            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
789            let writer = Write::new(blob.clone(), size, NZUsize!(10));
790
791            // Fill buffer completely
792            writer.write_at(0, b"0123456789").await.unwrap();
793            assert_eq!(writer.size().await, 10);
794
795            // Write at non-contiguous offset (should flush then write directly)
796            writer.write_at(15, b"abc").await.unwrap();
797            assert_eq!(writer.size().await, 18);
798            writer.sync().await.unwrap();
799            assert_eq!(writer.size().await, 18);
800
801            // Verify data with gap
802            let (blob_check, size_check) =
803                context.open("partition", b"write_straddle").await.unwrap();
804            assert_eq!(size_check, 18);
805            let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
806            let mut buf = vec![0u8; 18];
807            reader.read_exact(&mut buf, 18).await.unwrap();
808
809            let mut expected = vec![0u8; 18];
810            expected[0..10].copy_from_slice(b"0123456789");
811            expected[15..18].copy_from_slice(b"abc");
812            assert_eq!(buf, expected);
813
814            // Test write that exceeds buffer capacity
815            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
816            let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
817            writer2.write_at(0, b"0123456789").await.unwrap();
818            assert_eq!(writer2.size().await, 10);
819
820            // Write large data that exceeds capacity
821            writer2.write_at(5, b"ABCDEFGHIJKL").await.unwrap();
822            assert_eq!(writer2.size().await, 17);
823            writer2.sync().await.unwrap();
824            assert_eq!(writer2.size().await, 17);
825
826            // Verify overwrite result
827            let (blob_check2, size_check2) =
828                context.open("partition", b"write_straddle2").await.unwrap();
829            assert_eq!(size_check2, 17);
830            let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
831            let mut buf2 = vec![0u8; 17];
832            reader2.read_exact(&mut buf2, 17).await.unwrap();
833            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
834        });
835    }
836
837    #[test_traced]
838    fn test_write_close() {
839        let executor = deterministic::Runner::default();
840        executor.start(|context| async move {
841            // Test that closing writer flushes and persists buffered data
842            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
843            let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
844            writer.write_at(0, b"pending").await.unwrap();
845            assert_eq!(writer.size().await, 7);
846
847            // Sync writer to persist data
848            writer.sync().await.unwrap();
849
850            // Verify data persistence
851            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
852            assert_eq!(size_check, 7);
853            let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
854            let mut buf = [0u8; 7];
855            reader.read_exact(&mut buf, 7).await.unwrap();
856            assert_eq!(&buf, b"pending");
857        });
858    }
859
860    #[test_traced]
861    fn test_write_direct_due_to_size() {
862        let executor = deterministic::Runner::default();
863        executor.start(|context| async move {
864            // Test direct writes when data exceeds buffer capacity
865            let (blob, size) = context
866                .open("partition", b"write_direct_size")
867                .await
868                .unwrap();
869            let writer = Write::new(blob.clone(), size, NZUsize!(5));
870
871            // Write data larger than buffer capacity (should write directly)
872            let data_large = b"0123456789";
873            writer.write_at(0, data_large).await.unwrap();
874            assert_eq!(writer.size().await, 10);
875
876            // Sync to ensure data is persisted
877            writer.sync().await.unwrap();
878
879            // Verify direct write worked
880            let (blob_check, size_check) = context
881                .open("partition", b"write_direct_size")
882                .await
883                .unwrap();
884            assert_eq!(size_check, 10);
885            let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
886            let mut buf = vec![0u8; 10];
887            reader.read_exact(&mut buf, 10).await.unwrap();
888            assert_eq!(&buf, data_large.as_slice());
889
890            // Now write small data that should be buffered
891            writer.write_at(10, b"abc").await.unwrap();
892            assert_eq!(writer.size().await, 13);
893
894            // Verify it's in buffer by reading through writer
895            let read_small_buf_vec = writer
896                .read_at(10, IoBufMut::zeroed(3))
897                .await
898                .unwrap()
899                .coalesce();
900            assert_eq!(read_small_buf_vec, b"abc");
901
902            writer.sync().await.unwrap();
903
904            // Verify final state
905            let (blob_check2, size_check2) = context
906                .open("partition", b"write_direct_size")
907                .await
908                .unwrap();
909            assert_eq!(size_check2, 13);
910            let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
911            let mut buf2 = vec![0u8; 13];
912            reader2.read_exact(&mut buf2, 13).await.unwrap();
913            assert_eq!(&buf2[10..], b"abc".as_slice());
914        });
915    }
916
917    #[test_traced]
918    fn test_write_overwrite_and_extend_in_buffer() {
919        let executor = deterministic::Runner::default();
920        executor.start(|context| async move {
921            // Test complex buffer operations: overwrite and extend within capacity
922            let (blob, size) = context
923                .open("partition", b"overwrite_extend_buf")
924                .await
925                .unwrap();
926            let writer = Write::new(blob.clone(), size, NZUsize!(15));
927
928            // Write initial data
929            writer.write_at(0, b"0123456789").await.unwrap();
930            assert_eq!(writer.size().await, 10);
931
932            // Overwrite and extend within buffer capacity
933            writer.write_at(5, b"ABCDEFGHIJ").await.unwrap();
934            assert_eq!(writer.size().await, 15);
935
936            // Verify buffer content through writer
937            let read_buf_vec = writer
938                .read_at(0, IoBufMut::zeroed(15))
939                .await
940                .unwrap()
941                .coalesce();
942            assert_eq!(read_buf_vec, b"01234ABCDEFGHIJ");
943
944            writer.sync().await.unwrap();
945
946            // Verify persisted result
947            let (blob_check, size_check) = context
948                .open("partition", b"overwrite_extend_buf")
949                .await
950                .unwrap();
951            assert_eq!(size_check, 15);
952            let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
953            let mut final_buf = vec![0u8; 15];
954            reader.read_exact(&mut final_buf, 15).await.unwrap();
955            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
956        });
957    }
958
959    #[test_traced]
960    fn test_write_at_size() {
961        let executor = deterministic::Runner::default();
962        executor.start(|context| async move {
963            // Test writing at the current logical end of the blob
964            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
965            let writer = Write::new(blob.clone(), size, NZUsize!(20));
966
967            // Write initial data
968            writer.write_at(0, b"0123456789").await.unwrap();
969            assert_eq!(writer.size().await, 10);
970            writer.sync().await.unwrap();
971
972            // Append at the current size (logical end)
973            writer.write_at(writer.size().await, b"abc").await.unwrap();
974            assert_eq!(writer.size().await, 13);
975            writer.sync().await.unwrap();
976
977            // Verify complete result
978            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
979            assert_eq!(size_check, 13);
980            let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
981            let mut buf = vec![0u8; 13];
982            reader.read_exact(&mut buf, 13).await.unwrap();
983            assert_eq!(&buf, b"0123456789abc");
984        });
985    }
986
987    #[test_traced]
988    fn test_write_at_size_multiple_appends() {
989        let executor = deterministic::Runner::default();
990        executor.start(|context| async move {
991            // Test multiple appends using writer.size()
992            let (blob, size) = context
993                .open("partition", b"write_multiple_appends_at_size")
994                .await
995                .unwrap();
996            let writer = Write::new(blob.clone(), size, NZUsize!(5)); // Small buffer
997
998            // First write
999            writer.write_at(0, b"AAA").await.unwrap();
1000            assert_eq!(writer.size().await, 3);
1001            writer.sync().await.unwrap();
1002            assert_eq!(writer.size().await, 3);
1003
1004            // Append using size()
1005            writer.write_at(writer.size().await, b"BBB").await.unwrap();
1006            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
1007            writer.sync().await.unwrap();
1008            assert_eq!(writer.size().await, 6);
1009
1010            // Append again using size()
1011            writer.write_at(writer.size().await, b"CCC").await.unwrap();
1012            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
1013            writer.sync().await.unwrap();
1014            assert_eq!(writer.size().await, 9);
1015
1016            // Verify final content
1017            let (blob_check, size_check) = context
1018                .open("partition", b"write_multiple_appends_at_size")
1019                .await
1020                .unwrap();
1021            assert_eq!(size_check, 9);
1022            let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
1023            let mut buf = vec![0u8; 9];
1024            reader.read_exact(&mut buf, 9).await.unwrap();
1025            assert_eq!(&buf, b"AAABBBCCC");
1026        });
1027    }
1028
1029    #[test_traced]
1030    fn test_write_non_contiguous_then_append_at_size() {
1031        let executor = deterministic::Runner::default();
1032        executor.start(|context| async move {
1033            // Test writing non-contiguously, then appending at the new size
1034            let (blob, size) = context
1035                .open("partition", b"write_non_contiguous_then_append")
1036                .await
1037                .unwrap();
1038            let writer = Write::new(blob.clone(), size, NZUsize!(10));
1039
1040            // Initial buffered write
1041            writer.write_at(0, b"INITIAL").await.unwrap(); // 7 bytes
1042            assert_eq!(writer.size().await, 7);
1043            // Buffer contains "INITIAL", inner.position = 0
1044
1045            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1046            writer.write_at(20, b"NONCONTIG").await.unwrap();
1047            assert_eq!(writer.size().await, 29);
1048            writer.sync().await.unwrap();
1049            assert_eq!(writer.size().await, 29);
1050
1051            // Append at the new size
1052            writer
1053                .write_at(writer.size().await, b"APPEND")
1054                .await
1055                .unwrap();
1056            assert_eq!(writer.size().await, 35); // 29 + 6
1057            writer.sync().await.unwrap();
1058            assert_eq!(writer.size().await, 35);
1059
1060            // Verify final content
1061            let (blob_check, size_check) = context
1062                .open("partition", b"write_non_contiguous_then_append")
1063                .await
1064                .unwrap();
1065            assert_eq!(size_check, 35);
1066            let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
1067            let mut buf = vec![0u8; 35];
1068            reader.read_exact(&mut buf, 35).await.unwrap();
1069
1070            let mut expected = vec![0u8; 35];
1071            expected[0..7].copy_from_slice(b"INITIAL");
1072            expected[20..29].copy_from_slice(b"NONCONTIG");
1073            expected[29..35].copy_from_slice(b"APPEND");
1074            assert_eq!(buf, expected);
1075        });
1076    }
1077
1078    #[test_traced]
1079    fn test_resize_then_append_at_size() {
1080        let executor = deterministic::Runner::default();
1081        executor.start(|context| async move {
1082            // Test truncating, then appending at the new size
1083            let (blob, size) = context
1084                .open("partition", b"resize_then_append_at_size")
1085                .await
1086                .unwrap();
1087            let writer = Write::new(blob.clone(), size, NZUsize!(10));
1088
1089            // Write initial data and sync
1090            writer.write_at(0, b"0123456789ABCDEF").await.unwrap(); // 16 bytes
1091            assert_eq!(writer.size().await, 16);
1092            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1093            assert_eq!(writer.size().await, 16);
1094
1095            // Resize
1096            let resize_to = 5;
1097            writer.resize(resize_to).await.unwrap();
1098            // after resize, inner.position should be `resize_to` (5)
1099            // buffer should be empty
1100            assert_eq!(writer.size().await, resize_to);
1101            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1102            assert_eq!(writer.size().await, resize_to);
1103
1104            // Append at the new (resized) size
1105            writer
1106                .write_at(writer.size().await, b"XXXXX")
1107                .await
1108                .unwrap(); // 5 bytes
1109                           // inner.buffer = "XXXXX", inner.position = 5
1110            assert_eq!(writer.size().await, 10); // 5 (resized) + 5 (XXXXX)
1111            writer.sync().await.unwrap();
1112            assert_eq!(writer.size().await, 10);
1113
1114            // Verify final content
1115            let (blob_check, size_check) = context
1116                .open("partition", b"resize_then_append_at_size")
1117                .await
1118                .unwrap();
1119            assert_eq!(size_check, 10);
1120            let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
1121            let mut buf = vec![0u8; 10];
1122            reader.read_exact(&mut buf, 10).await.unwrap();
1123            assert_eq!(&buf, b"01234XXXXX");
1124        });
1125    }
1126}