automerge_persistent_sled/
lib.rs1#![warn(missing_docs)]
2#![warn(missing_crate_level_docs)]
3#![warn(missing_doc_code_examples)]
4#![warn(clippy::pedantic)]
5#![warn(clippy::nursery)]
6
7use automerge::ActorId;
54use automerge_persistent::{Persister, StoredSizes};
55
56#[derive(Debug)]
62pub struct SledPersister {
63 changes_tree: sled::Tree,
64 document_tree: sled::Tree,
65 sync_states_tree: sled::Tree,
66 prefix: String,
67 sizes: StoredSizes,
68}
69
70#[derive(Debug, thiserror::Error)]
72pub enum SledPersisterError {
73 #[error(transparent)]
75 SledError(#[from] sled::Error),
76}
77
78impl SledPersister {
79 pub fn new<S>(
81 changes_tree: sled::Tree,
82 document_tree: sled::Tree,
83 sync_states_tree: sled::Tree,
84 prefix: S,
85 ) -> Result<Self, SledPersisterError>
86 where
87 S: Into<String>,
88 {
89 let prefix = prefix.into();
90
91 let mut s = Self {
92 changes_tree,
93 document_tree,
94 sync_states_tree,
95 prefix,
96 sizes: StoredSizes::default(),
97 };
98 s.sizes.changes = s.get_changes()?.iter().map(Vec::len).sum::<usize>() as u64;
99 s.sizes.document = s.get_document()?.unwrap_or_default().len() as u64;
100 s.sizes.sync_states = s
101 .get_peer_ids()?
102 .iter()
103 .map(|id| s.get_sync_state(id).map(|o| o.unwrap_or_default().len()))
104 .collect::<Result<Vec<usize>, _>>()?
105 .iter()
106 .sum::<usize>() as u64;
107 Ok(s)
108 }
109
110 fn make_key(&self, actor_id: &ActorId, seq: u64) -> Vec<u8> {
114 let mut key = self.prefix.as_bytes().to_vec();
115 key.extend(actor_id.to_bytes());
116 key.extend(&seq.to_be_bytes());
117 key
118 }
119
120 fn make_document_key(&self) -> Vec<u8> {
123 self.prefix.as_bytes().to_vec()
124 }
125
126 fn make_peer_key(&self, peer_id: &[u8]) -> Vec<u8> {
127 let mut key = self.prefix.as_bytes().to_vec();
128 key.extend(peer_id);
129 key
130 }
131}
132
133impl Persister for SledPersister {
134 type Error = SledPersisterError;
135
136 fn get_changes(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
138 self.changes_tree
139 .scan_prefix(&self.prefix)
140 .values()
141 .map(|v| v.map(|v| v.to_vec()).map_err(Self::Error::SledError))
142 .collect()
143 }
144
145 fn insert_changes(&mut self, changes: Vec<(ActorId, u64, Vec<u8>)>) -> Result<(), Self::Error> {
147 for (a, s, c) in changes {
148 let key = self.make_key(&a, s);
149 self.sizes.changes += c.len() as u64;
150 if let Some(old) = self.changes_tree.insert(key, c)? {
151 self.sizes.changes -= old.len() as u64;
152 }
153 }
154 Ok(())
155 }
156
157 fn remove_changes(&mut self, changes: Vec<(&ActorId, u64)>) -> Result<(), Self::Error> {
159 for (a, s) in changes {
160 let key = self.make_key(a, s);
161 if let Some(old) = self.changes_tree.remove(key)? {
162 self.sizes.changes -= old.len() as u64;
163 }
164 }
165 Ok(())
166 }
167
168 fn get_document(&self) -> Result<Option<Vec<u8>>, Self::Error> {
170 Ok(self
171 .document_tree
172 .get(self.make_document_key())?
173 .map(|v| v.to_vec()))
174 }
175
176 fn set_document(&mut self, data: Vec<u8>) -> Result<(), Self::Error> {
178 self.sizes.document = data.len() as u64;
179 self.document_tree.insert(self.make_document_key(), data)?;
180 Ok(())
181 }
182
183 fn get_sync_state(&self, peer_id: &[u8]) -> Result<Option<Vec<u8>>, Self::Error> {
184 let sync_state_key = self.make_peer_key(peer_id);
185 Ok(self
186 .sync_states_tree
187 .get(sync_state_key)?
188 .map(|v| v.to_vec()))
189 }
190
191 fn set_sync_state(&mut self, peer_id: Vec<u8>, sync_state: Vec<u8>) -> Result<(), Self::Error> {
192 let sync_state_key = self.make_peer_key(&peer_id);
193 self.sizes.sync_states += sync_state.len() as u64;
194 if let Some(old) = self.sync_states_tree.insert(sync_state_key, sync_state)? {
195 self.sizes.sync_states -= old.len() as u64;
196 }
197 Ok(())
198 }
199
200 fn remove_sync_states(&mut self, peer_ids: &[&[u8]]) -> Result<(), Self::Error> {
201 for id in peer_ids {
202 let key = self.make_peer_key(id);
203 if let Some(old) = self.sync_states_tree.remove(key)? {
204 self.sizes.sync_states -= old.len() as u64;
205 }
206 }
207 Ok(())
208 }
209
210 fn get_peer_ids(&self) -> Result<Vec<Vec<u8>>, Self::Error> {
211 self.sync_states_tree
212 .scan_prefix(&self.prefix)
213 .keys()
214 .map(|v| v.map(|v| v.to_vec()).map_err(Self::Error::SledError))
215 .collect()
216 }
217
218 fn sizes(&self) -> StoredSizes {
219 self.sizes.clone()
220 }
221
222 fn flush(&mut self) -> Result<usize, Self::Error> {
223 let mut flushed = 0;
224 flushed += self.changes_tree.flush()?;
225 flushed += self.document_tree.flush()?;
226 flushed += self.sync_states_tree.flush()?;
227 Ok(flushed)
228 }
229}