Skip to main content

kithara_storage/
unified.rs

1#![forbid(unsafe_code)]
2
3use std::{
4    ops::Range,
5    path::{Path, PathBuf},
6    sync::Arc,
7};
8
9#[cfg(not(target_arch = "wasm32"))]
10use crate::MmapResource;
11use crate::{
12    AtomicChunked, MemResource, StorageResult,
13    resource::{ResourceExt, ResourceStatus, WaitOutcome},
14};
15
16/// Unified resource: disk (mmap) or memory backend.
17///
18/// Every variant wraps its inner in an [`AtomicChunked`] decorator.
19/// Fresh segment writes use the decorator in **atomic** mode (writes
20/// land at `<canonical>.tmp`, atomic-renamed on commit); re-opens of
21/// already-committed files and memory-backed inners use it in
22/// **passthrough** mode (no atomicity, zero overhead beyond the Arc).
23/// Uniform wrapping means every code path that observes a
24/// `StorageResource` sees the same atomic-on-commit guarantees, and
25/// no caller can accidentally bypass the protection.
26///
27/// `Arc` makes the variants cheaply cloneable, matching the original
28/// `Resource<D>` contract — the previous direct-`Resource` enum was
29/// also Clone via internal `Arc<DriverState>`.
30#[derive(Clone, Debug)]
31pub enum StorageResource {
32    /// File-backed mmap resource (atomic or passthrough decorator).
33    #[cfg(not(target_arch = "wasm32"))]
34    Mmap(Arc<AtomicChunked<MmapResource>>),
35    /// In-memory resource (always passthrough — memory has no
36    /// torn-write hazard).
37    Mem(Arc<AtomicChunked<MemResource>>),
38}
39
40#[cfg(not(target_arch = "wasm32"))]
41impl From<MmapResource> for StorageResource {
42    fn from(r: MmapResource) -> Self {
43        let path = r.path().map(Path::to_path_buf).unwrap_or_default();
44        Self::Mmap(Arc::new(AtomicChunked::passthrough(r, path)))
45    }
46}
47
48#[cfg(not(target_arch = "wasm32"))]
49impl From<AtomicChunked<MmapResource>> for StorageResource {
50    fn from(r: AtomicChunked<MmapResource>) -> Self {
51        Self::Mmap(Arc::new(r))
52    }
53}
54
55impl From<MemResource> for StorageResource {
56    fn from(r: MemResource) -> Self {
57        Self::Mem(Arc::new(AtomicChunked::passthrough(r, PathBuf::default())))
58    }
59}
60
61impl ResourceExt for StorageResource {
62    fn commit(&self, final_len: Option<u64>) -> StorageResult<()> {
63        match self {
64            #[cfg(not(target_arch = "wasm32"))]
65            Self::Mmap(r) => r.commit(final_len),
66            Self::Mem(r) => r.commit(final_len),
67        }
68    }
69
70    fn contains_range(&self, range: Range<u64>) -> bool {
71        match self {
72            #[cfg(not(target_arch = "wasm32"))]
73            Self::Mmap(r) => r.contains_range(range),
74            Self::Mem(r) => r.contains_range(range),
75        }
76    }
77
78    fn fail(&self, reason: String) {
79        match self {
80            #[cfg(not(target_arch = "wasm32"))]
81            Self::Mmap(r) => r.fail(reason),
82            Self::Mem(r) => r.fail(reason),
83        }
84    }
85    // ast-grep-ignore: idioms.match-self-conversion
86    fn len(&self) -> Option<u64> {
87        match self {
88            #[cfg(not(target_arch = "wasm32"))]
89            Self::Mmap(r) => r.len(),
90            Self::Mem(r) => r.len(),
91        }
92    }
93
94    fn next_gap(&self, from: u64, limit: u64) -> Option<Range<u64>> {
95        match self {
96            #[cfg(not(target_arch = "wasm32"))]
97            Self::Mmap(r) => r.next_gap(from, limit),
98            Self::Mem(r) => r.next_gap(from, limit),
99        }
100    }
101    // ast-grep-ignore: idioms.match-self-conversion
102    fn path(&self) -> Option<&Path> {
103        match self {
104            #[cfg(not(target_arch = "wasm32"))]
105            Self::Mmap(r) => r.path(),
106            Self::Mem(_) => None,
107        }
108    }
109    // ast-grep-ignore: idioms.match-self-conversion
110    fn reactivate(&self) -> StorageResult<()> {
111        match self {
112            #[cfg(not(target_arch = "wasm32"))]
113            Self::Mmap(r) => r.reactivate(),
114            Self::Mem(r) => r.reactivate(),
115        }
116    }
117
118    fn read_at(&self, offset: u64, buf: &mut [u8]) -> StorageResult<usize> {
119        match self {
120            #[cfg(not(target_arch = "wasm32"))]
121            Self::Mmap(r) => r.read_at(offset, buf),
122            Self::Mem(r) => r.read_at(offset, buf),
123        }
124    }
125    // ast-grep-ignore: idioms.match-self-conversion
126    fn status(&self) -> ResourceStatus {
127        match self {
128            #[cfg(not(target_arch = "wasm32"))]
129            Self::Mmap(r) => r.status(),
130            Self::Mem(r) => r.status(),
131        }
132    }
133
134    fn wait_range(&self, range: Range<u64>) -> StorageResult<WaitOutcome> {
135        match self {
136            #[cfg(not(target_arch = "wasm32"))]
137            Self::Mmap(r) => r.wait_range(range),
138            Self::Mem(r) => r.wait_range(range),
139        }
140    }
141
142    fn write_at(&self, offset: u64, data: &[u8]) -> StorageResult<()> {
143        match self {
144            #[cfg(not(target_arch = "wasm32"))]
145            Self::Mmap(r) => r.write_at(offset, data),
146            Self::Mem(r) => r.write_at(offset, data),
147        }
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    mod kithara {
154        pub(crate) use kithara_test_macros::test;
155    }
156
157    use kithara_platform::time::Duration;
158    use tokio_util::sync::CancellationToken;
159
160    use super::*;
161    #[cfg(not(target_arch = "wasm32"))]
162    use crate::{MmapOptions, OpenMode, Resource};
163
164    #[cfg(not(target_arch = "wasm32"))]
165    #[kithara::test(timeout(Duration::from_secs(5)))]
166    fn mmap_variant_roundtrip() {
167        let dir = tempfile::tempdir().unwrap();
168        let path = dir.path().join("test.bin");
169        let mmap: MmapResource = Resource::open(
170            CancellationToken::new(),
171            MmapOptions {
172                path,
173                initial_len: None,
174                mode: OpenMode::Auto,
175            },
176        )
177        .unwrap();
178
179        let res = StorageResource::from(mmap);
180        res.write_at(0, b"hello mmap").unwrap();
181        res.commit(Some(10)).unwrap();
182
183        let mut buf = [0u8; 10];
184        let n = res.read_at(0, &mut buf).unwrap();
185        assert_eq!(n, 10);
186        assert_eq!(&buf, b"hello mmap");
187        assert!(res.path().is_some());
188    }
189
190    #[kithara::test(timeout(Duration::from_secs(5)))]
191    fn mem_variant_roundtrip() {
192        let mem = MemResource::new(CancellationToken::new());
193        let res = StorageResource::from(mem);
194
195        res.write_at(0, b"hello mem").unwrap();
196        res.commit(Some(9)).unwrap();
197
198        let mut buf = [0u8; 9];
199        let n = res.read_at(0, &mut buf).unwrap();
200        assert_eq!(n, 9);
201        assert_eq!(&buf, b"hello mem");
202        assert!(res.path().is_none());
203    }
204
205    #[cfg(not(target_arch = "wasm32"))]
206    #[kithara::test(timeout(Duration::from_secs(5)))]
207    fn from_mmap_resource() {
208        let dir = tempfile::tempdir().unwrap();
209        let path = dir.path().join("conv.bin");
210        let mmap: MmapResource = Resource::open(
211            CancellationToken::new(),
212            MmapOptions {
213                path,
214                initial_len: None,
215                mode: OpenMode::Auto,
216            },
217        )
218        .unwrap();
219        let res: StorageResource = mmap.into();
220        assert!(matches!(res, StorageResource::Mmap(_)));
221    }
222
223    #[kithara::test(timeout(Duration::from_secs(5)))]
224    fn from_mem_resource() {
225        let mem = MemResource::new(CancellationToken::new());
226        let res: StorageResource = mem.into();
227        assert!(matches!(res, StorageResource::Mem(_)));
228    }
229
230    #[kithara::test(timeout(Duration::from_secs(5)))]
231    fn status_delegation() {
232        let mem = MemResource::new(CancellationToken::new());
233        let res = StorageResource::from(mem);
234
235        assert_eq!(res.status(), ResourceStatus::Active);
236        res.write_at(0, b"data").unwrap();
237        res.commit(Some(4)).unwrap();
238        assert!(matches!(res.status(), ResourceStatus::Committed { .. }));
239    }
240
241    #[kithara::test(timeout(Duration::from_secs(5)))]
242    fn wait_range_delegation() {
243        let mem = MemResource::new(CancellationToken::new());
244        let res = StorageResource::from(mem);
245
246        res.write_at(0, b"data").unwrap();
247        let outcome = res.wait_range(0..4).unwrap();
248        assert_eq!(outcome, WaitOutcome::Ready);
249    }
250
251    #[kithara::test(timeout(Duration::from_secs(5)))]
252    fn fail_delegation() {
253        let mem = MemResource::new(CancellationToken::new());
254        let res = StorageResource::from(mem);
255
256        res.fail("boom".to_string());
257        assert_eq!(res.status(), ResourceStatus::Failed("boom".to_string()));
258    }
259
260    #[kithara::test(timeout(Duration::from_secs(5)))]
261    fn reactivate_delegation() {
262        let mem = MemResource::new(CancellationToken::new());
263        let res = StorageResource::from(mem);
264
265        res.write_at(0, b"data").unwrap();
266        res.commit(Some(4)).unwrap();
267        assert!(matches!(res.status(), ResourceStatus::Committed { .. }));
268
269        res.reactivate().unwrap();
270        assert_eq!(res.status(), ResourceStatus::Active);
271    }
272
273    #[kithara::test(timeout(Duration::from_secs(5)))]
274    fn reactivate_clears_failed_for_refetch() {
275        let mem = MemResource::new(CancellationToken::new());
276        let res = StorageResource::from(mem);
277
278        res.write_at(0, b"par").unwrap();
279        res.fail("fetch cancelled before completion".to_string());
280        assert!(matches!(res.status(), ResourceStatus::Failed(_)));
281
282        res.reactivate()
283            .expect("reactivate must clear a prior failure so the key can be re-fetched");
284        assert_eq!(res.status(), ResourceStatus::Active);
285
286        res.write_at(0, b"data").unwrap();
287        res.commit(Some(4)).unwrap();
288        assert!(matches!(res.status(), ResourceStatus::Committed { .. }));
289    }
290}