commonware-runtime 2026.3.0

Execute asynchronous tasks with a configurable scheduler.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
//! This module provides an io_uring-based implementation of the [crate::Storage] trait,
//! offering fast, high-throughput file operations on Linux systems.
//!
//! ## Architecture
//!
//! I/O operations are sent via a [commonware_utils::channel::mpsc] channel to a dedicated io_uring event loop
//! running in another thread. Operation results are returned via a [commonware_utils::channel::oneshot] channel.
//!
//! ## Memory Safety
//!
//! We pass to the kernel, via io_uring, a pointer to the buffer being read from/written into.
//! Therefore, we ensure that the memory location is valid for the duration of the operation.
//! That is, it doesn't move or go out of scope until the operation completes.
//!
//! ## Feature Flag
//!
//! This implementation is enabled by using the `iouring-storage` feature.
//!
//! ## Linux Only
//!
//! This implementation is only available on Linux systems that support io_uring.
//! It requires Linux kernel 6.1 or newer. See [crate::iouring] for details.

use super::Header;
use crate::{
    iouring::{self, should_retry, OpBuffer, OpFd, OpIovecs},
    Buf, BufferPool, Error, IoBuf, IoBufs, IoBufsMut,
};
use commonware_codec::Encode;
use commonware_utils::{channel::oneshot, from_hex, hex};
use io_uring::{opcode, types::Fd};
use prometheus_client::registry::Registry;
use std::{
    fs::{self, File},
    io::{Error as IoError, Read, Seek, SeekFrom, Write},
    ops::RangeInclusive,
    os::fd::AsRawFd,
    path::{Path, PathBuf},
    sync::Arc,
};

/// Cap iovec batch size: larger iovecs reduce syscall count but increase
/// per-write kernel setup overhead.
const IOVEC_BATCH_SIZE: usize = 32;

/// Syncs a directory to ensure directory entry changes are durable.
/// On Unix, directory metadata (file creation/deletion) must be explicitly fsynced.
fn sync_dir(path: &Path) -> Result<(), Error> {
    let dir = File::open(path).map_err(|e| {
        Error::BlobOpenFailed(
            path.to_string_lossy().to_string(),
            "directory".to_string(),
            e,
        )
    })?;
    dir.sync_all().map_err(|e| {
        Error::BlobSyncFailed(
            path.to_string_lossy().to_string(),
            "directory".to_string(),
            e,
        )
    })
}

#[derive(Clone, Debug)]
/// Configuration for a [Storage].
pub struct Config {
    /// Where to store blobs.
    pub storage_directory: PathBuf,
    /// Configuration for the iouring instance.
    pub iouring_config: iouring::Config,
}

#[derive(Clone)]
pub struct Storage {
    storage_directory: PathBuf,
    io_submitter: iouring::Submitter,
    pool: BufferPool,
}

impl Storage {
    /// Returns a new `Storage` instance.
    pub fn start(mut cfg: Config, registry: &mut Registry, pool: BufferPool) -> Self {
        // Optimize performance by hinting the kernel that a single task will
        // submit requests. This is safe because each iouring instance runs in a
        // dedicated thread, which guarantees that the same thread that creates
        // the ring is the only thread submitting work to it.
        cfg.iouring_config.single_issuer = true;

        let (io_submitter, iouring_loop) = iouring::IoUringLoop::new(cfg.iouring_config, registry);

        let storage = Self {
            storage_directory: cfg.storage_directory,
            io_submitter,
            pool,
        };

        std::thread::spawn(move || iouring_loop.run());
        storage
    }
}

impl crate::Storage for Storage {
    type Blob = Blob;

