reflex/storage/nvme/
mod.rs1pub mod error;
5
6#[cfg(test)]
7mod tests;
8
9pub use error::{NvmeError, NvmeResult};
10
11use std::fs::{self, File};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15use rkyv::rancor::Error as RkyvError;
16use rkyv::to_bytes;
17
18use crate::storage::CacheEntry;
19use crate::storage::mmap::MmapFileHandle;
20
21const RKYV_EXTENSION: &str = "rkyv";
22
23const TEMP_EXTENSION: &str = "rkyv.tmp";
24
25#[derive(Debug, Clone)]
26pub struct NvmeStorage {
28 storage_path: PathBuf,
29}
30
31impl NvmeStorage {
32 pub fn new(storage_path: PathBuf) -> Self {
34 Self { storage_path }
35 }
36
37 pub fn storage_path(&self) -> &Path {
39 &self.storage_path
40 }
41
42 pub fn ensure_storage_path(&self) -> NvmeResult<()> {
44 if !self.storage_path.exists() {
45 fs::create_dir_all(&self.storage_path).map_err(|_| NvmeError::StorageUnavailable {
46 path: self.storage_path.clone(),
47 })?;
48 }
49 Ok(())
50 }
51
52 fn tenant_path(&self, tenant_id: u64) -> PathBuf {
53 self.storage_path.join(tenant_id.to_string())
54 }
55
56 fn entry_path(&self, tenant_id: u64, entry_id: u64) -> PathBuf {
57 self.tenant_path(tenant_id)
58 .join(format!("{}.{}", entry_id, RKYV_EXTENSION))
59 }
60
61 fn temp_entry_path(&self, tenant_id: u64, entry_id: u64) -> PathBuf {
62 self.tenant_path(tenant_id)
63 .join(format!("{}.{}", entry_id, TEMP_EXTENSION))
64 }
65
66 fn ensure_tenant_dir(&self, tenant_id: u64) -> NvmeResult<()> {
67 let tenant_path = self.tenant_path(tenant_id);
68 if !tenant_path.exists() {
69 fs::create_dir_all(&tenant_path)
70 .map_err(|_| NvmeError::TenantDirCreationFailed { path: tenant_path })?;
71 }
72 Ok(())
73 }
74
75 pub fn store(&self, id: u64, entry: &CacheEntry) -> NvmeResult<MmapFileHandle> {
77 let tenant_id = entry.tenant_id;
78
79 self.ensure_storage_path()?;
80 self.ensure_tenant_dir(tenant_id)?;
81
82 let bytes = to_bytes::<RkyvError>(entry)
83 .map_err(|e| NvmeError::Serialization(format!("{:?}", e)))?;
84
85 let temp_path = self.temp_entry_path(tenant_id, id);
86 let final_path = self.entry_path(tenant_id, id);
87
88 {
89 let mut file = File::create(&temp_path)?;
90 file.write_all(&bytes)?;
91 file.sync_all()?;
92 }
93
94 fs::rename(&temp_path, &final_path)?;
95
96 let handle = MmapFileHandle::open(&final_path)?;
97 Ok(handle)
98 }
99
100 pub fn load(&self, id: u64, tenant_id: u64) -> NvmeResult<MmapFileHandle> {
102 let path = self.entry_path(tenant_id, id);
103
104 if !path.exists() {
105 return Err(NvmeError::NotFound {
106 tenant_id,
107 entry_id: id,
108 });
109 }
110
111 let handle = MmapFileHandle::open(&path)?;
112 Ok(handle)
113 }
114
115 pub fn delete(&self, id: u64, tenant_id: u64) -> NvmeResult<()> {
117 let path = self.entry_path(tenant_id, id);
118
119 if !path.exists() {
120 return Err(NvmeError::NotFound {
121 tenant_id,
122 entry_id: id,
123 });
124 }
125
126 fs::remove_file(&path)?;
127 Ok(())
128 }
129
130 pub fn exists(&self, id: u64, tenant_id: u64) -> bool {
132 self.entry_path(tenant_id, id).exists()
133 }
134
135 pub fn list_entries(&self, tenant_id: u64) -> NvmeResult<Vec<u64>> {
137 let tenant_path = self.tenant_path(tenant_id);
138
139 if !tenant_path.exists() {
140 return Ok(Vec::new());
141 }
142
143 let mut entries = Vec::new();
144
145 for entry in fs::read_dir(&tenant_path)? {
146 let entry = entry?;
147 let path = entry.path();
148
149 if let Some(ext) = path.extension()
150 && ext == RKYV_EXTENSION
151 && let Some(stem) = path.file_stem()
152 && let Some(stem_str) = stem.to_str()
153 && let Ok(id) = stem_str.parse::<u64>()
154 {
155 entries.push(id);
156 }
157 }
158
159 Ok(entries)
160 }
161
162 pub fn list_tenants(&self) -> NvmeResult<Vec<u64>> {
164 if !self.storage_path.exists() {
165 return Ok(Vec::new());
166 }
167
168 let mut tenants = Vec::new();
169
170 for entry in fs::read_dir(&self.storage_path)? {
171 let entry = entry?;
172 let path = entry.path();
173
174 if path.is_dir()
175 && let Some(name) = path.file_name()
176 && let Some(name_str) = name.to_str()
177 && let Ok(id) = name_str.parse::<u64>()
178 {
179 tenants.push(id);
180 }
181 }
182
183 Ok(tenants)
184 }
185
186 pub fn cleanup_empty_tenant_dirs(&self) -> NvmeResult<usize> {
188 if !self.storage_path.exists() {
189 return Ok(0);
190 }
191
192 let mut removed = 0;
193
194 for entry in fs::read_dir(&self.storage_path)? {
195 let entry = entry?;
196 let path = entry.path();
197
198 if path.is_dir() {
199 let is_empty = fs::read_dir(&path)?.next().is_none();
200
201 if is_empty {
202 fs::remove_dir(&path)?;
203 removed += 1;
204 }
205 }
206 }
207
208 Ok(removed)
209 }
210
211 pub fn stats(&self) -> NvmeResult<StorageStats> {
213 let tenants = self.list_tenants()?;
214 let mut total_entries = 0;
215 let mut total_bytes = 0;
216
217 for tenant_id in &tenants {
218 let entries = self.list_entries(*tenant_id)?;
219 total_entries += entries.len();
220
221 for entry_id in entries {
222 let path = self.entry_path(*tenant_id, entry_id);
223 if let Ok(metadata) = fs::metadata(&path) {
224 total_bytes += metadata.len();
225 }
226 }
227 }
228
229 Ok(StorageStats {
230 tenant_count: tenants.len(),
231 entry_count: total_entries,
232 total_bytes,
233 })
234 }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238pub struct StorageStats {
240 pub tenant_count: usize,
242 pub entry_count: usize,
244 pub total_bytes: u64,
246}