Skip to main content

kithara_storage/backend/mmap/
io.rs

1#![forbid(unsafe_code)]
2
3use std::{
4    fs::{self, OpenOptions},
5    ops::Range,
6    path::Path,
7};
8
9use mmap_io::MemoryMappedFile;
10
11use crate::{
12    StorageError, StorageResult,
13    backend::{
14        mmap::driver::{Consts, MmapDriver, MmapState},
15        traits::DriverIo,
16    },
17    resource::OpenMode,
18};
19
20impl DriverIo for MmapDriver {
21    fn commit(&self, final_len: Option<u64>) -> StorageResult<()> {
22        let mut mmap_guard = self.mmap.lock_sync();
23
24        if let Some(len) = final_len {
25            if len > 0 {
26                let needs_truncate = matches!(
27                    &*mmap_guard,
28                    MmapState::Active(mmap) if len < mmap.len()
29                );
30
31                *mmap_guard = MmapState::Empty;
32
33                if needs_truncate {
34                    let file_len = fs::metadata(&self.path).map_or(0, |m| m.len());
35                    if file_len > len {
36                        let f = OpenOptions::new()
37                            .write(true)
38                            .open(&self.path)
39                            .map_err(|e| StorageError::Failed(format!("truncate open: {e}")))?;
40                        f.set_len(len)
41                            .map_err(|e| StorageError::Failed(format!("truncate: {e}")))?;
42                    }
43                }
44
45                let ro = MemoryMappedFile::open_ro(&self.path)?;
46                *mmap_guard = MmapState::Committed(ro);
47            } else {
48                *mmap_guard = MmapState::Empty;
49                let _ = fs::write(&self.path, b"");
50            }
51        } else {
52            let is_active = matches!(*mmap_guard, MmapState::Active(_));
53            if is_active {
54                *mmap_guard = MmapState::Empty;
55                if self.path.exists() && fs::metadata(&self.path).is_ok_and(|m| m.len() > 0) {
56                    let ro = MemoryMappedFile::open_ro(&self.path)?;
57                    *mmap_guard = MmapState::Committed(ro);
58                }
59            }
60        }
61
62        drop(mmap_guard);
63        Ok(())
64    }
65
66    fn notify_write(&self, range: &Range<u64>) {
67        self.ready_ranges.push(range.clone());
68    }
69
70    fn path(&self) -> Option<&Path> {
71        Some(&self.path)
72    }
73
74    fn reactivate(&self) -> StorageResult<()> {
75        let mut mmap_guard = self.mmap.lock_sync();
76
77        match &*mmap_guard {
78            MmapState::Active(_) => {}
79            MmapState::Committed(_) | MmapState::Empty => {
80                *mmap_guard = MmapState::Empty;
81                if self.path.exists() && fs::metadata(&self.path).is_ok_and(|m| m.len() > 0) {
82                    let rw = MemoryMappedFile::open_rw(&self.path)?;
83                    *mmap_guard = MmapState::Active(rw);
84                }
85            }
86        }
87
88        drop(mmap_guard);
89        Ok(())
90    }
91
92    #[cfg_attr(feature = "perf", hotpath::measure)]
93    fn read_at(&self, offset: u64, buf: &mut [u8], _effective_len: u64) -> StorageResult<usize> {
94        {
95            let mmap_guard = self.mmap.lock_sync();
96            if let Some(mmap) = mmap_guard.as_readable() {
97                mmap.read_into(offset, buf)?;
98            }
99        }
100        Ok(buf.len())
101    }
102
103    fn storage_len(&self) -> u64 {
104        let mmap_guard = self.mmap.lock_sync();
105        mmap_guard.len()
106    }
107
108    fn try_fast_check(&self, range: &Range<u64>) -> bool {
109        let mut found_match = false;
110        while let Some(ready) = self.ready_ranges.pop() {
111            if ready.start <= range.start && ready.end >= range.end {
112                found_match = true;
113                break;
114            }
115        }
116        found_match
117    }
118
119    #[cfg_attr(feature = "perf", hotpath::measure)]
120    fn write_at(&self, offset: u64, data: &[u8], committed: bool) -> StorageResult<()> {
121        let end = offset + data.len() as u64;
122        let mut mmap_guard = self.mmap.lock_sync();
123
124        if committed {
125            match (&*mmap_guard, self.mode) {
126                (MmapState::Committed(_), OpenMode::ReadWrite) => {
127                    *mmap_guard = MmapState::Empty;
128                    let rw = MemoryMappedFile::open_rw(&self.path)?;
129                    *mmap_guard = MmapState::Active(rw);
130                }
131                (MmapState::Active(_), _)
132                | (MmapState::Empty, OpenMode::Auto | OpenMode::ReadWrite) => {}
133                _ => {
134                    return Err(StorageError::Failed(
135                        "cannot write to committed resource".to_string(),
136                    ));
137                }
138            }
139        }
140
141        let mmap = match &*mmap_guard {
142            MmapState::Active(m) => {
143                if end > m.len() {
144                    let new_size = end.max(m.len() * Consts::MMAP_GROWTH_FACTOR);
145                    m.resize(new_size)?;
146                }
147                m
148            }
149            MmapState::Committed(_) => {
150                return Err(StorageError::Failed(
151                    "cannot write to committed resource".to_string(),
152                ));
153            }
154            MmapState::Empty => {
155                let size = end.max(Consts::DEFAULT_INITIAL_SIZE);
156                let m = MemoryMappedFile::create_rw(&self.path, size)?;
157                *mmap_guard = MmapState::Active(m);
158                match &*mmap_guard {
159                    MmapState::Active(m) => m,
160                    _ => {
161                        return Err(StorageError::Failed(
162                            "mmap not available after create".to_string(),
163                        ));
164                    }
165                }
166            }
167        };
168
169        mmap.update_region(offset, data)?;
170        Ok(())
171    }
172}