    async fn open_versioned(
        &self,
        partition: &str,
        name: &[u8],
        versions: RangeInclusive<u16>,
    ) -> Result<(Blob, u64, u16), Error> {
        super::validate_partition_name(partition)?;

        // Construct the full path
        let path = self.storage_directory.join(partition).join(hex(name));
        let parent = path
            .parent()
            .ok_or_else(|| Error::PartitionMissing(partition.into()))?;

        // Check if partition exists before creating
        let parent_existed = parent.exists();

        // Create the partition directory if it does not exist
        fs::create_dir_all(parent).map_err(|_| Error::PartitionCreationFailed(partition.into()))?;

        // Open the file, creating it if it doesn't exist
        let mut file = fs::OpenOptions::new()
            .read(true)
            .write(true)
            .create(true)
            .truncate(false)
            .open(&path)
            .map_err(|e| Error::BlobOpenFailed(partition.into(), hex(name), e))?;

        // Assume empty files are newly created. Existing empty files will be synced too; that's OK.
        let raw_len = file.metadata().map_err(|_| Error::ReadFailed)?.len();

        // Handle header: new/corrupted blobs get a fresh header written,
        // existing blobs have their header read.
        let (blob_version, logical_len) = if Header::missing(raw_len) {
            // New (or corrupted) blob - truncate and write header with latest version
            let (header, blob_version) = Header::new(&versions);
            file.set_len(Header::SIZE_U64)
                .map_err(|e| Error::BlobResizeFailed(partition.into(), hex(name), e))?;
            file.seek(SeekFrom::Start(0))
                .map_err(|_| Error::WriteFailed)?;
            file.write_all(&header.encode())
                .map_err(|_| Error::WriteFailed)?;
            file.sync_all()
                .map_err(|e| Error::BlobSyncFailed(partition.into(), hex(name), e))?;

            // For new files, sync the parent directory to ensure the directory entry is durable.
            if raw_len == 0 {
                sync_dir(parent)?;
                if !parent_existed {
                    sync_dir(&self.storage_directory)?;
                }
            }

            (blob_version, 0)
        } else {
            // Existing blob - read and validate header
            file.seek(SeekFrom::Start(0))
                .map_err(|_| Error::ReadFailed)?;
            let mut header_bytes = [0u8; Header::SIZE];
            file.read_exact(&mut header_bytes)
                .map_err(|_| Error::ReadFailed)?;
            Header::from(header_bytes, raw_len, &versions)
                .map_err(|e| e.into_error(partition, name))?
        };

        let blob = Blob::new(
            partition.into(),
            name,
            file,
            self.io_submitter.clone(),
            self.pool.clone(),
        );
        Ok((blob, logical_len, blob_version))
    }

    async fn remove(&self, partition: &str, name: Option<&[u8]>) -> Result<(), Error> {
        super::validate_partition_name(partition)?;

        let path = self.storage_directory.join(partition);
        if let Some(name) = name {
            let blob_path = path.join(hex(name));
            fs::remove_file(blob_path)
                .map_err(|_| Error::BlobMissing(partition.into(), hex(name)))?;

            // Sync the partition directory to ensure the removal is durable.
            sync_dir(&path)?;
        } else {
            fs::remove_dir_all(&path).map_err(|_| Error::PartitionMissing(partition.into()))?;

            // Sync the storage directory to ensure the removal is durable.
            sync_dir(&self.storage_directory)?;
        }
        Ok(())
    }

    async fn scan(&self, partition: &str) -> Result<Vec<Vec<u8>>, Error> {
        super::validate_partition_name(partition)?;

        let path = self.storage_directory.join(partition);

        let entries =
            std::fs::read_dir(&path).map_err(|_| Error::PartitionMissing(partition.into()))?;

        let mut blobs = Vec::new();
        for entry in entries {
            let entry = entry.map_err(|_| Error::ReadFailed)?;
            let file_type = entry.file_type().map_err(|_| Error::ReadFailed)?;

            if !file_type.is_file() {
                return Err(Error::PartitionCorrupt(partition.into()));
            }

            if let Some(name) = entry.file_name().to_str() {
                let name = from_hex(name).ok_or(Error::PartitionCorrupt(partition.into()))?;
                blobs.push(name);
            }
        }

        Ok(blobs)
    }
}

pub struct Blob {
    /// The partition this blob lives in
    partition: String,
    /// The name of the blob
    name: Vec<u8>,
    /// The underlying file
    file: Arc<File>,
    /// Where to send IO operations to be executed
    io_submitter: iouring::Submitter,
    /// Buffer pool for read allocations
    pool: BufferPool,
}

