commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3mod append;
4pub mod pool;
5mod read;
6mod tip;
7mod write;
8
9pub use append::Append;
10pub use pool::{Pool, PoolRef};
11pub use read::Read;
12pub use write::Write;
13
14#[cfg(test)]
15mod tests {
16    use super::*;
17    use crate::{deterministic, Blob as _, Error, Runner, Storage};
18    use commonware_macros::test_traced;
19
20    #[test_traced]
21    fn test_read_basic() {
22        let executor = deterministic::Runner::default();
23        executor.start(|context| async move {
24            // Test basic buffered reading functionality with sequential reads
25            let data = b"Hello, world! This is a test.";
26            let (blob, size) = context.open("partition", b"test").await.unwrap();
27            assert_eq!(size, 0);
28            blob.write_at(data.to_vec(), 0).await.unwrap();
29            let size = data.len() as u64;
30
31            // Create a buffered reader with small buffer to test refilling
32            let buffer_size = 10;
33            let mut reader = Read::new(blob, size, buffer_size);
34
35            // Read some data
36            let mut buf = [0u8; 5];
37            reader.read_exact(&mut buf, 5).await.unwrap();
38            assert_eq!(&buf, b"Hello");
39
40            // Read more data that requires a buffer refill
41            let mut buf = [0u8; 14];
42            reader.read_exact(&mut buf, 14).await.unwrap();
43            assert_eq!(&buf, b", world! This ");
44
45            // Verify position tracking
46            assert_eq!(reader.position(), 19);
47
48            // Read the remaining data
49            let mut buf = [0u8; 10];
50            reader.read_exact(&mut buf, 7).await.unwrap();
51            assert_eq!(&buf[..7], b"is a te");
52
53            // Attempt to read beyond the end should fail
54            let mut buf = [0u8; 5];
55            let result = reader.read_exact(&mut buf, 5).await;
56            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
57        });
58    }
59
60    #[test_traced]
61    #[should_panic(expected = "buffer size must be greater than zero")]
62    fn test_read_empty() {
63        let executor = deterministic::Runner::default();
64        executor.start(|context| async move {
65            // Test that creating a reader with zero buffer size panics
66            let data = b"Hello, world! This is a test.";
67            let (blob, size) = context.open("partition", b"test").await.unwrap();
68            assert_eq!(size, 0);
69            blob.write_at(data.to_vec(), 0).await.unwrap();
70            let size = data.len() as u64;
71
72            // This should panic
73            let buffer_size = 0;
74            Read::new(blob, size, buffer_size);
75        });
76    }
77
78    #[test_traced]
79    fn test_read_cross_boundary() {
80        let executor = deterministic::Runner::default();
81        executor.start(|context| async move {
82            // Test reading data that spans multiple buffer refills
83            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
84            let (blob, size) = context.open("partition", b"test").await.unwrap();
85            assert_eq!(size, 0);
86            blob.write_at(data.to_vec(), 0).await.unwrap();
87            let size = data.len() as u64;
88
89            // Use a buffer smaller than the total data size
90            let buffer_size = 10;
91            let mut reader = Read::new(blob, size, buffer_size);
92
93            // Read data that crosses buffer boundaries
94            let mut buf = [0u8; 15];
95            reader.read_exact(&mut buf, 15).await.unwrap();
96            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
97
98            // Verify position tracking
99            assert_eq!(reader.position(), 15);
100
101            // Read the remaining data
102            let mut buf = [0u8; 11];
103            reader.read_exact(&mut buf, 11).await.unwrap();
104            assert_eq!(&buf, b"PQRSTUVWXYZ");
105
106            // Verify we're at the end
107            assert_eq!(reader.position(), 26);
108            assert_eq!(reader.blob_remaining(), 0);
109        });
110    }
111
112    #[test_traced]
113    fn test_read_with_known_size() {
114        let executor = deterministic::Runner::default();
115        executor.start(|context| async move {
116            // Test reader behavior with known blob size limits
117            let data = b"This is a test with known size limitations.";
118            let (blob, size) = context.open("partition", b"test").await.unwrap();
119            assert_eq!(size, 0);
120            blob.write_at(data.to_vec(), 0).await.unwrap();
121            let size = data.len() as u64;
122
123            // Create a buffered reader with buffer smaller than total data
124            let buffer_size = 10;
125            let mut reader = Read::new(blob, size, buffer_size);
126
127            // Check initial remaining bytes
128            assert_eq!(reader.blob_remaining(), size);
129
130            // Read partial data
131            let mut buf = [0u8; 5];
132            reader.read_exact(&mut buf, 5).await.unwrap();
133            assert_eq!(&buf, b"This ");
134
135            // Check remaining bytes after partial read
136            assert_eq!(reader.blob_remaining(), size - 5);
137
138            // Read exactly up to the size limit
139            let mut buf = vec![0u8; (size - 5) as usize];
140            reader
141                .read_exact(&mut buf, (size - 5) as usize)
142                .await
143                .unwrap();
144            assert_eq!(&buf, b"is a test with known size limitations.");
145
146            // Verify we're at the end
147            assert_eq!(reader.blob_remaining(), 0);
148
149            // Reading beyond the end should fail
150            let mut buf = [0u8; 1];
151            let result = reader.read_exact(&mut buf, 1).await;
152            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
153        });
154    }
155
156    #[test_traced]
157    fn test_read_large_data() {
158        let executor = deterministic::Runner::default();
159        executor.start(|context| async move {
160            // Test reading large amounts of data in chunks
161            let data_size = 1024 * 256; // 256KB of data
162            let data = vec![0x42; data_size];
163            let (blob, size) = context.open("partition", b"test").await.unwrap();
164            assert_eq!(size, 0);
165            blob.write_at(data.clone(), 0).await.unwrap();
166            let size = data.len() as u64;
167
168            // Use a buffer much smaller than the total data
169            let buffer_size = 64 * 1024; // 64KB buffer
170            let mut reader = Read::new(blob, size, buffer_size);
171
172            // Read all data in smaller chunks
173            let mut total_read = 0;
174            let chunk_size = 8 * 1024; // 8KB chunks
175            let mut buf = vec![0u8; chunk_size];
176
177            while total_read < data_size {
178                let to_read = std::cmp::min(chunk_size, data_size - total_read);
179                reader
180                    .read_exact(&mut buf[..to_read], to_read)
181                    .await
182                    .unwrap();
183
184                // Verify data integrity
185                assert!(
186                    buf[..to_read].iter().all(|&b| b == 0x42),
187                    "Data at position {total_read} is not correct"
188                );
189
190                total_read += to_read;
191            }
192
193            // Verify we read everything
194            assert_eq!(total_read, data_size);
195
196            // Reading beyond the end should fail
197            let mut extra_buf = [0u8; 1];
198            let result = reader.read_exact(&mut extra_buf, 1).await;
199            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
200        });
201    }
202
203    #[test_traced]
204    fn test_read_exact_size_reads() {
205        let executor = deterministic::Runner::default();
206        executor.start(|context| async move {
207            // Create a blob with exactly 2.5 buffer sizes of data
208            let buffer_size = 1024;
209            let data_size = buffer_size * 5 / 2; // 2.5 buffers
210            let data = vec![0x37; data_size];
211
212            let (blob, size) = context.open("partition", b"test").await.unwrap();
213            assert_eq!(size, 0);
214            blob.write_at(data.clone(), 0).await.unwrap();
215            let size = data.len() as u64;
216
217            let mut reader = Read::new(blob, size, buffer_size);
218
219            // Read exactly one buffer size
220            let mut buf1 = vec![0u8; buffer_size];
221            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
222            assert!(buf1.iter().all(|&b| b == 0x37));
223
224            // Read exactly one buffer size more
225            let mut buf2 = vec![0u8; buffer_size];
226            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
227            assert!(buf2.iter().all(|&b| b == 0x37));
228
229            // Read the remaining half buffer
230            let half_buffer = buffer_size / 2;
231            let mut buf3 = vec![0u8; half_buffer];
232            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
233            assert!(buf3.iter().all(|&b| b == 0x37));
234
235            // Verify we're at the end
236            assert_eq!(reader.blob_remaining(), 0);
237            assert_eq!(reader.position(), size);
238        });
239    }
240
241    #[test_traced]
242    fn test_read_seek_to() {
243        let executor = deterministic::Runner::default();
244        executor.start(|context| async move {
245            // Create a memory blob with some test data
246            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
247            let (blob, size) = context.open("partition", b"test").await.unwrap();
248            assert_eq!(size, 0);
249            blob.write_at(data.to_vec(), 0).await.unwrap();
250            let size = data.len() as u64;
251
252            // Create a buffer reader
253            let buffer_size = 10;
254            let mut reader = Read::new(blob, size, buffer_size);
255
256            // Read some data to advance the position
257            let mut buf = [0u8; 5];
258            reader.read_exact(&mut buf, 5).await.unwrap();
259            assert_eq!(&buf, b"ABCDE");
260            assert_eq!(reader.position(), 5);
261
262            // Seek to a specific position
263            reader.seek_to(10).unwrap();
264            assert_eq!(reader.position(), 10);
265
266            // Read data from the new position
267            let mut buf = [0u8; 5];
268            reader.read_exact(&mut buf, 5).await.unwrap();
269            assert_eq!(&buf, b"KLMNO");
270
271            // Seek to beginning
272            reader.seek_to(0).unwrap();
273            assert_eq!(reader.position(), 0);
274
275            let mut buf = [0u8; 5];
276            reader.read_exact(&mut buf, 5).await.unwrap();
277            assert_eq!(&buf, b"ABCDE");
278
279            // Seek to end
280            reader.seek_to(size).unwrap();
281            assert_eq!(reader.position(), size);
282
283            // Trying to read should fail
284            let mut buf = [0u8; 1];
285            let result = reader.read_exact(&mut buf, 1).await;
286            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
287
288            // Seek beyond end should fail
289            let result = reader.seek_to(size + 10);
290            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
291        });
292    }
293
294    #[test_traced]
295    fn test_read_seek_with_refill() {
296        let executor = deterministic::Runner::default();
297        executor.start(|context| async move {
298            // Create a memory blob with longer data
299            let data = vec![0x41; 1000]; // 1000 'A' characters
300            let (blob, size) = context.open("partition", b"test").await.unwrap();
301            assert_eq!(size, 0);
302            blob.write_at(data.clone(), 0).await.unwrap();
303            let size = data.len() as u64;
304
305            // Create a buffer reader with small buffer
306            let buffer_size = 10;
307            let mut reader = Read::new(blob, size, buffer_size);
308
309            // Read some data
310            let mut buf = [0u8; 5];
311            reader.read_exact(&mut buf, 5).await.unwrap();
312
313            // Seek far ahead, past the current buffer
314            reader.seek_to(500).unwrap();
315
316            // Read data - should get data from position 500
317            let mut buf = [0u8; 5];
318            reader.read_exact(&mut buf, 5).await.unwrap();
319            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
320            assert_eq!(reader.position(), 505);
321
322            // Seek backwards
323            reader.seek_to(100).unwrap();
324
325            // Read again - should be at position 100
326            let mut buf = [0u8; 5];
327            reader.read_exact(&mut buf, 5).await.unwrap();
328            assert_eq!(reader.position(), 105);
329        });
330    }
331
332    #[test_traced]
333    fn test_read_resize() {
334        let executor = deterministic::Runner::default();
335        executor.start(|context| async move {
336            // Create a memory blob with some test data
337            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
338            let (blob, size) = context.open("partition", b"test").await.unwrap();
339            assert_eq!(size, 0);
340            blob.write_at(data.to_vec(), 0).await.unwrap();
341            let data_len = data.len() as u64;
342
343            // Create a buffer reader
344            let buffer_size = 10;
345            let reader = Read::new(blob.clone(), data_len, buffer_size);
346
347            // Resize the blob to half its size
348            let resize_len = data_len / 2;
349            reader.resize(resize_len).await.unwrap();
350
351            // Reopen to check truncation
352            let (blob, size) = context.open("partition", b"test").await.unwrap();
353            assert_eq!(size, resize_len, "Blob should be resized to half size");
354
355            // Create a new buffer and read to verify truncation
356            let mut new_reader = Read::new(blob, size, buffer_size);
357
358            // Read the content
359            let mut buf = vec![0u8; size as usize];
360            new_reader
361                .read_exact(&mut buf, size as usize)
362                .await
363                .unwrap();
364            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
365
366            // Reading beyond resized size should fail
367            let mut extra_buf = [0u8; 1];
368            let result = new_reader.read_exact(&mut extra_buf, 1).await;
369            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
370
371            // Test resize to larger size
372            new_reader.resize(data_len * 2).await.unwrap();
373
374            // Reopen to check resize
375            let (blob, new_size) = context.open("partition", b"test").await.unwrap();
376            assert_eq!(new_size, data_len * 2);
377
378            // Create a new buffer and read to verify resize
379            let mut new_reader = Read::new(blob, new_size, buffer_size);
380            let mut buf = vec![0u8; new_size as usize];
381            new_reader
382                .read_exact(&mut buf, new_size as usize)
383                .await
384                .unwrap();
385            assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
386            assert_eq!(
387                &buf[size as usize..],
388                vec![0u8; new_size as usize - size as usize]
389            );
390        });
391    }
392
393    #[test_traced]
394    fn test_read_resize_to_zero() {
395        let executor = deterministic::Runner::default();
396        executor.start(|context| async move {
397            // Create a memory blob with some test data
398            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
399            let data_len = data.len() as u64;
400            let (blob, size) = context.open("partition", b"test").await.unwrap();
401            assert_eq!(size, 0);
402            blob.write_at(data.to_vec(), 0).await.unwrap();
403
404            // Create a buffer reader
405            let buffer_size = 10;
406            let reader = Read::new(blob.clone(), data_len, buffer_size);
407
408            // Resize the blob to zero
409            reader.resize(0).await.unwrap();
410
411            // Reopen to check truncation
412            let (blob, size) = context.open("partition", b"test").await.unwrap();
413            assert_eq!(size, 0, "Blob should be resized to zero");
414
415            // Create a new buffer and try to read (should fail)
416            let mut new_reader = Read::new(blob, size, buffer_size);
417
418            // Reading from resized blob should fail
419            let mut buf = [0u8; 1];
420            let result = new_reader.read_exact(&mut buf, 1).await;
421            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
422        });
423    }
424
425    #[test_traced]
426    fn test_write_basic() {
427        let executor = deterministic::Runner::default();
428        executor.start(|context| async move {
429            // Test basic buffered write and sync functionality
430            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
431            assert_eq!(size, 0);
432
433            let writer = Write::new(blob.clone(), size, 8);
434            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
435            assert_eq!(writer.size().await, 5);
436            writer.sync().await.unwrap();
437            assert_eq!(writer.size().await, 5);
438
439            // Verify data was written correctly
440            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
441            assert_eq!(size, 5);
442            let mut reader = Read::new(blob, size, 8);
443            let mut buf = [0u8; 5];
444            reader.read_exact(&mut buf, 5).await.unwrap();
445            assert_eq!(&buf, b"hello");
446        });
447    }
448
449    #[test_traced]
450    fn test_write_multiple_flushes() {
451        let executor = deterministic::Runner::default();
452        executor.start(|context| async move {
453            // Test writes that cause buffer flushes due to capacity limits
454            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
455            assert_eq!(size, 0);
456
457            let writer = Write::new(blob.clone(), size, 4);
458            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
459            assert_eq!(writer.size().await, 3);
460            writer.write_at(b"defg".to_vec(), 3).await.unwrap();
461            assert_eq!(writer.size().await, 7);
462            writer.sync().await.unwrap();
463
464            // Verify the final result
465            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
466            assert_eq!(size, 7);
467            let mut reader = Read::new(blob, size, 4);
468            let mut buf = [0u8; 7];
469            reader.read_exact(&mut buf, 7).await.unwrap();
470            assert_eq!(&buf, b"abcdefg");
471        });
472    }
473
474    #[test_traced]
475    fn test_write_large_data() {
476        let executor = deterministic::Runner::default();
477        executor.start(|context| async move {
478            // Test writing data larger than buffer capacity (direct write)
479            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
480            assert_eq!(size, 0);
481
482            let writer = Write::new(blob.clone(), size, 4);
483            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
484            assert_eq!(writer.size().await, 3);
485            writer
486                .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
487                .await
488                .unwrap();
489            assert_eq!(writer.size().await, 26);
490            writer.sync().await.unwrap();
491            assert_eq!(writer.size().await, 26);
492
493            // Verify the complete data
494            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
495            assert_eq!(size, 26);
496            let mut reader = Read::new(blob, size, 4);
497            let mut buf = [0u8; 26];
498            reader.read_exact(&mut buf, 26).await.unwrap();
499            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
500        });
501    }
502
503    #[test_traced]
504    #[should_panic(expected = "buffer capacity must be greater than zero")]
505    fn test_write_empty() {
506        let executor = deterministic::Runner::default();
507        executor.start(|context| async move {
508            // Test that creating a writer with zero buffer capacity panics
509            let (blob, size) = context.open("partition", b"write_empty").await.unwrap();
510            assert_eq!(size, 0);
511            Write::new(blob, size, 0);
512        });
513    }
514
515    #[test_traced]
516    fn test_write_append_to_buffer() {
517        let executor = deterministic::Runner::default();
518        executor.start(|context| async move {
519            // Test sequential appends that exceed buffer capacity
520            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
521            let writer = Write::new(blob.clone(), size, 10);
522
523            // Write data that fits in buffer
524            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
525            assert_eq!(writer.size().await, 5);
526
527            // Append data that causes buffer flush
528            writer.write_at(b" world".to_vec(), 5).await.unwrap();
529            writer.sync().await.unwrap();
530            assert_eq!(writer.size().await, 11);
531
532            // Verify the complete result
533            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
534            assert_eq!(size, 11);
535            let mut reader = Read::new(blob, size, 10);
536            let mut buf = vec![0u8; 11];
537            reader.read_exact(&mut buf, 11).await.unwrap();
538            assert_eq!(&buf, b"hello world");
539        });
540    }
541
542    #[test_traced]
543    fn test_write_into_middle_of_buffer() {
544        let executor = deterministic::Runner::default();
545        executor.start(|context| async move {
546            // Test overwriting data within the buffer and extending it
547            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
548            let writer = Write::new(blob.clone(), size, 20);
549
550            // Initial write
551            writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
552            assert_eq!(writer.size().await, 10);
553
554            // Overwrite middle section
555            writer.write_at(b"01234".to_vec(), 2).await.unwrap();
556            assert_eq!(writer.size().await, 10);
557            writer.sync().await.unwrap();
558
559            // Verify overwrite result
560            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
561            assert_eq!(size, 10);
562            let mut reader = Read::new(blob, size, 10);
563            let mut buf = vec![0u8; 10];
564            reader.read_exact(&mut buf, 10).await.unwrap();
565            assert_eq!(&buf, b"ab01234hij");
566
567            // Extend buffer and do partial overwrite
568            writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
569            assert_eq!(writer.size().await, 20);
570            writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
571            assert_eq!(writer.size().await, 20);
572            writer.sync().await.unwrap();
573
574            // Verify final result
575            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
576            assert_eq!(size, 20);
577            let mut reader = Read::new(blob, size, 20);
578            let mut buf = vec![0u8; 20];
579            reader.read_exact(&mut buf, 20).await.unwrap();
580            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
581        });
582    }
583
584    #[test_traced]
585    fn test_write_before_buffer() {
586        let executor = deterministic::Runner::default();
587        executor.start(|context| async move {
588            // Test writing at offsets before the current buffer position
589            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
590            let writer = Write::new(blob.clone(), size, 10);
591
592            // Write data at a later offset first
593            writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
594            assert_eq!(writer.size().await, 20);
595
596            // Write at an earlier offset (should flush buffer first)
597            writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
598            assert_eq!(writer.size().await, 20);
599            writer.sync().await.unwrap();
600
601            // Verify data placement with gap
602            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
603            assert_eq!(size, 20);
604            let mut reader = Read::new(blob, size, 20);
605            let mut buf = vec![0u8; 20];
606            reader.read_exact(&mut buf, 20).await.unwrap();
607            let mut expected = vec![0u8; 20];
608            expected[0..5].copy_from_slice("abcde".as_bytes());
609            expected[10..20].copy_from_slice("0123456789".as_bytes());
610            assert_eq!(buf, expected);
611
612            // Fill the gap between existing data
613            writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
614            assert_eq!(writer.size().await, 20);
615            writer.sync().await.unwrap();
616            assert_eq!(writer.size().await, 20);
617
618            // Verify gap is filled
619            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
620            assert_eq!(size, 20);
621            let mut reader = Read::new(blob, size, 20);
622            let mut buf = vec![0u8; 20];
623            reader.read_exact(&mut buf, 20).await.unwrap();
624            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
625            assert_eq!(buf, expected);
626        });
627    }
628
629    #[test_traced]
630    fn test_write_resize() {
631        let executor = deterministic::Runner::default();
632        executor.start(|context| async move {
633            // Test blob resize functionality and subsequent writes
634            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
635            let writer = Write::new(blob, size, 10);
636
637            // Write initial data
638            writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
639            assert_eq!(writer.size().await, 11);
640            writer.sync().await.unwrap();
641            assert_eq!(writer.size().await, 11);
642
643            let (blob_check, size_check) =
644                context.open("partition", b"resize_write").await.unwrap();
645            assert_eq!(size_check, 11);
646            drop(blob_check);
647
648            // Resize to smaller size
649            writer.resize(5).await.unwrap();
650            assert_eq!(writer.size().await, 5);
651            writer.sync().await.unwrap();
652
653            // Verify resize
654            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
655            assert_eq!(size, 5);
656            let mut reader = Read::new(blob, size, 5);
657            let mut buf = vec![0u8; 5];
658            reader.read_exact(&mut buf, 5).await.unwrap();
659            assert_eq!(&buf, b"hello");
660
661            // Write to resized blob
662            writer.write_at(b"X".to_vec(), 0).await.unwrap();
663            assert_eq!(writer.size().await, 5);
664            writer.sync().await.unwrap();
665
666            // Verify overwrite
667            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
668            assert_eq!(size, 5);
669            let mut reader = Read::new(blob, size, 5);
670            let mut buf = vec![0u8; 5];
671            reader.read_exact(&mut buf, 5).await.unwrap();
672            assert_eq!(&buf, b"Xello");
673
674            // Test resize to larger size
675            writer.resize(10).await.unwrap();
676            assert_eq!(writer.size().await, 10);
677            writer.sync().await.unwrap();
678
679            // Verify resize
680            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
681            assert_eq!(size, 10);
682            let mut reader = Read::new(blob, size, 10);
683            let mut buf = vec![0u8; 10];
684            reader.read_exact(&mut buf, 10).await.unwrap();
685            assert_eq!(&buf[0..5], b"Xello");
686            assert_eq!(&buf[5..10], [0u8; 5]);
687
688            // Test resize to zero
689            let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
690            let writer_zero = Write::new(blob_zero.clone(), size, 10);
691            writer_zero
692                .write_at(b"some data".to_vec(), 0)
693                .await
694                .unwrap();
695            assert_eq!(writer_zero.size().await, 9);
696            writer_zero.sync().await.unwrap();
697            assert_eq!(writer_zero.size().await, 9);
698            writer_zero.resize(0).await.unwrap();
699            assert_eq!(writer_zero.size().await, 0);
700            writer_zero.sync().await.unwrap();
701            assert_eq!(writer_zero.size().await, 0);
702
703            // Ensure the blob is empty
704            let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
705            assert_eq!(size_z, 0);
706        });
707    }
708
709    #[test_traced]
710    fn test_write_read_at_on_writer() {
711        let executor = deterministic::Runner::default();
712        executor.start(|context| async move {
713            // Test reading through writer's read_at method (buffer + blob reads)
714            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
715            let writer = Write::new(blob.clone(), size, 10);
716
717            // Write data that stays in buffer
718            writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
719            assert_eq!(writer.size().await, 8);
720
721            // Read from buffer via writer
722            let mut read_buf_vec = vec![0u8; 4].into();
723            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
724            assert_eq!(read_buf_vec.as_ref(), b"buff");
725
726            read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
727            assert_eq!(read_buf_vec.as_ref(), b"ered");
728
729            // Reading past buffer end should fail
730            let small_buf_vec = vec![0u8; 1];
731            assert!(writer.read_at(small_buf_vec, 8).await.is_err());
732
733            // Write large data that flushes buffer
734            writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
735            assert_eq!(writer.size().await, 20);
736            writer.sync().await.unwrap();
737            assert_eq!(writer.size().await, 20);
738
739            // Read from underlying blob through writer
740            let mut read_buf_vec_2 = vec![0u8; 4].into();
741            read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
742            assert_eq!(read_buf_vec_2.as_ref(), b"buff");
743
744            let mut read_buf_7_vec = vec![0u8; 7].into();
745            read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
746            assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
747
748            // Buffer new data at the end
749            writer.write_at(b" more data".to_vec(), 20).await.unwrap();
750            assert_eq!(writer.size().await, 30);
751
752            // Read newly buffered data
753            let mut read_buf_vec_3 = vec![0u8; 5].into();
754            read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
755            assert_eq!(read_buf_vec_3.as_ref(), b" more");
756
757            // Read spanning both blob and buffer
758            let mut combo_read_buf_vec = vec![0u8; 12].into();
759            combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
760            assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
761
762            // Verify complete content by reopening
763            writer.sync().await.unwrap();
764            assert_eq!(writer.size().await, 30);
765            let (final_blob, final_size) =
766                context.open("partition", b"read_at_writer").await.unwrap();
767            assert_eq!(final_size, 30);
768            let mut final_reader = Read::new(final_blob, final_size, 30);
769            let mut full_content = vec![0u8; 30];
770            final_reader
771                .read_exact(&mut full_content, 30)
772                .await
773                .unwrap();
774            assert_eq!(&full_content, b"buffered and flushed more data");
775        });
776    }
777
778    #[test_traced]
779    fn test_write_straddling_non_mergeable() {
780        let executor = deterministic::Runner::default();
781        executor.start(|context| async move {
782            // Test writes that cannot be merged into buffer (non-contiguous/too large)
783            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
784            let writer = Write::new(blob.clone(), size, 10);
785
786            // Fill buffer completely
787            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
788            assert_eq!(writer.size().await, 10);
789
790            // Write at non-contiguous offset (should flush then write directly)
791            writer.write_at(b"abc".to_vec(), 15).await.unwrap();
792            assert_eq!(writer.size().await, 18);
793            writer.sync().await.unwrap();
794            assert_eq!(writer.size().await, 18);
795
796            // Verify data with gap
797            let (blob_check, size_check) =
798                context.open("partition", b"write_straddle").await.unwrap();
799            assert_eq!(size_check, 18);
800            let mut reader = Read::new(blob_check, size_check, 20);
801            let mut buf = vec![0u8; 18];
802            reader.read_exact(&mut buf, 18).await.unwrap();
803
804            let mut expected = vec![0u8; 18];
805            expected[0..10].copy_from_slice(b"0123456789");
806            expected[15..18].copy_from_slice(b"abc");
807            assert_eq!(buf, expected);
808
809            // Test write that exceeds buffer capacity
810            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
811            let writer2 = Write::new(blob2.clone(), size, 10);
812            writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
813            assert_eq!(writer2.size().await, 10);
814
815            // Write large data that exceeds capacity
816            writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
817            assert_eq!(writer2.size().await, 17);
818            writer2.sync().await.unwrap();
819            assert_eq!(writer2.size().await, 17);
820
821            // Verify overwrite result
822            let (blob_check2, size_check2) =
823                context.open("partition", b"write_straddle2").await.unwrap();
824            assert_eq!(size_check2, 17);
825            let mut reader2 = Read::new(blob_check2, size_check2, 20);
826            let mut buf2 = vec![0u8; 17];
827            reader2.read_exact(&mut buf2, 17).await.unwrap();
828            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
829        });
830    }
831
832    #[test_traced]
833    fn test_write_close() {
834        let executor = deterministic::Runner::default();
835        executor.start(|context| async move {
836            // Test that closing writer flushes and persists buffered data
837            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
838            let writer = Write::new(blob_orig.clone(), size, 8);
839            writer.write_at(b"pending".to_vec(), 0).await.unwrap();
840            assert_eq!(writer.size().await, 7);
841
842            // Close should flush and sync data
843            writer.close().await.unwrap();
844
845            // Verify data persistence
846            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
847            assert_eq!(size_check, 7);
848            let mut reader = Read::new(blob_check, size_check, 8);
849            let mut buf = [0u8; 7];
850            reader.read_exact(&mut buf, 7).await.unwrap();
851            assert_eq!(&buf, b"pending");
852        });
853    }
854
855    #[test_traced]
856    fn test_write_direct_due_to_size() {
857        let executor = deterministic::Runner::default();
858        executor.start(|context| async move {
859            // Test direct writes when data exceeds buffer capacity
860            let (blob, size) = context
861                .open("partition", b"write_direct_size")
862                .await
863                .unwrap();
864            let writer = Write::new(blob.clone(), size, 5);
865
866            // Write data larger than buffer capacity (should write directly)
867            let data_large = b"0123456789";
868            writer.write_at(data_large.to_vec(), 0).await.unwrap();
869            assert_eq!(writer.size().await, 10);
870
871            // Sync to ensure data is persisted
872            writer.sync().await.unwrap();
873
874            // Verify direct write worked
875            let (blob_check, size_check) = context
876                .open("partition", b"write_direct_size")
877                .await
878                .unwrap();
879            assert_eq!(size_check, 10);
880            let mut reader = Read::new(blob_check, size_check, 10);
881            let mut buf = vec![0u8; 10];
882            reader.read_exact(&mut buf, 10).await.unwrap();
883            assert_eq!(&buf, data_large.as_slice());
884
885            // Now write small data that should be buffered
886            writer.write_at(b"abc".to_vec(), 10).await.unwrap();
887            assert_eq!(writer.size().await, 13);
888
889            // Verify it's in buffer by reading through writer
890            let mut read_small_buf_vec = vec![0u8; 3].into();
891            read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
892            assert_eq!(read_small_buf_vec.as_ref(), b"abc");
893
894            writer.sync().await.unwrap();
895
896            // Verify final state
897            let (blob_check2, size_check2) = context
898                .open("partition", b"write_direct_size")
899                .await
900                .unwrap();
901            assert_eq!(size_check2, 13);
902            let mut reader2 = Read::new(blob_check2, size_check2, 13);
903            let mut buf2 = vec![0u8; 13];
904            reader2.read_exact(&mut buf2, 13).await.unwrap();
905            assert_eq!(&buf2[10..], b"abc".as_slice());
906        });
907    }
908
909    #[test_traced]
910    fn test_write_overwrite_and_extend_in_buffer() {
911        let executor = deterministic::Runner::default();
912        executor.start(|context| async move {
913            // Test complex buffer operations: overwrite and extend within capacity
914            let (blob, size) = context
915                .open("partition", b"overwrite_extend_buf")
916                .await
917                .unwrap();
918            let writer = Write::new(blob.clone(), size, 15);
919
920            // Write initial data
921            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
922            assert_eq!(writer.size().await, 10);
923
924            // Overwrite and extend within buffer capacity
925            writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
926            assert_eq!(writer.size().await, 15);
927
928            // Verify buffer content through writer
929            let mut read_buf_vec = vec![0u8; 15].into();
930            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
931            assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
932
933            writer.sync().await.unwrap();
934
935            // Verify persisted result
936            let (blob_check, size_check) = context
937                .open("partition", b"overwrite_extend_buf")
938                .await
939                .unwrap();
940            assert_eq!(size_check, 15);
941            let mut reader = Read::new(blob_check, size_check, 15);
942            let mut final_buf = vec![0u8; 15];
943            reader.read_exact(&mut final_buf, 15).await.unwrap();
944            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
945        });
946    }
947
948    #[test_traced]
949    fn test_write_at_size() {
950        let executor = deterministic::Runner::default();
951        executor.start(|context| async move {
952            // Test writing at the current logical end of the blob
953            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
954            let writer = Write::new(blob.clone(), size, 20);
955
956            // Write initial data
957            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
958            assert_eq!(writer.size().await, 10);
959            writer.sync().await.unwrap();
960
961            // Append at the current size (logical end)
962            writer
963                .write_at(b"abc".to_vec(), writer.size().await)
964                .await
965                .unwrap();
966            assert_eq!(writer.size().await, 13);
967            writer.sync().await.unwrap();
968
969            // Verify complete result
970            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
971            assert_eq!(size_check, 13);
972            let mut reader = Read::new(blob_check, size_check, 13);
973            let mut buf = vec![0u8; 13];
974            reader.read_exact(&mut buf, 13).await.unwrap();
975            assert_eq!(&buf, b"0123456789abc");
976        });
977    }
978
979    #[test_traced]
980    fn test_write_at_size_multiple_appends() {
981        let executor = deterministic::Runner::default();
982        executor.start(|context| async move {
983            // Test multiple appends using writer.size()
984            let (blob, size) = context
985                .open("partition", b"write_multiple_appends_at_size")
986                .await
987                .unwrap();
988            let writer = Write::new(blob.clone(), size, 5); // Small buffer
989
990            // First write
991            writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
992            assert_eq!(writer.size().await, 3);
993            writer.sync().await.unwrap();
994            assert_eq!(writer.size().await, 3);
995
996            // Append using size()
997            writer
998                .write_at(b"BBB".to_vec(), writer.size().await)
999                .await
1000                .unwrap();
1001            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
1002            writer.sync().await.unwrap();
1003            assert_eq!(writer.size().await, 6);
1004
1005            // Append again using size()
1006            writer
1007                .write_at(b"CCC".to_vec(), writer.size().await)
1008                .await
1009                .unwrap();
1010            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
1011            writer.sync().await.unwrap();
1012            assert_eq!(writer.size().await, 9);
1013
1014            // Verify final content
1015            let (blob_check, size_check) = context
1016                .open("partition", b"write_multiple_appends_at_size")
1017                .await
1018                .unwrap();
1019            assert_eq!(size_check, 9);
1020            let mut reader = Read::new(blob_check, size_check, 9);
1021            let mut buf = vec![0u8; 9];
1022            reader.read_exact(&mut buf, 9).await.unwrap();
1023            assert_eq!(&buf, b"AAABBBCCC");
1024        });
1025    }
1026
1027    #[test_traced]
1028    fn test_write_non_contiguous_then_append_at_size() {
1029        let executor = deterministic::Runner::default();
1030        executor.start(|context| async move {
1031            // Test writing non-contiguously, then appending at the new size
1032            let (blob, size) = context
1033                .open("partition", b"write_non_contiguous_then_append")
1034                .await
1035                .unwrap();
1036            let writer = Write::new(blob.clone(), size, 10);
1037
1038            // Initial buffered write
1039            writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); // 7 bytes
1040            assert_eq!(writer.size().await, 7);
1041            // Buffer contains "INITIAL", inner.position = 0
1042
1043            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1044            writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1045            assert_eq!(writer.size().await, 29);
1046            writer.sync().await.unwrap();
1047            assert_eq!(writer.size().await, 29);
1048
1049            // Append at the new size
1050            writer
1051                .write_at(b"APPEND".to_vec(), writer.size().await)
1052                .await
1053                .unwrap();
1054            assert_eq!(writer.size().await, 35); // 29 + 6
1055            writer.sync().await.unwrap();
1056            assert_eq!(writer.size().await, 35);
1057
1058            // Verify final content
1059            let (blob_check, size_check) = context
1060                .open("partition", b"write_non_contiguous_then_append")
1061                .await
1062                .unwrap();
1063            assert_eq!(size_check, 35);
1064            let mut reader = Read::new(blob_check, size_check, 35);
1065            let mut buf = vec![0u8; 35];
1066            reader.read_exact(&mut buf, 35).await.unwrap();
1067
1068            let mut expected = vec![0u8; 35];
1069            expected[0..7].copy_from_slice(b"INITIAL");
1070            expected[20..29].copy_from_slice(b"NONCONTIG");
1071            expected[29..35].copy_from_slice(b"APPEND");
1072            assert_eq!(buf, expected);
1073        });
1074    }
1075
1076    #[test_traced]
1077    fn test_resize_then_append_at_size() {
1078        let executor = deterministic::Runner::default();
1079        executor.start(|context| async move {
1080            // Test truncating, then appending at the new size
1081            let (blob, size) = context
1082                .open("partition", b"resize_then_append_at_size")
1083                .await
1084                .unwrap();
1085            let writer = Write::new(blob.clone(), size, 10);
1086
1087            // Write initial data and sync
1088            writer
1089                .write_at(b"0123456789ABCDEF".to_vec(), 0)
1090                .await
1091                .unwrap(); // 16 bytes
1092            assert_eq!(writer.size().await, 16);
1093            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1094            assert_eq!(writer.size().await, 16);
1095
1096            // Resize
1097            let resize_to = 5;
1098            writer.resize(resize_to).await.unwrap();
1099            // after resize, inner.position should be `resize_to` (5)
1100            // buffer should be empty
1101            assert_eq!(writer.size().await, resize_to);
1102            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1103            assert_eq!(writer.size().await, resize_to);
1104
1105            // Append at the new (resized) size
1106            writer
1107                .write_at(b"XXXXX".to_vec(), writer.size().await)
1108                .await
1109                .unwrap(); // 5 bytes
1110                           // inner.buffer = "XXXXX", inner.position = 5
1111            assert_eq!(writer.size().await, 10); // 5 (resized) + 5 (XXXXX)
1112            writer.sync().await.unwrap();
1113            assert_eq!(writer.size().await, 10);
1114
1115            // Verify final content
1116            let (blob_check, size_check) = context
1117                .open("partition", b"resize_then_append_at_size")
1118                .await
1119                .unwrap();
1120            assert_eq!(size_check, 10);
1121            let mut reader = Read::new(blob_check, size_check, 10);
1122            let mut buf = vec![0u8; 10];
1123            reader.read_exact(&mut buf, 10).await.unwrap();
1124            assert_eq!(&buf, b"01234XXXXX");
1125        });
1126    }
1127}