commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3mod read;
4mod write;
5
6pub use read::Read;
7pub use write::Write;
8
9#[cfg(test)]
10mod tests {
11    use super::*;
12    use crate::{deterministic, Blob as _, Error, Runner, Storage};
13    use commonware_macros::test_traced;
14
15    #[test_traced]
16    fn test_read_basic() {
17        let executor = deterministic::Runner::default();
18        executor.start(|context| async move {
19            // Test basic buffered reading functionality with sequential reads
20            let data = b"Hello, world! This is a test.";
21            let (blob, size) = context.open("partition", b"test").await.unwrap();
22            assert_eq!(size, 0);
23            blob.write_at(data.to_vec(), 0).await.unwrap();
24            let size = data.len() as u64;
25
26            // Create a buffered reader with small buffer to test refilling
27            let buffer_size = 10;
28            let mut reader = Read::new(blob, size, buffer_size);
29
30            // Read some data
31            let mut buf = [0u8; 5];
32            reader.read_exact(&mut buf, 5).await.unwrap();
33            assert_eq!(&buf, b"Hello");
34
35            // Read more data that requires a buffer refill
36            let mut buf = [0u8; 14];
37            reader.read_exact(&mut buf, 14).await.unwrap();
38            assert_eq!(&buf, b", world! This ");
39
40            // Verify position tracking
41            assert_eq!(reader.position(), 19);
42
43            // Read the remaining data
44            let mut buf = [0u8; 10];
45            reader.read_exact(&mut buf, 7).await.unwrap();
46            assert_eq!(&buf[..7], b"is a te");
47
48            // Attempt to read beyond the end should fail
49            let mut buf = [0u8; 5];
50            let result = reader.read_exact(&mut buf, 5).await;
51            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
52        });
53    }
54
55    #[test_traced]
56    #[should_panic(expected = "buffer size must be greater than zero")]
57    fn test_read_empty() {
58        let executor = deterministic::Runner::default();
59        executor.start(|context| async move {
60            // Test that creating a reader with zero buffer size panics
61            let data = b"Hello, world! This is a test.";
62            let (blob, size) = context.open("partition", b"test").await.unwrap();
63            assert_eq!(size, 0);
64            blob.write_at(data.to_vec(), 0).await.unwrap();
65            let size = data.len() as u64;
66
67            // This should panic
68            let buffer_size = 0;
69            Read::new(blob, size, buffer_size);
70        });
71    }
72
73    #[test_traced]
74    fn test_read_cross_boundary() {
75        let executor = deterministic::Runner::default();
76        executor.start(|context| async move {
77            // Test reading data that spans multiple buffer refills
78            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
79            let (blob, size) = context.open("partition", b"test").await.unwrap();
80            assert_eq!(size, 0);
81            blob.write_at(data.to_vec(), 0).await.unwrap();
82            let size = data.len() as u64;
83
84            // Use a buffer smaller than the total data size
85            let buffer_size = 10;
86            let mut reader = Read::new(blob, size, buffer_size);
87
88            // Read data that crosses buffer boundaries
89            let mut buf = [0u8; 15];
90            reader.read_exact(&mut buf, 15).await.unwrap();
91            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
92
93            // Verify position tracking
94            assert_eq!(reader.position(), 15);
95
96            // Read the remaining data
97            let mut buf = [0u8; 11];
98            reader.read_exact(&mut buf, 11).await.unwrap();
99            assert_eq!(&buf, b"PQRSTUVWXYZ");
100
101            // Verify we're at the end
102            assert_eq!(reader.position(), 26);
103            assert_eq!(reader.blob_remaining(), 0);
104        });
105    }
106
107    #[test_traced]
108    fn test_read_with_known_size() {
109        let executor = deterministic::Runner::default();
110        executor.start(|context| async move {
111            // Test reader behavior with known blob size limits
112            let data = b"This is a test with known size limitations.";
113            let (blob, size) = context.open("partition", b"test").await.unwrap();
114            assert_eq!(size, 0);
115            blob.write_at(data.to_vec(), 0).await.unwrap();
116            let size = data.len() as u64;
117
118            // Create a buffered reader with buffer smaller than total data
119            let buffer_size = 10;
120            let mut reader = Read::new(blob, size, buffer_size);
121
122            // Check initial remaining bytes
123            assert_eq!(reader.blob_remaining(), size);
124
125            // Read partial data
126            let mut buf = [0u8; 5];
127            reader.read_exact(&mut buf, 5).await.unwrap();
128            assert_eq!(&buf, b"This ");
129
130            // Check remaining bytes after partial read
131            assert_eq!(reader.blob_remaining(), size - 5);
132
133            // Read exactly up to the size limit
134            let mut buf = vec![0u8; (size - 5) as usize];
135            reader
136                .read_exact(&mut buf, (size - 5) as usize)
137                .await
138                .unwrap();
139            assert_eq!(&buf, b"is a test with known size limitations.");
140
141            // Verify we're at the end
142            assert_eq!(reader.blob_remaining(), 0);
143
144            // Reading beyond the end should fail
145            let mut buf = [0u8; 1];
146            let result = reader.read_exact(&mut buf, 1).await;
147            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
148        });
149    }
150
151    #[test_traced]
152    fn test_read_large_data() {
153        let executor = deterministic::Runner::default();
154        executor.start(|context| async move {
155            // Test reading large amounts of data in chunks
156            let data_size = 1024 * 256; // 256KB of data
157            let data = vec![0x42; data_size];
158            let (blob, size) = context.open("partition", b"test").await.unwrap();
159            assert_eq!(size, 0);
160            blob.write_at(data.clone(), 0).await.unwrap();
161            let size = data.len() as u64;
162
163            // Use a buffer much smaller than the total data
164            let buffer_size = 64 * 1024; // 64KB buffer
165            let mut reader = Read::new(blob, size, buffer_size);
166
167            // Read all data in smaller chunks
168            let mut total_read = 0;
169            let chunk_size = 8 * 1024; // 8KB chunks
170            let mut buf = vec![0u8; chunk_size];
171
172            while total_read < data_size {
173                let to_read = std::cmp::min(chunk_size, data_size - total_read);
174                reader
175                    .read_exact(&mut buf[..to_read], to_read)
176                    .await
177                    .unwrap();
178
179                // Verify data integrity
180                assert!(
181                    buf[..to_read].iter().all(|&b| b == 0x42),
182                    "Data at position {} is not correct",
183                    total_read
184                );
185
186                total_read += to_read;
187            }
188
189            // Verify we read everything
190            assert_eq!(total_read, data_size);
191
192            // Reading beyond the end should fail
193            let mut extra_buf = [0u8; 1];
194            let result = reader.read_exact(&mut extra_buf, 1).await;
195            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
196        });
197    }
198
199    #[test_traced]
200    fn test_read_exact_size_reads() {
201        let executor = deterministic::Runner::default();
202        executor.start(|context| async move {
203            // Create a blob with exactly 2.5 buffer sizes of data
204            let buffer_size = 1024;
205            let data_size = buffer_size * 5 / 2; // 2.5 buffers
206            let data = vec![0x37; data_size];
207
208            let (blob, size) = context.open("partition", b"test").await.unwrap();
209            assert_eq!(size, 0);
210            blob.write_at(data.clone(), 0).await.unwrap();
211            let size = data.len() as u64;
212
213            let mut reader = Read::new(blob, size, buffer_size);
214
215            // Read exactly one buffer size
216            let mut buf1 = vec![0u8; buffer_size];
217            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
218            assert!(buf1.iter().all(|&b| b == 0x37));
219
220            // Read exactly one buffer size more
221            let mut buf2 = vec![0u8; buffer_size];
222            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
223            assert!(buf2.iter().all(|&b| b == 0x37));
224
225            // Read the remaining half buffer
226            let half_buffer = buffer_size / 2;
227            let mut buf3 = vec![0u8; half_buffer];
228            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
229            assert!(buf3.iter().all(|&b| b == 0x37));
230
231            // Verify we're at the end
232            assert_eq!(reader.blob_remaining(), 0);
233            assert_eq!(reader.position(), size);
234        });
235    }
236
237    #[test_traced]
238    fn test_read_seek_to() {
239        let executor = deterministic::Runner::default();
240        executor.start(|context| async move {
241            // Create a memory blob with some test data
242            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
243            let (blob, size) = context.open("partition", b"test").await.unwrap();
244            assert_eq!(size, 0);
245            blob.write_at(data.to_vec(), 0).await.unwrap();
246            let size = data.len() as u64;
247
248            // Create a buffer reader
249            let buffer_size = 10;
250            let mut reader = Read::new(blob, size, buffer_size);
251
252            // Read some data to advance the position
253            let mut buf = [0u8; 5];
254            reader.read_exact(&mut buf, 5).await.unwrap();
255            assert_eq!(&buf, b"ABCDE");
256            assert_eq!(reader.position(), 5);
257
258            // Seek to a specific position
259            reader.seek_to(10).unwrap();
260            assert_eq!(reader.position(), 10);
261
262            // Read data from the new position
263            let mut buf = [0u8; 5];
264            reader.read_exact(&mut buf, 5).await.unwrap();
265            assert_eq!(&buf, b"KLMNO");
266
267            // Seek to beginning
268            reader.seek_to(0).unwrap();
269            assert_eq!(reader.position(), 0);
270
271            let mut buf = [0u8; 5];
272            reader.read_exact(&mut buf, 5).await.unwrap();
273            assert_eq!(&buf, b"ABCDE");
274
275            // Seek to end
276            reader.seek_to(size).unwrap();
277            assert_eq!(reader.position(), size);
278
279            // Trying to read should fail
280            let mut buf = [0u8; 1];
281            let result = reader.read_exact(&mut buf, 1).await;
282            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
283
284            // Seek beyond end should fail
285            let result = reader.seek_to(size + 10);
286            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
287        });
288    }
289
290    #[test_traced]
291    fn test_read_seek_with_refill() {
292        let executor = deterministic::Runner::default();
293        executor.start(|context| async move {
294            // Create a memory blob with longer data
295            let data = vec![0x41; 1000]; // 1000 'A' characters
296            let (blob, size) = context.open("partition", b"test").await.unwrap();
297            assert_eq!(size, 0);
298            blob.write_at(data.clone(), 0).await.unwrap();
299            let size = data.len() as u64;
300
301            // Create a buffer reader with small buffer
302            let buffer_size = 10;
303            let mut reader = Read::new(blob, size, buffer_size);
304
305            // Read some data
306            let mut buf = [0u8; 5];
307            reader.read_exact(&mut buf, 5).await.unwrap();
308
309            // Seek far ahead, past the current buffer
310            reader.seek_to(500).unwrap();
311
312            // Read data - should get data from position 500
313            let mut buf = [0u8; 5];
314            reader.read_exact(&mut buf, 5).await.unwrap();
315            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
316            assert_eq!(reader.position(), 505);
317
318            // Seek backwards
319            reader.seek_to(100).unwrap();
320
321            // Read again - should be at position 100
322            let mut buf = [0u8; 5];
323            reader.read_exact(&mut buf, 5).await.unwrap();
324            assert_eq!(reader.position(), 105);
325        });
326    }
327
328    #[test_traced]
329    fn test_read_truncate() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            // Create a memory blob with some test data
333            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
334            let (blob, size) = context.open("partition", b"test").await.unwrap();
335            assert_eq!(size, 0);
336            blob.write_at(data.to_vec(), 0).await.unwrap();
337            let data_len = data.len() as u64;
338
339            // Create a buffer reader
340            let buffer_size = 10;
341            let reader = Read::new(blob.clone(), data_len, buffer_size);
342
343            // Truncate the blob to half its size
344            let truncate_len = data_len / 2;
345            reader.truncate(truncate_len).await.unwrap();
346
347            // Reopen to check truncation
348            let (blob, size) = context.open("partition", b"test").await.unwrap();
349            assert_eq!(size, truncate_len, "Blob should be truncated to half size");
350
351            // Create a new buffer and read to verify truncation
352            let mut new_reader = Read::new(blob, size, buffer_size);
353
354            // Read the content
355            let mut buf = vec![0u8; size as usize];
356            new_reader
357                .read_exact(&mut buf, size as usize)
358                .await
359                .unwrap();
360            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Truncated content should match");
361
362            // Reading beyond truncated size should fail
363            let mut extra_buf = [0u8; 1];
364            let result = new_reader.read_exact(&mut extra_buf, 1).await;
365            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
366        });
367    }
368
369    #[test_traced]
370    fn test_read_truncate_to_zero() {
371        let executor = deterministic::Runner::default();
372        executor.start(|context| async move {
373            // Create a memory blob with some test data
374            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
375            let data_len = data.len() as u64;
376            let (blob, size) = context.open("partition", b"test").await.unwrap();
377            assert_eq!(size, 0);
378            blob.write_at(data.to_vec(), 0).await.unwrap();
379
380            // Create a buffer reader
381            let buffer_size = 10;
382            let reader = Read::new(blob.clone(), data_len, buffer_size);
383
384            // Truncate the blob to zero
385            reader.truncate(0).await.unwrap();
386
387            // Reopen to check truncation
388            let (blob, size) = context.open("partition", b"test").await.unwrap();
389            assert_eq!(size, 0, "Blob should be truncated to zero");
390
391            // Create a new buffer and try to read (should fail)
392            let mut new_reader = Read::new(blob, size, buffer_size);
393
394            // Reading from truncated blob should fail
395            let mut buf = [0u8; 1];
396            let result = new_reader.read_exact(&mut buf, 1).await;
397            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
398        });
399    }
400
401    #[test_traced]
402    fn test_write_basic() {
403        let executor = deterministic::Runner::default();
404        executor.start(|context| async move {
405            // Test basic buffered write and sync functionality
406            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
407            assert_eq!(size, 0);
408
409            let writer = Write::new(blob.clone(), size, 8);
410            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
411            assert_eq!(writer.size().await, 5);
412            writer.sync().await.unwrap();
413            assert_eq!(writer.size().await, 5);
414
415            // Verify data was written correctly
416            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
417            assert_eq!(size, 5);
418            let mut reader = Read::new(blob, size, 8);
419            let mut buf = [0u8; 5];
420            reader.read_exact(&mut buf, 5).await.unwrap();
421            assert_eq!(&buf, b"hello");
422        });
423    }
424
425    #[test_traced]
426    fn test_write_multiple_flushes() {
427        let executor = deterministic::Runner::default();
428        executor.start(|context| async move {
429            // Test writes that cause buffer flushes due to capacity limits
430            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
431            assert_eq!(size, 0);
432
433            let writer = Write::new(blob.clone(), size, 4);
434            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
435            assert_eq!(writer.size().await, 3);
436            writer.write_at(b"defg".to_vec(), 3).await.unwrap();
437            assert_eq!(writer.size().await, 7);
438            writer.sync().await.unwrap();
439
440            // Verify the final result
441            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
442            assert_eq!(size, 7);
443            let mut reader = Read::new(blob, size, 4);
444            let mut buf = [0u8; 7];
445            reader.read_exact(&mut buf, 7).await.unwrap();
446            assert_eq!(&buf, b"abcdefg");
447        });
448    }
449
450    #[test_traced]
451    fn test_write_large_data() {
452        let executor = deterministic::Runner::default();
453        executor.start(|context| async move {
454            // Test writing data larger than buffer capacity (direct write)
455            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
456            assert_eq!(size, 0);
457
458            let writer = Write::new(blob.clone(), size, 4);
459            writer.write_at(b"abc".to_vec(), 0).await.unwrap();
460            assert_eq!(writer.size().await, 3);
461            writer
462                .write_at(b"defghijklmnopqrstuvwxyz".to_vec(), 3)
463                .await
464                .unwrap();
465            assert_eq!(writer.size().await, 26);
466            writer.sync().await.unwrap();
467            assert_eq!(writer.size().await, 26);
468
469            // Verify the complete data
470            let (blob, size) = context.open("partition", b"write_large").await.unwrap();
471            assert_eq!(size, 26);
472            let mut reader = Read::new(blob, size, 4);
473            let mut buf = [0u8; 26];
474            reader.read_exact(&mut buf, 26).await.unwrap();
475            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
476        });
477    }
478
479    #[test_traced]
480    #[should_panic(expected = "buffer capacity must be greater than zero")]
481    fn test_write_empty() {
482        let executor = deterministic::Runner::default();
483        executor.start(|context| async move {
484            // Test that creating a writer with zero buffer capacity panics
485            let (blob, size) = context.open("partition", b"write_empty").await.unwrap();
486            assert_eq!(size, 0);
487            Write::new(blob, size, 0);
488        });
489    }
490
491    #[test_traced]
492    fn test_write_append_to_buffer() {
493        let executor = deterministic::Runner::default();
494        executor.start(|context| async move {
495            // Test sequential appends that exceed buffer capacity
496            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
497            let writer = Write::new(blob.clone(), size, 10);
498
499            // Write data that fits in buffer
500            writer.write_at(b"hello".to_vec(), 0).await.unwrap();
501            assert_eq!(writer.size().await, 5);
502
503            // Append data that causes buffer flush
504            writer.write_at(b" world".to_vec(), 5).await.unwrap();
505            writer.sync().await.unwrap();
506            assert_eq!(writer.size().await, 11);
507
508            // Verify the complete result
509            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
510            assert_eq!(size, 11);
511            let mut reader = Read::new(blob, size, 10);
512            let mut buf = vec![0u8; 11];
513            reader.read_exact(&mut buf, 11).await.unwrap();
514            assert_eq!(&buf, b"hello world");
515        });
516    }
517
518    #[test_traced]
519    fn test_write_into_middle_of_buffer() {
520        let executor = deterministic::Runner::default();
521        executor.start(|context| async move {
522            // Test overwriting data within the buffer and extending it
523            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
524            let writer = Write::new(blob.clone(), size, 20);
525
526            // Initial write
527            writer.write_at(b"abcdefghij".to_vec(), 0).await.unwrap();
528            assert_eq!(writer.size().await, 10);
529
530            // Overwrite middle section
531            writer.write_at(b"01234".to_vec(), 2).await.unwrap();
532            assert_eq!(writer.size().await, 10);
533            writer.sync().await.unwrap();
534
535            // Verify overwrite result
536            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
537            assert_eq!(size, 10);
538            let mut reader = Read::new(blob, size, 10);
539            let mut buf = vec![0u8; 10];
540            reader.read_exact(&mut buf, 10).await.unwrap();
541            assert_eq!(&buf, b"ab01234hij");
542
543            // Extend buffer and do partial overwrite
544            writer.write_at(b"klmnopqrst".to_vec(), 10).await.unwrap();
545            assert_eq!(writer.size().await, 20);
546            writer.write_at(b"wxyz".to_vec(), 9).await.unwrap();
547            assert_eq!(writer.size().await, 20);
548            writer.sync().await.unwrap();
549
550            // Verify final result
551            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
552            assert_eq!(size, 20);
553            let mut reader = Read::new(blob, size, 20);
554            let mut buf = vec![0u8; 20];
555            reader.read_exact(&mut buf, 20).await.unwrap();
556            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
557        });
558    }
559
560    #[test_traced]
561    fn test_write_before_buffer() {
562        let executor = deterministic::Runner::default();
563        executor.start(|context| async move {
564            // Test writing at offsets before the current buffer position
565            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
566            let writer = Write::new(blob.clone(), size, 10);
567
568            // Write data at a later offset first
569            writer.write_at(b"0123456789".to_vec(), 10).await.unwrap();
570            assert_eq!(writer.size().await, 20);
571
572            // Write at an earlier offset (should flush buffer first)
573            writer.write_at(b"abcde".to_vec(), 0).await.unwrap();
574            assert_eq!(writer.size().await, 20);
575            writer.sync().await.unwrap();
576
577            // Verify data placement with gap
578            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
579            assert_eq!(size, 20);
580            let mut reader = Read::new(blob, size, 20);
581            let mut buf = vec![0u8; 20];
582            reader.read_exact(&mut buf, 20).await.unwrap();
583            let mut expected = vec![0u8; 20];
584            expected[0..5].copy_from_slice("abcde".as_bytes());
585            expected[10..20].copy_from_slice("0123456789".as_bytes());
586            assert_eq!(buf, expected);
587
588            // Fill the gap between existing data
589            writer.write_at(b"fghij".to_vec(), 5).await.unwrap();
590            assert_eq!(writer.size().await, 20);
591            writer.sync().await.unwrap();
592            assert_eq!(writer.size().await, 20);
593
594            // Verify gap is filled
595            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
596            assert_eq!(size, 20);
597            let mut reader = Read::new(blob, size, 20);
598            let mut buf = vec![0u8; 20];
599            reader.read_exact(&mut buf, 20).await.unwrap();
600            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
601            assert_eq!(buf, expected);
602        });
603    }
604
605    #[test_traced]
606    fn test_write_truncate() {
607        let executor = deterministic::Runner::default();
608        executor.start(|context| async move {
609            // Test blob truncation functionality and subsequent writes
610            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
611            let writer = Write::new(blob, size, 10);
612
613            // Write initial data
614            writer.write_at(b"hello world".to_vec(), 0).await.unwrap();
615            assert_eq!(writer.size().await, 11);
616            writer.sync().await.unwrap();
617            assert_eq!(writer.size().await, 11);
618
619            let (blob_check, size_check) =
620                context.open("partition", b"truncate_write").await.unwrap();
621            assert_eq!(size_check, 11);
622            drop(blob_check);
623
624            // Truncate to smaller size
625            writer.truncate(5).await.unwrap();
626            assert_eq!(writer.size().await, 5);
627            writer.sync().await.unwrap();
628
629            // Verify truncation
630            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
631            assert_eq!(size, 5);
632            let mut reader = Read::new(blob, size, 5);
633            let mut buf = vec![0u8; 5];
634            reader.read_exact(&mut buf, 5).await.unwrap();
635            assert_eq!(&buf, b"hello");
636
637            // Write to truncated blob
638            writer.write_at(b"X".to_vec(), 0).await.unwrap();
639            assert_eq!(writer.size().await, 5);
640            writer.sync().await.unwrap();
641
642            // Verify overwrite
643            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
644            assert_eq!(size, 5);
645            let mut reader = Read::new(blob, size, 5);
646            let mut buf = vec![0u8; 5];
647            reader.read_exact(&mut buf, 5).await.unwrap();
648            assert_eq!(&buf, b"Xello");
649
650            // Test truncate to zero
651            let (blob_zero, size) = context.open("partition", b"truncate_zero").await.unwrap();
652            let writer_zero = Write::new(blob_zero.clone(), size, 10);
653            writer_zero
654                .write_at(b"some data".to_vec(), 0)
655                .await
656                .unwrap();
657            assert_eq!(writer_zero.size().await, 9);
658            writer_zero.sync().await.unwrap();
659            assert_eq!(writer_zero.size().await, 9);
660            writer_zero.truncate(0).await.unwrap();
661            assert_eq!(writer_zero.size().await, 0);
662            writer_zero.sync().await.unwrap();
663            assert_eq!(writer_zero.size().await, 0);
664
665            // Ensure the blob is empty
666            let (_, size_z) = context.open("partition", b"truncate_zero").await.unwrap();
667            assert_eq!(size_z, 0);
668        });
669    }
670
671    #[test_traced]
672    fn test_write_read_at_on_writer() {
673        let executor = deterministic::Runner::default();
674        executor.start(|context| async move {
675            // Test reading through writer's read_at method (buffer + blob reads)
676            let (blob, size) = context.open("partition", b"read_at_writer").await.unwrap();
677            let writer = Write::new(blob.clone(), size, 10);
678
679            // Write data that stays in buffer
680            writer.write_at(b"buffered".to_vec(), 0).await.unwrap();
681            assert_eq!(writer.size().await, 8);
682
683            // Read from buffer via writer
684            let mut read_buf_vec = vec![0u8; 4].into();
685            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
686            assert_eq!(read_buf_vec.as_ref(), b"buff");
687
688            read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
689            assert_eq!(read_buf_vec.as_ref(), b"ered");
690
691            // Reading past buffer end should fail
692            let small_buf_vec = vec![0u8; 1];
693            assert!(writer.read_at(small_buf_vec, 8).await.is_err());
694
695            // Write large data that flushes buffer
696            writer.write_at(b" and flushed".to_vec(), 8).await.unwrap();
697            assert_eq!(writer.size().await, 20);
698            writer.sync().await.unwrap();
699            assert_eq!(writer.size().await, 20);
700
701            // Read from underlying blob through writer
702            let mut read_buf_vec_2 = vec![0u8; 4].into();
703            read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
704            assert_eq!(read_buf_vec_2.as_ref(), b"buff");
705
706            let mut read_buf_7_vec = vec![0u8; 7].into();
707            read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
708            assert_eq!(read_buf_7_vec.as_ref(), b"flushed");
709
710            // Buffer new data at the end
711            writer.write_at(b" more data".to_vec(), 20).await.unwrap();
712            assert_eq!(writer.size().await, 30);
713
714            // Read newly buffered data
715            let mut read_buf_vec_3 = vec![0u8; 5].into();
716            read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
717            assert_eq!(read_buf_vec_3.as_ref(), b" more");
718
719            // Read spanning both blob and buffer
720            let mut combo_read_buf_vec = vec![0u8; 12].into();
721            combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
722            assert_eq!(combo_read_buf_vec.as_ref(), b"shed more da");
723
724            // Verify complete content by reopening
725            writer.sync().await.unwrap();
726            assert_eq!(writer.size().await, 30);
727            let (final_blob, final_size) =
728                context.open("partition", b"read_at_writer").await.unwrap();
729            assert_eq!(final_size, 30);
730            let mut final_reader = Read::new(final_blob, final_size, 30);
731            let mut full_content = vec![0u8; 30];
732            final_reader
733                .read_exact(&mut full_content, 30)
734                .await
735                .unwrap();
736            assert_eq!(&full_content, b"buffered and flushed more data");
737        });
738    }
739
740    #[test_traced]
741    fn test_write_straddling_non_mergeable() {
742        let executor = deterministic::Runner::default();
743        executor.start(|context| async move {
744            // Test writes that cannot be merged into buffer (non-contiguous/too large)
745            let (blob, size) = context.open("partition", b"write_straddle").await.unwrap();
746            let writer = Write::new(blob.clone(), size, 10);
747
748            // Fill buffer completely
749            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
750            assert_eq!(writer.size().await, 10);
751
752            // Write at non-contiguous offset (should flush then write directly)
753            writer.write_at(b"abc".to_vec(), 15).await.unwrap();
754            assert_eq!(writer.size().await, 18);
755            writer.sync().await.unwrap();
756            assert_eq!(writer.size().await, 18);
757
758            // Verify data with gap
759            let (blob_check, size_check) =
760                context.open("partition", b"write_straddle").await.unwrap();
761            assert_eq!(size_check, 18);
762            let mut reader = Read::new(blob_check, size_check, 20);
763            let mut buf = vec![0u8; 18];
764            reader.read_exact(&mut buf, 18).await.unwrap();
765
766            let mut expected = vec![0u8; 18];
767            expected[0..10].copy_from_slice(b"0123456789");
768            expected[15..18].copy_from_slice(b"abc");
769            assert_eq!(buf, expected);
770
771            // Test write that exceeds buffer capacity
772            let (blob2, size) = context.open("partition", b"write_straddle2").await.unwrap();
773            let writer2 = Write::new(blob2.clone(), size, 10);
774            writer2.write_at(b"0123456789".to_vec(), 0).await.unwrap();
775            assert_eq!(writer2.size().await, 10);
776
777            // Write large data that exceeds capacity
778            writer2.write_at(b"ABCDEFGHIJKL".to_vec(), 5).await.unwrap();
779            assert_eq!(writer2.size().await, 17);
780            writer2.sync().await.unwrap();
781            assert_eq!(writer2.size().await, 17);
782
783            // Verify overwrite result
784            let (blob_check2, size_check2) =
785                context.open("partition", b"write_straddle2").await.unwrap();
786            assert_eq!(size_check2, 17);
787            let mut reader2 = Read::new(blob_check2, size_check2, 20);
788            let mut buf2 = vec![0u8; 17];
789            reader2.read_exact(&mut buf2, 17).await.unwrap();
790            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
791        });
792    }
793
794    #[test_traced]
795    fn test_write_close() {
796        let executor = deterministic::Runner::default();
797        executor.start(|context| async move {
798            // Test that closing writer flushes and persists buffered data
799            let (blob_orig, size) = context.open("partition", b"write_close").await.unwrap();
800            let writer = Write::new(blob_orig.clone(), size, 8);
801            writer.write_at(b"pending".to_vec(), 0).await.unwrap();
802            assert_eq!(writer.size().await, 7);
803
804            // Close should flush and sync data
805            writer.close().await.unwrap();
806
807            // Verify data persistence
808            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
809            assert_eq!(size_check, 7);
810            let mut reader = Read::new(blob_check, size_check, 8);
811            let mut buf = [0u8; 7];
812            reader.read_exact(&mut buf, 7).await.unwrap();
813            assert_eq!(&buf, b"pending");
814        });
815    }
816
817    #[test_traced]
818    fn test_write_direct_due_to_size() {
819        let executor = deterministic::Runner::default();
820        executor.start(|context| async move {
821            // Test direct writes when data exceeds buffer capacity
822            let (blob, size) = context
823                .open("partition", b"write_direct_size")
824                .await
825                .unwrap();
826            let writer = Write::new(blob.clone(), size, 5);
827
828            // Write data larger than buffer capacity (should write directly)
829            let data_large = b"0123456789";
830            writer.write_at(data_large.to_vec(), 0).await.unwrap();
831            assert_eq!(writer.size().await, 10);
832
833            // Sync to ensure data is persisted
834            writer.sync().await.unwrap();
835
836            // Verify direct write worked
837            let (blob_check, size_check) = context
838                .open("partition", b"write_direct_size")
839                .await
840                .unwrap();
841            assert_eq!(size_check, 10);
842            let mut reader = Read::new(blob_check, size_check, 10);
843            let mut buf = vec![0u8; 10];
844            reader.read_exact(&mut buf, 10).await.unwrap();
845            assert_eq!(&buf, data_large.as_slice());
846
847            // Now write small data that should be buffered
848            writer.write_at(b"abc".to_vec(), 10).await.unwrap();
849            assert_eq!(writer.size().await, 13);
850
851            // Verify it's in buffer by reading through writer
852            let mut read_small_buf_vec = vec![0u8; 3].into();
853            read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
854            assert_eq!(read_small_buf_vec.as_ref(), b"abc");
855
856            writer.sync().await.unwrap();
857
858            // Verify final state
859            let (blob_check2, size_check2) = context
860                .open("partition", b"write_direct_size")
861                .await
862                .unwrap();
863            assert_eq!(size_check2, 13);
864            let mut reader2 = Read::new(blob_check2, size_check2, 13);
865            let mut buf2 = vec![0u8; 13];
866            reader2.read_exact(&mut buf2, 13).await.unwrap();
867            assert_eq!(&buf2[10..], b"abc".as_slice());
868        });
869    }
870
871    #[test_traced]
872    fn test_write_overwrite_and_extend_in_buffer() {
873        let executor = deterministic::Runner::default();
874        executor.start(|context| async move {
875            // Test complex buffer operations: overwrite and extend within capacity
876            let (blob, size) = context
877                .open("partition", b"overwrite_extend_buf")
878                .await
879                .unwrap();
880            let writer = Write::new(blob.clone(), size, 15);
881
882            // Write initial data
883            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
884            assert_eq!(writer.size().await, 10);
885
886            // Overwrite and extend within buffer capacity
887            writer.write_at(b"ABCDEFGHIJ".to_vec(), 5).await.unwrap();
888            assert_eq!(writer.size().await, 15);
889
890            // Verify buffer content through writer
891            let mut read_buf_vec = vec![0u8; 15].into();
892            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
893            assert_eq!(read_buf_vec.as_ref(), b"01234ABCDEFGHIJ");
894
895            writer.sync().await.unwrap();
896
897            // Verify persisted result
898            let (blob_check, size_check) = context
899                .open("partition", b"overwrite_extend_buf")
900                .await
901                .unwrap();
902            assert_eq!(size_check, 15);
903            let mut reader = Read::new(blob_check, size_check, 15);
904            let mut final_buf = vec![0u8; 15];
905            reader.read_exact(&mut final_buf, 15).await.unwrap();
906            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
907        });
908    }
909
910    #[test_traced]
911    fn test_write_at_size() {
912        let executor = deterministic::Runner::default();
913        executor.start(|context| async move {
914            // Test writing at the current logical end of the blob
915            let (blob, size) = context.open("partition", b"write_end").await.unwrap();
916            let writer = Write::new(blob.clone(), size, 20);
917
918            // Write initial data
919            writer.write_at(b"0123456789".to_vec(), 0).await.unwrap();
920            assert_eq!(writer.size().await, 10);
921            writer.sync().await.unwrap();
922
923            // Append at the current size (logical end)
924            writer
925                .write_at(b"abc".to_vec(), writer.size().await)
926                .await
927                .unwrap();
928            assert_eq!(writer.size().await, 13);
929            writer.sync().await.unwrap();
930
931            // Verify complete result
932            let (blob_check, size_check) = context.open("partition", b"write_end").await.unwrap();
933            assert_eq!(size_check, 13);
934            let mut reader = Read::new(blob_check, size_check, 13);
935            let mut buf = vec![0u8; 13];
936            reader.read_exact(&mut buf, 13).await.unwrap();
937            assert_eq!(&buf, b"0123456789abc");
938        });
939    }
940
941    #[test_traced]
942    fn test_write_at_size_multiple_appends() {
943        let executor = deterministic::Runner::default();
944        executor.start(|context| async move {
945            // Test multiple appends using writer.size()
946            let (blob, size) = context
947                .open("partition", b"write_multiple_appends_at_size")
948                .await
949                .unwrap();
950            let writer = Write::new(blob.clone(), size, 5); // Small buffer
951
952            // First write
953            writer.write_at(b"AAA".to_vec(), 0).await.unwrap();
954            assert_eq!(writer.size().await, 3);
955            writer.sync().await.unwrap();
956            assert_eq!(writer.size().await, 3);
957
958            // Append using size()
959            writer
960                .write_at(b"BBB".to_vec(), writer.size().await)
961                .await
962                .unwrap();
963            assert_eq!(writer.size().await, 6); // 3 (AAA) + 3 (BBB)
964            writer.sync().await.unwrap();
965            assert_eq!(writer.size().await, 6);
966
967            // Append again using size()
968            writer
969                .write_at(b"CCC".to_vec(), writer.size().await)
970                .await
971                .unwrap();
972            assert_eq!(writer.size().await, 9); // 6 + 3 (CCC)
973            writer.sync().await.unwrap();
974            assert_eq!(writer.size().await, 9);
975
976            // Verify final content
977            let (blob_check, size_check) = context
978                .open("partition", b"write_multiple_appends_at_size")
979                .await
980                .unwrap();
981            assert_eq!(size_check, 9);
982            let mut reader = Read::new(blob_check, size_check, 9);
983            let mut buf = vec![0u8; 9];
984            reader.read_exact(&mut buf, 9).await.unwrap();
985            assert_eq!(&buf, b"AAABBBCCC");
986        });
987    }
988
989    #[test_traced]
990    fn test_write_non_contiguous_then_append_at_size() {
991        let executor = deterministic::Runner::default();
992        executor.start(|context| async move {
993            // Test writing non-contiguously, then appending at the new size
994            let (blob, size) = context
995                .open("partition", b"write_non_contiguous_then_append")
996                .await
997                .unwrap();
998            let writer = Write::new(blob.clone(), size, 10);
999
1000            // Initial buffered write
1001            writer.write_at(b"INITIAL".to_vec(), 0).await.unwrap(); // 7 bytes
1002            assert_eq!(writer.size().await, 7);
1003            // Buffer contains "INITIAL", inner.position = 0
1004
1005            // Non-contiguous write, forces flush of "INITIAL" and direct write of "NONCONTIG"
1006            writer.write_at(b"NONCONTIG".to_vec(), 20).await.unwrap();
1007            assert_eq!(writer.size().await, 29);
1008            writer.sync().await.unwrap();
1009            assert_eq!(writer.size().await, 29);
1010
1011            // Append at the new size
1012            writer
1013                .write_at(b"APPEND".to_vec(), writer.size().await)
1014                .await
1015                .unwrap();
1016            assert_eq!(writer.size().await, 35); // 29 + 6
1017            writer.sync().await.unwrap();
1018            assert_eq!(writer.size().await, 35);
1019
1020            // Verify final content
1021            let (blob_check, size_check) = context
1022                .open("partition", b"write_non_contiguous_then_append")
1023                .await
1024                .unwrap();
1025            assert_eq!(size_check, 35);
1026            let mut reader = Read::new(blob_check, size_check, 35);
1027            let mut buf = vec![0u8; 35];
1028            reader.read_exact(&mut buf, 35).await.unwrap();
1029
1030            let mut expected = vec![0u8; 35];
1031            expected[0..7].copy_from_slice(b"INITIAL");
1032            expected[20..29].copy_from_slice(b"NONCONTIG");
1033            expected[29..35].copy_from_slice(b"APPEND");
1034            assert_eq!(buf, expected);
1035        });
1036    }
1037
1038    #[test_traced]
1039    fn test_truncate_then_append_at_size() {
1040        let executor = deterministic::Runner::default();
1041        executor.start(|context| async move {
1042            // Test truncating, then appending at the new size
1043            let (blob, size) = context
1044                .open("partition", b"truncate_then_append_at_size")
1045                .await
1046                .unwrap();
1047            let writer = Write::new(blob.clone(), size, 10);
1048
1049            // Write initial data and sync
1050            writer
1051                .write_at(b"0123456789ABCDEF".to_vec(), 0)
1052                .await
1053                .unwrap(); // 16 bytes
1054            assert_eq!(writer.size().await, 16);
1055            writer.sync().await.unwrap(); // inner.position = 16, buffer empty
1056            assert_eq!(writer.size().await, 16);
1057
1058            // Truncate
1059            let truncate_to = 5;
1060            writer.truncate(truncate_to).await.unwrap();
1061            // after truncate, inner.position should be `truncate_to` (5)
1062            // buffer should be empty
1063            assert_eq!(writer.size().await, truncate_to);
1064            writer.sync().await.unwrap(); // Ensure truncation is persisted for verify step
1065            assert_eq!(writer.size().await, truncate_to);
1066
1067            // Append at the new (truncated) size
1068            writer
1069                .write_at(b"XXXXX".to_vec(), writer.size().await)
1070                .await
1071                .unwrap(); // 5 bytes
1072                           // inner.buffer = "XXXXX", inner.position = 5
1073            assert_eq!(writer.size().await, 10); // 5 (truncated) + 5 (XXXXX)
1074            writer.sync().await.unwrap();
1075            assert_eq!(writer.size().await, 10);
1076
1077            // Verify final content
1078            let (blob_check, size_check) = context
1079                .open("partition", b"truncate_then_append_at_size")
1080                .await
1081                .unwrap();
1082            assert_eq!(size_check, 10);
1083            let mut reader = Read::new(blob_check, size_check, 10);
1084            let mut buf = vec![0u8; 10];
1085            reader.read_exact(&mut buf, 10).await.unwrap();
1086            assert_eq!(&buf, b"01234XXXXX");
1087        });
1088    }
1089}