reflex/storage/nvme/
mod.rs

1//! NVMe-backed storage (simple file-per-entry layout).
2
3/// NVMe backend error types.
4pub mod error;
5
6#[cfg(test)]
7mod tests;
8
9pub use error::{NvmeError, NvmeResult};
10
11use std::fs::{self, File};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15use rkyv::rancor::Error as RkyvError;
16use rkyv::to_bytes;
17
18use crate::storage::CacheEntry;
19use crate::storage::mmap::MmapFileHandle;
20
21const RKYV_EXTENSION: &str = "rkyv";
22
23const TEMP_EXTENSION: &str = "rkyv.tmp";
24
25#[derive(Debug, Clone)]
26/// Stores and retrieves [`CacheEntry`] records on disk.
27pub struct NvmeStorage {
28    storage_path: PathBuf,
29}
30
31impl NvmeStorage {
32    /// Creates a storage rooted at `storage_path`.
33    pub fn new(storage_path: PathBuf) -> Self {
34        Self { storage_path }
35    }
36
37    /// Returns the root storage directory.
38    pub fn storage_path(&self) -> &Path {
39        &self.storage_path
40    }
41
42    /// Ensures the root storage directory exists.
43    pub fn ensure_storage_path(&self) -> NvmeResult<()> {
44        if !self.storage_path.exists() {
45            fs::create_dir_all(&self.storage_path).map_err(|_| NvmeError::StorageUnavailable {
46                path: self.storage_path.clone(),
47            })?;
48        }
49        Ok(())
50    }
51
52    fn tenant_path(&self, tenant_id: u64) -> PathBuf {
53        self.storage_path.join(tenant_id.to_string())
54    }
55
56    fn entry_path(&self, tenant_id: u64, entry_id: u64) -> PathBuf {
57        self.tenant_path(tenant_id)
58            .join(format!("{}.{}", entry_id, RKYV_EXTENSION))
59    }
60
61    fn temp_entry_path(&self, tenant_id: u64, entry_id: u64) -> PathBuf {
62        self.tenant_path(tenant_id)
63            .join(format!("{}.{}", entry_id, TEMP_EXTENSION))
64    }
65
66    fn ensure_tenant_dir(&self, tenant_id: u64) -> NvmeResult<()> {
67        let tenant_path = self.tenant_path(tenant_id);
68        if !tenant_path.exists() {
69            fs::create_dir_all(&tenant_path)
70                .map_err(|_| NvmeError::TenantDirCreationFailed { path: tenant_path })?;
71        }
72        Ok(())
73    }
74
75    /// Writes `entry` under `(tenant_id, id)` and returns an mmap handle to the bytes.
76    pub fn store(&self, id: u64, entry: &CacheEntry) -> NvmeResult<MmapFileHandle> {
77        let tenant_id = entry.tenant_id;
78
79        self.ensure_storage_path()?;
80        self.ensure_tenant_dir(tenant_id)?;
81
82        let bytes = to_bytes::<RkyvError>(entry)
83            .map_err(|e| NvmeError::Serialization(format!("{:?}", e)))?;
84
85        let temp_path = self.temp_entry_path(tenant_id, id);
86        let final_path = self.entry_path(tenant_id, id);
87
88        {
89            let mut file = File::create(&temp_path)?;
90            file.write_all(&bytes)?;
91            file.sync_all()?;
92        }
93
94        fs::rename(&temp_path, &final_path)?;
95
96        let handle = MmapFileHandle::open(&final_path)?;
97        Ok(handle)
98    }
99
100    /// Loads the `(tenant_id, id)` entry as an mmap handle.
101    pub fn load(&self, id: u64, tenant_id: u64) -> NvmeResult<MmapFileHandle> {
102        let path = self.entry_path(tenant_id, id);
103
104        if !path.exists() {
105            return Err(NvmeError::NotFound {
106                tenant_id,
107                entry_id: id,
108            });
109        }
110
111        let handle = MmapFileHandle::open(&path)?;
112        Ok(handle)
113    }
114
115    /// Deletes the `(tenant_id, id)` entry.
116    pub fn delete(&self, id: u64, tenant_id: u64) -> NvmeResult<()> {
117        let path = self.entry_path(tenant_id, id);
118
119        if !path.exists() {
120            return Err(NvmeError::NotFound {
121                tenant_id,
122                entry_id: id,
123            });
124        }
125
126        fs::remove_file(&path)?;
127        Ok(())
128    }
129
130    /// Returns `true` if `(tenant_id, id)` exists.
131    pub fn exists(&self, id: u64, tenant_id: u64) -> bool {
132        self.entry_path(tenant_id, id).exists()
133    }
134
135    /// Lists entry ids for `tenant_id`.
136    pub fn list_entries(&self, tenant_id: u64) -> NvmeResult<Vec<u64>> {
137        let tenant_path = self.tenant_path(tenant_id);
138
139        if !tenant_path.exists() {
140            return Ok(Vec::new());
141        }
142
143        let mut entries = Vec::new();
144
145        for entry in fs::read_dir(&tenant_path)? {
146            let entry = entry?;
147            let path = entry.path();
148
149            if let Some(ext) = path.extension()
150                && ext == RKYV_EXTENSION
151                && let Some(stem) = path.file_stem()
152                && let Some(stem_str) = stem.to_str()
153                && let Ok(id) = stem_str.parse::<u64>()
154            {
155                entries.push(id);
156            }
157        }
158
159        Ok(entries)
160    }
161
162    /// Lists tenant ids currently present under the root directory.
163    pub fn list_tenants(&self) -> NvmeResult<Vec<u64>> {
164        if !self.storage_path.exists() {
165            return Ok(Vec::new());
166        }
167
168        let mut tenants = Vec::new();
169
170        for entry in fs::read_dir(&self.storage_path)? {
171            let entry = entry?;
172            let path = entry.path();
173
174            if path.is_dir()
175                && let Some(name) = path.file_name()
176                && let Some(name_str) = name.to_str()
177                && let Ok(id) = name_str.parse::<u64>()
178            {
179                tenants.push(id);
180            }
181        }
182
183        Ok(tenants)
184    }
185
186    /// Removes empty tenant directories and returns the count removed.
187    pub fn cleanup_empty_tenant_dirs(&self) -> NvmeResult<usize> {
188        if !self.storage_path.exists() {
189            return Ok(0);
190        }
191
192        let mut removed = 0;
193
194        for entry in fs::read_dir(&self.storage_path)? {
195            let entry = entry?;
196            let path = entry.path();
197
198            if path.is_dir() {
199                let is_empty = fs::read_dir(&path)?.next().is_none();
200
201                if is_empty {
202                    fs::remove_dir(&path)?;
203                    removed += 1;
204                }
205            }
206        }
207
208        Ok(removed)
209    }
210
211    /// Returns basic storage stats by scanning the directory tree.
212    pub fn stats(&self) -> NvmeResult<StorageStats> {
213        let tenants = self.list_tenants()?;
214        let mut total_entries = 0;
215        let mut total_bytes = 0;
216
217        for tenant_id in &tenants {
218            let entries = self.list_entries(*tenant_id)?;
219            total_entries += entries.len();
220
221            for entry_id in entries {
222                let path = self.entry_path(*tenant_id, entry_id);
223                if let Ok(metadata) = fs::metadata(&path) {
224                    total_bytes += metadata.len();
225                }
226            }
227        }
228
229        Ok(StorageStats {
230            tenant_count: tenants.len(),
231            entry_count: total_entries,
232            total_bytes,
233        })
234    }
235}
236
237#[derive(Debug, Clone, Copy, PartialEq, Eq)]
238/// Aggregate stats for the NVMe storage directory.
239pub struct StorageStats {
240    /// Number of tenant directories.
241    pub tenant_count: usize,
242    /// Total number of entry files.
243    pub entry_count: usize,
244    /// Total bytes across all entry files.
245    pub total_bytes: u64,
246}