commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3mod append;
4pub mod pool;
5mod read;
6mod tip;
7mod write;
8
9pub use append::Append;
10pub use pool::{Pool, PoolRef};
11pub use read::Read;
12pub use write::Write;
13
14#[cfg(test)]
15mod tests {
16    use super::*;
17    use crate::{deterministic, Blob as _, Error, Runner, Storage};
18    use commonware_macros::test_traced;
19    use commonware_utils::NZUsize;
20
21    #[test_traced]
22    fn test_read_basic() {
23        let executor = deterministic::Runner::default();
24        executor.start(|context| async move {
25            // Test basic buffered reading functionality with sequential reads
26            let data = b"Hello, world! This is a test.";
27            let (blob, size) = context.open("partition", b"test").await.unwrap();
28            assert_eq!(size, 0);
29            blob.write_at(data.to_vec(), 0).await.unwrap();
30            let size = data.len() as u64;
31
32            // Create a buffered reader with small buffer to test refilling
33            let mut reader = Read::new(blob, size, NZUsize!(10));
34
35            // Read some data
36            let mut buf = [0u8; 5];
37            reader.read_exact(&mut buf, 5).await.unwrap();
38            assert_eq!(&buf, b"Hello");
39
40            // Read more data that requires a buffer refill
41            let mut buf = [0u8; 14];
42            reader.read_exact(&mut buf, 14).await.unwrap();
43            assert_eq!(&buf, b", world! This ");
44
45            // Verify position tracking
46            assert_eq!(reader.position(), 19);
47
48            // Read the remaining data
49            let mut buf = [0u8; 10];
50            reader.read_exact(&mut buf, 7).await.unwrap();
51            assert_eq!(&buf[..7], b"is a te");
52
53            // Attempt to read beyond the end should fail
54            let mut buf = [0u8; 5];
55            let result = reader.read_exact(&mut buf, 5).await;
56            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
57        });
58    }
59
60    #[test_traced]
61    fn test_read_cross_boundary() {
62        let executor = deterministic::Runner::default();
63        executor.start(|context| async move {
64            // Test reading data that spans multiple buffer refills
65            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
66            let (blob, size) = context.open("partition", b"test").await.unwrap();
67            assert_eq!(size, 0);
68            blob.write_at(data.to_vec(), 0).await.unwrap();
69            let size = data.len() as u64;
70
71            // Use a buffer smaller than the total data size
72            let mut reader = Read::new(blob, size, NZUsize!(10));
73
74            // Read data that crosses buffer boundaries
75            let mut buf = [0u8; 15];
76            reader.read_exact(&mut buf, 15).await.unwrap();
77            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
78
79            // Verify position tracking
80            assert_eq!(reader.position(), 15);
81
82            // Read the remaining data
83            let mut buf = [0u8; 11];
84            reader.read_exact(&mut buf, 11).await.unwrap();
85            assert_eq!(&buf, b"PQRSTUVWXYZ");
86
87            // Verify we're at the end
88            assert_eq!(reader.position(), 26);
89            assert_eq!(reader.blob_remaining(), 0);
90        });
91    }
92
93    // Regression test for https://github.com/commonwarexyz/monorepo/issues/1348
94    #[test_traced]
95    fn test_read_to_end_then_rewind_and_read_again() {
96        let executor = deterministic::Runner::default();
97        executor.start(|context| async move {
98            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
99            let (blob, size) = context.open("partition", b"test").await.unwrap();
100            assert_eq!(size, 0);
101            blob.write_at(data.to_vec(), 0).await.unwrap();
102            let size = data.len() as u64;
103
104            let mut reader = Read::new(blob, size, NZUsize!(20));
105
106            // Read data that crosses buffer boundaries
107            let mut buf = [0u8; 21];
108            reader.read_exact(&mut buf, 21).await.unwrap();
109            assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
110
111            // Verify position tracking
112            assert_eq!(reader.position(), 21);
113
114            // Read the remaining data
115            let mut buf = [0u8; 5];
116            reader.read_exact(&mut buf, 5).await.unwrap();
117            assert_eq!(&buf, b"VWXYZ");
118
119            // Rewind and read again
120            reader.seek_to(0).unwrap();
121            let mut buf = [0u8; 21];
122            reader.read_exact(&mut buf, 21).await.unwrap();
123            assert_eq!(&buf, b"ABCDEFGHIJKLMNOPQRSTU");
124        });
125    }
126
127    #[test_traced]
128    fn test_read_with_known_size() {
129        let executor = deterministic::Runner::default();
130        executor.start(|context| async move {
131            // Test reader behavior with known blob size limits
132            let data = b"This is a test with known size limitations.";
133            let (blob, size) = context.open("partition", b"test").await.unwrap();
134            assert_eq!(size, 0);
135            blob.write_at(data.to_vec(), 0).await.unwrap();
136            let size = data.len() as u64;
137
138            // Create a buffered reader with buffer smaller than total data
139            let mut reader = Read::new(blob, size, NZUsize!(10));
140
141            // Check initial remaining bytes
142            assert_eq!(reader.blob_remaining(), size);
143
144            // Read partial data
145            let mut buf = [0u8; 5];
146            reader.read_exact(&mut buf, 5).await.unwrap();
147            assert_eq!(&buf, b"This ");
148
149            // Check remaining bytes after partial read
150            assert_eq!(reader.blob_remaining(), size - 5);
151
152            // Read exactly up to the size limit
153            let mut buf = vec![0u8; (size - 5) as usize];
154            reader
155                .read_exact(&mut buf, (size - 5) as usize)
156                .await
157                .unwrap();
158            assert_eq!(&buf, b"is a test with known size limitations.");
159
160            // Verify we're at the end
161            assert_eq!(reader.blob_remaining(), 0);
162
163            // Reading beyond the end should fail
164            let mut buf = [0u8; 1];
165            let result = reader.read_exact(&mut buf, 1).await;
166            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
167        });
168    }
169
170    #[test_traced]
171    fn test_read_large_data() {
172        let executor = deterministic::Runner::default();
173        executor.start(|context| async move {
174            // Test reading large amounts of data in chunks
175            let data_size = 1024 * 256; // 256KB of data
176            let data = vec![0x42; data_size];
177            let (blob, size) = context.open("partition", b"test").await.unwrap();
178            assert_eq!(size, 0);
179            blob.write_at(data.clone(), 0).await.unwrap();
180            let size = data.len() as u64;
181
182            // Use a buffer much smaller than the total data
183            let mut reader = Read::new(blob, size, NZUsize!(64 * 1024)); // 64KB buffer
184
185            // Read all data in smaller chunks
186            let mut total_read = 0;
187            let chunk_size = 8 * 1024; // 8KB chunks
188            let mut buf = vec![0u8; chunk_size];
189
190            while total_read < data_size {
191                let to_read = std::cmp::min(chunk_size, data_size - total_read);
192                reader
193                    .read_exact(&mut buf[..to_read], to_read)
194                    .await
195                    .unwrap();
196
197                // Verify data integrity
198                assert!(
199                    buf[..to_read].iter().all(|&b| b == 0x42),
200                    "Data at position {total_read} is not correct"
201                );
202
203                total_read += to_read;
204            }
205
206            // Verify we read everything
207            assert_eq!(total_read, data_size);
208
209            // Reading beyond the end should fail
210            let mut extra_buf = [0u8; 1];
211            let result = reader.read_exact(&mut extra_buf, 1).await;
212            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
213        });
214    }
215
216    #[test_traced]
217    fn test_read_exact_size_reads() {
218        let executor = deterministic::Runner::default();
219        executor.start(|context| async move {
220            // Create a blob with exactly 2.5 buffer sizes of data
221            let buffer_size = 1024;
222            let data_size = buffer_size * 5 / 2; // 2.5 buffers
223            let data = vec![0x37; data_size];
224
225            let (blob, size) = context.open("partition", b"test").await.unwrap();
226            assert_eq!(size, 0);
227            blob.write_at(data.clone(), 0).await.unwrap();
228            let size = data.len() as u64;
229
230            let mut reader = Read::new(blob, size, NZUsize!(buffer_size));
231
232            // Read exactly one buffer size
233            let mut buf1 = vec![0u8; buffer_size];
234            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
235            assert!(buf1.iter().all(|&b| b == 0x37));
236
237            // Read exactly one buffer size more
238            let mut buf2 = vec![0u8; buffer_size];
239            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
240            assert!(buf2.iter().all(|&b| b == 0x37));
241
242            // Read the remaining half buffer
243            let half_buffer = buffer_size / 2;
244            let mut buf3 = vec![0u8; half_buffer];
245            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
246            assert!(buf3.iter().all(|&b| b == 0x37));
247
248            // Verify we're at the end
249            assert_eq!(reader.blob_remaining(), 0);
250            assert_eq!(reader.position(), size);
251        });
252    }
253
254    #[test_traced]
255    fn test_read_seek_to() {
256        let executor = deterministic::Runner::default();
257        executor.start(|context| async move {
258            // Create a memory blob with some test data
259            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
260            let (blob, size) = context.open("partition", b"test").await.unwrap();
261            assert_eq!(size, 0);
262            blob.write_at(data.to_vec(), 0).await.unwrap();
263            let size = data.len() as u64;
264
265            // Create a buffer reader
266            let mut reader = Read::new(blob, size, NZUsize!(10));
267
268            // Read some data to advance the position
269            let mut buf = [0u8; 5];
270            reader.read_exact(&mut buf, 5).await.unwrap();
271            assert_eq!(&buf, b"ABCDE");
272            assert_eq!(reader.position(), 5);
273
274            // Seek to a specific position
275            reader.seek_to(10).unwrap();
276            assert_eq!(reader.position(), 10);
277
278            // Read data from the new position
279            let mut buf = [0u8; 5];
280            reader.read_exact(&mut buf, 5).await.unwrap();
281            assert_eq!(&buf, b"KLMNO");
282
283            // Seek to beginning
284            reader.seek_to(0).unwrap();
285            assert_eq!(reader.position(), 0);
286
287            let mut buf = [0u8; 5];
288            reader.read_exact(&mut buf, 5).await.unwrap();
289            assert_eq!(&buf, b"ABCDE");
290
291            // Seek to end
292            reader.seek_to(size).unwrap();
293            assert_eq!(reader.position(), size);
294
295            // Trying to read should fail
296            let mut buf = [0u8; 1];
297            let result = reader.read_exact(&mut buf, 1).await;
298            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
299
300            // Seek beyond end should fail
301            let result = reader.seek_to(size + 10);
302            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
303        });
304    }
305
306    #[test_traced]
307    fn test_read_seek_with_refill() {
308        let executor = deterministic::Runner::default();
309        executor.start(|context| async move {
310            // Create a memory blob with longer data
311            let data = vec![0x41; 1000]; // 1000 'A' characters
312            let (blob, size) = context.open("partition", b"test").await.unwrap();
313            assert_eq!(size, 0);
314            blob.write_at(data.clone(), 0).await.unwrap();
315            let size = data.len() as u64;
316
317            // Create a buffer reader with small buffer
318            let mut reader = Read::new(blob, size, NZUsize!(10));
319
320            // Read some data
321            let mut buf = [0u8; 5];
322            reader.read_exact(&mut buf, 5).await.unwrap();
323
324            // Seek far ahead, past the current buffer
325            reader.seek_to(500).unwrap();
326
327            // Read data - should get data from position 500
328            let mut buf = [0u8; 5];
329            reader.read_exact(&mut buf, 5).await.unwrap();
330            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
331            assert_eq!(reader.position(), 505);
332
333            // Seek backwards
334            reader.seek_to(100).unwrap();
335
336            // Read again - should be at position 100
337            let mut buf = [0u8; 5];
338            reader.read_exact(&mut buf, 5).await.unwrap();
339            assert_eq!(reader.position(), 105);
340        });
341    }
342
343    #[test_traced]
344    fn test_read_resize() {
345        let executor = deterministic::Runner::default();
346        executor.start(|context| async move {
347            // Create a memory blob with some test data
348            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
349            let (blob, size) = context.open("partition", b"test").await.unwrap();
350            assert_eq!(size, 0);
351            blob.write_at(data.to_vec(), 0).await.unwrap();
352            let data_len = data.len() as u64;
353
354            // Create a buffer reader
355            let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
356
357            // Resize the blob to half its size
358            let resize_len = data_len / 2;
359            reader.resize(resize_len).await.unwrap();
360
361            // Reopen to check truncation
362            let (blob, size) = context.open("partition", b"test").await.unwrap();
363            assert_eq!(size, resize_len, "Blob should be resized to half size");
364
365            // Create a new buffer and read to verify truncation
366            let mut new_reader = Read::new(blob, size, NZUsize!(10));
367
368            // Read the content
369            let mut buf = vec![0u8; size as usize];
370            new_reader
371                .read_exact(&mut buf, size as usize)
372                .await
373                .unwrap();
374            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Resized content should match");
375
376            // Reading beyond resized size should fail
377            let mut extra_buf = [0u8; 1];
378            let result = new_reader.read_exact(&mut extra_buf, 1).await;
379            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
380
381            // Test resize to larger size
382            new_reader.resize(data_len * 2).await.unwrap();
383
384            // Reopen to check resize
385            let (blob, new_size) = context.open("partition", b"test").await.unwrap();
386            assert_eq!(new_size, data_len * 2);
387
388            // Create a new buffer and read to verify resize
389            let mut new_reader = Read::new(blob, new_size, NZUsize!(10));
390            let mut buf = vec![0u8; new_size as usize];
391            new_reader
392                .read_exact(&mut buf, new_size as usize)
393                .await
394                .unwrap();
395            assert_eq!(&buf[..size as usize], b"ABCDEFGHIJKLM");
396            assert_eq!(
397                &buf[size as usize..],
398                vec![0u8; new_size as usize - size as usize]
399            );
400        });
401    }
402
403    #[test_traced]
404    fn test_read_resize_to_zero() {
405        let executor = deterministic::Runner::default();
406        executor.start(|context| async move {
407            // Create a memory blob with some test data
408            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
409            let data_len = data.len() as u64;
410            let (blob, size) = context.open("partition", b"test").await.unwrap();
411            assert_eq!(size, 0);
412            blob.write_at(data.to_vec(), 0).await.unwrap();
413
414            // Create a buffer reader
415            let reader = Read::new(blob.clone(), data_len, NZUsize!(10));
416
417            // Resize the blob to zero
418            reader.resize(0).await.unwrap();
419
420            // Reopen to check truncation
421            let (blob, size) = context.open("partition", b"test").await.unwrap();
422            assert_eq!(size, 0, "Blob should be resized to zero");
423
424            // Create a new buffer and try to read (should fail)
425            let mut new_reader = Read::new(blob, size, NZUsize!(10));
426
427            // Reading from resized blob should fail
428            let mut buf = [0u8; 1];
429            let result = new_reader.read_exact(&mut buf, 1).await;
430            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
431        });
432    }
433
434    #[test_traced]
435    fn test_write_basic() {
436        let executor = deterministic::Runner::default();
437        executor.start(|context| async move {
438            // Test basic buffered write and sync functionality
439            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
440            assert_eq!(size, 0);
441
442            let writer = Write::new(blob.clone(), size, NZUsize!(8));
443            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
444            assert_eq!(writer.size().await, 5);
445            writer.sync().await.unwrap();
446            assert_eq!(writer.size().await, 5);
447
448            // Verify data was written correctly
449            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
450            assert_eq!(size, 5);
451            let mut reader = Read::new(blob, size, NZUsize!(8));
452            let mut buf = [0u8; 5];
453            reader.read_exact(&mut buf, 5).await.unwrap();
454            assert_eq!(&buf, b"hello");
455        });
456    }
457
458    #[test_traced]
459    fn test_write_multiple_flushes() {
460        let executor = deterministic::Runner::default();
461        executor.start(|context| async move {
462            // Test writes that cause buffer flushes due to capacity limits
463            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
464            assert_eq!(size, 0);
465
466            let writer = Write::new(blob.clone(), size, NZUsize!(4));
467            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
468            assert_eq!(writer.size().await, 3);
469            writer.write_at(b"defg".to_vec(), 3).await.unwrap();
470            assert_eq!(writer.size().await, 7);
471            writer.sync().await.unwrap();
472
473            // Verify the final result
474            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
475            assert_eq!(size, 7);
476            let mut reader = Read::new(blob, size, NZUsize!(4));
477            let mut buf = [0u8; 7];
478            reader.read_exact(&mut buf, 7).await.unwrap();
479            assert_eq!(&buf, b"abcdefg");
480        });
481    }
482
483    #[test_traced]
484    fn test_write_large_data() {
485        let executor = deterministic::Runner::default();
486        executor.start(|context| async move {
487            // Test writing data larger than buffer capacity (direct write)
488            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
489            assert_eq!(size, 0);
490
491            let writer = Write::new(blob.clone(), size, NZUsize!(4));
492            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
493            assert_eq!(writer.size().await, 3);
494            writer
495                .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
496                .await
497                .unwrap();
498            assert_eq!(writer.size().await, 26);
499            writer.sync().await.unwrap();
500            assert_eq!(writer.size().await, 26);
501
502            // Verify the complete data
503            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
504            assert_eq!(size, 26);
505            let mut reader = Read::new(blob, size, NZUsize!(4));
506            let mut buf = [0u8; 26];
507            reader.read_exact(&mut buf, 26).await.unwrap();
508            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
509        });
510    }
511
512    #[test_traced]
513    fn test_write_append_to_buffer() {
514        let executor = deterministic::Runner::default();
515        executor.start(|context| async move {
516            // Test sequential appends that exceed buffer capacity
517            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
518            let writer = Write::new(blob.clone(), size, NZUsize!(10));
519
520            // Write data that fits in buffer
521            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
522            assert_eq!(writer.size().await, 5);
523
524            // Append data that causes buffer flush
525            writer.write_at(b" world".to_vec(), 5).await.unwrap();
526            writer.sync().await.unwrap();
527            assert_eq!(writer.size().await, 11);
528
529            // Verify the complete result
530            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
531            assert_eq!(size, 11);
532            let mut reader = Read::new(blob, size, NZUsize!(10));
533            let mut buf = vec![0u8; 11];
534            reader.read_exact(&mut buf, 11).await.unwrap();
535            assert_eq!(&buf, b"hello world");
536        });
537    }
538
539    #[test_traced]
540    fn test_write_into_middle_of_buffer() {
541        let executor = deterministic::Runner::default();
542        executor.start(|context| async move {
543            // Test overwriting data within the buffer and extending it
544            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
545            let writer = Write::new(blob.clone(), size, NZUsize!(20));
546
547            // Initial write
548            writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
549            assert_eq!(writer.size().await, 10);
550
551            // Overwrite middle section
552            writer.write_at(b"01234".to_vec(), 2).await.unwrap();
553            assert_eq!(writer.size().await, 10);
554            writer.sync().await.unwrap();
555
556            // Verify overwrite result
557            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
558            assert_eq!(size, 10);
559            let mut reader = Read::new(blob, size, NZUsize!(10));
560            let mut buf = vec![0u8; 10];
561            reader.read_exact(&mut buf, 10).await.unwrap();
562            assert_eq!(&buf, b"ab01234hij");
563
564            // Extend buffer and do partial overwrite
565            writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
566            assert_eq!(writer.size().await, 20);
567            writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
568            assert_eq!(writer.size().await, 20);
569            writer.sync().await.unwrap();
570
571            // Verify final result
572            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
573            assert_eq!(size, 20);
574            let mut reader = Read::new(blob, size, NZUsize!(20));
575            let mut buf = vec![0u8; 20];
576            reader.read_exact(&mut buf, 20).await.unwrap();
577            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
578        });
579    }
580
581    #[test_traced]
582    fn test_write_before_buffer() {
583        let executor = deterministic::Runner::default();
584        executor.start(|context| async move {
585            // Test writing at offsets before the current buffer position
586            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
587            let writer = Write::new(blob.clone(), size, NZUsize!(10));
588
589            // Write data at a later offset first
590            writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
591            assert_eq!(writer.size().await, 20);
592
593            // Write at an earlier offset (should flush buffer first)
594            writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
595            assert_eq!(writer.size().await, 20);
596            writer.sync().await.unwrap();
597
598            // Verify data placement with gap
599            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
600            assert_eq!(size, 20);
601            let mut reader = Read::new(blob, size, NZUsize!(20));
602            let mut buf = vec![0u8; 20];
603            reader.read_exact(&mut buf, 20).await.unwrap();
604            let mut expected = vec![0u8; 20];
605            expected[0..5].copy_from_slice("abcde".as_bytes());
606            expected[10..20].copy_from_slice("0123456789".as_bytes());
607            assert_eq!(buf, expected);
608
609            // Fill the gap between existing data
610            writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
611            assert_eq!(writer.size().await, 20);
612            writer.sync().await.unwrap();
613            assert_eq!(writer.size().await, 20);
614
615            // Verify gap is filled
616            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
617            assert_eq!(size, 20);
618            let mut reader = Read::new(blob, size, NZUsize!(20));
619            let mut buf = vec![0u8; 20];
620            reader.read_exact(&mut buf, 20).await.unwrap();
621            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
622            assert_eq!(buf, expected);
623        });
624    }
625
626    #[test_traced]
627    fn test_write_resize() {
628        let executor = deterministic::Runner::default();
629        executor.start(|context| async move {
630            // Test blob resize functionality and subsequent writes
631            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
632            let writer = Write::new(blob, size, NZUsize!(10));
633
634            // Write initial data
635            writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
636            assert_eq!(writer.size().await, 11);
637            writer.sync().await.unwrap();
638            assert_eq!(writer.size().await, 11);
639
640            let (blob_check, size_check) =
641                context.open("partition", b"resize_write").await.unwrap();
642            assert_eq!(size_check, 11);
643            drop(blob_check);
644
645            // Resize to smaller size
646            writer.resize(5).await.unwrap();
647            assert_eq!(writer.size().await, 5);
648            writer.sync().await.unwrap();
649
650            // Verify resize
651            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
652            assert_eq!(size, 5);
653            let mut reader = Read::new(blob, size, NZUsize!(5));
654            let mut buf = vec![0u8; 5];
655            reader.read_exact(&mut buf, 5).await.unwrap();
656            assert_eq!(&buf, b"hello");
657
658            // Write to resized blob
659            writer.write_at(b"X".to_vec(), 0).await.unwrap();
660            assert_eq!(writer.size().await, 5);
661            writer.sync().await.unwrap();
662
663            // Verify overwrite
664            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
665            assert_eq!(size, 5);
666            let mut reader = Read::new(blob, size, NZUsize!(5));
667            let mut buf = vec![0u8; 5];
668            reader.read_exact(&mut buf, 5).await.unwrap();
669            assert_eq!(&buf, b"Xello");
670
671            // Test resize to larger size
672            writer.resize(10).await.unwrap();
673            assert_eq!(writer.size().await, 10);
674            writer.sync().await.unwrap();
675
676            // Verify resize
677            let (blob, size) = context.open("partition", b"resize_write").await.unwrap();
678            assert_eq!(size, 10);
679            let mut reader = Read::new(blob, size, NZUsize!(10));
680            let mut buf = vec![0u8; 10];
681            reader.read_exact(&mut buf, 10).await.unwrap();
682            assert_eq!(&buf[0..5], b"Xello");
683            assert_eq!(&buf[5..10], [0u8; 5]);
684
685            // Test resize to zero
686            let (blob_zero, size) = context.open("partition", b"resize_zero").await.unwrap();
687            let writer_zero = Write::new(blob_zero.clone(), size, NZUsize!(10));
688            writer_zero
689                .write_at(b"some data".to_vec(), 0)
690                .await
691                .unwrap();
692            assert_eq!(writer_zero.size().await, 9);
693            writer_zero.sync().await.unwrap();
694            assert_eq!(writer_zero.size().await, 9);
695            writer_zero.resize(0).await.unwrap();
696            assert_eq!(writer_zero.size().await, 0);
697            writer_zero.sync().await.unwrap();
698            assert_eq!(writer_zero.size().await, 0);
699
700            // Ensure the blob is empty
701            let (_, size_z) = context.open("partition", b"resize_zero").await.unwrap();
702            assert_eq!(size_z, 0);
703        });
704    }
705
706    #[test_traced]
707    fn test_write_read_at_on_writer() {
708        let executor = deterministic::Runner::default();
709        executor.start(|context| async move {
710            // Test reading through writer's read_at method (buffer + blob reads)
711            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
712            let writer = Write::new(blob.clone(), size, NZUsize!(10));
713
714            // Write data that stays in buffer
715            writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
716            assert_eq!(writer.size().await, 8);
717
718            // Read from buffer via writer
719            let mut read_buf_vec = vec![0u8; 4].into();
720            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
721            assert_eq!(read_buf_vec.as_ref(), b"buff");
722
723            read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
724            assert_eq!(read_buf_vec.as_ref(), b"ered");
725
726            // Reading past buffer end should fail
727            let small_buf_vec = vec![0u8; 1];
728            assert!(writer.read_at(small_buf_vec, 8).await.is_err());
729
730            // Write large data that flushes buffer
731            writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
732            assert_eq!(writer.size().await, 20);
733            writer.sync().await.unwrap();
734            assert_eq!(writer.size().await, 20);
735
736            // Read from underlying blob through writer
737            let mut read_buf_vec_2 = vec![0u8; 4].into();
738            read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
739            assert_eq!(read_buf_vec_2.as_ref(), b"buff");
740
741            let mut read_buf_7_vec = vec![0u8; 7].into();
742            read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
743            assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
744
745            // Buffer new data at the end
746            writer.write_at(b" more data".to_vec(), 20).await.unwrap();
747            assert_eq!(writer.size().await, 30);
748
749            // Read newly buffered data
750            let mut read_buf_vec_3 = vec![0u8; 5].into();
751            read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
752            assert_eq!(read_buf_vec_3.as_ref(), b" more");
753
754            // Read spanning both blob and buffer
755            let mut combo_read_buf_vec = vec![0u8; 12].into();
756            combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
757            assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
758
759            // Verify complete content by reopening
760            writer.sync().await.unwrap();
761            assert_eq!(writer.size().await, 30);
762            let (final_blob, final_size) =
763                context.open("partition", b"read_at_writer").await.unwrap();
764            assert_eq!(final_size, 30);
765            let mut final_reader = Read::new(final_blob, final_size, NZUsize!(30));
766            let mut full_content = vec![0u8; 30];
767            final_reader
768                .read_exact(&mut full_content, 30)
769                .await
770                .unwrap();
771            assert_eq!(&full_content, b"buffered and flushed more data");
772        });
773    }
774
775    #[test_traced]
776    fn test_write_straddling_non_mergeable() {
777        let executor = deterministic::Runner::default();
778        executor.start(|context| async move {
779            // Test writes that cannot be merged into buffer (non-contiguous/too large)
780            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
781            let writer = Write::new(blob.clone(), size, NZUsize!(10));
782
783            // Fill buffer completely
784            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
785            assert_eq!(writer.size().await, 10);
786
787            // Write at non-contiguous offset (should flush then write directly)
788            writer.write_at(b"abc".to_vec(), 15).await.unwrap();
789            assert_eq!(writer.size().await, 18);
790            writer.sync().await.unwrap();
791            assert_eq!(writer.size().await, 18);
792
793            // Verify data with gap
794            let (blob_check, size_check) =
795                context.open("partition", b"write_straddle").await.unwrap();
796            assert_eq!(size_check, 18);
797            let mut reader = Read::new(blob_check, size_check, NZUsize!(20));
798            let mut buf = vec![0u8; 18];
799            reader.read_exact(&mut buf, 18).await.unwrap();
800
801            let mut expected = vec![0u8; 18];
802            expected[0..10].copy_from_slice(b"0123456789");
803            expected[15..18].copy_from_slice(b"abc");
804            assert_eq!(buf, expected);
805
806            // Test write that exceeds buffer capacity
807            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
808            let writer2 = Write::new(blob2.clone(), size, NZUsize!(10));
809            writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
810            assert_eq!(writer2.size().await, 10);
811
812            // Write large data that exceeds capacity
813            writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
814            assert_eq!(writer2.size().await, 17);
815            writer2.sync().await.unwrap();
816            assert_eq!(writer2.size().await, 17);
817
818            // Verify overwrite result
819            let (blob_check2, size_check2) =
820                context.open("partition", b"write_straddle2").await.unwrap();
821            assert_eq!(size_check2, 17);
822            let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(20));
823            let mut buf2 = vec![0u8; 17];
824            reader2.read_exact(&mut buf2, 17).await.unwrap();
825            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
826        });
827    }
828
829    #[test_traced]
830    fn test_write_close() {
831        let executor = deterministic::Runner::default();
832        executor.start(|context| async move {
833            // Test that closing writer flushes and persists buffered data
834            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
835            let writer = Write::new(blob_orig.clone(), size, NZUsize!(8));
836            writer.write_at(b"pending".to_vec(), 0).await.unwrap();
837            assert_eq!(writer.size().await, 7);
838
839            // Sync writer to persist data
840            writer.sync().await.unwrap();
841
842            // Verify data persistence
843            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
844            assert_eq!(size_check, 7);
845            let mut reader = Read::new(blob_check, size_check, NZUsize!(8));
846            let mut buf = [0u8; 7];
847            reader.read_exact(&mut buf, 7).await.unwrap();
848            assert_eq!(&buf, b"pending");
849        });
850    }
851
852    #[test_traced]
853    fn test_write_direct_due_to_size() {
854        let executor = deterministic::Runner::default();
855        executor.start(|context| async move {
856            // Test direct writes when data exceeds buffer capacity
857            let (blob, size) = context
858                .open("partition", b"write_direct_size")
859                .await
860                .unwrap();
861            let writer = Write::new(blob.clone(), size, NZUsize!(5));
862
863            // Write data larger than buffer capacity (should write directly)
864            let data_large = b"0123456789";
865            writer.write_at(data_large.to_vec(), 0).await.unwrap();
866            assert_eq!(writer.size().await, 10);
867
868            // Sync to ensure data is persisted
869            writer.sync().await.unwrap();
870
871            // Verify direct write worked
872            let (blob_check, size_check) = context
873                .open("partition", b"write_direct_size")
874                .await
875                .unwrap();
876            assert_eq!(size_check, 10);
877            let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
878            let mut buf = vec![0u8; 10];
879            reader.read_exact(&mut buf, 10).await.unwrap();
880            assert_eq!(&buf, data_large.as_slice());
881
882            // Now write small data that should be buffered
883            writer.write_at(b"abc".to_vec(), 10).await.unwrap();
884            assert_eq!(writer.size().await, 13);
885
886            // Verify it's in buffer by reading through writer
887            let mut read_small_buf_vec = vec![0u8; 3].into();
888            read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
889            assert_eq!(read_small_buf_vec.as_ref(), b"abc");
890
891            writer.sync().await.unwrap();
892
893            // Verify final state
894            let (blob_check2, size_check2) = context
895                .open("partition", b"write_direct_size")
896                .await
897                .unwrap();
898            assert_eq!(size_check2, 13);
899            let mut reader2 = Read::new(blob_check2, size_check2, NZUsize!(13));
900            let mut buf2 = vec![0u8; 13];
901            reader2.read_exact(&mut buf2, 13).await.unwrap();
902            assert_eq!(&buf2[10..], b"abc".as_slice());
903        });
904    }
905
906    #[test_traced]
907    fn test_write_overwrite_and_extend_in_buffer() {
908        let executor = deterministic::Runner::default();
909        executor.start(|context| async move {
910            // Test complex buffer operations: overwrite and extend within capacity
911            let (blob, size) = context
912                .open("partition", b"overwrite_extend_buf")
913                .await
914                .unwrap();
915            let writer = Write::new(blob.clone(), size, NZUsize!(15));
916
917            // Write initial data
918            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
919            assert_eq!(writer.size().await, 10);
920
921            // Overwrite and extend within buffer capacity
922            writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
923            assert_eq!(writer.size().await, 15);
924
925            // Verify buffer content through writer
926            let mut read_buf_vec = vec![0u8; 15].into();
927            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
928            assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
929
930            writer.sync().await.unwrap();
931
932            // Verify persisted result
933            let (blob_check, size_check) = context
934                .open("partition", b"overwrite_extend_buf")
935                .await
936                .unwrap();
937            assert_eq!(size_check, 15);
938            let mut reader = Read::new(blob_check, size_check, NZUsize!(15));
939            let mut final_buf = vec![0u8; 15];
940            reader.read_exact(&mut final_buf, 15).await.unwrap();
941            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
942        });
943    }
944
945    #[test_traced]
946    fn test_write_at_size() {
947        let executor = deterministic::Runner::default();
948        executor.start(|context| async move {
949            // Test writing at the current logical end of the blob
950            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
951            let writer = Write::new(blob.clone(), size, NZUsize!(20));
952
953            // Write initial data
954            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
955            assert_eq!(writer.size().await, 10);
956            writer.sync().await.unwrap();
957
958            // Append at the current size (logical end)
959            writer
960                .write_at(b"abc".to_vec(), writer.size().await)
961                .await
962                .unwrap();
963            assert_eq!(writer.size().await, 13);
964            writer.sync().await.unwrap();
965
966            // Verify complete result
967            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
968            assert_eq!(size_check, 13);
969            let mut reader = Read::new(blob_check, size_check, NZUsize!(13));
970            let mut buf = vec![0u8; 13];
971            reader.read_exact(&mut buf, 13).await.unwrap();
972            assert_eq!(&buf, b"0123456789abc");
973        });
974    }
975
976    #[test_traced]
977    fn test_write_at_size_multiple_appends() {
978        let executor = deterministic::Runner::default();
979        executor.start(|context| async move {
980            // Test multiple appends using writer.size()
981            let (blob, size) = context
982                .open("partition", b"write_multiple_appends_at_size")
983                .await
984                .unwrap();
985            let writer = Write::new(blob.clone(), size, NZUsize!(5)); // Small buffer
986
987            // First write
988            writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
989            assert_eq!(writer.size().await, 3);
990            writer.sync().await.unwrap();
991            assert_eq!(writer.size().await, 3);
992
993            // Append using size()
994            writer
995                .write_at(b"BBB".to_vec(), writer.size().await)
996                .await
997                .unwrap();
998            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
999            writer.sync().await.unwrap();
1000            assert_eq!(writer.size().await, 6);
1001
1002            // Append again using size()
1003            writer
1004                .write_at(b"CCC".to_vec(), writer.size().await)
1005                .await
1006                .unwrap();
1007            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
1008            writer.sync().await.unwrap();
1009            assert_eq!(writer.size().await, 9);
1010
1011            // Verify final content
1012            let (blob_check, size_check) = context
1013                .open("partition", b"write_multiple_appends_at_size")
1014                .await
1015                .unwrap();
1016            assert_eq!(size_check, 9);
1017            let mut reader = Read::new(blob_check, size_check, NZUsize!(9));
1018            let mut buf = vec![0u8; 9];
1019            reader.read_exact(&mut buf, 9).await.unwrap();
1020            assert_eq!(&buf, b"AAABBBCCC");
1021        });
1022    }
1023
1024    #[test_traced]
1025    fn test_write_non_contiguous_then_append_at_size() {
1026        let executor = deterministic::Runner::default();
1027        executor.start(|context| async move {
1028            // Test writing non-contiguously, then appending at the new size
1029            let (blob, size) = context
1030                .open("partition", b"write_non_contiguous_then_append")
1031                .await
1032                .unwrap();
1033            let writer = Write::new(blob.clone(), size, NZUsize!(10));
1034
1035            // Initial buffered write
1036            writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); // 7 bytes
1037            assert_eq!(writer.size().await, 7);
1038            // Buffer contains "INITIAL", inner.position = 0
1039
1040            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1041            writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1042            assert_eq!(writer.size().await, 29);
1043            writer.sync().await.unwrap();
1044            assert_eq!(writer.size().await, 29);
1045
1046            // Append at the new size
1047            writer
1048                .write_at(b"APPEND".to_vec(), writer.size().await)
1049                .await
1050                .unwrap();
1051            assert_eq!(writer.size().await, 35); // 29 + 6
1052            writer.sync().await.unwrap();
1053            assert_eq!(writer.size().await, 35);
1054
1055            // Verify final content
1056            let (blob_check, size_check) = context
1057                .open("partition", b"write_non_contiguous_then_append")
1058                .await
1059                .unwrap();
1060            assert_eq!(size_check, 35);
1061            let mut reader = Read::new(blob_check, size_check, NZUsize!(35));
1062            let mut buf = vec![0u8; 35];
1063            reader.read_exact(&mut buf, 35).await.unwrap();
1064
1065            let mut expected = vec![0u8; 35];
1066            expected[0..7].copy_from_slice(b"INITIAL");
1067            expected[20..29].copy_from_slice(b"NONCONTIG");
1068            expected[29..35].copy_from_slice(b"APPEND");
1069            assert_eq!(buf, expected);
1070        });
1071    }
1072
1073    #[test_traced]
1074    fn test_resize_then_append_at_size() {
1075        let executor = deterministic::Runner::default();
1076        executor.start(|context| async move {
1077            // Test truncating, then appending at the new size
1078            let (blob, size) = context
1079                .open("partition", b"resize_then_append_at_size")
1080                .await
1081                .unwrap();
1082            let writer = Write::new(blob.clone(), size, NZUsize!(10));
1083
1084            // Write initial data and sync
1085            writer
1086                .write_at(b"0123456789ABCDEF".to_vec(), 0)
1087                .await
1088                .unwrap(); // 16 bytes
1089            assert_eq!(writer.size().await, 16);
1090            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1091            assert_eq!(writer.size().await, 16);
1092
1093            // Resize
1094            let resize_to = 5;
1095            writer.resize(resize_to).await.unwrap();
1096            // after resize, inner.position should be `resize_to` (5)
1097            // buffer should be empty
1098            assert_eq!(writer.size().await, resize_to);
1099            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1100            assert_eq!(writer.size().await, resize_to);
1101
1102            // Append at the new (resized) size
1103            writer
1104                .write_at(b"XXXXX".to_vec(), writer.size().await)
1105                .await
1106                .unwrap(); // 5 bytes
1107                           // inner.buffer = "XXXXX", inner.position = 5
1108            assert_eq!(writer.size().await, 10); // 5 (resized) + 5 (XXXXX)
1109            writer.sync().await.unwrap();
1110            assert_eq!(writer.size().await, 10);
1111
1112            // Verify final content
1113            let (blob_check, size_check) = context
1114                .open("partition", b"resize_then_append_at_size")
1115                .await
1116                .unwrap();
1117            assert_eq!(size_check, 10);
1118            let mut reader = Read::new(blob_check, size_check, NZUsize!(10));
1119            let mut buf = vec![0u8; 10];
1120            reader.read_exact(&mut buf, 10).await.unwrap();
1121            assert_eq!(&buf, b"01234XXXXX");
1122        });
1123    }
1124}