holt 0.3.2

An adaptive-radix-tree metadata storage engine for path-shaped keys, with per-blob concurrency and crash-safe persistence.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
//! `io_uring`-backed I/O context for [`super::FileBlobStore`].
//!
//! Only compiled when **both** of the following hold:
//!
//! - Target is Linux (`cfg(target_os = "linux")`).
//! - The `io-uring` feature is enabled.
//!
//! Otherwise the persistent store stays on the `pread`/`pwrite`
//! syscall path. The feature gate keeps the `io-uring` crate out of
//! the default dependency closure (smaller build times, smaller
//! attack surface on platforms that don't use it) but lets Linux
//! users opt in to ring-backed data-file I/O.
//! The data file is registered as a fixed file, and data flushes
//! use `IORING_OP_FSYNC` with `DATASYNC` so the Linux fast path does
//! not bounce out to `File::sync_data`.
//!
//! ## Why a separate file
//!
//! The `io_uring` types (`IoUring`, `SubmissionQueueEntry`,
//! `CompletionQueueEntry`, …) are heavily `unsafe`-bound — keeping
//! them isolated here lets the rest of `FileBlobStore` stay
//! safe-Rust. The module exports only the store operations:
//! [`UringContext::pread_at`], [`UringContext::pwrite_at`],
//! [`UringContext::pwrite_many_at`],
//! [`UringContext::pwrite_many_and_sync_at`],
//! [`UringContext::sync_data`],
//! and [`UringContext::new`].
//!
//! ## Concurrency
//!
//! One [`UringContext`] per [`super::FileBlobStore`]. The
//! store wraps it in a `Mutex` so multiple writers serialise on
//! the submission queue. With a single I/O worker thread
//! (`holt-ckpt-io`) the lock is uncontended on the hot path.
//!
//! ## SQE depth
//!
//! `RING_DEPTH = 256` — enough to keep a local NVMe queue fed by
//! large checkpoint batches. Batches that fit in one ring keep
//! fixed-size CQ bookkeeping on the stack; larger checkpoint
//! bursts use a streaming refill/drain loop so the device sees a
//! sustained queue instead of chunk-sized stop-and-wait waves.
//! Batched writes are sorted by data-file offset first, matching
//! the default `pwritev` store's sequential shape.

use std::io;
use std::os::unix::io::AsRawFd;

use io_uring::{opcode, squeue, types, IoUring};

use crate::store::blob_store::{AlignedBlobBuf, BlobBufPool};

/// Number of SQEs / CQEs the ring is sized for. Each checkpoint
/// blob write is one SQE; larger dirty snapshots are submitted in
/// ring-sized chunks.
const RING_DEPTH: u32 = 256;
const RING_DEPTH_USIZE: usize = RING_DEPTH as usize;
const CQ_BITMAP_WORDS: usize = RING_DEPTH_USIZE.div_ceil(64);
/// Owns a single `io_uring` plus the `RawFd` of the file we
/// submit against. The file itself is owned by
/// [`super::FileBlobStore::data_file`]; this struct only
/// borrows its descriptor.
pub(super) struct UringContext {
    ring: IoUring,
    raw_fd: i32,
    fixed_fd: types::Fixed,
    fixed_buffers: bool,
}

#[derive(Clone, Copy)]
struct OrderedWrite<'a> {
    offset: u64,
    buf: &'a AlignedBlobBuf,
    order: usize,
}

impl std::fmt::Debug for UringContext {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // Don't dump the ring — would print SQ/CQ internals;
        // the fd alone is enough to identify which store file
        // this context drives.
        f.debug_struct("UringContext")
            .field("fd", &self.raw_fd)
            .finish_non_exhaustive()
    }
}

impl UringContext {
    /// Build a fresh ring bound to `file`'s descriptor. Fails with
    /// `io::Error` if `IORING_SETUP_*` is rejected by the kernel
    /// (e.g. kernel too old).
    pub(super) fn new(file: &std::fs::File, buffers: Option<&BlobBufPool>) -> io::Result<Self> {
        let ring = build_ring()?;
        let raw_fd = file.as_raw_fd();
        ring.submitter().register_files(&[raw_fd])?;
        let fixed_buffers = if let Some(buffers) = buffers {
            let iovecs = buffers.iovecs();
            // SAFETY: BlobBufPool owns every iovec's backing memory
            // for at least as long as this ring is registered. The
            // store drops/unregisters the ring before its pool Arc
            // can release the slab.
            unsafe {
                ring.submitter().register_buffers(&iovecs)?;
            }
            true
        } else {
            false
        };
        Ok(Self {
            ring,
            raw_fd,
            fixed_fd: types::Fixed(0),
            fixed_buffers,
        })
    }

