automerge_persistent/
autocommit.rs

1use std::collections::HashMap;
2
3use crate::{Error, PeerId, Persister};
4use automerge::{
5    sync::{self, SyncDoc},
6    AutoCommit, Change, ChangeHash, OpObserver,
7};
8
9/// A wrapper for a persister and an automerge document.
10#[derive(Debug)]
11pub struct PersistentAutoCommit<P> {
12    document: AutoCommit,
13    sync_states: HashMap<PeerId, sync::State>,
14    persister: P,
15    saved_heads: Vec<ChangeHash>,
16}
17
18impl<P> PersistentAutoCommit<P>
19where
20    P: Persister + 'static,
21{
22    pub const fn document(&self) -> &AutoCommit {
23        &self.document
24    }
25
26    /// UNSAFE: this may lead to changes not being immediately persisted
27    pub fn document_mut(&mut self) -> &mut AutoCommit {
28        &mut self.document
29    }
30
31    /// Make changes to the document but don't immediately persist changes.
32    pub fn transact<F: FnOnce(&mut AutoCommit) -> Result<O, E>, O, E>(
33        &mut self,
34        f: F,
35    ) -> Result<O, E> {
36        let result = f(&mut self.document)?;
37        // don't get the changes or anything as that will close the transaction, instead delay that
38        // until another operation such as save or receive_sync_message etc.
39        Ok(result)
40    }
41
42    /// Load the persisted changes (both individual changes and a document) from storage and
43    /// rebuild the document.
44    ///
45    /// ```rust
46    /// # use automerge_persistent::MemoryPersister;
47    /// # use automerge_persistent::PersistentAutoCommit;
48    /// let persister = MemoryPersister::default();
49    /// let doc = PersistentAutoCommit::load(persister).unwrap();
50    /// ```
51    pub fn load(persister: P) -> Result<Self, Error<P::Error>> {
52        let document = persister.get_document().map_err(Error::PersisterError)?;
53        let mut doc = if let Some(document) = document {
54            AutoCommit::load(&document).map_err(Error::AutomergeError)?
55        } else {
56            AutoCommit::new()
57        };
58
59        let change_bytes = persister.get_changes().map_err(Error::PersisterError)?;
60
61        let mut changes = Vec::new();
62        for change_bytes in change_bytes {
63            changes.push(Change::from_bytes(change_bytes).map_err(Error::AutomergeLoadChangeError)?)
64        }
65
66        doc.apply_changes(changes).map_err(Error::AutomergeError)?;
67
68        let saved_heads = doc.get_heads();
69        Ok(Self {
70            document: doc,
71            sync_states: HashMap::new(),
72            persister,
73            saved_heads,
74        })
75    }
76
77    /// Compact the storage.
78    ///
79    /// This first obtains the changes currently in the document, saves the document and persists the
80    /// saved document. We then can remove the previously obtained changes one by one.
81    ///
82    /// It also clears out the storage used up by old sync states for peers by removing those given
83    /// in `old_peers`.
84    ///
85    /// ```rust
86    /// # use automerge_persistent::MemoryPersister;
87    /// # use automerge_persistent::PersistentAutoCommit;
88    /// # let persister = MemoryPersister::default();
89    /// # let mut doc = PersistentAutoCommit::load(persister).unwrap();
90    /// doc.compact(&[]).unwrap();
91    /// ```
92    pub fn compact(&mut self, old_peer_ids: &[&[u8]]) -> Result<(), Error<P::Error>> {
93        let saved_document = self.document.save();
94        self.saved_heads = self.document.get_heads();
95        let changes = self.document.get_changes(&[])?;
96        self.persister
97            .set_document(saved_document)
98            .map_err(Error::PersisterError)?;
99        self.persister
100            .remove_changes(
101                changes
102                    .into_iter()
103                    .map(|c| (c.actor_id(), c.seq()))
104                    .collect(),
105            )
106            .map_err(Error::PersisterError)?;
107        self.persister
108            .remove_sync_states(old_peer_ids)
109            .map_err(Error::PersisterError)?;
110        Ok(())
111    }
112
113    /// Generate a sync message to be sent to a peer document.
114    ///
115    /// Peer id is intentionally low level and up to the user as it can be a DNS name, IP address or
116    /// something else.
117    ///
118    /// This internally retrieves the previous sync state from storage and saves the new one
119    /// afterwards.
120    ///
121    /// ```rust
122    /// # use automerge_persistent::MemoryPersister;
123    /// # use automerge_persistent::PersistentAutoCommit;
124    /// # let persister = MemoryPersister::default();
125    /// # let mut doc = PersistentAutoCommit::load(persister).unwrap();
126    /// let message = doc.generate_sync_message(vec![]).unwrap();
127    /// ```
128    pub fn generate_sync_message(
129        &mut self,
130        peer_id: PeerId,
131    ) -> Result<Option<sync::Message>, Error<P::Error>> {
132        self.close_transaction()?;
133
134        if !self.sync_states.contains_key(&peer_id) {
135            if let Some(sync_state) = self
136                .persister
137                .get_sync_state(&peer_id)
138                .map_err(Error::PersisterError)?
139            {
140                let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
141                self.sync_states.insert(peer_id.clone(), s);
142            }
143        }
144        let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
145        let message = self.document.sync().generate_sync_message(sync_state);
146        self.persister
147            .set_sync_state(peer_id, sync_state.encode())
148            .map_err(Error::PersisterError)?;
149        Ok(message)
150    }
151
152    /// Receive a sync message from a peer document.
153    ///
154    /// Peer id is intentionally low level and up to the user as it can be a DNS name, IP address or
155    /// something else.
156    ///
157    /// This internally retrieves the previous sync state from storage and saves the new one
158    /// afterwards.
159    pub fn receive_sync_message(
160        &mut self,
161        peer_id: PeerId,
162        message: sync::Message,
163    ) -> Result<(), Error<P::Error>> {
164        self.receive_sync_message_with(peer_id, message, &mut ())
165    }
166
167    /// Receive a sync message from a peer document.
168    ///
169    /// Peer id is intentionally low level and up to the user as it can be a DNS name, IP address or
170    /// something else.
171    ///
172    /// This internally retrieves the previous sync state from storage and saves the new one
173    /// afterwards.
174    pub fn receive_sync_message_with<Obs: OpObserver>(
175        &mut self,
176        peer_id: PeerId,
177        message: sync::Message,
178        op_observer: &mut Obs,
179    ) -> Result<(), Error<P::Error>> {
180        self.close_transaction()?;
181
182        if !self.sync_states.contains_key(&peer_id) {
183            if let Some(sync_state) = self
184                .persister
185                .get_sync_state(&peer_id)
186                .map_err(Error::PersisterError)?
187            {
188                let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
189                self.sync_states.insert(peer_id.clone(), s);
190            }
191        }
192        let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
193
194        let heads = self.document.get_heads();
195        self.document
196            .sync()
197            .receive_sync_message_with(sync_state, message, op_observer)
198            .map_err(Error::AutomergeError)?;
199        let changes = self.document.get_changes(&heads)?;
200        self.persister
201            .insert_changes(
202                changes
203                    .into_iter()
204                    .map(|c| (c.actor_id().clone(), c.seq(), c.raw_bytes().to_vec()))
205                    .collect(),
206            )
207            .map_err(Error::PersisterError)?;
208
209        self.persister
210            .set_sync_state(peer_id, sync_state.encode())
211            .map_err(Error::PersisterError)?;
212        Ok(())
213    }
214
215    /// Flush any data out to storage returning the number of bytes flushed.
216    ///
217    /// # Errors
218    ///
219    /// Returns the error returned by the persister during flushing.
220    pub fn flush(&mut self) -> Result<usize, Error<P::Error>> {
221        self.close_transaction()?;
222        let bytes = self.persister.flush().map_err(Error::PersisterError)?;
223        Ok(bytes)
224    }
225
226    /// Close any current transaction and write out the changes to disk.
227    pub fn close_transaction(&mut self) -> Result<(), Error<P::Error>> {
228        for change in self.document.get_changes(&self.saved_heads)? {
229            self.persister
230                .insert_changes(vec![(
231                    change.actor_id().clone(),
232                    change.seq(),
233                    change.raw_bytes().to_vec(),
234                )])
235                .map_err(Error::PersisterError)?
236        }
237        self.saved_heads = self.document.get_heads();
238        Ok(())
239    }
240
241    /// Close the document.
242    ///
243    /// This calls flush on the persister and returns it for potential use in other documents.
244    ///
245    /// # Errors
246    ///
247    /// Returns the error from flushing.
248    pub fn close(mut self) -> Result<P, Error<P::Error>> {
249        self.flush()?;
250        Ok(self.persister)
251    }
252
253    /// Obtain a reference to the persister.
254    pub const fn persister(&self) -> &P {
255        &self.persister
256    }
257
258    /// Reset the sync state for a peer.
259    ///
260    /// This is typically used when a peer disconnects, we need to reset the sync state for them as
261    /// they may come back up with different state.
262    pub fn reset_sync_state(&mut self, peer_id: &[u8]) -> Result<(), P::Error> {
263        self.sync_states.remove(peer_id);
264        self.persister.remove_sync_states(&[peer_id])
265    }
266}