1#![forbid(unsafe_code)]
2
3use std::{
4 ops::Range,
5 path::{Path, PathBuf},
6 sync::Arc,
7};
8
9#[cfg(not(target_arch = "wasm32"))]
10use crate::MmapResource;
11use crate::{
12 AtomicChunked, MemResource, StorageResult,
13 resource::{ResourceExt, ResourceStatus, WaitOutcome},
14};
15
16#[derive(Clone, Debug)]
31pub enum StorageResource {
32 #[cfg(not(target_arch = "wasm32"))]
34 Mmap(Arc<AtomicChunked<MmapResource>>),
35 Mem(Arc<AtomicChunked<MemResource>>),
38}
39
40#[cfg(not(target_arch = "wasm32"))]
41impl From<MmapResource> for StorageResource {
42 fn from(r: MmapResource) -> Self {
43 let path = r.path().map(Path::to_path_buf).unwrap_or_default();
44 Self::Mmap(Arc::new(AtomicChunked::passthrough(r, path)))
45 }
46}
47
48#[cfg(not(target_arch = "wasm32"))]
49impl From<AtomicChunked<MmapResource>> for StorageResource {
50 fn from(r: AtomicChunked<MmapResource>) -> Self {
51 Self::Mmap(Arc::new(r))
52 }
53}
54
55impl From<MemResource> for StorageResource {
56 fn from(r: MemResource) -> Self {
57 Self::Mem(Arc::new(AtomicChunked::passthrough(r, PathBuf::default())))
58 }
59}
60
61impl ResourceExt for StorageResource {
62 fn commit(&self, final_len: Option<u64>) -> StorageResult<()> {
63 match self {
64 #[cfg(not(target_arch = "wasm32"))]
65 Self::Mmap(r) => r.commit(final_len),
66 Self::Mem(r) => r.commit(final_len),
67 }
68 }
69
70 fn contains_range(&self, range: Range<u64>) -> bool {
71 match self {
72 #[cfg(not(target_arch = "wasm32"))]
73 Self::Mmap(r) => r.contains_range(range),
74 Self::Mem(r) => r.contains_range(range),
75 }
76 }
77
78 fn fail(&self, reason: String) {
79 match self {
80 #[cfg(not(target_arch = "wasm32"))]
81 Self::Mmap(r) => r.fail(reason),
82 Self::Mem(r) => r.fail(reason),
83 }
84 }
85 fn len(&self) -> Option<u64> {
87 match self {
88 #[cfg(not(target_arch = "wasm32"))]
89 Self::Mmap(r) => r.len(),
90 Self::Mem(r) => r.len(),
91 }
92 }
93
94 fn next_gap(&self, from: u64, limit: u64) -> Option<Range<u64>> {
95 match self {
96 #[cfg(not(target_arch = "wasm32"))]
97 Self::Mmap(r) => r.next_gap(from, limit),
98 Self::Mem(r) => r.next_gap(from, limit),
99 }
100 }
101 fn path(&self) -> Option<&Path> {
103 match self {
104 #[cfg(not(target_arch = "wasm32"))]
105 Self::Mmap(r) => r.path(),
106 Self::Mem(_) => None,
107 }
108 }
109 fn reactivate(&self) -> StorageResult<()> {
111 match self {
112 #[cfg(not(target_arch = "wasm32"))]
113 Self::Mmap(r) => r.reactivate(),
114 Self::Mem(r) => r.reactivate(),
115 }
116 }
117
118 fn read_at(&self, offset: u64, buf: &mut [u8]) -> StorageResult<usize> {
119 match self {
120 #[cfg(not(target_arch = "wasm32"))]
121 Self::Mmap(r) => r.read_at(offset, buf),
122 Self::Mem(r) => r.read_at(offset, buf),
123 }
124 }
125 fn status(&self) -> ResourceStatus {
127 match self {
128 #[cfg(not(target_arch = "wasm32"))]
129 Self::Mmap(r) => r.status(),
130 Self::Mem(r) => r.status(),
131 }
132 }
133
134 fn wait_range(&self, range: Range<u64>) -> StorageResult<WaitOutcome> {
135 match self {
136 #[cfg(not(target_arch = "wasm32"))]
137 Self::Mmap(r) => r.wait_range(range),
138 Self::Mem(r) => r.wait_range(range),
139 }
140 }
141
142 fn write_at(&self, offset: u64, data: &[u8]) -> StorageResult<()> {
143 match self {
144 #[cfg(not(target_arch = "wasm32"))]
145 Self::Mmap(r) => r.write_at(offset, data),
146 Self::Mem(r) => r.write_at(offset, data),
147 }
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 mod kithara {
154 pub(crate) use kithara_test_macros::test;
155 }
156
157 use kithara_platform::time::Duration;
158 use tokio_util::sync::CancellationToken;
159
160 use super::*;
161 #[cfg(not(target_arch = "wasm32"))]
162 use crate::{MmapOptions, OpenMode, Resource};
163
164 #[cfg(not(target_arch = "wasm32"))]
165 #[kithara::test(timeout(Duration::from_secs(5)))]
166 fn mmap_variant_roundtrip() {
167 let dir = tempfile::tempdir().unwrap();
168 let path = dir.path().join("test.bin");
169 let mmap: MmapResource = Resource::open(
170 CancellationToken::new(),
171 MmapOptions {
172 path,
173 initial_len: None,
174 mode: OpenMode::Auto,
175 },
176 )
177 .unwrap();
178
179 let res = StorageResource::from(mmap);
180 res.write_at(0, b"hello mmap").unwrap();
181 res.commit(Some(10)).unwrap();
182
183 let mut buf = [0u8; 10];
184 let n = res.read_at(0, &mut buf).unwrap();
185 assert_eq!(n, 10);
186 assert_eq!(&buf, b"hello mmap");
187 assert!(res.path().is_some());
188 }
189
190 #[kithara::test(timeout(Duration::from_secs(5)))]
191 fn mem_variant_roundtrip() {
192 let mem = MemResource::new(CancellationToken::new());
193 let res = StorageResource::from(mem);
194
195 res.write_at(0, b"hello mem").unwrap();
196 res.commit(Some(9)).unwrap();
197
198 let mut buf = [0u8; 9];
199 let n = res.read_at(0, &mut buf).unwrap();
200 assert_eq!(n, 9);
201 assert_eq!(&buf, b"hello mem");
202 assert!(res.path().is_none());
203 }
204
205 #[cfg(not(target_arch = "wasm32"))]
206 #[kithara::test(timeout(Duration::from_secs(5)))]
207 fn from_mmap_resource() {
208 let dir = tempfile::tempdir().unwrap();
209 let path = dir.path().join("conv.bin");
210 let mmap: MmapResource = Resource::open(
211 CancellationToken::new(),
212 MmapOptions {
213 path,
214 initial_len: None,
215 mode: OpenMode::Auto,
216 },
217 )
218 .unwrap();
219 let res: StorageResource = mmap.into();
220 assert!(matches!(res, StorageResource::Mmap(_)));
221 }
222
223 #[kithara::test(timeout(Duration::from_secs(5)))]
224 fn from_mem_resource() {
225 let mem = MemResource::new(CancellationToken::new());
226 let res: StorageResource = mem.into();
227 assert!(matches!(res, StorageResource::Mem(_)));
228 }
229
230 #[kithara::test(timeout(Duration::from_secs(5)))]
231 fn status_delegation() {
232 let mem = MemResource::new(CancellationToken::new());
233 let res = StorageResource::from(mem);
234
235 assert_eq!(res.status(), ResourceStatus::Active);
236 res.write_at(0, b"data").unwrap();
237 res.commit(Some(4)).unwrap();
238 assert!(matches!(res.status(), ResourceStatus::Committed { .. }));
239 }
240
241 #[kithara::test(timeout(Duration::from_secs(5)))]
242 fn wait_range_delegation() {
243 let mem = MemResource::new(CancellationToken::new());
244 let res = StorageResource::from(mem);
245
246 res.write_at(0, b"data").unwrap();
247 let outcome = res.wait_range(0..4).unwrap();
248 assert_eq!(outcome, WaitOutcome::Ready);
249 }
250
251 #[kithara::test(timeout(Duration::from_secs(5)))]
252 fn fail_delegation() {
253 let mem = MemResource::new(CancellationToken::new());
254 let res = StorageResource::from(mem);
255
256 res.fail("boom".to_string());
257 assert_eq!(res.status(), ResourceStatus::Failed("boom".to_string()));
258 }
259
260 #[kithara::test(timeout(Duration::from_secs(5)))]
261 fn reactivate_delegation() {
262 let mem = MemResource::new(CancellationToken::new());
263 let res = StorageResource::from(mem);
264
265 res.write_at(0, b"data").unwrap();
266 res.commit(Some(4)).unwrap();
267 assert!(matches!(res.status(), ResourceStatus::Committed { .. }));
268
269 res.reactivate().unwrap();
270 assert_eq!(res.status(), ResourceStatus::Active);
271 }
272
273 #[kithara::test(timeout(Duration::from_secs(5)))]
274 fn reactivate_clears_failed_for_refetch() {
275 let mem = MemResource::new(CancellationToken::new());
276 let res = StorageResource::from(mem);
277
278 res.write_at(0, b"par").unwrap();
279 res.fail("fetch cancelled before completion".to_string());
280 assert!(matches!(res.status(), ResourceStatus::Failed(_)));
281
282 res.reactivate()
283 .expect("reactivate must clear a prior failure so the key can be re-fetched");
284 assert_eq!(res.status(), ResourceStatus::Active);
285
286 res.write_at(0, b"data").unwrap();
287 res.commit(Some(4)).unwrap();
288 assert!(matches!(res.status(), ResourceStatus::Committed { .. }));
289 }
290}