indradb_sled/
datastore.rs

1use std::path::Path;
2use std::sync::Arc;
3use std::{u64, usize};
4
5use super::errors::map_err;
6use super::managers::*;
7
8use chrono::offset::Utc;
9use indradb::util::next_uuid;
10use indradb::{
11    BulkInsertItem, Datastore, Edge, EdgeDirection, EdgeKey, EdgeProperties, EdgeProperty, EdgePropertyQuery,
12    EdgeQuery, NamedProperty, Result, Transaction, Type, Vertex, VertexProperties, VertexProperty, VertexPropertyQuery,
13    VertexQuery,
14};
15use serde_json::Value as JsonValue;
16use sled::{Config, Db, Tree};
17use uuid::Uuid;
18
19#[derive(Copy, Clone, Default, Debug)]
20pub struct SledConfig {
21    use_compression: bool,
22    compression_factor: Option<i32>,
23}
24
25impl SledConfig {
26    /// Creates a new sled config with zstd compression enabled.
27    ///
28    /// # Arguments
29    /// * `factor`: The zstd compression factor to use. If unspecified, this
30    ///   will default to 5.
31    pub fn with_compression(factor: Option<i32>) -> SledConfig {
32        SledConfig {
33            use_compression: true,
34            compression_factor: factor,
35        }
36    }
37
38    /// Creates a new sled datastore.
39    pub fn open<P: AsRef<Path>>(self, path: P) -> Result<SledDatastore> {
40        Ok(SledDatastore {
41            holder: Arc::new(SledHolder::new(path, self)?),
42        })
43    }
44}
45
46/// The meat of a Sled datastore
47pub struct SledHolder {
48    pub(crate) db: Arc<Db>, // Derefs to Tree, holds the vertices
49    pub(crate) edges: Tree,
50    pub(crate) edge_ranges: Tree,
51    pub(crate) reversed_edge_ranges: Tree,
52    pub(crate) vertex_properties: Tree,
53    pub(crate) edge_properties: Tree,
54}
55
56impl<'ds> SledHolder {
57    /// The meat of a Sled datastore.
58    ///
59    /// # Arguments
60    /// * `path`: The file path to the Sled database.
61    /// * `opts`: Sled options to pass in.
62    pub fn new<P: AsRef<Path>>(path: P, opts: SledConfig) -> Result<SledHolder> {
63        let mut config = Config::default().path(path);
64
65        if opts.use_compression {
66            config = config.use_compression(true);
67        }
68
69        if let Some(compression_factor) = opts.compression_factor {
70            config = config.compression_factor(compression_factor);
71        }
72
73        let db = map_err(config.open())?;
74
75        Ok(SledHolder {
76            edges: map_err(db.open_tree("edges"))?,
77            edge_ranges: map_err(db.open_tree("edge_ranges"))?,
78            reversed_edge_ranges: map_err(db.open_tree("reversed_edge_ranges"))?,
79            vertex_properties: map_err(db.open_tree("vertex_properties"))?,
80            edge_properties: map_err(db.open_tree("edge_properties"))?,
81            db: Arc::new(db),
82        })
83    }
84}
85
86/// A datastore that is backed by Sled.
87pub struct SledDatastore {
88    pub(crate) holder: Arc<SledHolder>,
89}
90
91impl<'ds> SledDatastore {
92    /// Creates a new Sled datastore.
93    ///
94    /// # Arguments
95    /// * `path`: The file path to the Sled database.
96    pub fn new<P: AsRef<Path>>(path: P) -> Result<SledDatastore> {
97        Ok(SledDatastore {
98            holder: Arc::new(SledHolder::new(path, SledConfig::default())?),
99        })
100    }
101}
102
103impl Datastore for SledDatastore {
104    type Trans = SledTransaction;
105
106    fn sync(&self) -> Result<()> {
107        let holder = self.holder.clone();
108        let db = holder.db.clone();
109        map_err(db.flush())?;
110        Ok(())
111    }
112
113    fn transaction(&self) -> Result<Self::Trans> {
114        Ok(SledTransaction::new(self.holder.clone()))
115    }
116
117    fn bulk_insert<I>(&self, items: I) -> Result<()>
118    where
119        I: Iterator<Item = BulkInsertItem>,
120    {
121        let vertex_manager = VertexManager::new(&self.holder);
122        let edge_manager = EdgeManager::new(&self.holder);
123        let vertex_property_manager = VertexPropertyManager::new(&self.holder.vertex_properties);
124        let edge_property_manager = EdgePropertyManager::new(&self.holder.edge_properties);
125
126        for item in items {
127            match item {
128                BulkInsertItem::Vertex(ref vertex) => {
129                    vertex_manager.create(vertex)?;
130                }
131                BulkInsertItem::Edge(ref key) => {
132                    edge_manager.set(key.outbound_id, &key.t, key.inbound_id, Utc::now())?;
133                }
134                BulkInsertItem::VertexProperty(id, ref name, ref value) => {
135                    vertex_property_manager.set(id, name, value)?;
136                }
137                BulkInsertItem::EdgeProperty(ref key, ref name, ref value) => {
138                    edge_property_manager.set(key.outbound_id, &key.t, key.inbound_id, name, value)?;
139                }
140            }
141        }
142
143        map_err(self.holder.db.flush())?;
144        Ok(())
145    }
146}
147
148/// A transaction that is backed by Sled.
149pub struct SledTransaction {
150    holder: Arc<SledHolder>,
151}
152
153impl SledTransaction {
154    fn new(holder: Arc<SledHolder>) -> Self {
155        SledTransaction { holder }
156    }
157
158    #[allow(clippy::needless_collect)]
159    fn vertex_query_to_iterator<'iter, 'trans: 'iter>(
160        &'trans self,
161        q: VertexQuery,
162    ) -> Result<Box<dyn Iterator<Item = Result<VertexItem>> + 'iter>> {
163        match q {
164            VertexQuery::Range(q) => {
165                let vertex_manager = VertexManager::new(&self.holder);
166
167                let next_uuid = match q.start_id {
168                    Some(start_id) => {
169                        match next_uuid(start_id) {
170                            Ok(next_uuid) => next_uuid,
171                            // If we get an error back, it's because
172                            // `start_id` is the maximum possible value. We
173                            // know that no vertices exist whose ID is greater
174                            // than the maximum possible value, so just return
175                            // an empty list.
176                            Err(_) => return Ok(Box::new(vec![].into_iter())),
177                        }
178                    }
179                    None => Uuid::default(),
180                };
181
182                let mut iter: Box<dyn Iterator<Item = Result<VertexItem>>> =
183                    Box::new(vertex_manager.iterate_for_range(next_uuid));
184
185                if let Some(ref t) = q.t {
186                    iter = Box::new(iter.filter(move |item| match item {
187                        Ok((_, v)) => v == t,
188                        Err(_) => true,
189                    }));
190                }
191
192                let results: Vec<Result<VertexItem>> = iter.take(q.limit as usize).collect();
193                Ok(Box::new(results.into_iter()))
194            }
195            VertexQuery::Specific(q) => {
196                let vertex_manager = VertexManager::new(&self.holder);
197
198                let iter = q.ids.into_iter().map(move |id| match vertex_manager.get(id)? {
199                    Some(value) => Ok(Some((id, value))),
200                    None => Ok(None),
201                });
202
203                Ok(Box::new(remove_nones_from_iterator(iter)))
204            }
205            VertexQuery::Pipe(q) => {
206                let vertex_manager = VertexManager::new(&self.holder);
207                let edge_iterator = self.edge_query_to_iterator(*q.inner)?;
208                let direction = q.direction;
209
210                let iter = edge_iterator.map(move |item| {
211                    let (outbound_id, _, _, inbound_id) = item?;
212
213                    let id = match direction {
214                        EdgeDirection::Outbound => outbound_id,
215                        EdgeDirection::Inbound => inbound_id,
216                    };
217
218                    match vertex_manager.get(id)? {
219                        Some(value) => Ok(Some((id, value))),
220                        None => Ok(None),
221                    }
222                });
223
224                let mut iter: Box<dyn Iterator<Item = Result<VertexItem>>> = Box::new(remove_nones_from_iterator(iter));
225
226                if let Some(ref t) = q.t {
227                    iter = Box::new(iter.filter(move |item| match item {
228                        Ok((_, v)) => v == t,
229                        Err(_) => true,
230                    }));
231                }
232
233                let results: Vec<Result<VertexItem>> = iter.take(q.limit as usize).collect();
234                Ok(Box::new(results.into_iter()))
235            }
236        }
237    }
238
239    fn edge_query_to_iterator<'iter, 'trans: 'iter>(
240        &'trans self,
241        q: EdgeQuery,
242    ) -> Result<Box<dyn Iterator<Item = Result<EdgeRangeItem>> + 'iter>> {
243        match q {
244            EdgeQuery::Specific(q) => {
245                let edge_manager = EdgeManager::new(&self.holder);
246
247                let edges = q.keys.into_iter().map(move |key| {
248                    match edge_manager.get(key.outbound_id, &key.t, key.inbound_id)? {
249                        Some(update_datetime) => {
250                            Ok(Some((key.outbound_id, key.t.clone(), update_datetime, key.inbound_id)))
251                        }
252                        None => Ok(None),
253                    }
254                });
255
256                let iterator = remove_nones_from_iterator(edges);
257                Ok(Box::new(iterator))
258            }
259            EdgeQuery::Pipe(q) => {
260                let vertex_iterator = self.vertex_query_to_iterator(*q.inner)?;
261
262                let edge_range_manager = match q.direction {
263                    EdgeDirection::Outbound => EdgeRangeManager::new(&self.holder),
264                    EdgeDirection::Inbound => EdgeRangeManager::new_reversed(&self.holder),
265                };
266
267                // Ideally we'd use iterators all the way down, but things
268                // start breaking apart due to conditional expressions not
269                // returning the same type signature, issues with `Result`s
270                // and some of the iterators, etc. So at this point, we'll
271                // just resort to building a vector.
272                let mut edges: Vec<Result<EdgeRangeItem>> = Vec::new();
273
274                for item in vertex_iterator {
275                    let (id, _) = item?;
276                    let edge_iterator = edge_range_manager.iterate_for_range(id, q.t.as_ref(), q.high)?;
277
278                    for item in edge_iterator {
279                        match item {
280                            Ok((
281                                edge_range_first_id,
282                                edge_range_t,
283                                edge_range_update_datetime,
284                                edge_range_second_id,
285                            )) => {
286                                if let Some(low) = q.low {
287                                    if edge_range_update_datetime < low {
288                                        break;
289                                    }
290                                }
291
292                                edges.push(match q.direction {
293                                    EdgeDirection::Outbound => Ok((
294                                        edge_range_first_id,
295                                        edge_range_t,
296                                        edge_range_update_datetime,
297                                        edge_range_second_id,
298                                    )),
299                                    EdgeDirection::Inbound => Ok((
300                                        edge_range_second_id,
301                                        edge_range_t,
302                                        edge_range_update_datetime,
303                                        edge_range_first_id,
304                                    )),
305                                })
306                            }
307                            Err(_) => edges.push(item),
308                        }
309
310                        if edges.len() == q.limit as usize {
311                            break;
312                        }
313                    }
314                }
315
316                Ok(Box::new(edges.into_iter()))
317            }
318        }
319    }
320}
321
322impl Transaction for SledTransaction {
323    fn create_vertex(&self, vertex: &Vertex) -> Result<bool> {
324        let vertex_manager = VertexManager::new(&self.holder);
325
326        if vertex_manager.exists(vertex.id)? {
327            Ok(false)
328        } else {
329            vertex_manager.create(vertex)?;
330            Ok(true)
331        }
332    }
333
334    fn get_vertices<Q: Into<VertexQuery>>(&self, q: Q) -> Result<Vec<Vertex>> {
335        let iterator = self.vertex_query_to_iterator(q.into())?;
336
337        let mapped = iterator.map(move |item| {
338            let (id, t) = item?;
339            let vertex = Vertex::with_id(id, t);
340            Ok(vertex)
341        });
342
343        mapped.collect()
344    }
345
346    fn delete_vertices<Q: Into<VertexQuery>>(&self, q: Q) -> Result<()> {
347        let iterator = self.vertex_query_to_iterator(q.into())?;
348        let vertex_manager = VertexManager::new(&self.holder);
349
350        for item in iterator {
351            let (id, _) = item?;
352            vertex_manager.delete(id)?;
353        }
354
355        Ok(())
356    }
357
358    fn get_vertex_count(&self) -> Result<u64> {
359        let vertex_manager = VertexManager::new(&self.holder);
360        let iterator = vertex_manager.iterate_for_range(Uuid::default());
361        Ok(iterator.count() as u64)
362    }
363
364    fn create_edge(&self, key: &EdgeKey) -> Result<bool> {
365        let vertex_manager = VertexManager::new(&self.holder);
366
367        if !vertex_manager.exists(key.outbound_id)? || !vertex_manager.exists(key.inbound_id)? {
368            Ok(false)
369        } else {
370            let edge_manager = EdgeManager::new(&self.holder);
371            edge_manager.set(key.outbound_id, &key.t, key.inbound_id, Utc::now())?;
372            Ok(true)
373        }
374    }
375
376    fn get_edges<Q: Into<EdgeQuery>>(&self, q: Q) -> Result<Vec<Edge>> {
377        let iterator = self.edge_query_to_iterator(q.into())?;
378
379        let mapped = iterator.map(move |item: Result<EdgeRangeItem>| {
380            let (outbound_id, t, update_datetime, inbound_id) = item?;
381            let key = EdgeKey::new(outbound_id, t, inbound_id);
382            let edge = Edge::new(key, update_datetime);
383            Ok(edge)
384        });
385
386        mapped.collect()
387    }
388
389    fn delete_edges<Q: Into<EdgeQuery>>(&self, q: Q) -> Result<()> {
390        let edge_manager = EdgeManager::new(&self.holder);
391        let vertex_manager = VertexManager::new(&self.holder);
392        let iterator = self.edge_query_to_iterator(q.into())?;
393
394        for item in iterator {
395            let (outbound_id, t, update_datetime, inbound_id) = item?;
396
397            if vertex_manager.get(outbound_id)?.is_some() {
398                edge_manager.delete(outbound_id, &t, inbound_id, update_datetime)?;
399            };
400        }
401        Ok(())
402    }
403
404    fn get_edge_count(&self, id: Uuid, t: Option<&Type>, direction: EdgeDirection) -> Result<u64> {
405        let edge_range_manager = match direction {
406            EdgeDirection::Outbound => EdgeRangeManager::new(&self.holder),
407            EdgeDirection::Inbound => EdgeRangeManager::new_reversed(&self.holder),
408        };
409
410        let iter = edge_range_manager.iterate_for_range(id, t, None)?;
411        let count = iter.count();
412
413        Ok(count as u64)
414    }
415
416    fn get_vertex_properties(&self, q: VertexPropertyQuery) -> Result<Vec<VertexProperty>> {
417        let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
418        let mut properties = Vec::new();
419
420        for item in self.vertex_query_to_iterator(q.inner)? {
421            let (id, _) = item?;
422            let value = manager.get(id, &q.name)?;
423
424            if let Some(value) = value {
425                properties.push(VertexProperty::new(id, value));
426            }
427        }
428
429        Ok(properties)
430    }
431
432    fn get_all_vertex_properties<Q: Into<VertexQuery>>(&self, q: Q) -> Result<Vec<VertexProperties>> {
433        let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
434        let iterator = self.vertex_query_to_iterator(q.into())?;
435
436        let iter = iterator.map(move |item| {
437            let (id, t) = item?;
438            let vertex = Vertex::with_id(id, t);
439
440            let it = manager.iterate_for_owner(id)?;
441            let props: Result<Vec<_>> = it.collect();
442            let props_iter = props?.into_iter();
443            let props = props_iter
444                .map(|((_, name), value)| NamedProperty::new(name, value))
445                .collect();
446
447            Ok(VertexProperties::new(vertex, props))
448        });
449
450        iter.collect()
451    }
452
453    fn set_vertex_properties(&self, q: VertexPropertyQuery, value: &JsonValue) -> Result<()> {
454        let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
455
456        for item in self.vertex_query_to_iterator(q.inner)? {
457            let (id, _) = item?;
458            manager.set(id, &q.name, value)?;
459        }
460        Ok(())
461    }
462
463    fn delete_vertex_properties(&self, q: VertexPropertyQuery) -> Result<()> {
464        let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
465
466        for item in self.vertex_query_to_iterator(q.inner)? {
467            let (id, _) = item?;
468            manager.delete(id, &q.name)?;
469        }
470        Ok(())
471    }
472
473    fn get_edge_properties(&self, q: EdgePropertyQuery) -> Result<Vec<EdgeProperty>> {
474        let manager = EdgePropertyManager::new(&self.holder.edge_properties);
475        let mut properties = Vec::new();
476
477        for item in self.edge_query_to_iterator(q.inner)? {
478            let (outbound_id, t, _, inbound_id) = item?;
479            let value = manager.get(outbound_id, &t, inbound_id, &q.name)?;
480
481            if let Some(value) = value {
482                let key = EdgeKey::new(outbound_id, t, inbound_id);
483                properties.push(EdgeProperty::new(key, value));
484            }
485        }
486
487        Ok(properties)
488    }
489
490    fn get_all_edge_properties<Q: Into<EdgeQuery>>(&self, q: Q) -> Result<Vec<EdgeProperties>> {
491        let manager = EdgePropertyManager::new(&self.holder.edge_properties);
492        let iterator = self.edge_query_to_iterator(q.into())?;
493
494        let iter = iterator.map(move |item| {
495            let (out_id, t, time, in_id) = item?;
496            let edge = Edge::new(EdgeKey::new(out_id, t.clone(), in_id), time);
497            let it = manager.iterate_for_owner(out_id, &t, in_id)?;
498            let props: Result<Vec<_>> = it.collect();
499            let props_iter = props?.into_iter();
500            let props = props_iter
501                .map(|((_, _, _, name), value)| NamedProperty::new(name, value))
502                .collect();
503
504            Ok(EdgeProperties::new(edge, props))
505        });
506
507        iter.collect()
508    }
509
510    fn set_edge_properties(&self, q: EdgePropertyQuery, value: &JsonValue) -> Result<()> {
511        let manager = EdgePropertyManager::new(&self.holder.edge_properties);
512
513        for item in self.edge_query_to_iterator(q.inner)? {
514            let (outbound_id, t, _, inbound_id) = item?;
515            manager.set(outbound_id, &t, inbound_id, &q.name, value)?;
516        }
517        Ok(())
518    }
519
520    fn delete_edge_properties(&self, q: EdgePropertyQuery) -> Result<()> {
521        let manager = EdgePropertyManager::new(&self.holder.edge_properties);
522
523        for item in self.edge_query_to_iterator(q.inner)? {
524            let (outbound_id, t, _, inbound_id) = item?;
525            manager.delete(outbound_id, &t, inbound_id, &q.name)?;
526        }
527        Ok(())
528    }
529}
530
531fn remove_nones_from_iterator<I, T>(iter: I) -> impl Iterator<Item = Result<T>>
532where
533    I: Iterator<Item = Result<Option<T>>>,
534{
535    iter.filter_map(|item| match item {
536        Err(err) => Some(Err(err)),
537        Ok(Some(value)) => Some(Ok(value)),
538        _ => None,
539    })
540}