tc_transact/
fs.rs

1use std::fmt;
2use std::marker::PhantomData;
3
4use async_trait::async_trait;
5use futures::future::TryFutureExt;
6use futures::stream::{self, Stream, StreamExt, TryStreamExt};
7use get_size::GetSize;
8use log::debug;
9use safecast::AsType;
10
11use tc_error::*;
12use tcgeneric::{Id, ThreadSafe};
13
14use super::{TCResult, Transact, Transaction, TxnId};
15
16pub use freqfs::{FileLoad, FileSave};
17pub use txfs::{Key, VERSIONS};
18
19/// The underlying filesystem directory type backing a [`Dir`]
20pub type Inner<FE> = freqfs::DirLock<FE>;
21
22/// A read lock on a block in a [`File`]
23pub type BlockRead<FE, B> = txfs::FileVersionRead<TxnId, FE, B>;
24
25/// A write lock on a block in a [`File`]
26pub type BlockWrite<FE, B> = txfs::FileVersionWrite<TxnId, FE, B>;
27
28/// An entry in a [`Dir`]
29pub enum DirEntry<FE, B> {
30    Dir(Dir<FE>),
31    File(File<FE, B>),
32}
33
34impl<FE, B> DirEntry<FE, B> {
35    /// Return `true` if this [`DirEntry`] is a [`Dir`].
36    pub fn is_dir(&self) -> bool {
37        match self {
38            Self::Dir(_) => true,
39            Self::File(_) => false,
40        }
41    }
42
43    /// Return `true` if this [`DirEntry`] is a [`File`].
44    pub fn is_file(&self) -> bool {
45        match self {
46            Self::Dir(_) => false,
47            Self::File(_) => true,
48        }
49    }
50}
51
52impl<FE, B> fmt::Debug for DirEntry<FE, B> {
53    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
54        match self {
55            Self::Dir(dir) => dir.fmt(f),
56            Self::File(file) => file.fmt(f),
57        }
58    }
59}
60
61/// A transactional directory
62pub struct Dir<FE> {
63    inner: txfs::Dir<TxnId, FE>,
64}
65
66impl<FE> Clone for Dir<FE> {
67    fn clone(&self) -> Self {
68        Self {
69            inner: self.inner.clone(),
70        }
71    }
72}
73
74// TODO: support a mutable "Entry" type to use instead of calling `contains`
75impl<FE: ThreadSafe + Clone> Dir<FE> {
76    /// Destructure this [`Dir`] into its underlying [`freqfs::DirLock`].
77    pub fn into_inner(self) -> Inner<FE> {
78        self.inner.into_inner()
79    }
80
81    /// Check whether this [`Dir`] has an entry with the given `name` at `txn_id`.
82    pub async fn contains(&self, txn_id: TxnId, name: &Id) -> TCResult<bool> {
83        self.inner
84            .contains(txn_id, name)
85            .map_err(TCError::from)
86            .await
87    }
88
89    /// Delete the given entry from this [`Dir`], if present at `txn_id`.
90    pub async fn delete(&self, txn_id: TxnId, name: Id) -> TCResult<bool> {
91        self.inner.delete(txn_id, name).map_err(TCError::from).await
92    }
93
94    /// Iterate over the names of the entries in this [`Dir`] at `txn_id`.
95    pub async fn entry_names(&self, txn_id: TxnId) -> TCResult<impl Iterator<Item = Key>> {
96        self.inner.dir_names(txn_id).map_err(TCError::from).await
97    }
98
99    /// Iterate over each [`DirEntry`] in this [`Dir`] at `txn_id`, assuming it is a [`File`].
100    pub async fn files<B>(
101        &self,
102        txn_id: TxnId,
103    ) -> TCResult<impl Iterator<Item = (Key, File<FE, B>)>> {
104        let entries = self.inner.iter(txn_id).await?;
105        Ok(entries.map(|(name, entry)| {
106            let file = match &*entry {
107                txfs::DirEntry::Dir(blocks_dir) => File::new(blocks_dir.clone()),
108                other => panic!("not a block directory: {:?}", other),
109            };
110
111            (name, file)
112        }))
113    }
114
115    /// Get the sub-[`Dir`] with the given `name` at `txn_id`, or return a "not found" error.
116    pub async fn get_dir(&self, txn_id: TxnId, name: &Id) -> TCResult<Self> {
117        if let Some(dir) = self.inner.get_dir(txn_id, name).await? {
118            Ok(Self { inner: dir.clone() })
119        } else {
120            Err(TCError::not_found(name))
121        }
122    }
123
124    /// Get the [`File`] with the given `name` at `txn_id`, or return a "not found" error.
125    pub async fn get_file<B>(&self, txn_id: TxnId, name: &Id) -> TCResult<File<FE, B>>
126    where
127        B: GetSize + Clone,
128        FE: AsType<B>,
129    {
130        if let Some(blocks) = self.inner.get_dir(txn_id, name).await? {
131            Ok(File::new(blocks.clone()))
132        } else {
133            Err(TCError::not_found(name))
134        }
135    }
136
137    /// Return `true` if this [`Dir`] is empty at `txn_id`.
138    pub async fn is_empty(&self, txn_id: TxnId) -> TCResult<bool> {
139        self.inner.is_empty(txn_id).map_err(TCError::from).await
140    }
141
142    /// Construct a [`Stream`] of the [`DirEntry`]s in this [`Dir`] at `txn_id`.
143    pub async fn entries<B>(
144        &self,
145        txn_id: TxnId,
146    ) -> TCResult<impl Stream<Item = TCResult<(Key, DirEntry<FE, B>)>> + Unpin + Send + '_> {
147        let entries = self.inner.iter(txn_id).await?;
148
149        let entries = stream::iter(entries).then(move |(name, entry)| async move {
150            let entry = match &*entry {
151                txfs::DirEntry::Dir(dir) => {
152                    if dir.is_empty(txn_id).await? {
153                        panic!("an empty filesystem directory is ambiguous");
154                        // return Err(unexpected!("an empty filesystem directory is ambiguous"));
155                    } else if dir.contains_files(txn_id).await? {
156                        DirEntry::File(File::new(dir.clone()))
157                    } else {
158                        DirEntry::Dir(Self { inner: dir.clone() })
159                    }
160                }
161                txfs::DirEntry::File(block) => panic!(
162                    "a transactional Dir should never contain blocks: {:?}",
163                    block
164                ),
165            };
166
167            Ok((name, entry))
168        });
169
170        Ok(Box::pin(entries))
171    }
172
173    /// Delete any empty entries in this [`Dir`] at `txn_id`.
174    pub async fn trim(&self, txn_id: TxnId) -> TCResult<()> {
175        let mut to_delete = Vec::new();
176        let entries = self.inner.iter(txn_id).await?;
177
178        for (name, entry) in entries {
179            if let txfs::DirEntry::Dir(dir) = &*entry {
180                if dir.is_empty(txn_id).await? {
181                    to_delete.push(name);
182                }
183            }
184        }
185
186        for name in to_delete {
187            self.inner.delete(txn_id, (&*name).clone()).await?;
188        }
189
190        Ok(())
191    }
192}
193
194impl<FE> Dir<FE>
195where
196    FE: for<'a> FileSave<'a> + Clone,
197{
198    /// Load a transactional [`Dir`] from the filesystem cache.
199    pub async fn load(txn_id: TxnId, canon: freqfs::DirLock<FE>) -> TCResult<Self> {
200        txfs::Dir::load(txn_id, canon)
201            .map_ok(|inner| Self { inner })
202            .map_err(TCError::from)
203            .await
204    }
205
206    /// Create a new sub-directory with the given `name` at `txn_id`.
207    pub async fn create_dir(&self, txn_id: TxnId, name: Id) -> TCResult<Self> {
208        self.inner
209            .create_dir(txn_id, name.into())
210            .map_ok(|inner| Self { inner })
211            .map_err(TCError::from)
212            .await
213    }
214
215    /// Create a new [`File`] with the given `name` at `txn_id`.
216    pub async fn create_file<B>(&self, txn_id: TxnId, name: Id) -> TCResult<File<FE, B>>
217    where
218        B: GetSize + Clone,
219        FE: AsType<B>,
220    {
221        self.inner
222            .create_dir(txn_id, name.into())
223            .map_ok(File::new)
224            .map_err(TCError::from)
225            .await
226    }
227
228    /// Get the sub-[`Dir`] with the given `name` at `txn_id`, or create a new one.
229    pub async fn get_or_create_dir(&self, txn_id: TxnId, name: Id) -> TCResult<Self> {
230        if let Some(dir) = self.inner.get_dir(txn_id, &name).await? {
231            Ok(Self { inner: dir.clone() })
232        } else {
233            self.create_dir(txn_id, name).await
234        }
235    }
236}
237
238impl<FE: ThreadSafe + Clone + for<'a> FileSave<'a>> Dir<FE> {
239    /// Commit this [`Dir`] at `txn_id`.
240    pub async fn commit(&self, txn_id: TxnId, recursive: bool) {
241        debug!("Dir::commit {:?} (recursive: {})", self.inner, recursive);
242        self.inner.commit(txn_id, recursive).await
243    }
244
245    /// Roll back this [`Dir`] at `txn_id`.
246    pub async fn rollback(&self, txn_id: TxnId, recursive: bool) {
247        self.inner.rollback(txn_id, recursive).await
248    }
249
250    /// Finalize this [`Dir`] at `txn_id`.
251    pub async fn finalize(&self, txn_id: TxnId) {
252        self.inner.finalize(txn_id).await
253    }
254}
255
256impl<FE> fmt::Debug for Dir<FE> {
257    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258        self.inner.fmt(f)
259    }
260}
261
262/// A transactional file
263pub struct File<FE, B> {
264    inner: txfs::Dir<TxnId, FE>,
265    block: PhantomData<B>,
266}
267
268impl<FE, B> Clone for File<FE, B> {
269    fn clone(&self) -> Self {
270        Self::new(self.inner.clone())
271    }
272}
273
274impl<FE, B> File<FE, B> {
275    fn new(inner: txfs::Dir<TxnId, FE>) -> Self {
276        Self {
277            inner,
278            block: PhantomData,
279        }
280    }
281
282    /// Destructure this [`File`] into its underlying [`freqfs::DirLock`].
283    pub fn into_inner(self) -> Inner<FE>
284    where
285        FE: Send + Sync,
286    {
287        self.inner.into_inner()
288    }
289}
290
291impl<FE, B> File<FE, B>
292where
293    FE: for<'a> FileSave<'a> + Clone,
294{
295    /// Load a [`File`] from its [`Inner`] cache representation.
296    pub async fn load(inner: Inner<FE>, txn_id: TxnId) -> TCResult<Self> {
297        let inner = txfs::Dir::load(txn_id, inner).await?;
298
299        let contents = inner.iter(txn_id).await?;
300        for (name, entry) in contents {
301            if entry.is_dir() {
302                return Err(internal!(
303                    "cache dir entry {name} is a subdirectory (not a block)"
304                ));
305            }
306        }
307
308        Ok(Self::new(inner))
309    }
310}
311
312impl<FE, B> File<FE, B>
313where
314    FE: for<'a> FileSave<'a> + AsType<B> + Clone,
315    B: FileLoad + GetSize + Clone,
316{
317    /// Construct an iterator over the name of each block in this [`File`] at `txn_id`.
318    pub async fn block_ids(&self, txn_id: TxnId) -> TCResult<impl Iterator<Item = Key>> {
319        self.inner.file_names(txn_id).map_err(TCError::from).await
320    }
321
322    /// Create a new block at `txn_id` with the given `name` and `contents`.
323    pub async fn create_block(
324        &self,
325        txn_id: TxnId,
326        name: Id,
327        contents: B,
328    ) -> TCResult<BlockWrite<FE, B>> {
329        let block = self
330            .inner
331            .create_file(txn_id, name.into(), contents)
332            .await?;
333
334        block.into_write(txn_id).map_err(TCError::from).await
335    }
336
337    /// Delete the block with the given `name` at `txn_id` and return `true` if it was present.
338    pub async fn delete_block(&self, txn_id: TxnId, name: Id) -> TCResult<bool> {
339        self.inner.delete(txn_id, name).map_err(TCError::from).await
340    }
341
342    /// Return `true` if this [`File`] contains a block with the given `name` at `txn_id`.
343    pub async fn contains_block(&self, txn_id: TxnId, name: &Id) -> TCResult<bool> {
344        self.inner
345            .contains(txn_id, name)
346            .map_err(TCError::from)
347            .await
348    }
349
350    /// Iterate over the blocks in this [`File`].
351    pub async fn iter(
352        &self,
353        txn_id: TxnId,
354    ) -> TCResult<impl Stream<Item = TCResult<(Key, BlockRead<FE, B>)>> + Send + Unpin + '_> {
355        self.inner
356            .files(txn_id)
357            .map_ok(|blocks| blocks.map_err(TCError::from))
358            .map_err(TCError::from)
359            .await
360    }
361
362    /// Lock the block at `name` for reading at `txn_id`.
363    pub async fn read_block(&self, txn_id: TxnId, name: &Id) -> TCResult<BlockRead<FE, B>> {
364        self.inner
365            .read_file(txn_id, name)
366            .map_err(TCError::from)
367            .await
368    }
369
370    /// Lock the block at `name` for writing at `txn_id`.
371    pub async fn write_block(&self, txn_id: TxnId, name: &Id) -> TCResult<BlockWrite<FE, B>> {
372        self.inner
373            .write_file(txn_id, name)
374            .map_err(TCError::from)
375            .await
376    }
377}
378
379#[async_trait]
380impl<FE, B> Transact for File<FE, B>
381where
382    FE: for<'a> FileSave<'a> + AsType<B> + Clone + Send + Sync,
383    B: FileLoad + GetSize + Clone,
384    Self: Send + Sync,
385{
386    type Commit = ();
387
388    async fn commit(&self, txn_id: TxnId) -> Self::Commit {
389        self.inner.commit(txn_id, true).await
390    }
391
392    async fn rollback(&self, txn_id: &TxnId) {
393        self.inner.rollback(*txn_id, true).await
394    }
395
396    async fn finalize(&self, txn_id: &TxnId) {
397        self.inner.finalize(*txn_id).await
398    }
399}
400
401impl<FE, B> fmt::Debug for File<FE, B> {
402    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
403        write!(
404            f,
405            "a transactional File with blocks of type {}",
406            std::any::type_name::<B>()
407        )
408    }
409}
410
411/// Defines how to load a persistent data structure from the filesystem.
412#[async_trait]
413pub trait Persist<FE: ThreadSafe + Clone>: Sized {
414    type Txn: Transaction<FE>;
415    type Schema: Clone + Send + Sync;
416
417    /// Create a new instance of [`Self`] from an empty `Store`.
418    async fn create(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self>;
419
420    /// Load a saved instance of [`Self`] from persistent storage.
421    /// Should only be invoked at startup time.
422    async fn load(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self>;
423
424    /// Load a saved instance of [`Self`] from persistent storage if present, or create a new one.
425    async fn load_or_create(txn_id: TxnId, schema: Self::Schema, store: Dir<FE>) -> TCResult<Self> {
426        if store.is_empty(txn_id).await? {
427            Self::create(txn_id, schema, store).await
428        } else {
429            Self::load(txn_id, schema, store).await
430        }
431    }
432
433    /// Access the filesystem directory backing this persistent data structure.
434    fn dir(&self) -> Inner<FE>;
435}
436
437/// Copy a base state from another instance, possibly a view.
438#[async_trait]
439pub trait CopyFrom<FE: ThreadSafe + Clone, I>: Persist<FE> {
440    /// Copy a new instance of `Self` from an existing instance.
441    async fn copy_from(
442        txn: &<Self as Persist<FE>>::Txn,
443        store: Dir<FE>,
444        instance: I,
445    ) -> TCResult<Self>;
446}
447
448/// Restore a persistent state from a backup.
449#[async_trait]
450pub trait Restore<FE: ThreadSafe + Clone>: Persist<FE> {
451    /// Restore this persistent state from a backup.
452    async fn restore(&self, txn_id: TxnId, backup: &Self) -> TCResult<()>;
453}