commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3mod read;
4mod write;
5
6pub use read::Read;
7pub use write::Write;
8
9#[cfg(test)]
10mod tests {
11    use super::*;
12    use crate::{deterministic, Blob as _, Error, Runner, Storage};
13    use commonware_macros::test_traced;
14
15    #[test_traced]
16    fn test_read_basic() {
17        let executor = deterministic::Runner::default();
18        executor.start(|context| async move {
19            // Create a memory blob with some test data
20            let data = b"Hello, world! This is a test.";
21            let (blob, size) = context.open("partition", b"test").await.unwrap();
22            assert_eq!(size, 0);
23            blob.write_at(data.to_vec(), 0).await.unwrap();
24            let size = data.len() as u64;
25
26            // Create a buffer reader with a small buffer size
27            let lookahead = 10;
28            let mut reader = Read::new(blob, size, lookahead);
29
30            // Read some data
31            let mut buf = [0u8; 5];
32            reader.read_exact(&mut buf, 5).await.unwrap();
33            assert_eq!(&buf, b"Hello");
34
35            // Read more data that requires a refill
36            let mut buf = [0u8; 14];
37            reader.read_exact(&mut buf, 14).await.unwrap();
38            assert_eq!(&buf, b", world! This ");
39
40            // Verify position
41            assert_eq!(reader.position(), 19);
42
43            // Read the rest
44            let mut buf = [0u8; 10];
45            reader.read_exact(&mut buf, 7).await.unwrap();
46            assert_eq!(&buf[..7], b"is a te");
47
48            // Try to read beyond the end
49            let mut buf = [0u8; 5];
50            let result = reader.read_exact(&mut buf, 5).await;
51            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
52        });
53    }
54
55    #[test_traced]
56    #[should_panic(expected = "buffer size must be greater than zero")]
57    fn test_read_empty() {
58        let executor = deterministic::Runner::default();
59        executor.start(|context| async move {
60            // Create a memory blob with some test data
61            let data = b"Hello, world! This is a test.";
62            let (blob, size) = context.open("partition", b"test").await.unwrap();
63            assert_eq!(size, 0);
64            blob.write_at(data.to_vec(), 0).await.unwrap();
65            let size = data.len() as u64;
66
67            // Create a buffer reader with a small buffer size
68            let lookahead = 0;
69            Read::new(blob, size, lookahead);
70        });
71    }
72
73    #[test_traced]
74    fn test_read_cross_boundary() {
75        let executor = deterministic::Runner::default();
76        executor.start(|context| async move {
77            // Create a memory blob with some test data
78            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
79            let (blob, size) = context.open("partition", b"test").await.unwrap();
80            assert_eq!(size, 0);
81            blob.write_at(data.to_vec(), 0).await.unwrap();
82            let size = data.len() as u64;
83
84            // Create a buffer reader with buffer size 10
85            let buffer_size = 10;
86            let mut reader = Read::new(blob, size, buffer_size);
87
88            // Read data that crosses a buffer boundary
89            let mut buf = [0u8; 15];
90            reader.read_exact(&mut buf, 15).await.unwrap();
91            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
92
93            // Position should be 15
94            assert_eq!(reader.position(), 15);
95
96            // Read the rest
97            let mut buf = [0u8; 11];
98            reader.read_exact(&mut buf, 11).await.unwrap();
99            assert_eq!(&buf, b"PQRSTUVWXYZ");
100
101            // Position should be 26
102            assert_eq!(reader.position(), 26);
103            assert_eq!(reader.blob_remaining(), 0);
104        });
105    }
106
107    #[test_traced]
108    fn test_read_with_known_size() {
109        let executor = deterministic::Runner::default();
110        executor.start(|context| async move {
111            // Create a memory blob with some test data
112            let data = b"This is a test with known size limitations.";
113            let (blob, size) = context.open("partition", b"test").await.unwrap();
114            assert_eq!(size, 0);
115            blob.write_at(data.to_vec(), 0).await.unwrap();
116            let size = data.len() as u64;
117
118            // Create a buffer reader with a buffer smaller than the data
119            let buffer_size = 10;
120            let mut reader = Read::new(blob, size, buffer_size);
121
122            // Check remaining bytes in the blob
123            assert_eq!(reader.blob_remaining(), size);
124
125            // Read half the buffer size
126            let mut buf = [0u8; 5];
127            reader.read_exact(&mut buf, 5).await.unwrap();
128            assert_eq!(&buf, b"This ");
129
130            // Check remaining after read
131            assert_eq!(reader.blob_remaining(), size - 5);
132
133            // Try to read exactly up to the size limit
134            let mut buf = vec![0u8; (size - 5) as usize];
135            reader
136                .read_exact(&mut buf, (size - 5) as usize)
137                .await
138                .unwrap();
139            assert_eq!(&buf, b"is a test with known size limitations.");
140
141            // Now we should be at the end
142            assert_eq!(reader.blob_remaining(), 0);
143
144            // Trying to read more should fail
145            let mut buf = [0u8; 1];
146            let result = reader.read_exact(&mut buf, 1).await;
147            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
148        });
149    }
150
151    #[test_traced]
152    fn test_read_large_data() {
153        let executor = deterministic::Runner::default();
154        executor.start(|context| async move {
155            // Create a larger blob for testing with larger data
156            let data_size = 1024 * 256; // 256KB of data
157            let data = vec![0x42; data_size];
158            let (blob, size) = context.open("partition", b"test").await.unwrap();
159            assert_eq!(size, 0);
160            blob.write_at(data.clone(), 0).await.unwrap();
161            let size = data.len() as u64;
162
163            // Create a buffer with size smaller than the data
164            let buffer_size = 64 * 1024; // 64KB
165            let mut reader = Read::new(blob, size, buffer_size);
166
167            // Read all the data in chunks
168            let mut total_read = 0;
169            let chunk_size = 8 * 1024; // 8KB chunks
170            let mut buf = vec![0u8; chunk_size];
171
172            while total_read < data_size {
173                let to_read = std::cmp::min(chunk_size, data_size - total_read);
174                reader
175                    .read_exact(&mut buf[..to_read], to_read)
176                    .await
177                    .unwrap();
178
179                // Verify the data is correct (all bytes should be 0x42)
180                assert!(
181                    buf[..to_read].iter().all(|&b| b == 0x42),
182                    "Data at position {} is not correct",
183                    total_read
184                );
185
186                total_read += to_read;
187            }
188
189            // Verify we read everything
190            assert_eq!(total_read, data_size);
191
192            // Trying to read more should fail
193            let mut extra_buf = [0u8; 1];
194            let result = reader.read_exact(&mut extra_buf, 1).await;
195            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
196        });
197    }
198
199    #[test_traced]
200    fn test_read_exact_size_reads() {
201        let executor = deterministic::Runner::default();
202        executor.start(|context| async move {
203            // Create a blob with exactly 2.5 buffer sizes of data
204            let buffer_size = 1024;
205            let data_size = buffer_size * 5 / 2; // 2.5 buffers
206            let data = vec![0x37; data_size];
207
208            let (blob, size) = context.open("partition", b"test").await.unwrap();
209            assert_eq!(size, 0);
210            blob.write_at(data.clone(), 0).await.unwrap();
211            let size = data.len() as u64;
212
213            let mut reader = Read::new(blob, size, buffer_size);
214
215            // Read exactly one buffer size
216            let mut buf1 = vec![0u8; buffer_size];
217            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
218            assert!(buf1.iter().all(|&b| b == 0x37));
219
220            // Read exactly one buffer size more
221            let mut buf2 = vec![0u8; buffer_size];
222            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
223            assert!(buf2.iter().all(|&b| b == 0x37));
224
225            // Read the remaining half buffer
226            let half_buffer = buffer_size / 2;
227            let mut buf3 = vec![0u8; half_buffer];
228            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
229            assert!(buf3.iter().all(|&b| b == 0x37));
230
231            // Verify we're at the end
232            assert_eq!(reader.blob_remaining(), 0);
233            assert_eq!(reader.position(), size);
234        });
235    }
236
237    #[test_traced]
238    fn test_read_seek_to() {
239        let executor = deterministic::Runner::default();
240        executor.start(|context| async move {
241            // Create a memory blob with some test data
242            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
243            let (blob, size) = context.open("partition", b"test").await.unwrap();
244            assert_eq!(size, 0);
245            blob.write_at(data.to_vec(), 0).await.unwrap();
246            let size = data.len() as u64;
247
248            // Create a buffer reader
249            let buffer_size = 10;
250            let mut reader = Read::new(blob, size, buffer_size);
251
252            // Read some data to advance the position
253            let mut buf = [0u8; 5];
254            reader.read_exact(&mut buf, 5).await.unwrap();
255            assert_eq!(&buf, b"ABCDE");
256            assert_eq!(reader.position(), 5);
257
258            // Seek to a specific position
259            reader.seek_to(10).unwrap();
260            assert_eq!(reader.position(), 10);
261
262            // Read data from the new position
263            let mut buf = [0u8; 5];
264            reader.read_exact(&mut buf, 5).await.unwrap();
265            assert_eq!(&buf, b"KLMNO");
266
267            // Seek to beginning
268            reader.seek_to(0).unwrap();
269            assert_eq!(reader.position(), 0);
270
271            let mut buf = [0u8; 5];
272            reader.read_exact(&mut buf, 5).await.unwrap();
273            assert_eq!(&buf, b"ABCDE");
274
275            // Seek to end
276            reader.seek_to(size).unwrap();
277            assert_eq!(reader.position(), size);
278
279            // Trying to read should fail
280            let mut buf = [0u8; 1];
281            let result = reader.read_exact(&mut buf, 1).await;
282            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
283
284            // Seek beyond end should fail
285            let result = reader.seek_to(size + 10);
286            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
287        });
288    }
289
290    #[test_traced]
291    fn test_read_seek_with_refill() {
292        let executor = deterministic::Runner::default();
293        executor.start(|context| async move {
294            // Create a memory blob with longer data
295            let data = vec![0x41; 1000]; // 1000 'A' characters
296            let (blob, size) = context.open("partition", b"test").await.unwrap();
297            assert_eq!(size, 0);
298            blob.write_at(data.clone(), 0).await.unwrap();
299            let size = data.len() as u64;
300
301            // Create a buffer reader with small buffer
302            let buffer_size = 10;
303            let mut reader = Read::new(blob, size, buffer_size);
304
305            // Read some data
306            let mut buf = [0u8; 5];
307            reader.read_exact(&mut buf, 5).await.unwrap();
308
309            // Seek far ahead, past the current buffer
310            reader.seek_to(500).unwrap();
311
312            // Refill the buffer at the new position
313            reader.refill().await.unwrap();
314
315            // Read data - should get data from position 500
316            let mut buf = [0u8; 5];
317            reader.read_exact(&mut buf, 5).await.unwrap();
318            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
319            assert_eq!(reader.position(), 505);
320
321            // Seek backwards
322            reader.seek_to(100).unwrap();
323            reader.refill().await.unwrap();
324
325            // Read again - should be at position 100
326            let mut buf = [0u8; 5];
327            reader.read_exact(&mut buf, 5).await.unwrap();
328            assert_eq!(reader.position(), 105);
329        });
330    }
331
332    #[test_traced]
333    fn test_read_truncate() {
334        let executor = deterministic::Runner::default();
335        executor.start(|context| async move {
336            // Create a memory blob with some test data
337            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
338            let (blob, size) = context.open("partition", b"test").await.unwrap();
339            assert_eq!(size, 0);
340            blob.write_at(data.to_vec(), 0).await.unwrap();
341            let data_len = data.len() as u64;
342
343            // Create a buffer reader
344            let buffer_size = 10;
345            let reader = Read::new(blob.clone(), data_len, buffer_size);
346
347            // Truncate the blob to half its size
348            let truncate_len = data_len / 2;
349            reader.truncate(truncate_len).await.unwrap();
350
351            // Reopen to check truncation
352            let (blob, size) = context.open("partition", b"test").await.unwrap();
353            assert_eq!(size, truncate_len, "Blob should be truncated to half size");
354
355            // Create a new buffer and read to verify truncation
356            let mut new_reader = Read::new(blob, size, buffer_size);
357
358            // Read the content
359            let mut buf = vec![0u8; size as usize];
360            new_reader
361                .read_exact(&mut buf, size as usize)
362                .await
363                .unwrap();
364            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Truncated content should match");
365
366            // Reading beyond truncated size should fail
367            let mut extra_buf = [0u8; 1];
368            let result = new_reader.read_exact(&mut extra_buf, 1).await;
369            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
370        });
371    }
372
373    #[test_traced]
374    fn test_read_truncate_to_zero() {
375        let executor = deterministic::Runner::default();
376        executor.start(|context| async move {
377            // Create a memory blob with some test data
378            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
379            let data_len = data.len() as u64;
380            let (blob, size) = context.open("partition", b"test").await.unwrap();
381            assert_eq!(size, 0);
382            blob.write_at(data.to_vec(), 0).await.unwrap();
383
384            // Create a buffer reader
385            let buffer_size = 10;
386            let reader = Read::new(blob.clone(), data_len, buffer_size);
387
388            // Truncate the blob to zero
389            reader.truncate(0).await.unwrap();
390
391            // Reopen to check truncation
392            let (blob, size) = context.open("partition", b"test").await.unwrap();
393            assert_eq!(size, 0, "Blob should be truncated to zero");
394
395            // Create a new buffer and try to read (should fail)
396            let mut new_reader = Read::new(blob, size, buffer_size);
397
398            // Reading from truncated blob should fail
399            let mut buf = [0u8; 1];
400            let result = new_reader.read_exact(&mut buf, 1).await;
401            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
402        });
403    }
404
405    #[test_traced]
406    fn test_write_basic() {
407        let executor = deterministic::Runner::default();
408        executor.start(|context| async move {
409            // Test basic write_at and sync functionality.
410            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
411            assert_eq!(size, 0);
412
413            let writer = Write::new(blob.clone(), 0, 8);
414            writer.write_at("hello".as_bytes(), 0).await.unwrap();
415            writer.sync().await.unwrap();
416
417            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
418            assert_eq!(size, 5);
419            let mut reader = Read::new(blob, size, 8);
420            let mut buf = [0u8; 5];
421            reader.read_exact(&mut buf, 5).await.unwrap();
422            assert_eq!(&buf, b"hello");
423        });
424    }
425
426    #[test_traced]
427    fn test_write_multiple_flushes() {
428        let executor = deterministic::Runner::default();
429        executor.start(|context| async move {
430            // Test writing data that causes multiple buffer flushes.
431            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
432            assert_eq!(size, 0);
433
434            let writer = Write::new(blob.clone(), 0, 4);
435            writer.write_at("abc".as_bytes(), 0).await.unwrap();
436            writer.write_at("defg".as_bytes(), 3).await.unwrap();
437            writer.sync().await.unwrap();
438
439            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
440            assert_eq!(size, 7);
441            let mut reader = Read::new(blob, size, 4);
442            let mut buf = [0u8; 7];
443            reader.read_exact(&mut buf, 7).await.unwrap();
444            assert_eq!(&buf, b"abcdefg");
445        });
446    }
447
448    #[test_traced]
449    fn test_write_large_data() {
450        let executor = deterministic::Runner::default();
451        executor.start(|context| async move {
452            // Test writing data significantly larger than the buffer capacity.
453            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
454            assert_eq!(size, 0);
455
456            let writer = Write::new(blob.clone(), 0, 4);
457            writer.write_at("abc".as_bytes(), 0).await.unwrap();
458            writer
459                .write_at("defghijklmnopqrstuvwxyz".as_bytes(), 3)
460                .await
461                .unwrap();
462            writer.sync().await.unwrap();
463
464            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
465            assert_eq!(size, 26);
466            let mut reader = Read::new(blob, size, 4);
467            let mut buf = [0u8; 26];
468            reader.read_exact(&mut buf, 26).await.unwrap();
469            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
470        });
471    }
472
473    #[test_traced]
474    #[should_panic(expected = "buffer capacity must be greater than zero")]
475    fn test_write_empty() {
476        let executor = deterministic::Runner::default();
477        executor.start(|context| async move {
478            // Test creating a writer with zero buffer capacity.
479            let (blob, size) = context.open("partition", b"write_empty").await.unwrap();
480            assert_eq!(size, 0);
481            Write::new(blob, 0, 0);
482        });
483    }
484
485    #[test_traced]
486    fn test_write_append_to_buffer() {
487        let executor = deterministic::Runner::default();
488        executor.start(|context| async move {
489            // Test appending data that partially fits and then exceeds buffer capacity, causing a flush.
490            let (blob, _) = context.open("partition", b"append_buf").await.unwrap();
491            let writer = Write::new(blob.clone(), 0, 10);
492
493            // Write "hello" (5 bytes) - fits in buffer
494            writer.write_at("hello".as_bytes(), 0).await.unwrap();
495            // Append " world" (6 bytes) - "hello world" is 11 bytes, exceeds buffer
496            // "hello" is flushed, " world" is buffered
497            writer.write_at(" world".as_bytes(), 5).await.unwrap();
498            writer.sync().await.unwrap();
499
500            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
501            assert_eq!(size, 11);
502            let mut reader = Read::new(blob, size, 10);
503            let mut buf = vec![0u8; 11];
504            reader.read_exact(&mut buf, 11).await.unwrap();
505            assert_eq!(&buf, b"hello world");
506        });
507    }
508
509    #[test_traced]
510    fn test_write_into_middle_of_buffer() {
511        let executor = deterministic::Runner::default();
512        executor.start(|context| async move {
513            // Test writing data into the middle of an existing, partially filled buffer.
514            let (blob, _) = context.open("partition", b"middle_buf").await.unwrap();
515            let writer = Write::new(blob.clone(), 0, 20);
516
517            // Write "abcdefghij" (10 bytes)
518            writer.write_at("abcdefghij".as_bytes(), 0).await.unwrap();
519            // Write "01234" into the middle (offset 2, 5 bytes) -> "ab01234hij"
520            writer.write_at("01234".as_bytes(), 2).await.unwrap();
521            writer.sync().await.unwrap();
522
523            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
524            assert_eq!(size, 10); // Original length, as it's an overwrite
525            let mut reader = Read::new(blob, size, 10);
526            let mut buf = vec![0u8; 10];
527            reader.read_exact(&mut buf, 10).await.unwrap();
528            assert_eq!(&buf, b"ab01234hij");
529
530            // Write "klmnopqrst" (10 bytes) - buffer becomes "ab01234hijklmnopqrst" (20 bytes)
531            writer.write_at("klmnopqrst".as_bytes(), 10).await.unwrap();
532            // Overwrite "jklm" with "wxyz" -> buffer becomes "ab01234hiwxyzopqrst"
533            writer.write_at("wxyz".as_bytes(), 9).await.unwrap();
534            writer.sync().await.unwrap();
535
536            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
537            assert_eq!(size, 20);
538            let mut reader = Read::new(blob, size, 20);
539            let mut buf = vec![0u8; 20];
540            reader.read_exact(&mut buf, 20).await.unwrap();
541            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
542        });
543    }
544
545    #[test_traced]
546    fn test_write_before_buffer() {
547        let executor = deterministic::Runner::default();
548        executor.start(|context| async move {
549            // Test writing data at an offset that precedes the current buffered data range.
550            let (blob, _) = context.open("partition", b"before_buf").await.unwrap();
551            let writer = Write::new(blob.clone(), 10, 10); // Buffer starts at blob offset 10
552
553            // Buffer some data at offset 10: "0123456789"
554            writer.write_at("0123456789".as_bytes(), 10).await.unwrap();
555
556            // Write "abcde" at offset 0. This is before the current buffer.
557            // Current buffer should be flushed. "abcde" written directly.
558            // New buffer position will be 5.
559            writer.write_at("abcde".as_bytes(), 0).await.unwrap();
560            writer.sync().await.unwrap();
561
562            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
563            // Expected: "abcde" at 0 (5 bytes) + "0123456789" at 10 (10 bytes) = 20 total size.
564            // The underlying blob previously had "0123456789" written starting at 10.
565            // Then "abcde" was written starting at 0.
566            // The blob size should be 20 if the original data at 10 was preserved.
567            assert_eq!(size, 20);
568            let mut reader = Read::new(blob, size, 20);
569            let mut buf = vec![0u8; 20];
570            reader.read_exact(&mut buf, 20).await.unwrap();
571
572            let mut expected = vec![0u8; 20];
573            expected[0..5].copy_from_slice("abcde".as_bytes());
574            expected[10..20].copy_from_slice("0123456789".as_bytes());
575            assert_eq!(buf, expected);
576
577            // Write "fghij" at offset 5. This will append to the "abcde"
578            // The buffer for writer is now at position 5 with capacity 10.
579            // This write will be buffered.
580            writer.write_at("fghij".as_bytes(), 5).await.unwrap();
581            writer.sync().await.unwrap();
582
583            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
584            assert_eq!(size, 20); // Size remains 20 as "fghij" overwrites part of the gap + start of old data.
585            let mut reader = Read::new(blob, size, 20);
586            let mut buf = vec![0u8; 20];
587            reader.read_exact(&mut buf, 20).await.unwrap();
588
589            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
590            // The "0123456789" part originally at 10-19 is still there.
591            assert_eq!(buf, expected);
592        });
593    }
594
595    #[test_traced]
596    fn test_write_truncate() {
597        let executor = deterministic::Runner::default();
598        executor.start(|context| async move {
599            // Test truncating the blob via the writer and subsequent write behaviors.
600            let (blob, _) = context.open("partition", b"truncate_write").await.unwrap();
601            let writer = Write::new(blob, 0, 10);
602
603            writer.write_at("hello world".as_bytes(), 0).await.unwrap(); // 11 bytes
604            writer.sync().await.unwrap();
605
606            let (blob_check, size_check) =
607                context.open("partition", b"truncate_write").await.unwrap();
608            assert_eq!(size_check, 11);
609            drop(blob_check);
610
611            // Truncate to 5 bytes ("hello")
612            writer.truncate(5).await.unwrap();
613            writer.sync().await.unwrap();
614
615            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
616            assert_eq!(size, 5);
617            let mut reader = Read::new(blob, size, 5);
618            let mut buf = vec![0u8; 5];
619            reader.read_exact(&mut buf, 5).await.unwrap();
620            assert_eq!(&buf, b"hello");
621
622            // Write more data, buffer position should be reset by truncate implicitly
623            // or write_at should handle it.
624            // After truncate, the writer's internal state (position, buffer) needs to be consistent.
625            // Let's assume truncate implies a flush and reset of position to 0 for the writer if not specified.
626            // The `Blob::truncate` itself doesn't dictate writer state.
627            // The current `Write::truncate` flushes then truncates. The writer's `position` is not reset.
628            // This means subsequent writes might be to unexpected locations if not careful.
629            // Let's test current behavior:
630            // inner.position was 11 after "hello world". Flush happens. Blob truncated to 5.
631            // writer.inner.position is still 11.
632            // This is a bit tricky. For `Write::new(blob, position, capacity)`, `position` is the *start* of the buffer.
633            // After flush, `inner.position` becomes `inner.position + len_flushed`.
634            // So after writing "hello world" (11 bytes) with buffer 10:
635            // 1. "hello worl" (10 bytes) written to buffer. inner.buffer.len() = 10, inner.position = 0.
636            // 2. "d" (1 byte) written. Buffer has "hello worl". write("d") called.
637            //    buffer.len (10) + "d".len (1) > capacity (10). So flush.
638            //    blob.write_at("hello worl", 0). inner.position becomes 10. inner.buffer is empty.
639            //    Then "d" is buffered. inner.buffer = "d".
640            // 3. sync() called. Flushes "d". blob.write_at("d", 10). inner.position becomes 11. inner.buffer empty.
641            // 4. truncate(5). Flushes (empty buffer). inner.blob.truncate(5). inner.position is still 11.
642
643            // If we now write "X" at offset 0:
644            // write_start = 0. buffer_start = 11. buffer_end = 11.
645            // Not scenario 1 (0 != 11).
646            // Not scenario 2 (0 < 11).
647            // Scenario 3: flush (empty). blob.write_at("X", 0). inner.position = 0 + 1 = 1.
648            writer.write_at("X".as_bytes(), 0).await.unwrap();
649            writer.sync().await.unwrap();
650
651            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
652            assert_eq!(size, 5); // Blob was "hello", truncated to 5, now overwritten at 0 with "X", size remains 5.
653            let mut reader = Read::new(blob, size, 5);
654            let mut buf = vec![0u8; 5];
655            reader.read_exact(&mut buf, 5).await.unwrap();
656            assert_eq!(&buf, b"Xello");
657
658            // Test truncate to 0
659            let (blob_zero, _) = context.open("partition", b"truncate_zero").await.unwrap();
660            let writer_zero = Write::new(blob_zero.clone(), 0, 10);
661            writer_zero
662                .write_at("some data".as_bytes(), 0)
663                .await
664                .unwrap();
665            writer_zero.sync().await.unwrap();
666            writer_zero.truncate(0).await.unwrap();
667            writer_zero.sync().await.unwrap();
668
669            let (_, size_z) = context.open("partition", b"truncate_zero").await.unwrap();
670            assert_eq!(size_z, 0);
671        });
672    }
673
674    #[test_traced]
675    fn test_write_read_at_on_writer() {
676        let executor = deterministic::Runner::default();
677        executor.start(|context| async move {
678            // Test reading data through the writer's read_at method, covering buffer and blob reads.
679            let (blob, _) = context.open("partition", b"read_at_writer").await.unwrap();
680            let writer = Write::new(blob.clone(), 0, 10); // Buffer capacity 10, starts at blob offset 0
681
682            // 1. Write "buffered" (8 bytes) - stays in buffer
683            writer.write_at("buffered".as_bytes(), 0).await.unwrap();
684
685            // Read from buffer: "buff" at offset 0
686            let mut read_buf_vec = vec![0u8; 4];
687            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
688            assert_eq!(&read_buf_vec, b"buff");
689
690            // Read from buffer: "ered" at offset 4
691            read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
692            assert_eq!(&read_buf_vec, b"ered");
693
694            // Read past buffer end (buffered data is "buffered" at 0-7)
695            let small_buf_vec = vec![0u8; 1];
696            assert!(writer.read_at(small_buf_vec, 8).await.is_err());
697
698            // 2. Write " and flushed" (12 bytes) at offset 8.
699            // "buffered" (8 bytes) is at 0-7. New write starts at 8.
700            // This is an append to buffer. writer.write_at(" and flushed", 8)
701            // inner.write will be called. buffer.len (8) + " and flushed".len (12) = 20 > capacity (10)
702            // So, "buffered" is flushed. blob.write_at("buffered", 0). inner.position = 8.
703            // Then " and flushed" (12 bytes) is written. Since 12 > 10 (capacity), it's a direct write.
704            // blob.write_at(" and flushed", 8). inner.position = 8 + 12 = 20.
705            writer.write_at(" and flushed".as_bytes(), 8).await.unwrap();
706            writer.sync().await.unwrap(); // Syncs any remaining (should be none from this op)
707
708            // Blob now contains "buffered and flushed" (8 + 12 = 20 bytes)
709            // Writer's inner.position = 20. Buffer is empty.
710
711            // Read from underlying blob through writer: "buff" at offset 0
712            let mut read_buf_vec_2 = vec![0u8; 4];
713            read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
714            assert_eq!(&read_buf_vec_2, b"buff");
715
716            // Read from underlying blob: "flushed" at offset 13
717            let mut read_buf_7_vec = vec![0u8; 7];
718            read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
719            assert_eq!(&read_buf_7_vec, b"flushed");
720
721            // 3. Buffer new data without flushing previous
722            // Writer inner.position = 20. Buffer is empty.
723            // Write " more data" (9 bytes) at offset 20. This fits in buffer.
724            writer.write_at(" more data".as_bytes(), 20).await.unwrap();
725
726            // Read the newly buffered data: "more"
727            let mut read_buf_vec_3 = vec![0u8; 5];
728            read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
729            assert_eq!(&read_buf_vec_3, b" more");
730
731            // Read part from blob, part from buffer (not directly supported by current read_at logic, it seems)
732            // `read_at` logic:
733            // If data_end <= buffer_start: read from blob. (buffer_start is inner.position where buffer *would* start if written)
734            // If offset >= buffer_start: read from buffer (if it contains the data)
735            // Combination case: read blob part, then buffer part.
736
737            // Current state:
738            // Blob: "buffered and flushed" (0-19)
739            // Buffer: " more data" (starts at blob offset 20, covers 20-29)
740            // writer.inner.position = 20 (start of buffered data " more data")
741            // writer.inner.buffer = " more data"
742
743            // Try to read "flushed more " (12 bytes: ("shed " - 5) from blob, ("more da" - 7) from buffer) starting at offset 16
744            // offset = 16, data_len = 12. data_end = 28.
745            // buffer_start (inner.position) = 20. buffer_end = 20 + 9 = 29.
746            // data_end (28) > buffer_start (20) is true.
747            // offset (16) < buffer_start (20) is true. -> This is the combined case.
748            let mut combo_read_buf_vec = vec![0u8; 12];
749            combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
750            assert_eq!(&combo_read_buf_vec, b"shed more da");
751
752            // Verify full content by reopening and reading
753            writer.sync().await.unwrap(); // Flush "more data"
754            let (final_blob, final_size) =
755                context.open("partition", b"read_at_writer").await.unwrap();
756            assert_eq!(final_size, 30); // "buffered and flushed more data"
757            let mut final_reader = Read::new(final_blob, final_size, 30);
758            let mut full_content = vec![0u8; 30];
759            final_reader
760                .read_exact(&mut full_content, 30)
761                .await
762                .unwrap();
763            assert_eq!(&full_content, b"buffered and flushed more data");
764        });
765    }
766
767    #[test_traced]
768    fn test_write_straddling_non_mergable() {
769        let executor = deterministic::Runner::default();
770        executor.start(|context| async move {
771            // Test write operations that are non-contiguous with the current buffer, forcing flushes.
772            let (blob, _) = context.open("partition", b"write_straddle").await.unwrap();
773            let writer = Write::new(blob.clone(), 0, 10); // buffer capacity 10
774
775            // Buffer "0123456789" (10 bytes)
776            writer.write_at("0123456789".as_bytes(), 0).await.unwrap();
777            // At this point, inner.buffer = "0123456789", inner.position = 0
778
779            // Write "abc" at offset 15.
780            // This is scenario 3: write is after buffer, non-contiguous.
781            // Current buffer "0123456789" is flushed. blob gets "0123456789". inner.position becomes 10.
782            // Then "abc" is written directly to blob at offset 15. inner.position becomes 15 + 3 = 18.
783            writer.write_at("abc".as_bytes(), 15).await.unwrap();
784            writer.sync().await.unwrap(); // syncs blob state
785
786            let (blob_check, size_check) =
787                context.open("partition", b"write_straddle").await.unwrap();
788            // Expected: "0123456789" at 0-9, then "abc" at 15-17. Size should be 18.
789            // The space between 9 and 15 will be undefined or zeros depending on blob impl.
790            // For memory blob, it extends and fills with zeros.
791            assert_eq!(size_check, 18);
792            let mut reader = Read::new(blob_check, size_check, 20);
793            let mut buf = vec![0u8; 18];
794            reader.read_exact(&mut buf, 18).await.unwrap();
795
796            let mut expected = vec![0u8; 18];
797            expected[0..10].copy_from_slice(b"0123456789");
798            // Bytes 10-14 are zeros for memory blob
799            expected[15..18].copy_from_slice(b"abc");
800            assert_eq!(buf, expected);
801
802            // Reset for a new scenario: Write that overwrites end of buffer and extends
803            let (blob2, _) = context.open("partition", b"write_straddle2").await.unwrap();
804            let writer2 = Write::new(blob2.clone(), 0, 10);
805            writer2.write_at("0123456789".as_bytes(), 0).await.unwrap(); // Buffer full: "0123456789", position 0
806
807            // Write "ABCDEFGHIJKL" (12 bytes) at offset 5.
808            // write_start = 5. data_len = 12.
809            // buffer_start = 0. buffer_end = 10.
810            // Scenario 1: no (5 != 10)
811            // Scenario 2: can_write_into_buffer?
812            //   write_start (5) >= buffer_start (0) -> true
813            //   (write_start - buffer_start) (5) + data_len (12) = 17 <= capacity (10) -> false
814            // So, Scenario 3.
815            // Flush "0123456789". blob gets it. writer2.inner.position becomes 10.
816            // blob.write_at("ABCDEFGHIJKL", 5).
817            // Underlying blob becomes "01234ABCDEFGHIJKL" (len 17).
818            // writer2.inner.position becomes 5 + 12 = 17.
819            writer2
820                .write_at("ABCDEFGHIJKL".as_bytes(), 5)
821                .await
822                .unwrap();
823            writer2.sync().await.unwrap();
824
825            let (blob_check2, size_check2) =
826                context.open("partition", b"write_straddle2").await.unwrap();
827            assert_eq!(size_check2, 17);
828            let mut reader2 = Read::new(blob_check2, size_check2, 20);
829            let mut buf2 = vec![0u8; 17];
830            reader2.read_exact(&mut buf2, 17).await.unwrap();
831            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
832        });
833    }
834
835    #[test_traced]
836    fn test_write_close() {
837        let executor = deterministic::Runner::default();
838        executor.start(|context| async move {
839            // Test that closing the writer flushes any pending data in the buffer.
840            let (blob_orig, _) = context.open("partition", b"write_close").await.unwrap();
841            let writer = Write::new(blob_orig.clone(), 0, 8);
842            writer.write_at("pending".as_bytes(), 0).await.unwrap(); // 7 bytes, buffered
843                                                                     // Data "pending" is in the writer's buffer, not yet on disk.
844
845            // Closing the writer should flush and sync the data.
846            writer.close().await.unwrap();
847
848            // Reopen and verify
849            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
850            assert_eq!(size_check, 7);
851            let mut reader = Read::new(blob_check, size_check, 8);
852            let mut buf = [0u8; 7];
853            reader.read_exact(&mut buf, 7).await.unwrap();
854            assert_eq!(&buf, b"pending");
855        });
856    }
857
858    #[test_traced]
859    fn test_write_direct_due_to_size() {
860        let executor = deterministic::Runner::default();
861        executor.start(|context| async move {
862            let (blob, _) = context
863                .open("partition", b"write_direct_size")
864                .await
865                .unwrap();
866            // Buffer capacity 5, initial position 0
867            let writer = Write::new(blob.clone(), 0, 5);
868
869            // Write 10 bytes, which is > capacity. Should be a direct write.
870            let data_large = b"0123456789";
871            writer.write_at(data_large.as_slice(), 0).await.unwrap();
872            // Inner state: buffer should be empty, position should be 10.
873            // We can't directly check inner state here, so we rely on observable behavior.
874
875            // Sync to ensure data is on disk
876            writer.sync().await.unwrap();
877
878            let (blob_check, size_check) = context
879                .open("partition", b"write_direct_size")
880                .await
881                .unwrap();
882            assert_eq!(size_check, 10);
883            let mut reader = Read::new(blob_check, size_check, 10);
884            let mut buf = vec![0u8; 10];
885            reader.read_exact(&mut buf, 10).await.unwrap();
886            assert_eq!(&buf, data_large.as_slice());
887
888            // Now, buffer something small
889            writer.write_at(b"abc".as_slice(), 10).await.unwrap(); // This should be buffered
890                                                                   // Attempt to read it back using writer.read_at to see if it's in buffer
891            let mut read_small_buf_vec = vec![0u8; 3];
892            read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
893            assert_eq!(&read_small_buf_vec, b"abc".as_slice());
894
895            writer.sync().await.unwrap();
896            let (blob_check2, size_check2) = context
897                .open("partition", b"write_direct_size")
898                .await
899                .unwrap();
900            assert_eq!(size_check2, 13);
901            let mut reader2 = Read::new(blob_check2, size_check2, 13);
902            let mut buf2 = vec![0u8; 13];
903            reader2.read_exact(&mut buf2, 13).await.unwrap();
904            assert_eq!(&buf2[10..], b"abc".as_slice());
905        });
906    }
907
908    #[test_traced]
909    fn test_write_overwrite_and_extend_in_buffer() {
910        let executor = deterministic::Runner::default();
911        executor.start(|context| async move {
912            let (blob, _) = context
913                .open("partition", b"overwrite_extend_buf")
914                .await
915                .unwrap();
916            let writer = Write::new(blob.clone(), 0, 15); // buffer capacity 15
917
918            // 1. Buffer initial data: "0123456789" (10 bytes) at offset 0
919            writer.write_at("0123456789".as_bytes(), 0).await.unwrap();
920            // Inner buffer: "0123456789", position 0
921
922            // 2. Overwrite and extend: write "ABCDEFGHIJ" (10 bytes) at offset 5
923            // This should result in "01234ABCDEFGHIJ" (15 bytes) in the buffer.
924            // write_start = 5, data_len = 10.
925            // buffer_start = 0, buffer_end = 0 + 10 = 10 (current buffer data length)
926            // Scenario 2: can_write_into_buffer
927            //   write_start (5) >= buffer_start (0) -> true
928            //   (write_start - buffer_start) (5) + data_len (10) = 15 <= capacity (15) -> true
929            // Buffer internal offset = 5.
930            // Required buffer len = 5 + 10 = 15.
931            // Current buffer len is 10. Resize to 15.
932            // buffer[5..15] gets "ABCDEFGHIJ"
933            writer.write_at("ABCDEFGHIJ".as_bytes(), 5).await.unwrap();
934
935            // Check buffer content via read_at on writer
936            let mut read_buf_vec = vec![0u8; 15];
937            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
938            assert_eq!(&read_buf_vec, b"01234ABCDEFGHIJ".as_slice());
939
940            writer.sync().await.unwrap();
941
942            let (blob_check, size_check) = context
943                .open("partition", b"overwrite_extend_buf")
944                .await
945                .unwrap();
946            assert_eq!(size_check, 15);
947            let mut reader = Read::new(blob_check, size_check, 15);
948            let mut final_buf = vec![0u8; 15];
949            reader.read_exact(&mut final_buf, 15).await.unwrap();
950            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
951        });
952    }
953}