automerge_persistent_sled/
lib.rs

1#![warn(missing_docs)]
2#![warn(missing_crate_level_docs)]
3#![warn(missing_doc_code_examples)]
4#![warn(clippy::pedantic)]
5#![warn(clippy::nursery)]
6
7//! A persister targetting [Sled](https://github.com/spacejam/sled).
8//!
9//! # Single persister
10//!
11//! ```rust
12//! # use automerge_persistent::PersistentAutomerge;
13//! # use automerge_persistent_sled::SledPersister;
14//! # use automerge_persistent_sled::SledPersisterError;
15//! # fn main() -> Result<(), SledPersisterError> {
16//! let db = sled::Config::new().temporary(true).open()?;
17//! let changes_tree = db.open_tree("changes")?;
18//! let documents_tree = db.open_tree("documents")?;
19//! let sync_states_tree = db.open_tree("sync-states")?;
20//!
21//! let persister = SledPersister::new(changes_tree, documents_tree, sync_states_tree, "")?;
22//! let doc = PersistentAutomerge::load(persister);
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! # Multiple persisters sharing the same trees
28//!
29//! ```rust
30//! # use automerge_persistent::PersistentAutomerge;
31//! # use automerge_persistent_sled::SledPersister;
32//! # use automerge_persistent_sled::SledPersisterError;
33//! # fn main() -> Result<(), SledPersisterError> {
34//! let db = sled::Config::new().temporary(true).open()?;
35//! let changes_tree = db.open_tree("changes")?;
36//! let documents_tree = db.open_tree("documents")?;
37//! let sync_states_tree = db.open_tree("sync-states")?;
38//!
39//! let persister1 = SledPersister::new(
40//!     changes_tree.clone(),
41//!     documents_tree.clone(),
42//!     sync_states_tree.clone(),
43//!     "1",
44//! )?;
45//! let doc1 = PersistentAutomerge::load(persister1);
46//!
47//! let persister2 = SledPersister::new(changes_tree, documents_tree, sync_states_tree, "2")?;
48//! let doc2 = PersistentAutomerge::load(persister2);
49//! # Ok(())
50//! # }
51//! ```
52
53use automerge::ActorId;
54use automerge_persistent::{Persister, StoredSizes};
55
56/// The persister that stores changes and documents in sled trees.
57///
58/// Changes and documents are kept in separate trees.
59///
60/// An optional prefix can be used in case multiple persisters may share the same trees.
61#[derive(Debug)]
62pub struct SledPersister {
63    changes_tree: sled::Tree,
64    document_tree: sled::Tree,
65    sync_states_tree: sled::Tree,
66    prefix: String,
67    sizes: StoredSizes,
68}
69
70/// Possible errors from persisting.
71#[derive(Debug, thiserror::Error)]
72pub enum SledPersisterError {
73    /// Internal errors from sled.
74    #[error(transparent)]
75    SledError(#[from] sled::Error),
76}
77
78impl SledPersister {
79    /// Construct a new persister.
80    pub fn new<S>(
81        changes_tree: sled::Tree,
82        document_tree: sled::Tree,
83        sync_states_tree: sled::Tree,
84        prefix: S,
85    ) -> Result<Self, SledPersisterError>
86    where
87        S: Into<String>,
88    {
89        let prefix = prefix.into();
90
91        let mut s = Self {
92            changes_tree,
93            document_tree,
94            sync_states_tree,
95            prefix,
96            sizes: StoredSizes::default(),
97        };
98        s.sizes.changes = s.get_changes()?.iter().map(Vec::len).sum::<usize>() as u64;
99        s.sizes.document = s.get_document()?.unwrap_or_default().len() as u64;
100        s.sizes.sync_states = s
101            .get_peer_ids()?
102            .iter()
103            .map(|id| s.get_sync_state(id).map(|o| o.unwrap_or_default().len()))
104            .collect::<Result<Vec<usize>, _>>()?
105            .iter()
106            .sum::<usize>() as u64;
107        Ok(s)
108    }
109
110    /// Make a key from the prefix, `actor_id` and `sequence_number`.
111    ///
112    /// Converts the `actor_id` to bytes and appends the `sequence_number` in big endian form.
113    fn make_key(&self, actor_id: &ActorId, seq: u64) -> Vec<u8> {
114        let mut key = self.prefix.as_bytes().to_vec();
115        key.extend(actor_id.to_bytes());
116        key.extend(&seq.to_be_bytes());
117        key
118    }
119
120    /// Make a key just from the prefix.
121    /// Since each document only has one thing to store in this tree we can just use the prefix.
122    fn make_document_key(&self) -> Vec<u8> {
123        self.prefix.as_bytes().to_vec()
124    }
125
126    fn make_peer_key(&self, peer_id: &[u8]) -> Vec<u8> {
127        let mut key = self.prefix.as_bytes().to_vec();
128        key.extend(peer_id);
129        key
130    }
131}
132
133impl Persister for SledPersister {
134    type Error = SledPersisterError;
135
136    /// Get all of the current changes.
137    fn get_changes(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
138        self.changes_tree
139            .scan_prefix(&self.prefix)
140            .values()
141            .map(|v| v.map(|v| v.to_vec()).map_err(Self::Error::SledError))
142            .collect()
143    }
144
145    /// Insert all of the given changes into the tree.
146    fn insert_changes(&mut self, changes: Vec<(ActorId, u64, Vec<u8>)>) -> Result<(), Self::Error> {
147        for (a, s, c) in changes {
148            let key = self.make_key(&a, s);
149            self.sizes.changes += c.len() as u64;
150            if let Some(old) = self.changes_tree.insert(key, c)? {
151                self.sizes.changes -= old.len() as u64;
152            }
153        }
154        Ok(())
155    }
156
157    /// Remove all of the given changes from the tree.
158    fn remove_changes(&mut self, changes: Vec<(&ActorId, u64)>) -> Result<(), Self::Error> {
159        for (a, s) in changes {
160            let key = self.make_key(a, s);
161            if let Some(old) = self.changes_tree.remove(key)? {
162                self.sizes.changes -= old.len() as u64;
163            }
164        }
165        Ok(())
166    }
167
168    /// Retrieve the document from the tree.
169    fn get_document(&self) -> Result<Option<Vec<u8>>, Self::Error> {
170        Ok(self
171            .document_tree
172            .get(self.make_document_key())?
173            .map(|v| v.to_vec()))
174    }
175
176    /// Set the document in the tree.
177    fn set_document(&mut self, data: Vec<u8>) -> Result<(), Self::Error> {
178        self.sizes.document = data.len() as u64;
179        self.document_tree.insert(self.make_document_key(), data)?;
180        Ok(())
181    }
182
183    fn get_sync_state(&self, peer_id: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
184        let sync_state_key = self.make_peer_key(peer_id);
185        Ok(self
186            .sync_states_tree
187            .get(sync_state_key)?
188            .map(|v| v.to_vec()))
189    }
190
191    fn set_sync_state(&mut self, peer_id: Vec<u8>, sync_state: Vec<u8>) -> Result<(), Self::Error> {
192        let sync_state_key = self.make_peer_key(&peer_id);
193        self.sizes.sync_states += sync_state.len() as u64;
194        if let Some(old) = self.sync_states_tree.insert(sync_state_key, sync_state)? {
195            self.sizes.sync_states -= old.len() as u64;
196        }
197        Ok(())
198    }
199
200    fn remove_sync_states(&mut self, peer_ids: &[&[u8]]) -> Result<(), Self::Error> {
201        for id in peer_ids {
202            let key = self.make_peer_key(id);
203            if let Some(old) = self.sync_states_tree.remove(key)? {
204                self.sizes.sync_states -= old.len() as u64;
205            }
206        }
207        Ok(())
208    }
209
210    fn get_peer_ids(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
211        self.sync_states_tree
212            .scan_prefix(&self.prefix)
213            .keys()
214            .map(|v| v.map(|v| v.to_vec()).map_err(Self::Error::SledError))
215            .collect()
216    }
217
218    fn sizes(&self) -> StoredSizes {
219        self.sizes.clone()
220    }
221
222    fn flush(&mut self) -> Result<usize, Self::Error> {
223        let mut flushed = 0;
224        flushed += self.changes_tree.flush()?;
225        flushed += self.document_tree.flush()?;
226        flushed += self.sync_states_tree.flush()?;
227        Ok(flushed)
228    }
229}