Skip to main content

kithara_storage/backend/mmap/
driver.rs

1#![forbid(unsafe_code)]
2
3use std::{fmt, fs, ops::Range, path::PathBuf};
4
5use bon::Builder;
6use crossbeam_queue::SegQueue;
7use kithara_platform::Mutex;
8use mmap_io::MemoryMappedFile;
9use rangemap::RangeSet;
10
11use crate::{
12    StorageResult,
13    backend::{
14        resource::Resource,
15        traits::{Driver, DriverState},
16    },
17    resource::OpenMode,
18};
19
20/// Options for opening a [`MmapResource`].
21#[derive(Debug, Clone, Builder)]
22#[builder(state_mod(vis = "pub"))]
23#[non_exhaustive]
24pub struct MmapOptions {
25    /// Open mode controlling read/write behavior for existing files.
26    #[builder(default)]
27    pub mode: OpenMode,
28    /// Initial file size for new files. Ignored for existing files.
29    pub initial_len: Option<u64>,
30    /// Path to the backing file.
31    pub path: PathBuf,
32}
33
34impl MmapOptions {
35    /// Options for the given backing-file path with all other fields at
36    /// their builder defaults. For chains use [`MmapOptions::for_path`].
37    #[must_use]
38    pub fn new(path: PathBuf) -> Self {
39        Self::for_path(path).build()
40    }
41
42    /// Chainable counterpart to [`MmapOptions::new`]: returns a builder
43    /// with `path` set so callers can attach `.mode(...)` /
44    /// `.initial_len(...)` then `.build()`.
45    pub fn for_path(path: PathBuf) -> MmapOptionsBuilder<mmap_options_builder::SetPath> {
46        Self::builder().path(path)
47    }
48}
49
50/// Mmap state machine.
51///
52/// - `Active`: read-write mmap, used during streaming/writing.
53/// - `Committed`: read-only mmap, after commit (no writes allowed).
54/// - `Empty`: zero-length committed resource (no mmap needed).
55pub(super) enum MmapState {
56    Active(MemoryMappedFile),
57    Committed(MemoryMappedFile),
58    Empty,
59}
60
61impl MmapState {
62    // ast-grep-ignore: idioms.match-self-conversion
63    pub(super) fn as_readable(&self) -> Option<&MemoryMappedFile> {
64        match self {
65            Self::Active(m) | Self::Committed(m) => Some(m),
66            Self::Empty => None,
67        }
68    }
69    // ast-grep-ignore: idioms.match-self-conversion
70    pub(super) fn len(&self) -> u64 {
71        match self {
72            Self::Active(m) | Self::Committed(m) => m.len(),
73            Self::Empty => 0,
74        }
75    }
76}
77
78/// Mmap-backed storage driver.
79///
80/// Uses `mmap-io` for file-backed storage with a lock-free `SegQueue`
81/// for fast-path wait notifications.
82pub struct MmapDriver {
83    pub(super) mmap: Mutex<MmapState>,
84    pub(super) mode: OpenMode,
85    pub(super) path: PathBuf,
86    /// Lock-free queue for fast-path range notifications.
87    pub(super) ready_ranges: SegQueue<Range<u64>>,
88}
89
90impl fmt::Debug for MmapDriver {
91    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92        f.debug_struct("MmapDriver")
93            .field("path", &self.path)
94            .field("mode", &self.mode)
95            .finish_non_exhaustive()
96    }
97}
98
99pub(super) struct Consts;
100impl Consts {
101    /// Default initial size for new mmap files (64 KB).
102    pub(super) const DEFAULT_INITIAL_SIZE: u64 = 64 * 1024;
103
104    /// Growth factor when the mmap file needs to be resized.
105    pub(super) const MMAP_GROWTH_FACTOR: u64 = 2;
106}
107
108impl Driver for MmapDriver {
109    type Options = MmapOptions;
110
111    fn open(opts: MmapOptions) -> StorageResult<(Self, DriverState)> {
112        let mode = opts.mode;
113
114        let (mmap_state, init) = if opts.path.exists() && fs::metadata(&opts.path)?.len() > 0 {
115            let len;
116            let mmap_state = if mode == OpenMode::ReadWrite {
117                let mmap = MemoryMappedFile::open_rw(&opts.path)?;
118                len = mmap.len();
119                MmapState::Active(mmap)
120            } else {
121                let mmap = MemoryMappedFile::open_ro(&opts.path)?;
122                len = mmap.len();
123                MmapState::Committed(mmap)
124            };
125            let mut available = RangeSet::new();
126            available.insert(0..len);
127            let init = DriverState {
128                available,
129                is_committed: true,
130                final_len: Some(len),
131            };
132            (mmap_state, init)
133        } else if mode == OpenMode::ReadOnly {
134            (
135                MmapState::Empty,
136                DriverState {
137                    available: RangeSet::new(),
138                    is_committed: true,
139                    final_len: Some(0),
140                },
141            )
142        } else {
143            if let Some(parent) = opts.path.parent() {
144                fs::create_dir_all(parent)?;
145            }
146            let size = opts.initial_len.unwrap_or(Consts::DEFAULT_INITIAL_SIZE);
147            let mmap_state = if size == 0 {
148                MmapState::Empty
149            } else {
150                let mmap = MemoryMappedFile::create_rw(&opts.path, size)?;
151                MmapState::Active(mmap)
152            };
153            (mmap_state, DriverState::default())
154        };
155
156        let driver = Self {
157            mode,
158            mmap: Mutex::new(mmap_state),
159            path: opts.path,
160            ready_ranges: SegQueue::new(),
161        };
162
163        Ok((driver, init))
164    }
165}
166
167/// Mmap-backed storage resource.
168///
169/// Type alias for [`Resource<MmapDriver>`].
170pub type MmapResource = Resource<MmapDriver>;
171
172#[cfg(test)]
173mod tests {
174    mod kithara {
175        pub(crate) use kithara_test_macros::test;
176    }
177
178    use kithara_platform::{thread, time::Duration};
179    use tempfile::TempDir;
180    use tokio_util::sync::CancellationToken;
181
182    use super::*;
183    use crate::{
184        StorageError,
185        resource::{ResourceExt, ResourceStatus, WaitOutcome},
186    };
187
188    fn create_resource(dir: &TempDir) -> MmapResource {
189        create_resource_with_size(dir, None)
190    }
191
192    fn create_resource_with_size(dir: &TempDir, size: Option<u64>) -> MmapResource {
193        let path = dir.path().join("test.dat");
194        Resource::open(
195            CancellationToken::new(),
196            MmapOptions {
197                path,
198                initial_len: size,
199                mode: OpenMode::Auto,
200            },
201        )
202        .expect("BUG: open test resource with hard-coded params must succeed")
203    }
204
205    #[kithara::test(timeout(Duration::from_secs(1)))]
206    fn test_create_new_resource() {
207        let dir = TempDir::new().unwrap();
208        let res = create_resource(&dir);
209        assert_eq!(res.len(), None);
210        assert_eq!(res.status(), ResourceStatus::Active);
211    }
212
213    #[kithara::test(timeout(Duration::from_secs(1)))]
214    fn test_write_and_read() {
215        let dir = TempDir::new().unwrap();
216        let res = create_resource(&dir);
217
218        res.write_at(0, b"hello world").unwrap();
219        res.commit(Some(11)).unwrap();
220
221        let mut buf = [0u8; 11];
222        let n = res.read_at(0, &mut buf).unwrap();
223        assert_eq!(n, 11);
224        assert_eq!(&buf, b"hello world");
225    }
226
227    #[kithara::test(timeout(Duration::from_secs(1)))]
228    fn test_write_all_read_into() {
229        let dir = TempDir::new().unwrap();
230        let res = create_resource(&dir);
231
232        res.write_all(b"atomic data").unwrap();
233
234        let mut buf = Vec::new();
235        let n = res.read_into(&mut buf).unwrap();
236        assert_eq!(n, 11);
237        assert_eq!(&buf[..], b"atomic data");
238    }
239
240    #[kithara::test(timeout(Duration::from_secs(1)))]
241    fn test_wait_range_ready() {
242        let dir = TempDir::new().unwrap();
243        let res = create_resource(&dir);
244
245        res.write_at(0, b"data").unwrap();
246
247        let outcome = res.wait_range(0..4).unwrap();
248        assert_eq!(outcome, WaitOutcome::Ready);
249    }
250
251    #[kithara::test(timeout(Duration::from_secs(2)))]
252    fn test_wait_range_blocks_then_ready() {
253        let dir = TempDir::new().unwrap();
254        let res = create_resource(&dir);
255        let res2 = res.clone();
256
257        let handle = thread::spawn(move || {
258            thread::sleep(Duration::from_millis(50));
259            res2.write_at(0, b"delayed data").unwrap();
260        });
261
262        let outcome = res.wait_range(0..12).unwrap();
263        assert_eq!(outcome, WaitOutcome::Ready);
264        handle.join().unwrap();
265    }
266
267    #[kithara::test(timeout(Duration::from_secs(1)))]
268    fn test_wait_range_eof() {
269        let dir = TempDir::new().unwrap();
270        let res = create_resource(&dir);
271
272        res.write_at(0, b"short").unwrap();
273        res.commit(Some(5)).unwrap();
274
275        let outcome = res.wait_range(5..10).unwrap();
276        assert_eq!(outcome, WaitOutcome::Eof);
277    }
278
279    #[kithara::test(timeout(Duration::from_secs(1)))]
280    fn test_fail_wakes_waiters() {
281        let dir = TempDir::new().unwrap();
282        let res = create_resource(&dir);
283        let res2 = res.clone();
284
285        let handle = thread::spawn(move || {
286            thread::sleep(Duration::from_millis(50));
287            res2.fail("test error".to_string());
288        });
289
290        let result = res.wait_range(0..100);
291        assert!(result.is_err());
292        handle.join().unwrap();
293    }
294
295    #[kithara::test(timeout(Duration::from_secs(2)))]
296    fn test_cancel_wakes_waiters() {
297        let dir = TempDir::new().unwrap();
298        let cancel = CancellationToken::new();
299        let path = dir.path().join("cancel_test.dat");
300
301        let res: MmapResource = Resource::open(
302            cancel.clone(),
303            MmapOptions {
304                path,
305                initial_len: None,
306                mode: OpenMode::Auto,
307            },
308        )
309        .expect("BUG: open cancel-test resource with hard-coded params must succeed");
310
311        let handle = thread::spawn({
312            let cancel = cancel.clone();
313            move || {
314                thread::sleep(Duration::from_millis(50));
315                cancel.cancel();
316            }
317        });
318
319        let result = res.wait_range(0..100);
320        assert!(matches!(result, Err(StorageError::Cancelled)));
321        handle.join().unwrap();
322    }
323
324    #[kithara::test(timeout(Duration::from_secs(1)))]
325    fn test_open_existing_file() {
326        let dir = TempDir::new().unwrap();
327        let path = dir.path().join("existing.dat");
328
329        {
330            let res: MmapResource = Resource::open(
331                CancellationToken::new(),
332                MmapOptions {
333                    path: path.clone(),
334                    initial_len: None,
335                    mode: OpenMode::Auto,
336                },
337            )
338            .expect("BUG: opening the first resource in this test setup must succeed");
339            res.write_all(b"persisted data").unwrap();
340        }
341
342        let res: MmapResource = Resource::open(
343            CancellationToken::new(),
344            MmapOptions {
345                path,
346                initial_len: None,
347                mode: OpenMode::Auto,
348            },
349        )
350        .expect("BUG: re-opening the resource in this test setup must succeed");
351
352        assert_eq!(
353            res.status(),
354            ResourceStatus::Committed {
355                final_len: Some(14)
356            }
357        );
358        let mut buf = Vec::new();
359        res.read_into(&mut buf).unwrap();
360        assert_eq!(&buf[..], b"persisted data");
361    }
362
363    #[kithara::test(timeout(Duration::from_secs(1)))]
364    fn test_resize_on_large_write() {
365        let dir = TempDir::new().unwrap();
366        let res = create_resource_with_size(&dir, Some(16));
367
368        let big_data = vec![42u8; 1024];
369        res.write_at(0, &big_data).unwrap();
370        res.commit(Some(1024)).unwrap();
371
372        let mut buf = vec![0u8; 1024];
373        let n = res.read_at(0, &mut buf).unwrap();
374        assert_eq!(n, 1024);
375        assert!(buf.iter().all(|&b| b == 42));
376    }
377
378    #[kithara::test(timeout(Duration::from_secs(1)))]
379    fn test_status_transitions() {
380        let dir = TempDir::new().unwrap();
381        let res = create_resource(&dir);
382
383        assert_eq!(res.status(), ResourceStatus::Active);
384
385        res.write_at(0, b"data").unwrap();
386        assert_eq!(res.status(), ResourceStatus::Active);
387
388        res.commit(Some(4)).unwrap();
389        assert_eq!(
390            res.status(),
391            ResourceStatus::Committed { final_len: Some(4) }
392        );
393    }
394
395    #[kithara::test(timeout(Duration::from_secs(1)))]
396    fn test_status_failed() {
397        let dir = TempDir::new().unwrap();
398        let res = create_resource(&dir);
399
400        res.fail("boom".to_string());
401        assert_eq!(res.status(), ResourceStatus::Failed("boom".to_string()));
402    }
403}