1#![warn(missing_crate_level_docs)]
3#![warn(missing_doc_code_examples)]
4#![warn(clippy::nursery)]
6
7mod autocommit;
23mod mem;
24mod persister;
25
26use std::{collections::HashMap, fmt::Debug};
27
28pub use autocommit::PersistentAutoCommit;
29use automerge::{
30 op_observer::BranchableObserver,
31 sync::{self, DecodeStateError, SyncDoc},
32 transaction::{CommitOptions, Failure, Observed, Success, Transaction, UnObserved},
33 Automerge, AutomergeError, Change, LoadChangeError, OpObserver,
34};
35pub use mem::MemoryPersister;
36pub use persister::Persister;
37
38#[derive(Debug, Default, Clone)]
40pub struct StoredSizes {
41 pub changes: u64,
43 pub document: u64,
45 pub sync_states: u64,
47}
48
49#[derive(Debug, thiserror::Error)]
51pub enum Error<E> {
52 #[error(transparent)]
54 AutomergeError(#[from] AutomergeError),
55 #[error(transparent)]
56 AutomergeDecodeError(#[from] DecodeStateError),
57 #[error(transparent)]
58 AutomergeLoadChangeError(#[from] LoadChangeError),
59 #[error(transparent)]
61 PersisterError(E),
62}
63
64#[derive(Debug, thiserror::Error)]
66pub enum TransactionError<PE, E> {
67 #[error(transparent)]
69 PersisterError(PE),
70 #[error(transparent)]
72 TransactionError(#[from] Failure<E>),
73}
74
75pub type TransactionResult<O, Obs, E, PE> = Result<Success<O, Obs>, TransactionError<PE, E>>;
76
77type PeerId = Vec<u8>;
78
79#[derive(Debug)]
81pub struct PersistentAutomerge<P> {
82 document: Automerge,
83 sync_states: HashMap<PeerId, sync::State>,
84 persister: P,
85}
86
87impl<P> PersistentAutomerge<P>
88where
89 P: Persister + 'static,
90{
91 pub const fn document(&self) -> &Automerge {
92 &self.document
93 }
94
95 pub fn document_mut(&mut self) -> &mut Automerge {
96 &mut self.document
97 }
98
99 pub fn transact<F, O, E>(&mut self, f: F) -> TransactionResult<O, (), E, P::Error>
100 where
101 F: FnOnce(&mut Transaction<UnObserved>) -> Result<O, E>,
102 {
103 let result = self.document.transact(f)?;
104 if let Err(e) = self.after_transaction() {
105 return Err(TransactionError::PersisterError(e));
106 }
107 Ok(result)
108 }
109
110 fn after_transaction(&mut self) -> Result<(), P::Error> {
111 if let Some(change) = self.document.get_last_local_change() {
112 self.persister.insert_changes(vec![(
113 change.actor_id().clone(),
114 change.seq(),
115 change.raw_bytes().to_vec(),
116 )])?;
117 }
118 Ok(())
119 }
120
121 pub fn transact_with<F, O, E, C, Obs>(
122 &mut self,
123 c: C,
124 f: F,
125 ) -> TransactionResult<O, Obs, E, P::Error>
126 where
127 F: FnOnce(&mut Transaction<'_, Observed<Obs>>) -> Result<O, E>,
128 C: FnOnce(&O) -> CommitOptions,
129 Obs: OpObserver + BranchableObserver + Default,
130 {
131 let result = self.document.transact_observed_with(c, f)?;
132 if let Err(e) = self.after_transaction() {
133 return Err(TransactionError::PersisterError(e));
134 }
135 Ok(result)
136 }
137
138 pub fn apply_changes(
140 &mut self,
141 changes: impl IntoIterator<Item = Change>,
142 ) -> Result<(), Error<P::Error>> {
143 self.apply_changes_with::<_, ()>(changes, None)
144 }
145
146 pub fn apply_changes_with<I: IntoIterator<Item = Change>, Obs: OpObserver>(
147 &mut self,
148 changes: I,
149 op_observer: Option<&mut Obs>,
150 ) -> Result<(), Error<P::Error>> {
151 let mut to_persist = vec![];
152 self.document.apply_changes_with(
153 changes.into_iter().map(|change| {
154 to_persist.push((
155 change.actor_id().clone(),
156 change.seq(),
157 change.raw_bytes().to_vec(),
158 ));
159 change
160 }),
161 op_observer,
162 )?;
163 self.persister
164 .insert_changes(to_persist)
165 .map_err(Error::PersisterError)?;
166 Ok(())
167 }
168
169 pub fn load(persister: P) -> Result<Self, Error<P::Error>> {
179 let document = persister.get_document().map_err(Error::PersisterError)?;
180 let mut doc = if let Some(document) = document {
181 Automerge::load(&document).map_err(Error::AutomergeError)?
182 } else {
183 Automerge::default()
184 };
185
186 let change_bytes = persister.get_changes().map_err(Error::PersisterError)?;
187
188 let mut changes = Vec::new();
189 for change_bytes in change_bytes {
190 changes.push(Change::from_bytes(change_bytes).map_err(Error::AutomergeLoadChangeError)?)
191 }
192
193 doc.apply_changes(changes).map_err(Error::AutomergeError)?;
194 Ok(Self {
195 document: doc,
196 sync_states: HashMap::new(),
197 persister,
198 })
199 }
200
201 pub fn compact(&mut self, old_peer_ids: &[&[u8]]) -> Result<(), Error<P::Error>> {
217 let saved_document = self.document.save();
218 let changes = self.document.get_changes(&[])?;
219 self.persister
220 .set_document(saved_document)
221 .map_err(Error::PersisterError)?;
222 self.persister
223 .remove_changes(
224 changes
225 .into_iter()
226 .map(|c| (c.actor_id(), c.seq()))
227 .collect(),
228 )
229 .map_err(Error::PersisterError)?;
230 self.persister
231 .remove_sync_states(old_peer_ids)
232 .map_err(Error::PersisterError)?;
233 Ok(())
234 }
235
236 pub fn generate_sync_message(
252 &mut self,
253 peer_id: PeerId,
254 ) -> Result<Option<sync::Message>, Error<P::Error>> {
255 if !self.sync_states.contains_key(&peer_id) {
256 if let Some(sync_state) = self
257 .persister
258 .get_sync_state(&peer_id)
259 .map_err(Error::PersisterError)?
260 {
261 let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
262 self.sync_states.insert(peer_id.clone(), s);
263 }
264 }
265 let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
266 let message = self.document.generate_sync_message(sync_state);
267 self.persister
268 .set_sync_state(peer_id, sync_state.encode())
269 .map_err(Error::PersisterError)?;
270 Ok(message)
271 }
272
273 pub fn receive_sync_message(
281 &mut self,
282 peer_id: PeerId,
283 message: sync::Message,
284 ) -> Result<(), Error<P::Error>> {
285 self.receive_sync_message_with(peer_id, message, &mut ())
286 }
287
288 pub fn receive_sync_message_with<Obs: OpObserver>(
296 &mut self,
297 peer_id: PeerId,
298 message: sync::Message,
299 op_observer: &mut Obs,
300 ) -> Result<(), Error<P::Error>> {
301 if !self.sync_states.contains_key(&peer_id) {
302 if let Some(sync_state) = self
303 .persister
304 .get_sync_state(&peer_id)
305 .map_err(Error::PersisterError)?
306 {
307 let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
308 self.sync_states.insert(peer_id.clone(), s);
309 }
310 }
311 let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
312
313 let heads = self.document.get_heads();
314 self.document
315 .receive_sync_message_with(sync_state, message, op_observer)
316 .map_err(Error::AutomergeError)?;
317 let changes = self.document.get_changes(&heads)?;
318 self.persister
319 .insert_changes(
320 changes
321 .into_iter()
322 .map(|c| (c.actor_id().clone(), c.seq(), c.raw_bytes().to_vec()))
323 .collect(),
324 )
325 .map_err(Error::PersisterError)?;
326
327 self.persister
328 .set_sync_state(peer_id, sync_state.encode())
329 .map_err(Error::PersisterError)?;
330 Ok(())
331 }
332
333 pub fn flush(&mut self) -> Result<usize, P::Error> {
339 self.persister.flush()
340 }
341
342 pub fn close(mut self) -> Result<P, P::Error> {
350 self.flush()?;
351 Ok(self.persister)
352 }
353
354 pub const fn persister(&self) -> &P {
356 &self.persister
357 }
358
359 pub fn persister_mut(&mut self) -> &mut P {
361 &mut self.persister
362 }
363
364 pub fn reset_sync_state(&mut self, peer_id: &[u8]) -> Result<(), P::Error> {
369 self.sync_states.remove(peer_id);
370 self.persister.remove_sync_states(&[peer_id])
371 }
372}