    /// Synchronous `pwrite` via `io_uring`: push one SQE,
    /// `submit_and_wait(1)`, drain the CQE.
    ///
    /// The caller's `Mutex` over the `UringContext` ensures we
    /// never push a second SQE before the first's CQE has been
    /// drained — i.e. the SQ + CQ never get out of sync.
    pub(super) fn pwrite_at(&mut self, offset: u64, buf: &AlignedBlobBuf) -> io::Result<()> {
        let write = [OrderedWrite {
            offset,
            buf,
            order: 0,
        }];
        self.submit_write_batch(&write)
    }

    /// Synchronous batched `pwrite` via `io_uring`.
    ///
    /// Small batches push once and drain once. Larger checkpoint
    /// bursts keep the ring filled until every write completes,
    /// avoiding a hard wait at each `RING_DEPTH` boundary.
    pub(super) fn pwrite_many_at(&mut self, writes: &[(u64, &AlignedBlobBuf)]) -> io::Result<()> {
        let ordered = ordered_writes(writes);
        if ordered.is_empty() {
            return Ok(());
        }
        if ordered.len() <= RING_DEPTH_USIZE {
            return self.submit_write_batch(&ordered);
        }
        self.submit_write_stream(&ordered)
    }

    /// Submit a checkpoint write batch and then run a data-only
    /// fsync on the same ring.
    ///
    /// This deliberately keeps write completion and durability
    /// completion separate. Linked/drained fsync modes looked
    /// attractive in the code, but did not improve Holt's short
    /// checkpoint batches and made the completion path harder to
    /// reason about.
    pub(super) fn pwrite_many_and_sync_at(
        &mut self,
        writes: &[(u64, &AlignedBlobBuf)],
    ) -> io::Result<()> {
        if writes.is_empty() {
            return Ok(());
        }
        let ordered = ordered_writes(writes);
        if ordered.is_empty() {
            return Ok(());
        }
        if ordered.len() <= RING_DEPTH_USIZE {
            self.submit_write_batch(&ordered)?;
        } else {
            self.submit_write_stream(&ordered)?;
        }
        self.sync_data()
    }

    /// Synchronous `pread` via `io_uring`: same shape as
    /// [`Self::pwrite_at`].
    pub(super) fn pread_at(&mut self, offset: u64, buf: &mut AlignedBlobBuf) -> io::Result<()> {
        let entry = if self.fixed_buffers {
            if let Some(buffer_index) = buf.fixed_buffer_index() {
                opcode::ReadFixed::new(
                    self.fixed_fd,
                    buf.as_mut_ptr(),
                    buf.len() as u32,
                    buffer_index,
                )
                .offset(offset)
                .build()
            } else {
                opcode::Read::new(self.fixed_fd, buf.as_mut_ptr(), buf.len() as u32)
                    .offset(offset)
                    .build()
            }
        } else {
            opcode::Read::new(self.fixed_fd, buf.as_mut_ptr(), buf.len() as u32)
                .offset(offset)
                .build()
        }
        .user_data(0);

        // SAFETY: same argument as `pwrite_at` — `buf` outlives the
        // synchronous `submit_and_wait`.
        unsafe {
            self.ring
                .submission()
                .push(&entry)
                .map_err(|_| io::Error::other("uring SQ full"))?;
        }
        self.ring.submit_and_wait(1)?;

        let cqe = self
            .ring
            .completion()
            .next()
            .ok_or_else(|| io::Error::other("uring CQE missing"))?;
        let n = cqe.result();
        if n < 0 {
            return Err(io::Error::from_raw_os_error(-n));
        }
        if (n as usize) != buf.len() {
            return Err(io::Error::other(format!(
                "short uring read: read {} of {}",
                n,
                buf.len()
            )));
        }
        Ok(())
    }

    /// Synchronous `fdatasync` equivalent via `io_uring`.
    ///
    /// Callers only submit this after every prior write in the
    /// checkpoint batch has completed, matching `File::sync_data`
    /// ordering while keeping the Linux fast path on the ring.
    pub(super) fn sync_data(&mut self) -> io::Result<()> {
        let entry = self.fdatasync_entry();

        // SAFETY: no borrowed user buffer is attached to this SQE.
        unsafe {
            self.ring
                .submission()
                .push(&entry)
                .map_err(|_| io::Error::other("uring SQ full"))?;
        }
        self.ring.submit_and_wait(1)?;

        let cqe = self
            .ring
            .completion()
            .next()
            .ok_or_else(|| io::Error::other("uring CQE missing"))?;
        let n = cqe.result();
        check_fsync_result(n)
    }

