Skip to main content

nodedb_wal/
align.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! O_DIRECT alignment utilities.
4//!
5//! O_DIRECT requires that:
6//! - The file offset is aligned to the logical block size (typically 512 or 4096).
7//! - The memory buffer address is aligned to the logical block size.
8//! - The I/O size is a multiple of the logical block size.
9//!
10//! This module provides an aligned buffer type that satisfies these constraints.
11
12use crate::error::{Result, WalError};
13
14/// Default alignment for O_DIRECT I/O (4 KiB).
15///
16/// This matches the typical NVMe logical block size and Linux page size.
17/// Can be overridden at runtime via `WalWriterConfig::alignment`, which reads
18/// from `WalTuning::alignment`.
19pub const DEFAULT_ALIGNMENT: usize = 4096;
20
21/// An aligned byte buffer suitable for O_DIRECT I/O.
22///
23/// The buffer's starting address is guaranteed to be aligned to `alignment`,
24/// and its capacity is rounded up to a multiple of `alignment`.
25pub struct AlignedBuf {
26    /// Raw allocation. Layout guarantees alignment.
27    ptr: *mut u8,
28
29    /// Usable capacity (always a multiple of `alignment`).
30    capacity: usize,
31
32    /// Current write position within the buffer.
33    len: usize,
34
35    /// Alignment in bytes (power of two).
36    alignment: usize,
37
38    /// Layout used for deallocation.
39    layout: std::alloc::Layout,
40}
41
42// SAFETY: The buffer is a plain byte array with no interior mutability concerns.
43// It is owned by a single writer at a time.
44unsafe impl Send for AlignedBuf {}
45
46impl AlignedBuf {
47    /// Allocate a new aligned buffer.
48    ///
49    /// `min_capacity` is rounded up to the next multiple of `alignment`.
50    /// `alignment` must be a power of two.
51    pub fn new(min_capacity: usize, alignment: usize) -> Result<Self> {
52        assert!(
53            alignment.is_power_of_two(),
54            "alignment must be power of two"
55        );
56        assert!(alignment > 0, "alignment must be > 0");
57
58        // Round capacity up to alignment boundary.
59        let capacity = round_up(min_capacity.max(alignment), alignment);
60
61        let layout = std::alloc::Layout::from_size_align(capacity, alignment).map_err(|_| {
62            WalError::AlignmentViolation {
63                context: "buffer allocation",
64                required: alignment,
65                actual: min_capacity,
66            }
67        })?;
68
69        // SAFETY: Layout has non-zero size (capacity >= alignment > 0).
70        let ptr = unsafe { std::alloc::alloc_zeroed(layout) };
71        if ptr.is_null() {
72            std::alloc::handle_alloc_error(layout);
73        }
74
75        Ok(Self {
76            ptr,
77            capacity,
78            len: 0,
79            alignment,
80            layout,
81        })
82    }
83
84    /// Allocate with the default 4 KiB alignment.
85    pub fn with_default_alignment(min_capacity: usize) -> Result<Self> {
86        Self::new(min_capacity, DEFAULT_ALIGNMENT)
87    }
88
89    /// Write bytes into the buffer. Returns the number of bytes written.
90    ///
91    /// If the buffer doesn't have enough remaining capacity, writes as much
92    /// as possible and returns the count.
93    pub fn write(&mut self, data: &[u8]) -> usize {
94        let available = self.capacity - self.len;
95        let to_write = data.len().min(available);
96        if to_write > 0 {
97            // SAFETY: ptr + len is within the allocation, and to_write <= available.
98            unsafe {
99                std::ptr::copy_nonoverlapping(data.as_ptr(), self.ptr.add(self.len), to_write);
100            }
101            self.len += to_write;
102        }
103        to_write
104    }
105
106    /// Get the written portion of the buffer as a byte slice.
107    ///
108    /// The returned slice starts at an aligned address.
109    pub fn as_slice(&self) -> &[u8] {
110        // SAFETY: ptr is valid for `len` bytes, and we have shared access.
111        unsafe { std::slice::from_raw_parts(self.ptr, self.len) }
112    }
113
114    /// Get the written portion padded up to the next alignment boundary.
115    ///
116    /// O_DIRECT requires the I/O size to be a multiple of the block size.
117    /// The padding bytes are zeroed (from `alloc_zeroed`).
118    pub fn as_aligned_slice(&self) -> &[u8] {
119        let aligned_len = round_up(self.len, self.alignment);
120        let actual_len = aligned_len.min(self.capacity);
121        // SAFETY: ptr is valid for `capacity` bytes, and actual_len <= capacity.
122        unsafe { std::slice::from_raw_parts(self.ptr, actual_len) }
123    }
124
125    /// Reset the buffer for reuse without deallocating.
126    pub fn clear(&mut self) {
127        self.len = 0;
128    }
129
130    /// Current number of bytes written.
131    pub fn len(&self) -> usize {
132        self.len
133    }
134
135    /// Whether the buffer has no written data.
136    pub fn is_empty(&self) -> bool {
137        self.len == 0
138    }
139
140    /// Total capacity of the buffer.
141    pub fn capacity(&self) -> usize {
142        self.capacity
143    }
144
145    /// Remaining capacity.
146    pub fn remaining(&self) -> usize {
147        self.capacity - self.len
148    }
149
150    /// The alignment of this buffer.
151    pub fn alignment(&self) -> usize {
152        self.alignment
153    }
154
155    /// Raw pointer to the buffer start (for io_uring submission).
156    pub fn as_ptr(&self) -> *const u8 {
157        self.ptr
158    }
159
160    /// Mutable raw pointer to the buffer start.
161    pub fn as_mut_ptr(&mut self) -> *mut u8 {
162        self.ptr
163    }
164}
165
166impl Drop for AlignedBuf {
167    fn drop(&mut self) {
168        // SAFETY: ptr was allocated with this layout in `new()`.
169        unsafe {
170            std::alloc::dealloc(self.ptr, self.layout);
171        }
172    }
173}
174
175/// Round `value` up to the next multiple of `align` (which must be a power of two).
176#[inline]
177pub const fn round_up(value: usize, align: usize) -> usize {
178    (value + align - 1) & !(align - 1)
179}
180
181/// Check if a value is aligned to the given alignment.
182#[inline]
183pub const fn is_aligned(value: usize, align: usize) -> bool {
184    value & (align - 1) == 0
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn round_up_works() {
193        assert_eq!(round_up(0, 4096), 0);
194        assert_eq!(round_up(1, 4096), 4096);
195        assert_eq!(round_up(4096, 4096), 4096);
196        assert_eq!(round_up(4097, 4096), 8192);
197        assert_eq!(round_up(8192, 4096), 8192);
198    }
199
200    #[test]
201    fn is_aligned_works() {
202        assert!(is_aligned(0, 4096));
203        assert!(is_aligned(4096, 4096));
204        assert!(is_aligned(8192, 4096));
205        assert!(!is_aligned(1, 4096));
206        assert!(!is_aligned(4097, 4096));
207    }
208
209    #[test]
210    fn aligned_buf_address_is_aligned() {
211        let buf = AlignedBuf::with_default_alignment(1).unwrap();
212        assert!(is_aligned(buf.as_ptr() as usize, DEFAULT_ALIGNMENT));
213    }
214
215    #[test]
216    fn aligned_buf_capacity_is_aligned() {
217        let buf = AlignedBuf::with_default_alignment(1).unwrap();
218        assert!(is_aligned(buf.capacity(), DEFAULT_ALIGNMENT));
219        assert!(buf.capacity() >= DEFAULT_ALIGNMENT);
220    }
221
222    #[test]
223    fn aligned_buf_write_and_read() {
224        let mut buf = AlignedBuf::with_default_alignment(8192).unwrap();
225        let data = b"hello nodedb WAL";
226        let written = buf.write(data);
227        assert_eq!(written, data.len());
228        assert_eq!(&buf.as_slice()[..data.len()], data);
229    }
230
231    #[test]
232    fn aligned_slice_pads_to_boundary() {
233        let mut buf = AlignedBuf::with_default_alignment(8192).unwrap();
234        buf.write(b"short");
235        assert_eq!(buf.len(), 5);
236        assert_eq!(buf.as_aligned_slice().len(), DEFAULT_ALIGNMENT);
237    }
238
239    #[test]
240    fn clear_resets_without_dealloc() {
241        let mut buf = AlignedBuf::with_default_alignment(8192).unwrap();
242        let ptr_before = buf.as_ptr();
243        buf.write(b"some data");
244        buf.clear();
245        assert_eq!(buf.len(), 0);
246        assert!(buf.is_empty());
247        assert_eq!(buf.as_ptr(), ptr_before); // Same allocation.
248    }
249}