automerge_persistent/
autocommit.rs1use std::collections::HashMap;
2
3use crate::{Error, PeerId, Persister};
4use automerge::{
5 sync::{self, SyncDoc},
6 AutoCommit, Change, ChangeHash, OpObserver,
7};
8
9#[derive(Debug)]
11pub struct PersistentAutoCommit<P> {
12 document: AutoCommit,
13 sync_states: HashMap<PeerId, sync::State>,
14 persister: P,
15 saved_heads: Vec<ChangeHash>,
16}
17
18impl<P> PersistentAutoCommit<P>
19where
20 P: Persister + 'static,
21{
22 pub const fn document(&self) -> &AutoCommit {
23 &self.document
24 }
25
26 pub fn document_mut(&mut self) -> &mut AutoCommit {
28 &mut self.document
29 }
30
31 pub fn transact<F: FnOnce(&mut AutoCommit) -> Result<O, E>, O, E>(
33 &mut self,
34 f: F,
35 ) -> Result<O, E> {
36 let result = f(&mut self.document)?;
37 Ok(result)
40 }
41
42 pub fn load(persister: P) -> Result<Self, Error<P::Error>> {
52 let document = persister.get_document().map_err(Error::PersisterError)?;
53 let mut doc = if let Some(document) = document {
54 AutoCommit::load(&document).map_err(Error::AutomergeError)?
55 } else {
56 AutoCommit::new()
57 };
58
59 let change_bytes = persister.get_changes().map_err(Error::PersisterError)?;
60
61 let mut changes = Vec::new();
62 for change_bytes in change_bytes {
63 changes.push(Change::from_bytes(change_bytes).map_err(Error::AutomergeLoadChangeError)?)
64 }
65
66 doc.apply_changes(changes).map_err(Error::AutomergeError)?;
67
68 let saved_heads = doc.get_heads();
69 Ok(Self {
70 document: doc,
71 sync_states: HashMap::new(),
72 persister,
73 saved_heads,
74 })
75 }
76
77 pub fn compact(&mut self, old_peer_ids: &[&[u8]]) -> Result<(), Error<P::Error>> {
93 let saved_document = self.document.save();
94 self.saved_heads = self.document.get_heads();
95 let changes = self.document.get_changes(&[])?;
96 self.persister
97 .set_document(saved_document)
98 .map_err(Error::PersisterError)?;
99 self.persister
100 .remove_changes(
101 changes
102 .into_iter()
103 .map(|c| (c.actor_id(), c.seq()))
104 .collect(),
105 )
106 .map_err(Error::PersisterError)?;
107 self.persister
108 .remove_sync_states(old_peer_ids)
109 .map_err(Error::PersisterError)?;
110 Ok(())
111 }
112
113 pub fn generate_sync_message(
129 &mut self,
130 peer_id: PeerId,
131 ) -> Result<Option<sync::Message>, Error<P::Error>> {
132 self.close_transaction()?;
133
134 if !self.sync_states.contains_key(&peer_id) {
135 if let Some(sync_state) = self
136 .persister
137 .get_sync_state(&peer_id)
138 .map_err(Error::PersisterError)?
139 {
140 let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
141 self.sync_states.insert(peer_id.clone(), s);
142 }
143 }
144 let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
145 let message = self.document.sync().generate_sync_message(sync_state);
146 self.persister
147 .set_sync_state(peer_id, sync_state.encode())
148 .map_err(Error::PersisterError)?;
149 Ok(message)
150 }
151
152 pub fn receive_sync_message(
160 &mut self,
161 peer_id: PeerId,
162 message: sync::Message,
163 ) -> Result<(), Error<P::Error>> {
164 self.receive_sync_message_with(peer_id, message, &mut ())
165 }
166
167 pub fn receive_sync_message_with<Obs: OpObserver>(
175 &mut self,
176 peer_id: PeerId,
177 message: sync::Message,
178 op_observer: &mut Obs,
179 ) -> Result<(), Error<P::Error>> {
180 self.close_transaction()?;
181
182 if !self.sync_states.contains_key(&peer_id) {
183 if let Some(sync_state) = self
184 .persister
185 .get_sync_state(&peer_id)
186 .map_err(Error::PersisterError)?
187 {
188 let s = sync::State::decode(&sync_state).map_err(Error::AutomergeDecodeError)?;
189 self.sync_states.insert(peer_id.clone(), s);
190 }
191 }
192 let sync_state = self.sync_states.entry(peer_id.clone()).or_default();
193
194 let heads = self.document.get_heads();
195 self.document
196 .sync()
197 .receive_sync_message_with(sync_state, message, op_observer)
198 .map_err(Error::AutomergeError)?;
199 let changes = self.document.get_changes(&heads)?;
200 self.persister
201 .insert_changes(
202 changes
203 .into_iter()
204 .map(|c| (c.actor_id().clone(), c.seq(), c.raw_bytes().to_vec()))
205 .collect(),
206 )
207 .map_err(Error::PersisterError)?;
208
209 self.persister
210 .set_sync_state(peer_id, sync_state.encode())
211 .map_err(Error::PersisterError)?;
212 Ok(())
213 }
214
215 pub fn flush(&mut self) -> Result<usize, Error<P::Error>> {
221 self.close_transaction()?;
222 let bytes = self.persister.flush().map_err(Error::PersisterError)?;
223 Ok(bytes)
224 }
225
226 pub fn close_transaction(&mut self) -> Result<(), Error<P::Error>> {
228 for change in self.document.get_changes(&self.saved_heads)? {
229 self.persister
230 .insert_changes(vec![(
231 change.actor_id().clone(),
232 change.seq(),
233 change.raw_bytes().to_vec(),
234 )])
235 .map_err(Error::PersisterError)?
236 }
237 self.saved_heads = self.document.get_heads();
238 Ok(())
239 }
240
241 pub fn close(mut self) -> Result<P, Error<P::Error>> {
249 self.flush()?;
250 Ok(self.persister)
251 }
252
253 pub const fn persister(&self) -> &P {
255 &self.persister
256 }
257
258 pub fn reset_sync_state(&mut self, peer_id: &[u8]) -> Result<(), P::Error> {
263 self.sync_states.remove(peer_id);
264 self.persister.remove_sync_states(&[peer_id])
265 }
266}