1pub use crate::backend::class::{SnapshotedUpdate, State, StateBackend};
2use std::marker::Sync;
3use stm::{atomically, TVar};
4use thiserror::Error;
5
6#[derive(Error, Debug)]
11pub enum AppendErr<BackErr, UpdErr> {
12 #[error("Update state: {0}")]
13 Update(UpdErr),
14 #[error("Backend: {0}")]
15 Backend(BackErr),
16}
17
18pub struct AppendDb<T: StateBackend> {
19 pub backend: T,
20 pub last_state: TVar<T::State>,
21}
22
23impl<St: Clone + State + Sync + Send + 'static, Backend: StateBackend<State = St>>
24 AppendDb<Backend>
25{
26 pub fn new(backend: Backend, initial_state: St) -> Self {
28 AppendDb {
29 backend,
30 last_state: TVar::new(initial_state),
31 }
32 }
33
34 pub fn get(&self) -> St {
36 self.last_state.read_atomic()
37 }
38
39 pub fn get_with<F, T: Clone>(&self, getter: F) -> T
41 where
42 F: FnOnce(&St) -> T,
43 {
44 let st_arc = self.last_state.read_ref_atomic();
45 let st: &St = st_arc
46 .downcast_ref()
47 .expect("Cast to state is always valid here");
48 getter(st).clone()
49 }
50
51 pub async fn update(&self, upd: St::Update) -> Result<(), AppendErr<Backend::Err, St::Err>> {
53 atomically(|trans| {
54 let mut state = self.last_state.read(trans)?;
55 let upd = state.update(upd.clone()).map_err(AppendErr::Update);
56 match upd {
57 Ok(_) => {
58 self.last_state.write(trans, state)?;
59 Ok(Ok(()))
60 }
61 Err(e) => Ok(Err(e)),
62 }
63 })?;
64 self.backend
65 .write(SnapshotedUpdate::Incremental(upd))
66 .await
67 .map_err(AppendErr::Backend)?;
68 Ok(())
69 }
70
71 pub async fn snapshot(&self) -> Result<(), AppendErr<Backend::Err, St::Err>> {
73 let state = atomically(|trans| self.last_state.read(trans));
74 self.backend
75 .write(SnapshotedUpdate::Snapshot(state))
76 .await
77 .map_err(AppendErr::Backend)?;
78 Ok(())
79 }
80
81 pub async fn load(&self) -> Result<(), AppendErr<Backend::Err, St::Err>> {
83 let updates = self.backend.updates().await.map_err(AppendErr::Backend)?;
84
85 let (mut state, start_index) = match updates.first() {
86 Some(SnapshotedUpdate::Snapshot(s)) => (s.clone(), 1),
87 _ => {
88 let state = atomically(|trans| self.last_state.read(trans));
89 (state, 0)
90 }
91 };
92
93 for upd in &updates[start_index..] {
94 match upd {
95 SnapshotedUpdate::Snapshot(s) => state = s.clone(),
96 SnapshotedUpdate::Incremental(upd) => {
97 state.update(upd.clone()).map_err(AppendErr::Update)?
98 }
99 }
100 }
101 atomically(|trans| self.last_state.write(trans, state.clone()));
102
103 Ok(())
104 }
105
106 pub async fn load_patched<F>(
112 &self,
113 patch_state: F,
114 ) -> Result<(), AppendErr<Backend::Err, St::Err>>
115 where
116 F: Copy + FnOnce(St, bool) -> St,
117 {
118 let updates = self.backend.updates().await.map_err(AppendErr::Backend)?;
119
120 let (mut state, start_index) = match updates.first() {
121 Some(SnapshotedUpdate::Snapshot(s)) => (patch_state(s.clone(), true), 1),
122 _ => {
123 let state = atomically(|trans| self.last_state.read(trans));
124 (patch_state(state, true), 0)
125 }
126 };
127
128 for upd in &updates[start_index..] {
129 match upd {
130 SnapshotedUpdate::Snapshot(s) => state = patch_state(s.clone(), false),
131 SnapshotedUpdate::Incremental(upd) => {
132 state.update(upd.clone()).map_err(AppendErr::Update)?
133 }
134 }
135 }
136 atomically(|trans| self.last_state.write(trans, state.clone()));
137
138 Ok(())
139 }
140}