impl Clone for Blob {
    fn clone(&self) -> Self {
        Self {
            partition: self.partition.clone(),
            name: self.name.clone(),
            file: self.file.clone(),
            io_submitter: self.io_submitter.clone(),
            pool: self.pool.clone(),
        }
    }
}

impl Blob {
    fn new(
        partition: String,
        name: &[u8],
        file: File,
        io_submitter: iouring::Submitter,
        pool: BufferPool,
    ) -> Self {
        Self {
            partition,
            name: name.to_vec(),
            file: Arc::new(file),
            io_submitter,
            pool,
        }
    }

    fn as_raw_fd(&self) -> Fd {
        Fd(self.file.as_raw_fd())
    }

    async fn write_single_at(&self, mut offset: u64, mut buf: IoBuf) -> Result<(), Error> {
        let mut bytes_written = 0;
        let buf_len = buf.len();
        while bytes_written < buf_len {
            // Figure out how much is left to write and where to write from.
            //
            // SAFETY: IoBuf wraps Bytes which has stable memory addresses.
            // `bytes_written` is always < `buf_len` due to the loop condition, so
            // `add(bytes_written)` stays within bounds and `buf_len - bytes_written`
            // correctly represents the remaining valid bytes.
            let ptr = unsafe { buf.as_ptr().add(bytes_written) };
            let remaining_len = buf_len - bytes_written;

            // Create an operation to do the write
            let op = opcode::Write::new(self.as_raw_fd(), ptr, remaining_len as _)
                .offset(offset as _)
                .build();

            // Submit the operation
            let (sender, receiver) = oneshot::channel();
            self.io_submitter
                .send(iouring::Op {
                    work: op,
                    sender,
                    buffer: Some(OpBuffer::Write(buf)),
                    fd: Some(OpFd::File(self.file.clone())),
                    iovecs: None,
                })
                .await
                .map_err(|_| Error::WriteFailed)?;

            // Wait for the result
            let (return_value, return_buf) = receiver.await.map_err(|_| Error::WriteFailed)?;
            buf = match return_buf {
                Some(OpBuffer::Write(b)) => b,
                _ => unreachable!("io_uring loop returns the same OpBuffer that was submitted"),
            };
            if should_retry(return_value) {
                continue;
            }

            // A negative or zero return value indicates an error.
            let op_bytes_written: usize =
                return_value.try_into().map_err(|_| Error::WriteFailed)?;
            if op_bytes_written == 0 {
                return Err(Error::WriteFailed);
            }

            bytes_written += op_bytes_written;
            offset += op_bytes_written as u64;
        }

        Ok(())
    }

    async fn write_vectored_at(&self, mut offset: u64, mut bufs: IoBufs) -> Result<(), Error> {
        while bufs.has_remaining() {
            let (iovecs, iovecs_len) = {
                // Figure out how much is left to write and where to write from.
                //
                // Use one pre-initialized `libc::iovec` array as scratch space and
                // view it as `IoSlice` to fill via `chunks_vectored`, since
                // `IoSlice` is ABI-compatible with `libc::iovec` on Unix.
                let max_iovecs = bufs.chunk_count().min(IOVEC_BATCH_SIZE);
                assert!(
                    max_iovecs > 0,
                    "chunk_count should be > 0 if bufs.has_remaining() is true"
                );
                let mut iovecs: Box<[libc::iovec]> = std::iter::repeat_n(
                    libc::iovec {
                        iov_base: std::ptr::NonNull::<u8>::dangling().as_ptr().cast(),
                        iov_len: 0,
                    },
                    max_iovecs,
                )
                .collect();

                // SAFETY: `IoSlice` is ABI-compatible with `libc::iovec` on Unix.
                // `raw_iovecs` is initialized with valid empty entries, so `io_slices`
                // starts in a valid state for `chunks_vectored` to overwrite.
                let io_slices: &mut [std::io::IoSlice<'_>] = unsafe {
                    std::slice::from_raw_parts_mut(
                        iovecs.as_mut_ptr().cast::<std::io::IoSlice<'_>>(),
                        iovecs.len(),
                    )
                };
                let io_slices_len = bufs.chunks_vectored(io_slices);
                assert!(
                    io_slices_len > 0,
                    "chunks_vectored should produce at least one slice when bufs has remaining"
                );

                (OpIovecs::new(iovecs), io_slices_len)
            };

            // Create an operation to do the write
            let op = opcode::Writev::new(self.as_raw_fd(), iovecs.as_ptr(), iovecs_len as _)
                .offset(offset as _)
                .build();

            // Submit the operation
            let (sender, receiver) = oneshot::channel();
            self.io_submitter
                .send(iouring::Op {
                    work: op,
                    sender,
                    buffer: Some(OpBuffer::WriteVectored(bufs)),
                    fd: Some(OpFd::File(self.file.clone())),
                    iovecs: Some(iovecs),
                })
                .await
                .map_err(|_| Error::WriteFailed)?;

            // Wait for the result
            let (return_value, return_bufs) = receiver.await.map_err(|_| Error::WriteFailed)?;
            bufs = match return_bufs {
                Some(OpBuffer::WriteVectored(b)) => b,
                _ => unreachable!("io_uring loop returns the same OpBuffer that was submitted"),
            };
            if should_retry(return_value) {
                continue;
            }

            // A negative or zero return value indicates an error.
            let op_bytes_written: usize =
                return_value.try_into().map_err(|_| Error::WriteFailed)?;
            if op_bytes_written == 0 {
                return Err(Error::WriteFailed);
            }

            bufs.advance(op_bytes_written);
            offset += op_bytes_written as u64;
        }

        Ok(())
    }
}

