1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use futures::future::TryFutureExt;
6use futures::stream::{self, Stream, StreamExt, TryStreamExt};
7use get_size::GetSize;
8use log::debug;
9use safecast::AsType;
10
11use tc_error::*;
12use tcgeneric::{Id, ThreadSafe};
13
14use super::{TCResult, Transact, Transaction, TxnId};
15
16pub use freqfs::{FileLoad, FileSave};
17pub use txfs::{Key, VERSIONS};
18
19pub type Inner<FE> = freqfs::DirLock<FE>;
21
22pub type BlockRead<FE, B> = txfs::FileVersionRead<TxnId, FE, B>;
24
25pub type BlockWrite<FE, B> = txfs::FileVersionWrite<TxnId, FE, B>;
27
28pub enum DirEntry<FE, B> {
30 Dir(Dir<FE>),
31 File(File<FE, B>),
32}
33
34impl<FE, B> DirEntry<FE, B> {
35 pub fn is_dir(&self) -> bool {
37 match self {
38 Self::Dir(_) => true,
39 Self::File(_) => false,
40 }
41 }
42
43 pub fn is_file(&self) -> bool {
45 match self {
46 Self::Dir(_) => false,
47 Self::File(_) => true,
48 }
49 }
50}
51
52impl<FE, B> fmt::Debug for DirEntry<FE, B> {
53 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54 match self {
55 Self::Dir(dir) => dir.fmt(f),
56 Self::File(file) => file.fmt(f),
57 }
58 }
59}
60
61pub struct Dir<FE> {
63 inner: txfs::Dir<TxnId, FE>,
64}
65
66impl<FE> Clone for Dir<FE> {
67 fn clone(&self) -> Self {
68 Self {
69 inner: self.inner.clone(),
70 }
71 }
72}
73
74impl<FE: ThreadSafe + Clone> Dir<FE> {
76 pub fn into_inner(self) -> Inner<FE> {
78 self.inner.into_inner()
79 }
80
81 pub async fn contains(&self, txn_id: TxnId, name: &Id) -> TCResult<bool> {
83 self.inner
84 .contains(txn_id, name)
85 .map_err(TCError::from)
86 .await
87 }
88
89 pub async fn delete(&self, txn_id: TxnId, name: Id) -> TCResult<bool> {
91 self.inner.delete(txn_id, name).map_err(TCError::from).await
92 }
93
94 pub async fn entry_names(&self, txn_id: TxnId) -> TCResult<impl Iterator<Item = Key>> {
96 self.inner.dir_names(txn_id).map_err(TCError::from).await
97 }
98
99 pub async fn files<B>(
101 &self,
102 txn_id: TxnId,
103 ) -> TCResult<impl Iterator<Item = (Key, File<FE, B>)>> {
104 let entries = self.inner.iter(txn_id).await?;
105 Ok(entries.map(|(name, entry)| {
106 let file = match &*entry {
107 txfs::DirEntry::Dir(blocks_dir) => File::new(blocks_dir.clone()),
108 other => panic!("not a block directory: {:?}", other),
109 };
110
111 (name, file)
112 }))
113 }
114
115 pub async fn get_dir(&self, txn_id: TxnId, name: &Id) -> TCResult<Self> {
117 if let Some(dir) = self.inner.get_dir(txn_id, name).await? {
118 Ok(Self { inner: dir.clone() })
119 } else {
120 Err(TCError::not_found(name))
121 }
122 }
123
124 pub async fn get_file<B>(&self, txn_id: TxnId, name: &Id) -> TCResult<File<FE, B>>
126 where
127 B: GetSize + Clone,
128 FE: AsType<B>,
129 {
130 if let Some(blocks) = self.inner.get_dir(txn_id, name).await? {
131 Ok(File::new(blocks.clone()))
132 } else {
133 Err(TCError::not_found(name))
134 }
135 }
136
137 pub async fn is_empty(&self, txn_id: TxnId) -> TCResult<bool> {
139 self.inner.is_empty(txn_id).map_err(TCError::from).await
140 }
141
142 pub async fn entries<B>(
144 &self,
145 txn_id: TxnId,
146 ) -> TCResult<impl Stream<Item = TCResult<(Key, DirEntry<FE, B>)>> + Unpin + Send + '_> {
147 let entries = self.inner.iter(txn_id).await?;
148
149 let entries = stream::iter(entries).then(move |(name, entry)| async move {
150 let entry = match &*entry {
151 txfs::DirEntry::Dir(dir) => {
152 if dir.is_empty(txn_id).await? {
153 panic!("an empty filesystem directory is ambiguous");
154 } else if dir.contains_files(txn_id).await? {
156 DirEntry::File(File::new(dir.clone()))
157 } else {
158 DirEntry::Dir(Self { inner: dir.clone() })
159 }
160 }
161 txfs::DirEntry::File(block) => panic!(
162 "a transactional Dir should never contain blocks: {:?}",
163 block
164 ),
165 };
166
167 Ok((name, entry))
168 });
169
170 Ok(Box::pin(entries))
171 }
172
173 pub async fn trim(&self, txn_id: TxnId) -> TCResult<()> {
175 let mut to_delete = Vec::new();
176 let entries = self.inner.iter(txn_id).await?;
177
178 for (name, entry) in entries {
179 if let txfs::DirEntry::Dir(dir) = &*entry {
180 if dir.is_empty(txn_id).await? {
181 to_delete.push(name);
182 }
183 }
184 }
185
186 for name in to_delete {
187 self.inner.delete(txn_id, (&*name).clone()).await?;
188 }
189
190 Ok(())
191 }
192}
193
194impl<FE> Dir<FE>
195where
196 FE: for<'a> FileSave<'a> + Clone,
197{
198 pub async fn load(txn_id: TxnId, canon: freqfs::DirLock<FE>) -> TCResult<Self> {
200 txfs::Dir::load(txn_id, canon)
201 .map_ok(|inner| Self { inner })
202 .map_err(TCError::from)
203 .await
204 }
205
206 pub async fn create_dir(&self, txn_id: TxnId, name: Id) -> TCResult<Self> {
208 self.inner
209 .create_dir(txn_id, name.into())
210 .map_ok(|inner| Self { inner })
211 .map_err(TCError::from)
212 .await
213 }
214
215 pub async fn create_file<B>(&self, txn_id: TxnId, name: Id) -> TCResult<File<FE, B>>
217 where
218 B: GetSize + Clone,
219 FE: AsType<B>,
220 {
221 self.inner
222 .create_dir(txn_id, name.into())
223 .map_ok(File::new)
224 .map_err(TCError::from)
225 .await
226 }
227
228 pub async fn get_or_create_dir(&self, txn_id: TxnId, name: Id) -> TCResult<Self> {
230 if let Some(dir) = self.inner.get_dir(txn_id, &name).await? {
231 Ok(Self { inner: dir.clone() })
232 } else {
233 self.create_dir(txn_id, name).await
234 }
235 }
236}
237
238impl<FE: ThreadSafe + Clone + for<'a> FileSave<'a>> Dir<FE> {
239 pub async fn commit(&self, txn_id: TxnId, recursive: bool) {
241 debug!("Dir::commit {:?} (recursive: {})", self.inner, recursive);
242 self.inner.commit(txn_id, recursive).await
243 }
244
245 pub async fn rollback(&self, txn_id: TxnId, recursive: bool) {
247 self.inner.rollback(txn_id, recursive).await
248 }
249
250 pub async fn finalize(&self, txn_id: TxnId) {
252 self.inner.finalize(txn_id).await
253 }
254}
255
256impl<FE> fmt::Debug for Dir<FE> {
257 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258 self.inner.fmt(f)
259 }
260}
261
262pub struct File<FE, B> {
264 inner: txfs::Dir<TxnId, FE>,
265 block: PhantomData<B>,
266}
267
268impl<FE, B> Clone for File<FE, B> {
269 fn clone(&self) -> Self {
270 Self::new(self.inner.clone())
271 }
272}
273
274impl<FE, B> File<FE, B> {
275 fn new(inner: txfs::Dir<TxnId, FE>) -> Self {
276 Self {
277 inner,
278 block: PhantomData,
279 }
280 }
281
282 pub fn into_inner(self) -> Inner<FE>
284 where
285 FE: Send + Sync,
286 {
287 self.inner.into_inner()
288 }
289}
290
291impl<FE, B> File<FE, B>
292where
293 FE: for<'a> FileSave<'a> + Clone,
294{
295 pub async fn load(inner: Inner<FE>, txn_id: TxnId) -> TCResult<Self> {
297 let inner = txfs::Dir::load(txn_id, inner).await?;
298
299 let contents = inner.iter(txn_id).await?;
300 for (name, entry) in contents {
301 if entry.is_dir() {
302 return Err(internal!(
303 "cache dir entry {name} is a subdirectory (not a block)"
304 ));
305 }
306 }
307
308 Ok(Self::new(inner))
309 }
310}
311
312impl<FE, B> File<FE, B>
313where
314 FE: for<'a> FileSave<'a> + AsType<B> + Clone,
315 B: FileLoad + GetSize + Clone,
316{
317 pub async fn block_ids(&self, txn_id: TxnId) -> TCResult<impl Iterator<Item = Key>> {
319 self.inner.file_names(txn_id).map_err(TCError::from).await
320 }
321
322 pub async fn create_block(
324 &self,
325 txn_id: TxnId,
326 name: Id,
327 contents: B,
328 ) -> TCResult<BlockWrite<FE, B>> {
329 let block = self
330 .inner
331 .create_file(txn_id, name.into(), contents)
332 .await?;
333
334 block.into_write(txn_id).map_err(TCError::from).await
335 }
336
337 pub async fn delete_block(&self, txn_id: TxnId, name: Id) -> TCResult<bool> {
339 self.inner.delete(txn_id, name).map_err(TCError::from).await
340 }
341
342 pub async fn contains_block(&self, txn_id: TxnId, name: &Id) -> TCResult<bool> {
344 self.inner
345 .contains(txn_id, name)
346 .map_err(TCError::from)
347 .await
348 }
349
350 pub async fn iter(
352 &self,
353 txn_id: TxnId,
354 ) -> TCResult<impl Stream<Item = TCResult<(Key, BlockRead<FE, B>)>> + Send + Unpin + '_> {
355 self.inner
356 .files(txn_id)
357 .map_ok(|blocks| blocks.map_err(TCError::from))
358 .map_err(TCError::from)
359 .await
360 }
361
362 pub async fn read_block(&self, txn_id: TxnId, name: &Id) -> TCResult<BlockRead<FE, B>> {
364 self.inner
365 .read_file(txn_id, name)
366 .map_err(TCError::from)
367 .await
368 }
369
370 pub async fn write_block(&self, txn_id: TxnId, name: &Id) -> TCResult<BlockWrite<FE, B>> {
372 self.inner
373 .write_file(txn_id, name)
374 .map_err(TCError::from)
375 .await
376 }
377}
378
379#[async_trait]
380impl<FE, B> Transact for File<FE, B>
381where
382 FE: for<'a> FileSave<'a> + AsType<B> + Clone + Send + Sync,
383 B: FileLoad + GetSize + Clone,
384 Self: Send + Sync,
385{
386 type Commit = ();
387
388 async fn commit(&self, txn_id: TxnId) -> Self::Commit {
389 self.inner.commit(txn_id, true).await
390 }
391
392 async fn rollback(&self, txn_id: &TxnId) {
393 self.inner.rollback(*txn_id, true).await
394 }
395
396 async fn finalize(&self, txn_id: &TxnId) {
397 self.inner.finalize(*txn_id).await
398 }
399}
400
401impl<FE, B> fmt::Debug for File<FE, B> {
402 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403 write!(
404 f,
405 "a transactional File with blocks of type {}",
406 std::any::type_name::<B>()
407 )
408 }
409}
410
411#[async_trait]
413pub trait Persist<FE: ThreadSafe + Clone>: Sized {
414 type Txn: Transaction<FE>;
415 type Schema: Clone + Send + Sync;
416
417 async fn create(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self>;
419
420 async fn load(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self>;
423
424 async fn load_or_create(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self> {
426 if store.is_empty(txn_id).await? {
427 Self::create(txn_id, schema, store).await
428 } else {
429 Self::load(txn_id, schema, store).await
430 }
431 }
432
433 fn dir(&self) -> Inner<FE>;
435}
436
437#[async_trait]
439pub trait CopyFrom<FE: ThreadSafe + Clone, I>: Persist<FE> {
440 async fn copy_from(
442 txn: &<Self as Persist<FE>>::Txn,
443 store: Dir<FE>,
444 instance: I,
445 ) -> TCResult<Self>;
446}
447
448#[async_trait]
450pub trait Restore<FE: ThreadSafe + Clone>: Persist<FE> {
451 async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()>;
453}