kithara_storage/backend/mmap/
driver.rs1#![forbid(unsafe_code)]
2
3use std::{fmt, fs, ops::Range, path::PathBuf};
4
5use bon::Builder;
6use crossbeam_queue::SegQueue;
7use kithara_platform::Mutex;
8use mmap_io::MemoryMappedFile;
9use rangemap::RangeSet;
10
11use crate::{
12 StorageResult,
13 backend::{
14 resource::Resource,
15 traits::{Driver, DriverState},
16 },
17 resource::OpenMode,
18};
19
20#[derive(Debug, Clone, Builder)]
22#[builder(state_mod(vis = "pub"))]
23#[non_exhaustive]
24pub struct MmapOptions {
25 #[builder(default)]
27 pub mode: OpenMode,
28 pub initial_len: Option<u64>,
30 pub path: PathBuf,
32}
33
34impl MmapOptions {
35 #[must_use]
38 pub fn new(path: PathBuf) -> Self {
39 Self::for_path(path).build()
40 }
41
42 pub fn for_path(path: PathBuf) -> MmapOptionsBuilder<mmap_options_builder::SetPath> {
46 Self::builder().path(path)
47 }
48}
49
50pub(super) enum MmapState {
56 Active(MemoryMappedFile),
57 Committed(MemoryMappedFile),
58 Empty,
59}
60
61impl MmapState {
62 pub(super) fn as_readable(&self) -> Option<&MemoryMappedFile> {
64 match self {
65 Self::Active(m) | Self::Committed(m) => Some(m),
66 Self::Empty => None,
67 }
68 }
69 pub(super) fn len(&self) -> u64 {
71 match self {
72 Self::Active(m) | Self::Committed(m) => m.len(),
73 Self::Empty => 0,
74 }
75 }
76}
77
78pub struct MmapDriver {
83 pub(super) mmap: Mutex<MmapState>,
84 pub(super) mode: OpenMode,
85 pub(super) path: PathBuf,
86 pub(super) ready_ranges: SegQueue<Range<u64>>,
88}
89
90impl fmt::Debug for MmapDriver {
91 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
92 f.debug_struct("MmapDriver")
93 .field("path", &self.path)
94 .field("mode", &self.mode)
95 .finish_non_exhaustive()
96 }
97}
98
99pub(super) struct Consts;
100impl Consts {
101 pub(super) const DEFAULT_INITIAL_SIZE: u64 = 64 * 1024;
103
104 pub(super) const MMAP_GROWTH_FACTOR: u64 = 2;
106}
107
108impl Driver for MmapDriver {
109 type Options = MmapOptions;
110
111 fn open(opts: MmapOptions) -> StorageResult<(Self, DriverState)> {
112 let mode = opts.mode;
113
114 let (mmap_state, init) = if opts.path.exists() && fs::metadata(&opts.path)?.len() > 0 {
115 let len;
116 let mmap_state = if mode == OpenMode::ReadWrite {
117 let mmap = MemoryMappedFile::open_rw(&opts.path)?;
118 len = mmap.len();
119 MmapState::Active(mmap)
120 } else {
121 let mmap = MemoryMappedFile::open_ro(&opts.path)?;
122 len = mmap.len();
123 MmapState::Committed(mmap)
124 };
125 let mut available = RangeSet::new();
126 available.insert(0..len);
127 let init = DriverState {
128 available,
129 is_committed: true,
130 final_len: Some(len),
131 };
132 (mmap_state, init)
133 } else if mode == OpenMode::ReadOnly {
134 (
135 MmapState::Empty,
136 DriverState {
137 available: RangeSet::new(),
138 is_committed: true,
139 final_len: Some(0),
140 },
141 )
142 } else {
143 if let Some(parent) = opts.path.parent() {
144 fs::create_dir_all(parent)?;
145 }
146 let size = opts.initial_len.unwrap_or(Consts::DEFAULT_INITIAL_SIZE);
147 let mmap_state = if size == 0 {
148 MmapState::Empty
149 } else {
150 let mmap = MemoryMappedFile::create_rw(&opts.path, size)?;
151 MmapState::Active(mmap)
152 };
153 (mmap_state, DriverState::default())
154 };
155
156 let driver = Self {
157 mode,
158 mmap: Mutex::new(mmap_state),
159 path: opts.path,
160 ready_ranges: SegQueue::new(),
161 };
162
163 Ok((driver, init))
164 }
165}
166
167pub type MmapResource = Resource<MmapDriver>;
171
172#[cfg(test)]
173mod tests {
174 mod kithara {
175 pub(crate) use kithara_test_macros::test;
176 }
177
178 use kithara_platform::{thread, time::Duration};
179 use tempfile::TempDir;
180 use tokio_util::sync::CancellationToken;
181
182 use super::*;
183 use crate::{
184 StorageError,
185 resource::{ResourceExt, ResourceStatus, WaitOutcome},
186 };
187
188 fn create_resource(dir: &TempDir) -> MmapResource {
189 create_resource_with_size(dir, None)
190 }
191
192 fn create_resource_with_size(dir: &TempDir, size: Option<u64>) -> MmapResource {
193 let path = dir.path().join("test.dat");
194 Resource::open(
195 CancellationToken::new(),
196 MmapOptions {
197 path,
198 initial_len: size,
199 mode: OpenMode::Auto,
200 },
201 )
202 .expect("BUG: open test resource with hard-coded params must succeed")
203 }
204
205 #[kithara::test(timeout(Duration::from_secs(1)))]
206 fn test_create_new_resource() {
207 let dir = TempDir::new().unwrap();
208 let res = create_resource(&dir);
209 assert_eq!(res.len(), None);
210 assert_eq!(res.status(), ResourceStatus::Active);
211 }
212
213 #[kithara::test(timeout(Duration::from_secs(1)))]
214 fn test_write_and_read() {
215 let dir = TempDir::new().unwrap();
216 let res = create_resource(&dir);
217
218 res.write_at(0, b"hello world").unwrap();
219 res.commit(Some(11)).unwrap();
220
221 let mut buf = [0u8; 11];
222 let n = res.read_at(0, &mut buf).unwrap();
223 assert_eq!(n, 11);
224 assert_eq!(&buf, b"hello world");
225 }
226
227 #[kithara::test(timeout(Duration::from_secs(1)))]
228 fn test_write_all_read_into() {
229 let dir = TempDir::new().unwrap();
230 let res = create_resource(&dir);
231
232 res.write_all(b"atomic data").unwrap();
233
234 let mut buf = Vec::new();
235 let n = res.read_into(&mut buf).unwrap();
236 assert_eq!(n, 11);
237 assert_eq!(&buf[..], b"atomic data");
238 }
239
240 #[kithara::test(timeout(Duration::from_secs(1)))]
241 fn test_wait_range_ready() {
242 let dir = TempDir::new().unwrap();
243 let res = create_resource(&dir);
244
245 res.write_at(0, b"data").unwrap();
246
247 let outcome = res.wait_range(0..4).unwrap();
248 assert_eq!(outcome, WaitOutcome::Ready);
249 }
250
251 #[kithara::test(timeout(Duration::from_secs(2)))]
252 fn test_wait_range_blocks_then_ready() {
253 let dir = TempDir::new().unwrap();
254 let res = create_resource(&dir);
255 let res2 = res.clone();
256
257 let handle = thread::spawn(move || {
258 thread::sleep(Duration::from_millis(50));
259 res2.write_at(0, b"delayed data").unwrap();
260 });
261
262 let outcome = res.wait_range(0..12).unwrap();
263 assert_eq!(outcome, WaitOutcome::Ready);
264 handle.join().unwrap();
265 }
266
267 #[kithara::test(timeout(Duration::from_secs(1)))]
268 fn test_wait_range_eof() {
269 let dir = TempDir::new().unwrap();
270 let res = create_resource(&dir);
271
272 res.write_at(0, b"short").unwrap();
273 res.commit(Some(5)).unwrap();
274
275 let outcome = res.wait_range(5..10).unwrap();
276 assert_eq!(outcome, WaitOutcome::Eof);
277 }
278
279 #[kithara::test(timeout(Duration::from_secs(1)))]
280 fn test_fail_wakes_waiters() {
281 let dir = TempDir::new().unwrap();
282 let res = create_resource(&dir);
283 let res2 = res.clone();
284
285 let handle = thread::spawn(move || {
286 thread::sleep(Duration::from_millis(50));
287 res2.fail("test error".to_string());
288 });
289
290 let result = res.wait_range(0..100);
291 assert!(result.is_err());
292 handle.join().unwrap();
293 }
294
295 #[kithara::test(timeout(Duration::from_secs(2)))]
296 fn test_cancel_wakes_waiters() {
297 let dir = TempDir::new().unwrap();
298 let cancel = CancellationToken::new();
299 let path = dir.path().join("cancel_test.dat");
300
301 let res: MmapResource = Resource::open(
302 cancel.clone(),
303 MmapOptions {
304 path,
305 initial_len: None,
306 mode: OpenMode::Auto,
307 },
308 )
309 .expect("BUG: open cancel-test resource with hard-coded params must succeed");
310
311 let handle = thread::spawn({
312 let cancel = cancel.clone();
313 move || {
314 thread::sleep(Duration::from_millis(50));
315 cancel.cancel();
316 }
317 });
318
319 let result = res.wait_range(0..100);
320 assert!(matches!(result, Err(StorageError::Cancelled)));
321 handle.join().unwrap();
322 }
323
324 #[kithara::test(timeout(Duration::from_secs(1)))]
325 fn test_open_existing_file() {
326 let dir = TempDir::new().unwrap();
327 let path = dir.path().join("existing.dat");
328
329 {
330 let res: MmapResource = Resource::open(
331 CancellationToken::new(),
332 MmapOptions {
333 path: path.clone(),
334 initial_len: None,
335 mode: OpenMode::Auto,
336 },
337 )
338 .expect("BUG: opening the first resource in this test setup must succeed");
339 res.write_all(b"persisted data").unwrap();
340 }
341
342 let res: MmapResource = Resource::open(
343 CancellationToken::new(),
344 MmapOptions {
345 path,
346 initial_len: None,
347 mode: OpenMode::Auto,
348 },
349 )
350 .expect("BUG: re-opening the resource in this test setup must succeed");
351
352 assert_eq!(
353 res.status(),
354 ResourceStatus::Committed {
355 final_len: Some(14)
356 }
357 );
358 let mut buf = Vec::new();
359 res.read_into(&mut buf).unwrap();
360 assert_eq!(&buf[..], b"persisted data");
361 }
362
363 #[kithara::test(timeout(Duration::from_secs(1)))]
364 fn test_resize_on_large_write() {
365 let dir = TempDir::new().unwrap();
366 let res = create_resource_with_size(&dir, Some(16));
367
368 let big_data = vec![42u8; 1024];
369 res.write_at(0, &big_data).unwrap();
370 res.commit(Some(1024)).unwrap();
371
372 let mut buf = vec![0u8; 1024];
373 let n = res.read_at(0, &mut buf).unwrap();
374 assert_eq!(n, 1024);
375 assert!(buf.iter().all(|&b| b == 42));
376 }
377
378 #[kithara::test(timeout(Duration::from_secs(1)))]
379 fn test_status_transitions() {
380 let dir = TempDir::new().unwrap();
381 let res = create_resource(&dir);
382
383 assert_eq!(res.status(), ResourceStatus::Active);
384
385 res.write_at(0, b"data").unwrap();
386 assert_eq!(res.status(), ResourceStatus::Active);
387
388 res.commit(Some(4)).unwrap();
389 assert_eq!(
390 res.status(),
391 ResourceStatus::Committed { final_len: Some(4) }
392 );
393 }
394
395 #[kithara::test(timeout(Duration::from_secs(1)))]
396 fn test_status_failed() {
397 let dir = TempDir::new().unwrap();
398 let res = create_resource(&dir);
399
400 res.fail("boom".to_string());
401 assert_eq!(res.status(), ResourceStatus::Failed("boom".to_string()));
402 }
403}