kithara_storage/backend/mmap/
io.rs1#![forbid(unsafe_code)]
2
3use std::{
4 fs::{self, OpenOptions},
5 ops::Range,
6 path::Path,
7};
8
9use mmap_io::MemoryMappedFile;
10
11use crate::{
12 StorageError, StorageResult,
13 backend::{
14 mmap::driver::{Consts, MmapDriver, MmapState},
15 traits::DriverIo,
16 },
17 resource::OpenMode,
18};
19
20impl DriverIo for MmapDriver {
21 fn commit(&self, final_len: Option<u64>) -> StorageResult<()> {
22 let mut mmap_guard = self.mmap.lock_sync();
23
24 if let Some(len) = final_len {
25 if len > 0 {
26 let needs_truncate = matches!(
27 &*mmap_guard,
28 MmapState::Active(mmap) if len < mmap.len()
29 );
30
31 *mmap_guard = MmapState::Empty;
32
33 if needs_truncate {
34 let file_len = fs::metadata(&self.path).map_or(0, |m| m.len());
35 if file_len > len {
36 let f = OpenOptions::new()
37 .write(true)
38 .open(&self.path)
39 .map_err(|e| StorageError::Failed(format!("truncate open: {e}")))?;
40 f.set_len(len)
41 .map_err(|e| StorageError::Failed(format!("truncate: {e}")))?;
42 }
43 }
44
45 let ro = MemoryMappedFile::open_ro(&self.path)?;
46 *mmap_guard = MmapState::Committed(ro);
47 } else {
48 *mmap_guard = MmapState::Empty;
49 let _ = fs::write(&self.path, b"");
50 }
51 } else {
52 let is_active = matches!(*mmap_guard, MmapState::Active(_));
53 if is_active {
54 *mmap_guard = MmapState::Empty;
55 if self.path.exists() && fs::metadata(&self.path).is_ok_and(|m| m.len() > 0) {
56 let ro = MemoryMappedFile::open_ro(&self.path)?;
57 *mmap_guard = MmapState::Committed(ro);
58 }
59 }
60 }
61
62 drop(mmap_guard);
63 Ok(())
64 }
65
66 fn notify_write(&self, range: &Range<u64>) {
67 self.ready_ranges.push(range.clone());
68 }
69
70 fn path(&self) -> Option<&Path> {
71 Some(&self.path)
72 }
73
74 fn reactivate(&self) -> StorageResult<()> {
75 let mut mmap_guard = self.mmap.lock_sync();
76
77 match &*mmap_guard {
78 MmapState::Active(_) => {}
79 MmapState::Committed(_) | MmapState::Empty => {
80 *mmap_guard = MmapState::Empty;
81 if self.path.exists() && fs::metadata(&self.path).is_ok_and(|m| m.len() > 0) {
82 let rw = MemoryMappedFile::open_rw(&self.path)?;
83 *mmap_guard = MmapState::Active(rw);
84 }
85 }
86 }
87
88 drop(mmap_guard);
89 Ok(())
90 }
91
92 #[cfg_attr(feature = "perf", hotpath::measure)]
93 fn read_at(&self, offset: u64, buf: &mut [u8], _effective_len: u64) -> StorageResult<usize> {
94 {
95 let mmap_guard = self.mmap.lock_sync();
96 if let Some(mmap) = mmap_guard.as_readable() {
97 mmap.read_into(offset, buf)?;
98 }
99 }
100 Ok(buf.len())
101 }
102
103 fn storage_len(&self) -> u64 {
104 let mmap_guard = self.mmap.lock_sync();
105 mmap_guard.len()
106 }
107
108 fn try_fast_check(&self, range: &Range<u64>) -> bool {
109 let mut found_match = false;
110 while let Some(ready) = self.ready_ranges.pop() {
111 if ready.start <= range.start && ready.end >= range.end {
112 found_match = true;
113 break;
114 }
115 }
116 found_match
117 }
118
119 #[cfg_attr(feature = "perf", hotpath::measure)]
120 fn write_at(&self, offset: u64, data: &[u8], committed: bool) -> StorageResult<()> {
121 let end = offset + data.len() as u64;
122 let mut mmap_guard = self.mmap.lock_sync();
123
124 if committed {
125 match (&*mmap_guard, self.mode) {
126 (MmapState::Committed(_), OpenMode::ReadWrite) => {
127 *mmap_guard = MmapState::Empty;
128 let rw = MemoryMappedFile::open_rw(&self.path)?;
129 *mmap_guard = MmapState::Active(rw);
130 }
131 (MmapState::Active(_), _)
132 | (MmapState::Empty, OpenMode::Auto | OpenMode::ReadWrite) => {}
133 _ => {
134 return Err(StorageError::Failed(
135 "cannot write to committed resource".to_string(),
136 ));
137 }
138 }
139 }
140
141 let mmap = match &*mmap_guard {
142 MmapState::Active(m) => {
143 if end > m.len() {
144 let new_size = end.max(m.len() * Consts::MMAP_GROWTH_FACTOR);
145 m.resize(new_size)?;
146 }
147 m
148 }
149 MmapState::Committed(_) => {
150 return Err(StorageError::Failed(
151 "cannot write to committed resource".to_string(),
152 ));
153 }
154 MmapState::Empty => {
155 let size = end.max(Consts::DEFAULT_INITIAL_SIZE);
156 let m = MemoryMappedFile::create_rw(&self.path, size)?;
157 *mmap_guard = MmapState::Active(m);
158 match &*mmap_guard {
159 MmapState::Active(m) => m,
160 _ => {
161 return Err(StorageError::Failed(
162 "mmap not available after create".to_string(),
163 ));
164 }
165 }
166 }
167 };
168
169 mmap.update_region(offset, data)?;
170 Ok(())
171 }
172}