impl crate::Blob for Blob {
    async fn read_at(&self, offset: u64, len: usize) -> Result<IoBufsMut, Error> {
        self.read_at_buf(offset, len, self.pool.alloc(len)).await
    }

    async fn read_at_buf(
        &self,
        offset: u64,
        len: usize,
        bufs: impl Into<IoBufsMut> + Send,
    ) -> Result<IoBufsMut, Error> {
        let mut input_bufs = bufs.into();
        // SAFETY: `len` bytes are filled via io_uring read loop below.
        unsafe { input_bufs.set_len(len) };

        // For single buffers, read directly into them (zero-copy).
        // For multi-chunk buffers, use a temporary and copy to preserve the input structure.
        let (mut io_buf, original_bufs) = if input_bufs.is_single() {
            (input_bufs.coalesce(), None)
        } else {
            // SAFETY: `len` bytes are filled via io_uring read loop below.
            let tmp = unsafe { self.pool.alloc_len(len) };
            (tmp, Some(input_bufs))
        };

        let mut bytes_read = 0;
        let offset = offset
            .checked_add(Header::SIZE_U64)
            .ok_or(Error::OffsetOverflow)?;
        while bytes_read < len {
            // Figure out how much is left to read and where to read into.
            //
            // SAFETY: IoBufMut wraps BytesMut which has stable memory addresses.
            // `bytes_read` is always < `len` due to the loop condition, so
            // `add(bytes_read)` stays within bounds and `len - bytes_read`
            // correctly represents the remaining valid bytes.
            let ptr = unsafe { io_buf.as_mut_ptr().add(bytes_read) };
            let remaining_len = len - bytes_read;
            let offset = offset + bytes_read as u64;

            // Create an operation to do the read
            let op = opcode::Read::new(self.as_raw_fd(), ptr, remaining_len as _)
                .offset(offset as _)
                .build();

            // Submit the operation
            let (sender, receiver) = oneshot::channel();
            self.io_submitter
                .send(iouring::Op {
                    work: op,
                    sender,
                    buffer: Some(OpBuffer::Read(io_buf)),
                    fd: Some(OpFd::File(self.file.clone())),
                    iovecs: None,
                })
                .await
                .map_err(|_| Error::ReadFailed)?;

            // Wait for the result
            let (return_value, return_buf) = receiver.await.map_err(|_| Error::ReadFailed)?;
            io_buf = match return_buf {
                Some(OpBuffer::Read(b)) => b,
                _ => unreachable!("io_uring loop returns the same OpBuffer that was submitted"),
            };
            if should_retry(return_value) {
                continue;
            }

            // A non-positive return value indicates an error.
            let op_bytes_read: usize = return_value.try_into().map_err(|_| Error::ReadFailed)?;
            if op_bytes_read == 0 {
                // A return value of 0 indicates EOF, which shouldn't happen because we
                // aren't done reading into `buf`. See `man pread`.
                return Err(Error::BlobInsufficientLength);
            }
            bytes_read += op_bytes_read;
        }

        // Return the same buffer structure as input
        match original_bufs {
            None => Ok(io_buf.into()),
            Some(mut bufs) => {
                // Copy from temporary buffer to the original multi-chunk buffers.
                bufs.copy_from_slice(io_buf.as_ref());
                Ok(bufs)
            }
        }
    }