    fn submit_write_batch(&mut self, chunk: &[OrderedWrite<'_>]) -> io::Result<()> {
        debug_assert!(!chunk.is_empty());
        debug_assert!(chunk.len() <= RING_DEPTH_USIZE);

        for (idx, write) in chunk.iter().enumerate() {
            let entry = self
                .write_entry(write.offset, write.buf)
                .user_data(idx as u64);

            // SAFETY: every SQE references a slice borrowed from
            // `chunk`; this function synchronously waits for all
            // completions before returning, so all buffers outlive
            // the kernel reads.
            unsafe {
                self.ring
                    .submission()
                    .push(&entry)
                    .map_err(|_| io::Error::other("uring SQ full"))?;
            }
        }

        self.ring.submit_and_wait(chunk.len())?;
        self.drain_write_batch(chunk)
    }

    fn submit_write_stream(&mut self, writes: &[OrderedWrite<'_>]) -> io::Result<()> {
        debug_assert!(writes.len() > RING_DEPTH_USIZE);

        let mut next = 0usize;
        let mut in_flight = 0usize;
        let mut completed = 0usize;
        let mut seen = vec![0u64; writes.len().div_ceil(64)];
        let mut first_err: Option<io::Error> = None;

        while completed < writes.len() {
            let mut pushed = 0usize;
            if first_err.is_none() {
                while next < writes.len() && in_flight < RING_DEPTH_USIZE {
                    let write = writes[next];
                    let entry = self
                        .write_entry(write.offset, write.buf)
                        .user_data(next as u64);

                    // SAFETY: each SQE borrows from `writes`; this
                    // function synchronously drains every submitted
                    // CQE before returning, so buffers outlive the
                    // kernel reads.
                    let pushed_ok = unsafe { self.ring.submission().push(&entry).is_ok() };
                    if !pushed_ok {
                        break;
                    }
                    next += 1;
                    in_flight += 1;
                    pushed += 1;
                }
            }

            if pushed > 0 {
                self.ring.submit()?;
            }

            if in_flight == 0 {
                if let Some(err) = first_err {
                    return Err(err);
                }
                return Err(io::Error::other("uring SQ full with no writes in flight"));
            }

            let drained = self.drain_write_stream_available(
                writes,
                &mut seen,
                &mut first_err,
                &mut in_flight,
                &mut completed,
            )?;
            if drained == 0 {
                self.ring.submit_and_wait(1)?;
                self.drain_write_stream_available(
                    writes,
                    &mut seen,
                    &mut first_err,
                    &mut in_flight,
                    &mut completed,
                )?;
            }

            if first_err.is_some() && in_flight == 0 {
                break;
            }
        }

        if let Some(err) = first_err {
            return Err(err);
        }
        if completed != writes.len() {
            return Err(io::Error::other(format!(
                "missing uring write completions: completed {completed} of {}",
                writes.len()
            )));
        }
        Ok(())
    }

    fn write_entry(&self, offset: u64, buf: &AlignedBlobBuf) -> squeue::Entry {
        if self.fixed_buffers {
            if let Some(buffer_index) = buf.fixed_buffer_index() {
                return opcode::WriteFixed::new(
                    self.fixed_fd,
                    buf.as_ptr(),
                    buf.len() as u32,
                    buffer_index,
                )
                .offset(offset)
                .build();
            }
        }
        opcode::Write::new(self.fixed_fd, buf.as_ptr(), buf.len() as u32)
            .offset(offset)
            .build()
    }

    fn fdatasync_entry(&self) -> squeue::Entry {
        opcode::Fsync::new(self.fixed_fd)
            .flags(types::FsyncFlags::DATASYNC)
            .build()
    }

    fn drain_write_batch(&mut self, chunk: &[OrderedWrite<'_>]) -> io::Result<()> {
        let mut seen = [0u64; CQ_BITMAP_WORDS];
        let mut first_err: Option<io::Error> = None;

        for _ in 0..chunk.len() {
            let cqe = self
                .ring
                .completion()
                .next()
                .ok_or_else(|| io::Error::other("uring CQE missing"))?;
            let user_data = cqe.user_data();
            let n = cqe.result();

            let Ok(idx) = usize::try_from(user_data) else {
                record_err(
                    &mut first_err,
                    io::Error::other("uring CQE user_data overflow"),
                );
                continue;
            };
            if idx >= chunk.len() {
                record_err(
                    &mut first_err,
                    io::Error::other("uring CQE user_data out of batch"),
                );
                continue;
            }
            if let Err(e) = mark_seen(&mut seen, idx) {
                record_err(&mut first_err, e);
                continue;
            }
            if n < 0 {
                record_err(&mut first_err, io::Error::from_raw_os_error(-n));
                continue;
            }
            let expected = chunk[idx].buf.len();
            if (n as usize) != expected {
                record_err(
                    &mut first_err,
                    io::Error::other(format!("short uring write: wrote {n} of {expected}")),
                );
            }
        }

        if let Some(e) = first_err {
            return Err(e);
        }
        Ok(())
    }

    fn drain_write_stream_available(
        &mut self,
        writes: &[OrderedWrite<'_>],
        seen: &mut [u64],
        first_err: &mut Option<io::Error>,
        in_flight: &mut usize,
        completed: &mut usize,
    ) -> io::Result<usize> {
        let mut drained = 0usize;
        while let Some(cqe) = self.ring.completion().next() {
            drained += 1;
            *completed += 1;
            *in_flight = in_flight
                .checked_sub(1)
                .ok_or_else(|| io::Error::other("uring CQE without matching in-flight write"))?;

            let user_data = cqe.user_data();
            let n = cqe.result();
            let Ok(idx) = usize::try_from(user_data) else {
                record_err(first_err, io::Error::other("uring CQE user_data overflow"));
                continue;
            };
            if idx >= writes.len() {
                record_err(
                    first_err,
                    io::Error::other("uring CQE user_data out of stream"),
                );
                continue;
            }
            if let Err(e) = mark_seen_dynamic(seen, idx) {
                record_err(first_err, e);
                continue;
            }
            if n < 0 {
                record_err(first_err, io::Error::from_raw_os_error(-n));
                continue;
            }
            let expected = writes[idx].buf.len();
            if (n as usize) != expected {
                record_err(
                    first_err,
                    io::Error::other(format!("short uring write: wrote {n} of {expected}")),
                );
            }
        }
        Ok(drained)
    }
}

impl Drop for UringContext {
    fn drop(&mut self) {
        if self.fixed_buffers {
            let _ = self.ring.submitter().unregister_buffers();
        }
        let _ = self.ring.submitter().unregister_files();
    }
}

fn build_ring() -> io::Result<IoUring> {
    let mut builder = IoUring::builder();
    builder.setup_cqsize(RING_DEPTH * 2).setup_clamp();
    builder.build(RING_DEPTH)
}

fn ordered_writes<'a>(writes: &'a [(u64, &'a AlignedBlobBuf)]) -> Vec<OrderedWrite<'a>> {
    let mut ordered: Vec<_> = writes
        .iter()
        .enumerate()
        .map(|(order, (offset, buf))| OrderedWrite {
            offset: *offset,
            buf,
            order,
        })
        .collect();
    ordered.sort_by(|a, b| a.offset.cmp(&b.offset).then(a.order.cmp(&b.order)));

    let mut deduped = Vec::with_capacity(ordered.len());
    let mut idx = 0usize;
    while idx < ordered.len() {
        let offset = ordered[idx].offset;
        let mut last = ordered[idx];
        idx += 1;
        while idx < ordered.len() && ordered[idx].offset == offset {
            last = ordered[idx];
            idx += 1;
        }
        deduped.push(last);
    }
    deduped
}

fn check_fsync_result(n: i32) -> io::Result<()> {
    if n < 0 {
        return Err(io::Error::from_raw_os_error(-n));
    }
    if n != 0 {
        return Err(io::Error::other(format!(
            "unexpected uring fdatasync result: {n}",
        )));
    }
    Ok(())
}

fn record_err(slot: &mut Option<io::Error>, err: io::Error) {
    if slot.is_none() {
        *slot = Some(err);
    }
}

fn mark_seen(seen: &mut [u64; CQ_BITMAP_WORDS], idx: usize) -> io::Result<()> {
    let word = idx / 64;
    let bit = 1u64 << (idx % 64);
    if seen[word] & bit != 0 {
        return Err(io::Error::other("duplicate uring CQE user_data"));
    }
    seen[word] |= bit;
    Ok(())
}

fn mark_seen_dynamic(seen: &mut [u64], idx: usize) -> io::Result<()> {
    let word = idx / 64;
    let bit = 1u64 << (idx % 64);
    let Some(seen_word) = seen.get_mut(word) else {
        return Err(io::Error::other("uring CQE user_data out of bitmap"));
    };
    if *seen_word & bit != 0 {
        return Err(io::Error::other("duplicate uring CQE user_data"));
    }
    *seen_word |= bit;
    Ok(())
}