commonware_runtime/utils/buffer/
mod.rs

1//! Buffers for reading and writing to [crate::Blob]s.
2
3mod read;
4mod write;
5
6pub use read::Read;
7pub use write::Write;
8
9#[cfg(test)]
10mod tests {
11    use super::*;
12    use crate::{deterministic, Blob as _, Error, Runner, Storage};
13    use commonware_macros::test_traced;
14
15    #[test_traced]
16    fn test_read_basic() {
17        let executor = deterministic::Runner::default();
18        executor.start(|context| async move {
19            // Create a memory blob with some test data
20            let data = b"Hello, world! This is a test.";
21            let (blob, size) = context.open("partition", b"test").await.unwrap();
22            assert_eq!(size, 0);
23            blob.write_at(data.to_vec(), 0).await.unwrap();
24            let size = data.len() as u64;
25
26            // Create a buffer reader with a small buffer size
27            let lookahead = 10;
28            let mut reader = Read::new(blob, size, lookahead);
29
30            // Read some data
31            let mut buf = [0u8; 5];
32            reader.read_exact(&mut buf, 5).await.unwrap();
33            assert_eq!(&buf, b"Hello");
34
35            // Read more data that requires a refill
36            let mut buf = [0u8; 14];
37            reader.read_exact(&mut buf, 14).await.unwrap();
38            assert_eq!(&buf, b", world! This ");
39
40            // Verify position
41            assert_eq!(reader.position(), 19);
42
43            // Read the rest
44            let mut buf = [0u8; 10];
45            reader.read_exact(&mut buf, 7).await.unwrap();
46            assert_eq!(&buf[..7], b"is a te");
47
48            // Try to read beyond the end
49            let mut buf = [0u8; 5];
50            let result = reader.read_exact(&mut buf, 5).await;
51            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
52        });
53    }
54
55    #[test_traced]
56    #[should_panic(expected = "buffer size must be greater than zero")]
57    fn test_read_empty() {
58        let executor = deterministic::Runner::default();
59        executor.start(|context| async move {
60            // Create a memory blob with some test data
61            let data = b"Hello, world! This is a test.";
62            let (blob, size) = context.open("partition", b"test").await.unwrap();
63            assert_eq!(size, 0);
64            blob.write_at(data.to_vec(), 0).await.unwrap();
65            let size = data.len() as u64;
66
67            // Create a buffer reader with a small buffer size
68            let lookahead = 0;
69            Read::new(blob, size, lookahead);
70        });
71    }
72
73    #[test_traced]
74    fn test_read_cross_boundary() {
75        let executor = deterministic::Runner::default();
76        executor.start(|context| async move {
77            // Create a memory blob with some test data
78            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
79            let (blob, size) = context.open("partition", b"test").await.unwrap();
80            assert_eq!(size, 0);
81            blob.write_at(data.to_vec(), 0).await.unwrap();
82            let size = data.len() as u64;
83
84            // Create a buffer reader with buffer size 10
85            let buffer_size = 10;
86            let mut reader = Read::new(blob, size, buffer_size);
87
88            // Read data that crosses a buffer boundary
89            let mut buf = [0u8; 15];
90            reader.read_exact(&mut buf, 15).await.unwrap();
91            assert_eq!(&buf, b"ABCDEFGHIJKLMNO");
92
93            // Position should be 15
94            assert_eq!(reader.position(), 15);
95
96            // Read the rest
97            let mut buf = [0u8; 11];
98            reader.read_exact(&mut buf, 11).await.unwrap();
99            assert_eq!(&buf, b"PQRSTUVWXYZ");
100
101            // Position should be 26
102            assert_eq!(reader.position(), 26);
103            assert_eq!(reader.blob_remaining(), 0);
104        });
105    }
106
107    #[test_traced]
108    fn test_read_with_known_size() {
109        let executor = deterministic::Runner::default();
110        executor.start(|context| async move {
111            // Create a memory blob with some test data
112            let data = b"This is a test with known size limitations.";
113            let (blob, size) = context.open("partition", b"test").await.unwrap();
114            assert_eq!(size, 0);
115            blob.write_at(data.to_vec(), 0).await.unwrap();
116            let size = data.len() as u64;
117
118            // Create a buffer reader with a buffer smaller than the data
119            let buffer_size = 10;
120            let mut reader = Read::new(blob, size, buffer_size);
121
122            // Check remaining bytes in the blob
123            assert_eq!(reader.blob_remaining(), size);
124
125            // Read half the buffer size
126            let mut buf = [0u8; 5];
127            reader.read_exact(&mut buf, 5).await.unwrap();
128            assert_eq!(&buf, b"This ");
129
130            // Check remaining after read
131            assert_eq!(reader.blob_remaining(), size - 5);
132
133            // Try to read exactly up to the size limit
134            let mut buf = vec![0u8; (size - 5) as usize];
135            reader
136                .read_exact(&mut buf, (size - 5) as usize)
137                .await
138                .unwrap();
139            assert_eq!(&buf, b"is a test with known size limitations.");
140
141            // Now we should be at the end
142            assert_eq!(reader.blob_remaining(), 0);
143
144            // Trying to read more should fail
145            let mut buf = [0u8; 1];
146            let result = reader.read_exact(&mut buf, 1).await;
147            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
148        });
149    }
150
151    #[test_traced]
152    fn test_read_large_data() {
153        let executor = deterministic::Runner::default();
154        executor.start(|context| async move {
155            // Create a larger blob for testing with larger data
156            let data_size = 1024 * 256; // 256KB of data
157            let data = vec![0x42; data_size];
158            let (blob, size) = context.open("partition", b"test").await.unwrap();
159            assert_eq!(size, 0);
160            blob.write_at(data.clone(), 0).await.unwrap();
161            let size = data.len() as u64;
162
163            // Create a buffer with size smaller than the data
164            let buffer_size = 64 * 1024; // 64KB
165            let mut reader = Read::new(blob, size, buffer_size);
166
167            // Read all the data in chunks
168            let mut total_read = 0;
169            let chunk_size = 8 * 1024; // 8KB chunks
170            let mut buf = vec![0u8; chunk_size];
171
172            while total_read < data_size {
173                let to_read = std::cmp::min(chunk_size, data_size - total_read);
174                reader
175                    .read_exact(&mut buf[..to_read], to_read)
176                    .await
177                    .unwrap();
178
179                // Verify the data is correct (all bytes should be 0x42)
180                assert!(
181                    buf[..to_read].iter().all(|&b| b == 0x42),
182                    "Data at position {} is not correct",
183                    total_read
184                );
185
186                total_read += to_read;
187            }
188
189            // Verify we read everything
190            assert_eq!(total_read, data_size);
191
192            // Trying to read more should fail
193            let mut extra_buf = [0u8; 1];
194            let result = reader.read_exact(&mut extra_buf, 1).await;
195            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
196        });
197    }
198
199    #[test_traced]
200    fn test_read_exact_size_reads() {
201        let executor = deterministic::Runner::default();
202        executor.start(|context| async move {
203            // Create a blob with exactly 2.5 buffer sizes of data
204            let buffer_size = 1024;
205            let data_size = buffer_size * 5 / 2; // 2.5 buffers
206            let data = vec![0x37; data_size];
207
208            let (blob, size) = context.open("partition", b"test").await.unwrap();
209            assert_eq!(size, 0);
210            blob.write_at(data.clone(), 0).await.unwrap();
211            let size = data.len() as u64;
212
213            let mut reader = Read::new(blob, size, buffer_size);
214
215            // Read exactly one buffer size
216            let mut buf1 = vec![0u8; buffer_size];
217            reader.read_exact(&mut buf1, buffer_size).await.unwrap();
218            assert!(buf1.iter().all(|&b| b == 0x37));
219
220            // Read exactly one buffer size more
221            let mut buf2 = vec![0u8; buffer_size];
222            reader.read_exact(&mut buf2, buffer_size).await.unwrap();
223            assert!(buf2.iter().all(|&b| b == 0x37));
224
225            // Read the remaining half buffer
226            let half_buffer = buffer_size / 2;
227            let mut buf3 = vec![0u8; half_buffer];
228            reader.read_exact(&mut buf3, half_buffer).await.unwrap();
229            assert!(buf3.iter().all(|&b| b == 0x37));
230
231            // Verify we're at the end
232            assert_eq!(reader.blob_remaining(), 0);
233            assert_eq!(reader.position(), size);
234        });
235    }
236
237    #[test_traced]
238    fn test_read_seek_to() {
239        let executor = deterministic::Runner::default();
240        executor.start(|context| async move {
241            // Create a memory blob with some test data
242            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
243            let (blob, size) = context.open("partition", b"test").await.unwrap();
244            assert_eq!(size, 0);
245            blob.write_at(data.to_vec(), 0).await.unwrap();
246            let size = data.len() as u64;
247
248            // Create a buffer reader
249            let buffer_size = 10;
250            let mut reader = Read::new(blob, size, buffer_size);
251
252            // Read some data to advance the position
253            let mut buf = [0u8; 5];
254            reader.read_exact(&mut buf, 5).await.unwrap();
255            assert_eq!(&buf, b"ABCDE");
256            assert_eq!(reader.position(), 5);
257
258            // Seek to a specific position
259            reader.seek_to(10).unwrap();
260            assert_eq!(reader.position(), 10);
261
262            // Read data from the new position
263            let mut buf = [0u8; 5];
264            reader.read_exact(&mut buf, 5).await.unwrap();
265            assert_eq!(&buf, b"KLMNO");
266
267            // Seek to beginning
268            reader.seek_to(0).unwrap();
269            assert_eq!(reader.position(), 0);
270
271            let mut buf = [0u8; 5];
272            reader.read_exact(&mut buf, 5).await.unwrap();
273            assert_eq!(&buf, b"ABCDE");
274
275            // Seek to end
276            reader.seek_to(size).unwrap();
277            assert_eq!(reader.position(), size);
278
279            // Trying to read should fail
280            let mut buf = [0u8; 1];
281            let result = reader.read_exact(&mut buf, 1).await;
282            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
283
284            // Seek beyond end should fail
285            let result = reader.seek_to(size + 10);
286            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
287        });
288    }
289
290    #[test_traced]
291    fn test_read_seek_with_refill() {
292        let executor = deterministic::Runner::default();
293        executor.start(|context| async move {
294            // Create a memory blob with longer data
295            let data = vec![0x41; 1000]; // 1000 'A' characters
296            let (blob, size) = context.open("partition", b"test").await.unwrap();
297            assert_eq!(size, 0);
298            blob.write_at(data.clone(), 0).await.unwrap();
299            let size = data.len() as u64;
300
301            // Create a buffer reader with small buffer
302            let buffer_size = 10;
303            let mut reader = Read::new(blob, size, buffer_size);
304
305            // Read some data
306            let mut buf = [0u8; 5];
307            reader.read_exact(&mut buf, 5).await.unwrap();
308
309            // Seek far ahead, past the current buffer
310            reader.seek_to(500).unwrap();
311
312            // Read data - should get data from position 500
313            let mut buf = [0u8; 5];
314            reader.read_exact(&mut buf, 5).await.unwrap();
315            assert_eq!(&buf, b"AAAAA"); // Should still be 'A's
316            assert_eq!(reader.position(), 505);
317
318            // Seek backwards
319            reader.seek_to(100).unwrap();
320
321            // Read again - should be at position 100
322            let mut buf = [0u8; 5];
323            reader.read_exact(&mut buf, 5).await.unwrap();
324            assert_eq!(reader.position(), 105);
325        });
326    }
327
328    #[test_traced]
329    fn test_read_truncate() {
330        let executor = deterministic::Runner::default();
331        executor.start(|context| async move {
332            // Create a memory blob with some test data
333            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
334            let (blob, size) = context.open("partition", b"test").await.unwrap();
335            assert_eq!(size, 0);
336            blob.write_at(data.to_vec(), 0).await.unwrap();
337            let data_len = data.len() as u64;
338
339            // Create a buffer reader
340            let buffer_size = 10;
341            let reader = Read::new(blob.clone(), data_len, buffer_size);
342
343            // Truncate the blob to half its size
344            let truncate_len = data_len / 2;
345            reader.truncate(truncate_len).await.unwrap();
346
347            // Reopen to check truncation
348            let (blob, size) = context.open("partition", b"test").await.unwrap();
349            assert_eq!(size, truncate_len, "Blob should be truncated to half size");
350
351            // Create a new buffer and read to verify truncation
352            let mut new_reader = Read::new(blob, size, buffer_size);
353
354            // Read the content
355            let mut buf = vec![0u8; size as usize];
356            new_reader
357                .read_exact(&mut buf, size as usize)
358                .await
359                .unwrap();
360            assert_eq!(&buf, b"ABCDEFGHIJKLM", "Truncated content should match");
361
362            // Reading beyond truncated size should fail
363            let mut extra_buf = [0u8; 1];
364            let result = new_reader.read_exact(&mut extra_buf, 1).await;
365            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
366        });
367    }
368
369    #[test_traced]
370    fn test_read_truncate_to_zero() {
371        let executor = deterministic::Runner::default();
372        executor.start(|context| async move {
373            // Create a memory blob with some test data
374            let data = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
375            let data_len = data.len() as u64;
376            let (blob, size) = context.open("partition", b"test").await.unwrap();
377            assert_eq!(size, 0);
378            blob.write_at(data.to_vec(), 0).await.unwrap();
379
380            // Create a buffer reader
381            let buffer_size = 10;
382            let reader = Read::new(blob.clone(), data_len, buffer_size);
383
384            // Truncate the blob to zero
385            reader.truncate(0).await.unwrap();
386
387            // Reopen to check truncation
388            let (blob, size) = context.open("partition", b"test").await.unwrap();
389            assert_eq!(size, 0, "Blob should be truncated to zero");
390
391            // Create a new buffer and try to read (should fail)
392            let mut new_reader = Read::new(blob, size, buffer_size);
393
394            // Reading from truncated blob should fail
395            let mut buf = [0u8; 1];
396            let result = new_reader.read_exact(&mut buf, 1).await;
397            assert!(matches!(result, Err(Error::BlobInsufficientLength)));
398        });
399    }
400
401    #[test_traced]
402    fn test_write_basic() {
403        let executor = deterministic::Runner::default();
404        executor.start(|context| async move {
405            // Test basic write_at and sync functionality.
406            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
407            assert_eq!(size, 0);
408
409            let writer = Write::new(blob.clone(), 0, 8);
410            writer.write_at("hello".as_bytes(), 0).await.unwrap();
411            writer.sync().await.unwrap();
412
413            let (blob, size) = context.open("partition", b"write_basic").await.unwrap();
414            assert_eq!(size, 5);
415            let mut reader = Read::new(blob, size, 8);
416            let mut buf = [0u8; 5];
417            reader.read_exact(&mut buf, 5).await.unwrap();
418            assert_eq!(&buf, b"hello");
419        });
420    }
421
422    #[test_traced]
423    fn test_write_multiple_flushes() {
424        let executor = deterministic::Runner::default();
425        executor.start(|context| async move {
426            // Test writing data that causes multiple buffer flushes.
427            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
428            assert_eq!(size, 0);
429
430            let writer = Write::new(blob.clone(), 0, 4);
431            writer.write_at("abc".as_bytes(), 0).await.unwrap();
432            writer.write_at("defg".as_bytes(), 3).await.unwrap();
433            writer.sync().await.unwrap();
434
435            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
436            assert_eq!(size, 7);
437            let mut reader = Read::new(blob, size, 4);
438            let mut buf = [0u8; 7];
439            reader.read_exact(&mut buf, 7).await.unwrap();
440            assert_eq!(&buf, b"abcdefg");
441        });
442    }
443
444    #[test_traced]
445    fn test_write_large_data() {
446        let executor = deterministic::Runner::default();
447        executor.start(|context| async move {
448            // Test writing data significantly larger than the buffer capacity.
449            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
450            assert_eq!(size, 0);
451
452            let writer = Write::new(blob.clone(), 0, 4);
453            writer.write_at("abc".as_bytes(), 0).await.unwrap();
454            writer
455                .write_at("defghijklmnopqrstuvwxyz".as_bytes(), 3)
456                .await
457                .unwrap();
458            writer.sync().await.unwrap();
459
460            let (blob, size) = context.open("partition", b"write_multi").await.unwrap();
461            assert_eq!(size, 26);
462            let mut reader = Read::new(blob, size, 4);
463            let mut buf = [0u8; 26];
464            reader.read_exact(&mut buf, 26).await.unwrap();
465            assert_eq!(&buf, b"abcdefghijklmnopqrstuvwxyz");
466        });
467    }
468
469    #[test_traced]
470    #[should_panic(expected = "buffer capacity must be greater than zero")]
471    fn test_write_empty() {
472        let executor = deterministic::Runner::default();
473        executor.start(|context| async move {
474            // Test creating a writer with zero buffer capacity.
475            let (blob, size) = context.open("partition", b"write_empty").await.unwrap();
476            assert_eq!(size, 0);
477            Write::new(blob, 0, 0);
478        });
479    }
480
481    #[test_traced]
482    fn test_write_append_to_buffer() {
483        let executor = deterministic::Runner::default();
484        executor.start(|context| async move {
485            // Test appending data that partially fits and then exceeds buffer capacity, causing a flush.
486            let (blob, _) = context.open("partition", b"append_buf").await.unwrap();
487            let writer = Write::new(blob.clone(), 0, 10);
488
489            // Write "hello" (5 bytes) - fits in buffer
490            writer.write_at("hello".as_bytes(), 0).await.unwrap();
491            // Append " world" (6 bytes) - "hello world" is 11 bytes, exceeds buffer
492            // "hello" is flushed, " world" is buffered
493            writer.write_at(" world".as_bytes(), 5).await.unwrap();
494            writer.sync().await.unwrap();
495
496            let (blob, size) = context.open("partition", b"append_buf").await.unwrap();
497            assert_eq!(size, 11);
498            let mut reader = Read::new(blob, size, 10);
499            let mut buf = vec![0u8; 11];
500            reader.read_exact(&mut buf, 11).await.unwrap();
501            assert_eq!(&buf, b"hello world");
502        });
503    }
504
505    #[test_traced]
506    fn test_write_into_middle_of_buffer() {
507        let executor = deterministic::Runner::default();
508        executor.start(|context| async move {
509            // Test writing data into the middle of an existing, partially filled buffer.
510            let (blob, _) = context.open("partition", b"middle_buf").await.unwrap();
511            let writer = Write::new(blob.clone(), 0, 20);
512
513            // Write "abcdefghij" (10 bytes)
514            writer.write_at("abcdefghij".as_bytes(), 0).await.unwrap();
515            // Write "01234" into the middle (offset 2, 5 bytes) -> "ab01234hij"
516            writer.write_at("01234".as_bytes(), 2).await.unwrap();
517            writer.sync().await.unwrap();
518
519            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
520            assert_eq!(size, 10); // Original length, as it's an overwrite
521            let mut reader = Read::new(blob, size, 10);
522            let mut buf = vec![0u8; 10];
523            reader.read_exact(&mut buf, 10).await.unwrap();
524            assert_eq!(&buf, b"ab01234hij");
525
526            // Write "klmnopqrst" (10 bytes) - buffer becomes "ab01234hijklmnopqrst" (20 bytes)
527            writer.write_at("klmnopqrst".as_bytes(), 10).await.unwrap();
528            // Overwrite "jklm" with "wxyz" -> buffer becomes "ab01234hiwxyzopqrst"
529            writer.write_at("wxyz".as_bytes(), 9).await.unwrap();
530            writer.sync().await.unwrap();
531
532            let (blob, size) = context.open("partition", b"middle_buf").await.unwrap();
533            assert_eq!(size, 20);
534            let mut reader = Read::new(blob, size, 20);
535            let mut buf = vec![0u8; 20];
536            reader.read_exact(&mut buf, 20).await.unwrap();
537            assert_eq!(&buf, b"ab01234hiwxyznopqrst");
538        });
539    }
540
541    #[test_traced]
542    fn test_write_before_buffer() {
543        let executor = deterministic::Runner::default();
544        executor.start(|context| async move {
545            // Test writing data at an offset that precedes the current buffered data range.
546            let (blob, _) = context.open("partition", b"before_buf").await.unwrap();
547            let writer = Write::new(blob.clone(), 10, 10); // Buffer starts at blob offset 10
548
549            // Buffer some data at offset 10: "0123456789"
550            writer.write_at("0123456789".as_bytes(), 10).await.unwrap();
551
552            // Write "abcde" at offset 0. This is before the current buffer.
553            // Current buffer should be flushed. "abcde" written directly.
554            // New buffer position will be 5.
555            writer.write_at("abcde".as_bytes(), 0).await.unwrap();
556            writer.sync().await.unwrap();
557
558            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
559            // Expected: "abcde" at 0 (5 bytes) + "0123456789" at 10 (10 bytes) = 20 total size.
560            // The underlying blob previously had "0123456789" written starting at 10.
561            // Then "abcde" was written starting at 0.
562            // The blob size should be 20 if the original data at 10 was preserved.
563            assert_eq!(size, 20);
564            let mut reader = Read::new(blob, size, 20);
565            let mut buf = vec![0u8; 20];
566            reader.read_exact(&mut buf, 20).await.unwrap();
567
568            let mut expected = vec![0u8; 20];
569            expected[0..5].copy_from_slice("abcde".as_bytes());
570            expected[10..20].copy_from_slice("0123456789".as_bytes());
571            assert_eq!(buf, expected);
572
573            // Write "fghij" at offset 5. This will append to the "abcde"
574            // The buffer for writer is now at position 5 with capacity 10.
575            // This write will be buffered.
576            writer.write_at("fghij".as_bytes(), 5).await.unwrap();
577            writer.sync().await.unwrap();
578
579            let (blob, size) = context.open("partition", b"before_buf").await.unwrap();
580            assert_eq!(size, 20); // Size remains 20 as "fghij" overwrites part of the gap + start of old data.
581            let mut reader = Read::new(blob, size, 20);
582            let mut buf = vec![0u8; 20];
583            reader.read_exact(&mut buf, 20).await.unwrap();
584
585            expected[0..10].copy_from_slice("abcdefghij".as_bytes());
586            // The "0123456789" part originally at 10-19 is still there.
587            assert_eq!(buf, expected);
588        });
589    }
590
591    #[test_traced]
592    fn test_write_truncate() {
593        let executor = deterministic::Runner::default();
594        executor.start(|context| async move {
595            // Test truncating the blob via the writer and subsequent write behaviors.
596            let (blob, _) = context.open("partition", b"truncate_write").await.unwrap();
597            let writer = Write::new(blob, 0, 10);
598
599            writer.write_at("hello world".as_bytes(), 0).await.unwrap(); // 11 bytes
600            writer.sync().await.unwrap();
601
602            let (blob_check, size_check) =
603                context.open("partition", b"truncate_write").await.unwrap();
604            assert_eq!(size_check, 11);
605            drop(blob_check);
606
607            // Truncate to 5 bytes ("hello")
608            writer.truncate(5).await.unwrap();
609            writer.sync().await.unwrap();
610
611            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
612            assert_eq!(size, 5);
613            let mut reader = Read::new(blob, size, 5);
614            let mut buf = vec![0u8; 5];
615            reader.read_exact(&mut buf, 5).await.unwrap();
616            assert_eq!(&buf, b"hello");
617
618            // Write more data, buffer position should be reset by truncate implicitly
619            // or write_at should handle it.
620            // After truncate, the writer's internal state (position, buffer) needs to be consistent.
621            // Let's assume truncate implies a flush and reset of position to 0 for the writer if not specified.
622            // The `Blob::truncate` itself doesn't dictate writer state.
623            // The current `Write::truncate` flushes then truncates. The writer's `position` is not reset.
624            // This means subsequent writes might be to unexpected locations if not careful.
625            // Let's test current behavior:
626            // inner.position was 11 after "hello world". Flush happens. Blob truncated to 5.
627            // writer.inner.position is still 11.
628            // This is a bit tricky. For `Write::new(blob, position, capacity)`, `position` is the *start* of the buffer.
629            // After flush, `inner.position` becomes `inner.position + len_flushed`.
630            // So after writing "hello world" (11 bytes) with buffer 10:
631            // 1. "hello worl" (10 bytes) written to buffer. inner.buffer.len() = 10, inner.position = 0.
632            // 2. "d" (1 byte) written. Buffer has "hello worl". write("d") called.
633            //    buffer.len (10) + "d".len (1) > capacity (10). So flush.
634            //    blob.write_at("hello worl", 0). inner.position becomes 10. inner.buffer is empty.
635            //    Then "d" is buffered. inner.buffer = "d".
636            // 3. sync() called. Flushes "d". blob.write_at("d", 10). inner.position becomes 11. inner.buffer empty.
637            // 4. truncate(5). Flushes (empty buffer). inner.blob.truncate(5). inner.position is still 11.
638
639            // If we now write "X" at offset 0:
640            // write_start = 0. buffer_start = 11. buffer_end = 11.
641            // Not scenario 1 (0 != 11).
642            // Not scenario 2 (0 < 11).
643            // Scenario 3: flush (empty). blob.write_at("X", 0). inner.position = 0 + 1 = 1.
644            writer.write_at("X".as_bytes(), 0).await.unwrap();
645            writer.sync().await.unwrap();
646
647            let (blob, size) = context.open("partition", b"truncate_write").await.unwrap();
648            assert_eq!(size, 5); // Blob was "hello", truncated to 5, now overwritten at 0 with "X", size remains 5.
649            let mut reader = Read::new(blob, size, 5);
650            let mut buf = vec![0u8; 5];
651            reader.read_exact(&mut buf, 5).await.unwrap();
652            assert_eq!(&buf, b"Xello");
653
654            // Test truncate to 0
655            let (blob_zero, _) = context.open("partition", b"truncate_zero").await.unwrap();
656            let writer_zero = Write::new(blob_zero.clone(), 0, 10);
657            writer_zero
658                .write_at("some data".as_bytes(), 0)
659                .await
660                .unwrap();
661            writer_zero.sync().await.unwrap();
662            writer_zero.truncate(0).await.unwrap();
663            writer_zero.sync().await.unwrap();
664
665            let (_, size_z) = context.open("partition", b"truncate_zero").await.unwrap();
666            assert_eq!(size_z, 0);
667        });
668    }
669
670    #[test_traced]
671    fn test_write_read_at_on_writer() {
672        let executor = deterministic::Runner::default();
673        executor.start(|context| async move {
674            // Test reading data through the writer's read_at method, covering buffer and blob reads.
675            let (blob, _) = context.open("partition", b"read_at_writer").await.unwrap();
676            let writer = Write::new(blob.clone(), 0, 10); // Buffer capacity 10, starts at blob offset 0
677
678            // 1. Write "buffered" (8 bytes) - stays in buffer
679            writer.write_at("buffered".as_bytes(), 0).await.unwrap();
680
681            // Read from buffer: "buff" at offset 0
682            let mut read_buf_vec = vec![0u8; 4];
683            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
684            assert_eq!(&read_buf_vec, b"buff");
685
686            // Read from buffer: "ered" at offset 4
687            read_buf_vec = writer.read_at(read_buf_vec, 4).await.unwrap();
688            assert_eq!(&read_buf_vec, b"ered");
689
690            // Read past buffer end (buffered data is "buffered" at 0-7)
691            let small_buf_vec = vec![0u8; 1];
692            assert!(writer.read_at(small_buf_vec, 8).await.is_err());
693
694            // 2. Write " and flushed" (12 bytes) at offset 8.
695            // "buffered" (8 bytes) is at 0-7. New write starts at 8.
696            // This is an append to buffer. writer.write_at(" and flushed", 8)
697            // inner.write will be called. buffer.len (8) + " and flushed".len (12) = 20 > capacity (10)
698            // So, "buffered" is flushed. blob.write_at("buffered", 0). inner.position = 8.
699            // Then " and flushed" (12 bytes) is written. Since 12 > 10 (capacity), it's a direct write.
700            // blob.write_at(" and flushed", 8). inner.position = 8 + 12 = 20.
701            writer.write_at(" and flushed".as_bytes(), 8).await.unwrap();
702            writer.sync().await.unwrap(); // Syncs any remaining (should be none from this op)
703
704            // Blob now contains "buffered and flushed" (8 + 12 = 20 bytes)
705            // Writer's inner.position = 20. Buffer is empty.
706
707            // Read from underlying blob through writer: "buff" at offset 0
708            let mut read_buf_vec_2 = vec![0u8; 4];
709            read_buf_vec_2 = writer.read_at(read_buf_vec_2, 0).await.unwrap();
710            assert_eq!(&read_buf_vec_2, b"buff");
711
712            // Read from underlying blob: "flushed" at offset 13
713            let mut read_buf_7_vec = vec![0u8; 7];
714            read_buf_7_vec = writer.read_at(read_buf_7_vec, 13).await.unwrap();
715            assert_eq!(&read_buf_7_vec, b"flushed");
716
717            // 3. Buffer new data without flushing previous
718            // Writer inner.position = 20. Buffer is empty.
719            // Write " more data" (9 bytes) at offset 20. This fits in buffer.
720            writer.write_at(" more data".as_bytes(), 20).await.unwrap();
721
722            // Read the newly buffered data: "more"
723            let mut read_buf_vec_3 = vec![0u8; 5];
724            read_buf_vec_3 = writer.read_at(read_buf_vec_3, 20).await.unwrap();
725            assert_eq!(&read_buf_vec_3, b" more");
726
727            // Read part from blob, part from buffer (not directly supported by current read_at logic, it seems)
728            // `read_at` logic:
729            // If data_end <= buffer_start: read from blob. (buffer_start is inner.position where buffer *would* start if written)
730            // If offset >= buffer_start: read from buffer (if it contains the data)
731            // Combination case: read blob part, then buffer part.
732
733            // Current state:
734            // Blob: "buffered and flushed" (0-19)
735            // Buffer: " more data" (starts at blob offset 20, covers 20-29)
736            // writer.inner.position = 20 (start of buffered data " more data")
737            // writer.inner.buffer = " more data"
738
739            // Try to read "flushed more " (12 bytes: ("shed " - 5) from blob, ("more da" - 7) from buffer) starting at offset 16
740            // offset = 16, data_len = 12. data_end = 28.
741            // buffer_start (inner.position) = 20. buffer_end = 20 + 9 = 29.
742            // data_end (28) > buffer_start (20) is true.
743            // offset (16) < buffer_start (20) is true. -> This is the combined case.
744            let mut combo_read_buf_vec = vec![0u8; 12];
745            combo_read_buf_vec = writer.read_at(combo_read_buf_vec, 16).await.unwrap();
746            assert_eq!(&combo_read_buf_vec, b"shed more da");
747
748            // Verify full content by reopening and reading
749            writer.sync().await.unwrap(); // Flush "more data"
750            let (final_blob, final_size) =
751                context.open("partition", b"read_at_writer").await.unwrap();
752            assert_eq!(final_size, 30); // "buffered and flushed more data"
753            let mut final_reader = Read::new(final_blob, final_size, 30);
754            let mut full_content = vec![0u8; 30];
755            final_reader
756                .read_exact(&mut full_content, 30)
757                .await
758                .unwrap();
759            assert_eq!(&full_content, b"buffered and flushed more data");
760        });
761    }
762
763    #[test_traced]
764    fn test_write_straddling_non_mergeable() {
765        let executor = deterministic::Runner::default();
766        executor.start(|context| async move {
767            // Test write operations that are non-contiguous with the current buffer, forcing flushes.
768            let (blob, _) = context.open("partition", b"write_straddle").await.unwrap();
769            let writer = Write::new(blob.clone(), 0, 10); // buffer capacity 10
770
771            // Buffer "0123456789" (10 bytes)
772            writer.write_at("0123456789".as_bytes(), 0).await.unwrap();
773            // At this point, inner.buffer = "0123456789", inner.position = 0
774
775            // Write "abc" at offset 15.
776            // This is scenario 3: write is after buffer, non-contiguous.
777            // Current buffer "0123456789" is flushed. blob gets "0123456789". inner.position becomes 10.
778            // Then "abc" is written directly to blob at offset 15. inner.position becomes 15 + 3 = 18.
779            writer.write_at("abc".as_bytes(), 15).await.unwrap();
780            writer.sync().await.unwrap(); // syncs blob state
781
782            let (blob_check, size_check) =
783                context.open("partition", b"write_straddle").await.unwrap();
784            // Expected: "0123456789" at 0-9, then "abc" at 15-17. Size should be 18.
785            // The space between 9 and 15 will be undefined or zeros depending on blob impl.
786            // For memory blob, it extends and fills with zeros.
787            assert_eq!(size_check, 18);
788            let mut reader = Read::new(blob_check, size_check, 20);
789            let mut buf = vec![0u8; 18];
790            reader.read_exact(&mut buf, 18).await.unwrap();
791
792            let mut expected = vec![0u8; 18];
793            expected[0..10].copy_from_slice(b"0123456789");
794            // Bytes 10-14 are zeros for memory blob
795            expected[15..18].copy_from_slice(b"abc");
796            assert_eq!(buf, expected);
797
798            // Reset for a new scenario: Write that overwrites end of buffer and extends
799            let (blob2, _) = context.open("partition", b"write_straddle2").await.unwrap();
800            let writer2 = Write::new(blob2.clone(), 0, 10);
801            writer2.write_at("0123456789".as_bytes(), 0).await.unwrap(); // Buffer full: "0123456789", position 0
802
803            // Write "ABCDEFGHIJKL" (12 bytes) at offset 5.
804            // write_start = 5. data_len = 12.
805            // buffer_start = 0. buffer_end = 10.
806            // Scenario 1: no (5 != 10)
807            // Scenario 2: can_write_into_buffer?
808            //   write_start (5) >= buffer_start (0) -> true
809            //   (write_start - buffer_start) (5) + data_len (12) = 17 <= capacity (10) -> false
810            // So, Scenario 3.
811            // Flush "0123456789". blob gets it. writer2.inner.position becomes 10.
812            // blob.write_at("ABCDEFGHIJKL", 5).
813            // Underlying blob becomes "01234ABCDEFGHIJKL" (len 17).
814            // writer2.inner.position becomes 5 + 12 = 17.
815            writer2
816                .write_at("ABCDEFGHIJKL".as_bytes(), 5)
817                .await
818                .unwrap();
819            writer2.sync().await.unwrap();
820
821            let (blob_check2, size_check2) =
822                context.open("partition", b"write_straddle2").await.unwrap();
823            assert_eq!(size_check2, 17);
824            let mut reader2 = Read::new(blob_check2, size_check2, 20);
825            let mut buf2 = vec![0u8; 17];
826            reader2.read_exact(&mut buf2, 17).await.unwrap();
827            assert_eq!(&buf2, b"01234ABCDEFGHIJKL");
828        });
829    }
830
831    #[test_traced]
832    fn test_write_close() {
833        let executor = deterministic::Runner::default();
834        executor.start(|context| async move {
835            // Test that closing the writer flushes any pending data in the buffer.
836            let (blob_orig, _) = context.open("partition", b"write_close").await.unwrap();
837            let writer = Write::new(blob_orig.clone(), 0, 8);
838            writer.write_at("pending".as_bytes(), 0).await.unwrap(); // 7 bytes, buffered
839                                                                     // Data "pending" is in the writer's buffer, not yet on disk.
840
841            // Closing the writer should flush and sync the data.
842            writer.close().await.unwrap();
843
844            // Reopen and verify
845            let (blob_check, size_check) = context.open("partition", b"write_close").await.unwrap();
846            assert_eq!(size_check, 7);
847            let mut reader = Read::new(blob_check, size_check, 8);
848            let mut buf = [0u8; 7];
849            reader.read_exact(&mut buf, 7).await.unwrap();
850            assert_eq!(&buf, b"pending");
851        });
852    }
853
854    #[test_traced]
855    fn test_write_direct_due_to_size() {
856        let executor = deterministic::Runner::default();
857        executor.start(|context| async move {
858            let (blob, _) = context
859                .open("partition", b"write_direct_size")
860                .await
861                .unwrap();
862            // Buffer capacity 5, initial position 0
863            let writer = Write::new(blob.clone(), 0, 5);
864
865            // Write 10 bytes, which is > capacity. Should be a direct write.
866            let data_large = b"0123456789";
867            writer.write_at(data_large.as_slice(), 0).await.unwrap();
868            // Inner state: buffer should be empty, position should be 10.
869            // We can't directly check inner state here, so we rely on observable behavior.
870
871            // Sync to ensure data is on disk
872            writer.sync().await.unwrap();
873
874            let (blob_check, size_check) = context
875                .open("partition", b"write_direct_size")
876                .await
877                .unwrap();
878            assert_eq!(size_check, 10);
879            let mut reader = Read::new(blob_check, size_check, 10);
880            let mut buf = vec![0u8; 10];
881            reader.read_exact(&mut buf, 10).await.unwrap();
882            assert_eq!(&buf, data_large.as_slice());
883
884            // Now, buffer something small
885            writer.write_at(b"abc".as_slice(), 10).await.unwrap(); // This should be buffered
886                                                                   // Attempt to read it back using writer.read_at to see if it's in buffer
887            let mut read_small_buf_vec = vec![0u8; 3];
888            read_small_buf_vec = writer.read_at(read_small_buf_vec, 10).await.unwrap();
889            assert_eq!(&read_small_buf_vec, b"abc".as_slice());
890
891            writer.sync().await.unwrap();
892            let (blob_check2, size_check2) = context
893                .open("partition", b"write_direct_size")
894                .await
895                .unwrap();
896            assert_eq!(size_check2, 13);
897            let mut reader2 = Read::new(blob_check2, size_check2, 13);
898            let mut buf2 = vec![0u8; 13];
899            reader2.read_exact(&mut buf2, 13).await.unwrap();
900            assert_eq!(&buf2[10..], b"abc".as_slice());
901        });
902    }
903
904    #[test_traced]
905    fn test_write_overwrite_and_extend_in_buffer() {
906        let executor = deterministic::Runner::default();
907        executor.start(|context| async move {
908            let (blob, _) = context
909                .open("partition", b"overwrite_extend_buf")
910                .await
911                .unwrap();
912            let writer = Write::new(blob.clone(), 0, 15); // buffer capacity 15
913
914            // 1. Buffer initial data: "0123456789" (10 bytes) at offset 0
915            writer.write_at("0123456789".as_bytes(), 0).await.unwrap();
916            // Inner buffer: "0123456789", position 0
917
918            // 2. Overwrite and extend: write "ABCDEFGHIJ" (10 bytes) at offset 5
919            // This should result in "01234ABCDEFGHIJ" (15 bytes) in the buffer.
920            // write_start = 5, data_len = 10.
921            // buffer_start = 0, buffer_end = 0 + 10 = 10 (current buffer data length)
922            // Scenario 2: can_write_into_buffer
923            //   write_start (5) >= buffer_start (0) -> true
924            //   (write_start - buffer_start) (5) + data_len (10) = 15 <= capacity (15) -> true
925            // Buffer internal offset = 5.
926            // Required buffer len = 5 + 10 = 15.
927            // Current buffer len is 10. Resize to 15.
928            // buffer[5..15] gets "ABCDEFGHIJ"
929            writer.write_at("ABCDEFGHIJ".as_bytes(), 5).await.unwrap();
930
931            // Check buffer content via read_at on writer
932            let mut read_buf_vec = vec![0u8; 15];
933            read_buf_vec = writer.read_at(read_buf_vec, 0).await.unwrap();
934            assert_eq!(&read_buf_vec, b"01234ABCDEFGHIJ".as_slice());
935
936            writer.sync().await.unwrap();
937
938            let (blob_check, size_check) = context
939                .open("partition", b"overwrite_extend_buf")
940                .await
941                .unwrap();
942            assert_eq!(size_check, 15);
943            let mut reader = Read::new(blob_check, size_check, 15);
944            let mut final_buf = vec![0u8; 15];
945            reader.read_exact(&mut final_buf, 15).await.unwrap();
946            assert_eq!(&final_buf, b"01234ABCDEFGHIJ".as_slice());
947        });
948    }
949}