    async fn write_at(&self, offset: u64, bufs: impl Into<IoBufs> + Send) -> Result<(), Error> {
        let bufs = bufs.into();
        let offset = offset
            .checked_add(Header::SIZE_U64)
            .ok_or(Error::OffsetOverflow)?;

        match bufs.try_into_single() {
            Ok(buf) => self.write_single_at(offset, buf).await,
            Err(bufs) => self.write_vectored_at(offset, bufs).await,
        }
    }

    // TODO: Make this async. See https://github.com/commonwarexyz/monorepo/issues/831
    async fn resize(&self, len: u64) -> Result<(), Error> {
        let len = len
            .checked_add(Header::SIZE_U64)
            .ok_or(Error::OffsetOverflow)?;
        self.file.set_len(len).map_err(|e| {
            Error::BlobResizeFailed(self.partition.clone(), hex(&self.name), IoError::other(e))
        })
    }

    async fn sync(&self) -> Result<(), Error> {
        loop {
            // Create an operation to do the sync
            let op = opcode::Fsync::new(self.as_raw_fd()).build();

            // Submit the operation
            let (sender, receiver) = oneshot::channel();
            self.io_submitter
                .send(iouring::Op {
                    work: op,
                    sender,
                    buffer: None,
                    fd: Some(OpFd::File(self.file.clone())),
                    iovecs: None,
                })
                .await
                .map_err(|_| {
                    Error::BlobSyncFailed(
                        self.partition.clone(),
                        hex(&self.name),
                        IoError::other("failed to send work"),
                    )
                })?;

            // Wait for the result
            let (return_value, _) = receiver.await.map_err(|_| {
                Error::BlobSyncFailed(
                    self.partition.clone(),
                    hex(&self.name),
                    IoError::other("failed to read result"),
                )
            })?;
            if should_retry(return_value) {
                continue;
            }

            // If the return value is negative, it indicates an error.
            if return_value < 0 {
                return Err(Error::BlobSyncFailed(
                    self.partition.clone(),
                    hex(&self.name),
                    IoError::other(format!("error code: {return_value}")),
                ));
            }

            return Ok(());
        }
    }
}

#[cfg(test)]
mod tests {
    use super::{Header, *};
    use crate::{
        storage::tests::run_storage_tests, Blob, BufferPool, BufferPoolConfig, Storage as _,
    };
    use rand::{Rng as _, SeedableRng as _};
    use std::env;

    // Helper for creating test storage
    fn create_test_storage() -> (Storage, PathBuf) {
        let mut rng = rand::rngs::StdRng::from_entropy();
        let storage_directory =
            env::temp_dir().join(format!("commonware_iouring_storage_{}", rng.gen::<u64>()));

        let pool = BufferPool::new(BufferPoolConfig::for_storage(), &mut Registry::default());
        let storage = Storage::start(
            Config {
                storage_directory: storage_directory.clone(),
                iouring_config: Default::default(),
            },
            &mut Registry::default(),
            pool,
        );
        (storage, storage_directory)
    }

    #[tokio::test]
    async fn test_iouring_storage() {
        let (storage, storage_directory) = create_test_storage();
        run_storage_tests(storage).await;
        let _ = std::fs::remove_dir_all(storage_directory);
    }

