automerge_persistent/
lib.rs

1// #![warn(missing_docs)]
2#![warn(missing_crate_level_docs)]
3#![warn(missing_doc_code_examples)]
4// #![warn(clippy::pedantic)]
5#![warn(clippy::nursery)]
6
7//! A library for constructing efficient persistent automerge documents.
8//!
9//! A [`PersistentAutomerge`] wraps an [`automerge::Automerge`] and handles making the changes applied
10//! to it durable. This works by persisting every change before it is applied to the document. Then
11//! occasionally the user should call `compact` to save the document in a more compact format and
12//! cleanup the included changes. This strategy aims to be fast while also being space efficient
13//! (up to the user's requirements).
14//!
15//! ```rust
16//! # use automerge_persistent::MemoryPersister;
17//! # use automerge_persistent::PersistentAutomerge;
18//! let persister = MemoryPersister::default();
19//! let doc = PersistentAutomerge::load(persister).unwrap();
20//! ```
21
22mod autocommit;
23mod mem;
24mod persister;
25
26use std::{collections::HashMap, fmt::Debug};
27
28pub use autocommit::PersistentAutoCommit;
29use automerge::{
30    op_observer::BranchableObserver,
31    sync::{self, DecodeStateError, SyncDoc},
32    transaction::{CommitOptions, Failure, Observed, Success, Transaction, UnObserved},
33    Automerge, AutomergeError, Change, LoadChangeError, OpObserver,
34};
35pub use mem::MemoryPersister;
36pub use persister::Persister;
37
38/// Bytes stored for each of the stored types.
39#[derive(Debug, Default, Clone)]
40pub struct StoredSizes {
41    /// Total bytes stored for all changes.
42    pub changes: u64,
43    /// Total bytes stored in the document.
44    pub document: u64,
45    /// Total bytes stored for all sync states.
46    pub sync_states: u64,
47}
48
49/// Errors that persistent documents can return.
50#[derive(Debug, thiserror::Error)]
51pub enum Error<E> {
52    /// An automerge error.
53    #[error(transparent)]
54    AutomergeError(#[from] AutomergeError),
55    #[error(transparent)]
56    AutomergeDecodeError(#[from] DecodeStateError),
57    #[error(transparent)]
58    AutomergeLoadChangeError(#[from] LoadChangeError),
59    /// A persister error.
60    #[error(transparent)]
61    PersisterError(E),
62}
63
64/// Errors that persistent documents can return after a transaction.
65#[derive(Debug, thiserror::Error)]
66pub enum TransactionError<PE, E> {
67    /// A persister error.
68    #[error(transparent)]
69    PersisterError(PE),
70    /// A transaction error
71    #[error(transparent)]
72    TransactionError(#[from] Failure<E>),
73}
74
75pub type TransactionResult<O, Obs, E, PE> = Result<Success<O, Obs>, TransactionError<PE, E>>;
76
77type PeerId = Vec<u8>;
78
79/// A wrapper for a persister and an automerge document.
80#[derive(Debug)]
81pub struct PersistentAutomerge<P> {
82    document: Automerge,
83    sync_states: HashMap<PeerId, sync::State>,
84    persister: P,
85}
86
87impl<P> PersistentAutomerge<P>
88where
89    P: Persister + 'static,
90{
91    pub const fn document(&self) -> &Automerge {
92        &self.document
93    }
94
95    pub fn document_mut(&mut self) -> &mut Automerge {
96        &mut self.document
97    }
98
99    pub fn transact<F, O, E>(&mut self, f: F) -> TransactionResult<O, (), E, P::Error>
100    where
101        F: FnOnce(&mut Transaction<UnObserved>) -> Result<O, E>,
102    {
103        let result = self.document.transact(f)?;
104        if let Err(e) = self.after_transaction() {
105            return Err(TransactionError::PersisterError(e));
106        }
107        Ok(result)
108    }
109
110    fn after_transaction(&mut self) -> Result<(), P::Error> {
111        if let Some(change) = self.document.get_last_local_change() {
112            self.persister.insert_changes(vec![(
113                change.actor_id().clone(),
114                change.seq(),
115                change.raw_bytes().to_vec(),
116            )])?;
117        }
118        Ok(())
119    }
120
121    pub fn transact_with<F, O, E, C, Obs>(
122        &mut self,
123        c: C,
124        f: F,
125    ) -> TransactionResult<O, Obs, E, P::Error>
126    where
127        F: FnOnce(&mut Transaction<'_, Observed<Obs>>) -> Result<O, E>,
128        C: FnOnce(&O) -> CommitOptions,
129        Obs: OpObserver + BranchableObserver + Default,
130    {
131        let result = self.document.transact_observed_with(c, f)?;
132        if let Err(e) = self.after_transaction() {
133            return Err(TransactionError::PersisterError(e));
134        }
135        Ok(result)
136    }
137
138    /// Apply changes to this document.
139    pub fn apply_changes(
140        &mut self,
141        changes: impl IntoIterator<Item = Change>,
142    ) -> Result<(), Error<P::Error>> {
143        self.apply_changes_with::<_, ()>(changes, None)
144    }
145
146    pub fn apply_changes_with<I: IntoIterator<Item = Change>, Obs: OpObserver>(
147        &mut self,
148        changes: I,
149        op_observer: Option<&mut Obs>,
150    ) -> Result<(), Error<P::Error>> {
151        let mut to_persist = vec![];
152        self.document.apply_changes_with(
153            changes.into_iter().map(|change| {
154                to_persist.push((
155                    change.actor_id().clone(),
156                    change.seq(),
157                    change.raw_bytes().to_vec(),
158                ));
159                change
160            }),
161            op_observer,
162        )?;
163        self.persister
164            .insert_changes(to_persist)
165            .map_err(Error::PersisterError)?;
166        Ok(())
167    }
168
169    /// Load the persisted changes (both individual changes and a document) from storage and
170    /// rebuild the Document.
171    ///
172    /// ```rust
173    /// # use automerge_persistent::MemoryPersister;
174    /// # use automerge_persistent::PersistentAutomerge;
175    /// let persister = MemoryPersister::default();
176    /// let doc = PersistentAutomerge::load(persister).unwrap();
177    /// ```
178    pub fn load(persister: P) -> Result<Self, Error<P::Error>> {
179        let document = persister.get_document().map_err(Error::PersisterError)?;
180        let mut doc = if let Some(document) = document {
181            Automerge::load(&document).map_err(Error::AutomergeError)?
182        } else {
183            Automerge::default()
184        };
185
186        let change_bytes = persister.get_changes().map_err(Error::PersisterError)?;
187
188        let mut changes = Vec::new();
189        for change_bytes in change_bytes {
190            changes.push(Change::from_bytes(change_bytes).map_err(Error::AutomergeLoadChangeError)?)
191        }
192
193        doc.apply_changes(changes).map_err(Error::AutomergeError)?;
194        Ok(Self {
195            document: doc,
196            sync_states: HashMap::new(),
197            persister,
198        })
199    }
200
201    /// Compact the storage.
202    ///
203    /// This first obtains the changes currently in the document, saves the document and persists the
204    /// saved document. We then can remove the previously obtained changes one by one.
205    ///
206    /// It also clears out the storage used up by old sync states for peers by removing those given
207    /// in `old_peers`.
208    ///
209    /// ```rust
210    /// # use automerge_persistent::MemoryPersister;
211    /// # use automerge_persistent::PersistentAutomerge;
212    /// # let persister = MemoryPersister::default();
213    /// # let mut document = PersistentAutomerge::load(persister).unwrap();
214    /// document.compact(&[]).unwrap();
215    /// ```
216    pub fn compact(&mut self, old_peer_ids: &[&[u8]]) -> Result<(), Error<P::Error>> {
217        let saved_document = self.document.save();
218        let changes = self.document.get_changes(&[])?;
219        self.persister
220            .set_document(saved_document)
221            .map_err(Error::PersisterError)?;
222        self.persister
223            .remove_changes(
224                changes
225                    .into_iter()
226                    .map(|c| (c.actor_id(), c.seq()))
227                    .collect(),
228            )
229            .map_err(Error::PersisterError)?;
230        self.persister
231            .remove_sync_states(old_peer_ids)
232            .map_err(Error::PersisterError)?;
233        Ok(())
234    }
235
236    /// Generate a sync message to be sent to a peer document.
237    ///
238    /// Peer id is intentionally low level and up to the user as it can be a DNS name, IP address or
239    /// something else.
240    ///
241    /// This internally retrieves the previous sync state from storage and saves the new one
242    /// afterwards.
243    ///
244    /// ```rust
245    /// # use automerge_persistent::MemoryPersister;
246    /// # use automerge_persistent::PersistentAutomerge;
247    /// # let persister = MemoryPersister::default();
248    /// # let mut document = PersistentAutomerge::load(persister).unwrap();
249    /// let message = document.generate_sync_message(vec![]).unwrap();
250    /// ```
251    pub fn generate_sync_message(
252        &mut self,
253        peer_id: PeerId,
254    ) -> Result<Option<sync::Message>, Error<P::Error>> {
255        if !self.sync_states.contains_key(&peer_id) {
256            if let Some(sync_state) = self
257                .persister
258                .get_sync_state(&peer_id)
259                .map_err(Error::PersisterError)?
260            {
261                let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
262                self.sync_states.insert(peer_id.clone(), s);
263            }
264        }
265        let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
266        let message = self.document.generate_sync_message(sync_state);
267        self.persister
268            .set_sync_state(peer_id, sync_state.encode())
269            .map_err(Error::PersisterError)?;
270        Ok(message)
271    }
272
273    /// Receive a sync message from a peer document.
274    ///
275    /// Peer id is intentionally low level and up to the user as it can be a DNS name, IP address or
276    /// something else.
277    ///
278    /// This internally retrieves the previous sync state from storage and saves the new one
279    /// afterwards.
280    pub fn receive_sync_message(
281        &mut self,
282        peer_id: PeerId,
283        message: sync::Message,
284    ) -> Result<(), Error<P::Error>> {
285        self.receive_sync_message_with(peer_id, message, &mut ())
286    }
287
288    /// Receive a sync message from a peer document.
289    ///
290    /// Peer id is intentionally low level and up to the user as it can be a DNS name, IP address or
291    /// something else.
292    ///
293    /// This internally retrieves the previous sync state from storage and saves the new one
294    /// afterwards.
295    pub fn receive_sync_message_with<Obs: OpObserver>(
296        &mut self,
297        peer_id: PeerId,
298        message: sync::Message,
299        op_observer: &mut Obs,
300    ) -> Result<(), Error<P::Error>> {
301        if !self.sync_states.contains_key(&peer_id) {
302            if let Some(sync_state) = self
303                .persister
304                .get_sync_state(&peer_id)
305                .map_err(Error::PersisterError)?
306            {
307                let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
308                self.sync_states.insert(peer_id.clone(), s);
309            }
310        }
311        let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
312
313        let heads = self.document.get_heads();
314        self.document
315            .receive_sync_message_with(sync_state, message, op_observer)
316            .map_err(Error::AutomergeError)?;
317        let changes = self.document.get_changes(&heads)?;
318        self.persister
319            .insert_changes(
320                changes
321                    .into_iter()
322                    .map(|c| (c.actor_id().clone(), c.seq(), c.raw_bytes().to_vec()))
323                    .collect(),
324            )
325            .map_err(Error::PersisterError)?;
326
327        self.persister
328            .set_sync_state(peer_id, sync_state.encode())
329            .map_err(Error::PersisterError)?;
330        Ok(())
331    }
332
333    /// Flush any data out to storage returning the number of bytes flushed.
334    ///
335    /// # Errors
336    ///
337    /// Returns the error returned by the persister during flushing.
338    pub fn flush(&mut self) -> Result<usize, P::Error> {
339        self.persister.flush()
340    }
341
342    /// Close the document.
343    ///
344    /// This calls flush on the persister and returns it for potential use in other documents.
345    ///
346    /// # Errors
347    ///
348    /// Returns the error from flushing.
349    pub fn close(mut self) -> Result<P, P::Error> {
350        self.flush()?;
351        Ok(self.persister)
352    }
353
354    /// Obtain a reference to the persister.
355    pub const fn persister(&self) -> &P {
356        &self.persister
357    }
358
359    /// Obtain a mut reference to the persister.
360    pub fn persister_mut(&mut self) -> &mut P {
361        &mut self.persister
362    }
363
364    /// Reset the sync state for a peer.
365    ///
366    /// This is typically used when a peer disconnects, we need to reset the sync state for them as
367    /// they may come back up with different state.
368    pub fn reset_sync_state(&mut self, peer_id: &[u8]) -> Result<(), P::Error> {
369        self.sync_states.remove(peer_id);
370        self.persister.remove_sync_states(&[peer_id])
371    }
372}