append_db/
db.rs

1pub use crate::backend::class::{SnapshotedUpdate, State, StateBackend};
2use std::marker::Sync;
3use stm::{atomically, TVar};
4use thiserror::Error;
5
6/// We can fail either due state update logic or storage backend failure
7///
8/// Note: we cannot use associated types here as it will require 'Debug' impl for
9/// storages.
10#[derive(Error, Debug)]
11pub enum AppendErr<BackErr, UpdErr> {
12    #[error("Update state: {0}")]
13    Update(UpdErr),
14    #[error("Backend: {0}")]
15    Backend(BackErr),
16}
17
18pub struct AppendDb<T: StateBackend> {
19    pub backend: T,
20    pub last_state: TVar<T::State>,
21}
22
23impl<St: Clone + State + Sync + Send + 'static, Backend: StateBackend<State = St>>
24    AppendDb<Backend>
25{
26    /// Initialize with given backend and strarting in memory state
27    pub fn new(backend: Backend, initial_state: St) -> Self {
28        AppendDb {
29            backend,
30            last_state: TVar::new(initial_state),
31        }
32    }
33
34    /// Access current state
35    pub fn get(&self) -> St {
36        self.last_state.read_atomic()
37    }
38
39    /// Access part of state
40    pub fn get_with<F, T: Clone>(&self, getter: F) -> T
41    where
42        F: FnOnce(&St) -> T,
43    {
44        let st_arc = self.last_state.read_ref_atomic();
45        let st: &St = st_arc
46            .downcast_ref()
47            .expect("Cast to state is always valid here");
48        getter(st).clone()
49    }
50
51    /// Write down to storage new update and update in memory version
52    pub async fn update(&self, upd: St::Update) -> Result<(), AppendErr<Backend::Err, St::Err>> {
53        atomically(|trans| {
54            let mut state = self.last_state.read(trans)?;
55            let upd = state.update(upd.clone()).map_err(AppendErr::Update);
56            match upd {
57                Ok(_) => {
58                    self.last_state.write(trans, state)?;
59                    Ok(Ok(()))
60                }
61                Err(e) => Ok(Err(e)),
62            }
63        })?;
64        self.backend
65            .write(SnapshotedUpdate::Incremental(upd))
66            .await
67            .map_err(AppendErr::Backend)?;
68        Ok(())
69    }
70
71    /// Write down snapshot for current state
72    pub async fn snapshot(&self) -> Result<(), AppendErr<Backend::Err, St::Err>> {
73        let state = atomically(|trans| self.last_state.read(trans));
74        self.backend
75            .write(SnapshotedUpdate::Snapshot(state))
76            .await
77            .map_err(AppendErr::Backend)?;
78        Ok(())
79    }
80
81    /// Load state from storage
82    pub async fn load(&self) -> Result<(), AppendErr<Backend::Err, St::Err>> {
83        let updates = self.backend.updates().await.map_err(AppendErr::Backend)?;
84
85        let (mut state, start_index) = match updates.first() {
86            Some(SnapshotedUpdate::Snapshot(s)) => (s.clone(), 1),
87            _ => {
88                let state = atomically(|trans| self.last_state.read(trans));
89                (state, 0)
90            }
91        };
92
93        for upd in &updates[start_index..] {
94            match upd {
95                SnapshotedUpdate::Snapshot(s) => state = s.clone(),
96                SnapshotedUpdate::Incremental(upd) => {
97                    state.update(upd.clone()).map_err(AppendErr::Update)?
98                }
99            }
100        }
101        atomically(|trans| self.last_state.write(trans, state.clone()));
102
103        Ok(())
104    }
105
106    /// Load state from storage using provided function to patch starting and snapshot states. That
107    /// is helpful if you add some runtime info into state that is not rendered in updates.
108    ///
109    /// The second parameter in the closure indicates whether the patching occurs at start or not.
110    /// For later snapshots it will be called with false.
111    pub async fn load_patched<F>(
112        &self,
113        patch_state: F,
114    ) -> Result<(), AppendErr<Backend::Err, St::Err>>
115    where
116        F: Copy + FnOnce(St, bool) -> St,
117    {
118        let updates = self.backend.updates().await.map_err(AppendErr::Backend)?;
119
120        let (mut state, start_index) = match updates.first() {
121            Some(SnapshotedUpdate::Snapshot(s)) => (patch_state(s.clone(), true), 1),
122            _ => {
123                let state = atomically(|trans| self.last_state.read(trans));
124                (patch_state(state, true), 0)
125            }
126        };
127
128        for upd in &updates[start_index..] {
129            match upd {
130                SnapshotedUpdate::Snapshot(s) => state = patch_state(s.clone(), false),
131                SnapshotedUpdate::Incremental(upd) => {
132                    state.update(upd.clone()).map_err(AppendErr::Update)?
133                }
134            }
135        }
136        atomically(|trans| self.last_state.write(trans, state.clone()));
137
138        Ok(())
139    }
140}