    #[tokio::test]
    async fn test_blob_header_handling() {
        let (storage, storage_directory) = create_test_storage();

        // Test 1: New blob returns logical size 0 and correct application version
        let (blob, size) = storage.open("partition", b"test").await.unwrap();
        assert_eq!(size, 0, "new blob should have logical size 0");

        // Verify raw file has 8 bytes (header only)
        let file_path = storage_directory.join("partition").join(hex(b"test"));
        let metadata = std::fs::metadata(&file_path).unwrap();
        assert_eq!(
            metadata.len(),
            Header::SIZE_U64,
            "raw file should have 8-byte header"
        );

        // Test 2: Logical offset handling - write at offset 0 stores at raw offset 8
        let data = b"hello world";
        blob.write_at(0, data.to_vec()).await.unwrap();
        blob.sync().await.unwrap();

        // Verify raw file size
        let metadata = std::fs::metadata(&file_path).unwrap();
        assert_eq!(metadata.len(), Header::SIZE_U64 + data.len() as u64);

        // Verify raw file layout
        let raw_content = std::fs::read(&file_path).unwrap();
        assert_eq!(&raw_content[..Header::MAGIC_LENGTH], &Header::MAGIC);
        // Header version (bytes 4-5) and App version (bytes 6-7)
        assert_eq!(
            &raw_content[Header::MAGIC_LENGTH..Header::MAGIC_LENGTH + Header::VERSION_LENGTH],
            &Header::RUNTIME_VERSION.to_be_bytes()
        );
        // Data should start at offset 8
        assert_eq!(&raw_content[Header::SIZE..], data);

        // Test 3: Read at logical offset 0 returns data from raw offset 8
        let read_buf = blob.read_at(0, data.len()).await.unwrap().coalesce();
        assert_eq!(read_buf, data);

        // Test 4: Resize with logical length
        blob.resize(5).await.unwrap();
        blob.sync().await.unwrap();
        let metadata = std::fs::metadata(&file_path).unwrap();
        assert_eq!(
            metadata.len(),
            Header::SIZE_U64 + 5,
            "resize(5) should result in 13 raw bytes"
        );

        // resize(0) should leave only header
        blob.resize(0).await.unwrap();
        blob.sync().await.unwrap();
        let metadata = std::fs::metadata(&file_path).unwrap();
        assert_eq!(
            metadata.len(),
            Header::SIZE_U64,
            "resize(0) should leave only header"
        );

        // Test 5: Reopen existing blob preserves header and returns correct logical size
        blob.write_at(0, b"test data".to_vec()).await.unwrap();
        blob.sync().await.unwrap();
        drop(blob);

        let (blob2, size2) = storage.open("partition", b"test").await.unwrap();
        assert_eq!(size2, 9, "reopened blob should have logical size 9");
        let read_buf = blob2.read_at(0, 9).await.unwrap().coalesce();
        assert_eq!(read_buf, b"test data");
        drop(blob2);

        // Test 6: Corrupted blob recovery (0 < raw_size < 8)
        // Manually create a corrupted file with only 4 bytes
        let corrupted_path = storage_directory.join("partition").join(hex(b"corrupted"));
        std::fs::write(&corrupted_path, vec![0u8; 4]).unwrap();

        // Opening should truncate and write fresh header
        let (blob3, size3) = storage.open("partition", b"corrupted").await.unwrap();
        assert_eq!(size3, 0, "corrupted blob should return logical size 0");

        // Verify raw file now has proper 8-byte header
        let metadata = std::fs::metadata(&corrupted_path).unwrap();
        assert_eq!(
            metadata.len(),
            Header::SIZE_U64,
            "corrupted blob should be reset to header-only"
        );

        // Cleanup
        drop(blob3);
        let _ = std::fs::remove_dir_all(&storage_directory);
    }

    #[tokio::test]
    async fn test_blob_magic_mismatch() {
        let (storage, storage_directory) = create_test_storage();

        // Create the partition directory
        let partition_path = storage_directory.join("partition");
        std::fs::create_dir_all(&partition_path).unwrap();

        // Manually create a file with invalid magic bytes
        let bad_magic_path = partition_path.join(hex(b"bad_magic"));
        std::fs::write(&bad_magic_path, vec![0u8; Header::SIZE]).unwrap();

        // Opening should fail with corrupt error
        let result = storage.open("partition", b"bad_magic").await;
        assert!(
            matches!(result, Err(crate::Error::BlobCorrupt(_, _, reason)) if reason.contains("invalid magic"))
        );

        let _ = std::fs::remove_dir_all(&storage_directory);
    }
}