indradb/memory/
datastore.rs

1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
2use std::fs::File;
3use std::io::{BufReader, BufWriter};
4use std::path::PathBuf;
5use std::result::Result as StdResult;
6use std::sync::{Arc, Mutex, MutexGuard};
7
8use crate::errors::{Error, Result};
9use crate::util;
10use crate::{Database, Datastore, DynIter, Edge, Identifier, Json, Transaction, Vertex};
11
12use rmp_serde::decode::Error as RmpDecodeError;
13use serde::{Deserialize, Serialize};
14use tempfile::NamedTempFile;
15use uuid::Uuid;
16
17#[derive(Eq, PartialEq, Hash, Serialize, Deserialize, Debug)]
18enum IndexedPropertyMember {
19    Vertex(Uuid),
20    Edge(Edge),
21}
22
23// All of the data is actually stored in this struct, which is stored
24// internally to the datastore itself. This way, we can wrap a mutex around
25// the entire datastore, rather than on a per-data structure basis, as the
26// latter approach would risk deadlocking without extreme care.
27#[derive(Debug, Default, Serialize, Deserialize)]
28struct InternalMemory {
29    vertices: BTreeMap<Uuid, Identifier>,
30    edges: BTreeSet<Edge>,
31    reversed_edges: BTreeSet<Edge>,
32    vertex_properties: BTreeMap<(Uuid, Identifier), Json>,
33    edge_properties: BTreeMap<(Edge, Identifier), Json>,
34    property_values: HashMap<Identifier, HashMap<Json, HashSet<IndexedPropertyMember>>>,
35}
36
37pub struct MemoryTransaction<'a> {
38    internal: MutexGuard<'a, InternalMemory>,
39    path: Option<PathBuf>,
40}
41
42impl<'a> Transaction<'a> for MemoryTransaction<'a> {
43    fn vertex_count(&self) -> u64 {
44        self.internal.vertices.len() as u64
45    }
46
47    fn all_vertices(&'a self) -> Result<DynIter<'a, Vertex>> {
48        let iter = self
49            .internal
50            .vertices
51            .iter()
52            .map(|(id, t)| Ok(Vertex::with_id(*id, *t)));
53        Ok(Box::new(iter))
54    }
55
56    fn range_vertices(&'a self, offset: Uuid) -> Result<DynIter<'a, Vertex>> {
57        let iter = self
58            .internal
59            .vertices
60            .range(offset..)
61            .map(|(id, t)| Ok(Vertex::with_id(*id, *t)));
62        Ok(Box::new(iter))
63    }
64
65    fn specific_vertices(&'a self, ids: Vec<Uuid>) -> Result<DynIter<'a, Vertex>> {
66        let iter = ids.into_iter().filter_map(move |id| {
67            self.internal
68                .vertices
69                .get(&id)
70                .map(|value| Ok(Vertex::with_id(id, *value)))
71        });
72        Ok(Box::new(iter))
73    }
74
75    fn vertex_ids_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Uuid>>> {
76        if let Some(container) = self.internal.property_values.get(&name) {
77            let mut vertex_ids = HashSet::<Uuid>::default();
78            for sub_container in container.values() {
79                for member in sub_container {
80                    if let IndexedPropertyMember::Vertex(id) = member {
81                        vertex_ids.insert(*id);
82                    }
83                }
84            }
85            Ok(Some(Box::new(vertex_ids.into_iter().map(Ok))))
86        } else {
87            Ok(None)
88        }
89    }
90
91    fn vertex_ids_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Uuid>>> {
92        if let Some(container) = self.internal.property_values.get(&name) {
93            if let Some(sub_container) = container.get(value) {
94                let iter = Box::new(sub_container.iter().filter_map(move |member| match member {
95                    IndexedPropertyMember::Vertex(id) => Some(Ok(*id)),
96                    _ => None,
97                }));
98                Ok(Some(Box::new(iter)))
99            } else {
100                let iter = Vec::default().into_iter();
101                Ok(Some(Box::new(iter)))
102            }
103        } else {
104            Ok(None)
105        }
106    }
107
108    fn edge_count(&self) -> u64 {
109        self.internal.edges.len() as u64
110    }
111
112    fn all_edges(&'a self) -> Result<DynIter<'a, Edge>> {
113        let iter = self.internal.edges.iter().map(|e| Ok(e.clone()));
114        Ok(Box::new(iter))
115    }
116
117    fn range_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>> {
118        let iter = self.internal.edges.range(offset..).map(|e| Ok(e.clone()));
119        Ok(Box::new(iter))
120    }
121
122    fn range_reversed_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>> {
123        let iter = self.internal.reversed_edges.range(offset..).map(|e| Ok(e.clone()));
124        Ok(Box::new(iter))
125    }
126
127    fn specific_edges(&'a self, edges: Vec<Edge>) -> Result<DynIter<'a, Edge>> {
128        let iter = edges
129            .into_iter()
130            .filter(move |edge| self.internal.edges.contains(edge))
131            .map(Ok);
132        Ok(Box::new(iter))
133    }
134
135    fn edges_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Edge>>> {
136        if let Some(container) = self.internal.property_values.get(&name) {
137            let mut edges = HashSet::<Edge>::default();
138            for sub_container in container.values() {
139                for member in sub_container {
140                    if let IndexedPropertyMember::Edge(edge) = member {
141                        edges.insert(edge.clone());
142                    }
143                }
144            }
145            Ok(Some(Box::new(edges.into_iter().map(Ok))))
146        } else {
147            Ok(None)
148        }
149    }
150
151    fn edges_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Edge>>> {
152        if let Some(container) = self.internal.property_values.get(&name) {
153            if let Some(sub_container) = container.get(value) {
154                let iter = Box::new(sub_container.iter().filter_map(move |member| match member {
155                    IndexedPropertyMember::Edge(edge) if self.internal.edges.contains(edge) => Some(edge),
156                    _ => None,
157                }));
158                Ok(Some(Box::new(iter.map(|e| Ok(e.clone())))))
159            } else {
160                let iter = Vec::default().into_iter();
161                Ok(Some(Box::new(iter)))
162            }
163        } else {
164            Ok(None)
165        }
166    }
167
168    fn vertex_property(&self, vertex: &Vertex, name: Identifier) -> Result<Option<Json>> {
169        if let Some(value) = self.internal.vertex_properties.get(&(vertex.id, name)) {
170            Ok(Some(value.clone()))
171        } else {
172            Ok(None)
173        }
174    }
175
176    fn all_vertex_properties_for_vertex(&'a self, vertex: &Vertex) -> Result<DynIter<'a, (Identifier, Json)>> {
177        let mut vertex_properties = Vec::new();
178        let from = &(vertex.id, Identifier::default());
179        let to = &(util::next_uuid(vertex.id).unwrap(), Identifier::default());
180        for ((_prop_vertex_id, prop_name), prop_value) in self.internal.vertex_properties.range(from..to) {
181            vertex_properties.push((*prop_name, prop_value.clone()));
182        }
183        Ok(Box::new(vertex_properties.into_iter().map(Ok)))
184    }
185
186    fn edge_property(&self, edge: &Edge, name: Identifier) -> Result<Option<Json>> {
187        if let Some(value) = self.internal.edge_properties.get(&(edge.clone(), name)) {
188            Ok(Some(value.clone()))
189        } else {
190            Ok(None)
191        }
192    }
193
194    fn all_edge_properties_for_edge(&'a self, edge: &Edge) -> Result<DynIter<'a, (Identifier, Json)>> {
195        let mut edge_properties = Vec::new();
196        let from = &(edge.clone(), Identifier::default());
197        for ((prop_edge, prop_name), prop_value) in self.internal.edge_properties.range(from..) {
198            if prop_edge != edge {
199                break;
200            }
201            edge_properties.push((*prop_name, prop_value.clone()));
202        }
203        Ok(Box::new(edge_properties.into_iter().map(Ok)))
204    }
205
206    fn delete_vertices(&mut self, vertices: Vec<Vertex>) -> Result<()> {
207        for vertex in vertices {
208            self.internal.vertices.remove(&vertex.id);
209
210            let mut deletable_vertex_properties: Vec<(Uuid, Identifier)> = Vec::new();
211            for (property_key, _) in self
212                .internal
213                .vertex_properties
214                .range((vertex.id, Identifier::default())..)
215            {
216                let (property_vertex_id, _) = property_key;
217
218                if &vertex.id != property_vertex_id {
219                    break;
220                }
221
222                deletable_vertex_properties.push(*property_key);
223            }
224            self.delete_vertex_properties(deletable_vertex_properties)?;
225
226            let mut deletable_edges: Vec<Edge> = Vec::new();
227            for edge in self.internal.edges.iter() {
228                if edge.outbound_id == vertex.id || edge.inbound_id == vertex.id {
229                    deletable_edges.push(edge.clone());
230                }
231            }
232            self.delete_edges(deletable_edges)?;
233        }
234        Ok(())
235    }
236
237    fn delete_edges(&mut self, edges: Vec<Edge>) -> Result<()> {
238        for edge in edges {
239            self.internal.edges.remove(&edge);
240            self.internal.reversed_edges.remove(&edge.reversed());
241
242            let mut deletable_edge_properties: Vec<(Edge, Identifier)> = Vec::new();
243            for (property_key, _) in self
244                .internal
245                .edge_properties
246                .range((edge.clone(), Identifier::default())..)
247            {
248                let (property_edge, _) = property_key;
249
250                if &edge != property_edge {
251                    break;
252                }
253
254                deletable_edge_properties.push(property_key.clone());
255            }
256            self.delete_edge_properties(deletable_edge_properties)?;
257        }
258        Ok(())
259    }
260
261    fn delete_vertex_properties(&mut self, props: Vec<(Uuid, Identifier)>) -> Result<()> {
262        for prop in props {
263            if let Some(property_value) = self.internal.vertex_properties.remove(&prop) {
264                let (property_vertex_id, property_name) = prop;
265                if let Some(property_container) = self.internal.property_values.get_mut(&property_name) {
266                    debug_assert!(property_container
267                        .get_mut(&property_value)
268                        .unwrap()
269                        .remove(&IndexedPropertyMember::Vertex(property_vertex_id)));
270                }
271            }
272        }
273        Ok(())
274    }
275
276    fn delete_edge_properties(&mut self, props: Vec<(Edge, Identifier)>) -> Result<()> {
277        for prop in props {
278            if let Some(property_value) = self.internal.edge_properties.remove(&prop) {
279                let (property_edge, property_name) = prop;
280                if let Some(property_container) = self.internal.property_values.get_mut(&property_name) {
281                    debug_assert!(property_container
282                        .get_mut(&property_value)
283                        .unwrap()
284                        .remove(&IndexedPropertyMember::Edge(property_edge)));
285                }
286            }
287        }
288        Ok(())
289    }
290
291    fn sync(&self) -> Result<()> {
292        if let Some(ref persist_path) = self.path {
293            let temp_path = NamedTempFile::new().map_err(|err| Error::Datastore(Box::new(err)))?;
294            {
295                let mut buf = BufWriter::new(temp_path.as_file());
296                rmp_serde::encode::write(&mut buf, &*self.internal)?;
297            }
298            temp_path
299                .persist(persist_path)
300                .map_err(|err| Error::Datastore(Box::new(err)))?;
301        }
302        Ok(())
303    }
304
305    fn create_vertex(&mut self, vertex: &Vertex) -> Result<bool> {
306        let mut inserted = false;
307
308        self.internal.vertices.entry(vertex.id).or_insert_with(|| {
309            inserted = true;
310            vertex.t
311        });
312
313        Ok(inserted)
314    }
315
316    fn create_edge(&mut self, edge: &Edge) -> Result<bool> {
317        if !self.internal.vertices.contains_key(&edge.outbound_id)
318            || !self.internal.vertices.contains_key(&edge.inbound_id)
319        {
320            return Ok(false);
321        }
322
323        self.internal.edges.insert(edge.clone());
324        self.internal.reversed_edges.insert(edge.reversed());
325        Ok(true)
326    }
327
328    fn index_property(&mut self, name: Identifier) -> Result<()> {
329        let mut property_container: HashMap<Json, HashSet<IndexedPropertyMember>> = HashMap::new();
330        for id in self.internal.vertices.keys() {
331            if let Some(value) = self.internal.vertex_properties.get(&(*id, name)) {
332                property_container
333                    .entry(value.clone())
334                    .or_default()
335                    .insert(IndexedPropertyMember::Vertex(*id));
336            }
337        }
338        for edge in self.internal.edges.iter() {
339            if let Some(value) = self.internal.edge_properties.get(&(edge.clone(), name)) {
340                property_container
341                    .entry(value.clone())
342                    .or_default()
343                    .insert(IndexedPropertyMember::Edge(edge.clone()));
344            }
345        }
346
347        let existing_property_container = self.internal.property_values.entry(name).or_default();
348        for (value, members) in property_container.into_iter() {
349            let existing_members = existing_property_container.entry(value).or_default();
350            for member in members {
351                existing_members.insert(member);
352            }
353        }
354
355        Ok(())
356    }
357
358    fn set_vertex_properties(&mut self, vertex_ids: Vec<Uuid>, name: Identifier, value: &Json) -> Result<()> {
359        let mut deletable_vertex_properties = Vec::new();
360        for vertex_id in &vertex_ids {
361            deletable_vertex_properties.push((*vertex_id, name));
362        }
363        self.delete_vertex_properties(deletable_vertex_properties)?;
364
365        for vertex_id in &vertex_ids {
366            self.internal
367                .vertex_properties
368                .insert((*vertex_id, name), value.clone());
369        }
370
371        if let Some(property_container) = self.internal.property_values.get_mut(&name) {
372            let property_container = property_container.entry(value.clone()).or_insert_with(HashSet::new);
373            for vertex_id in vertex_ids.into_iter() {
374                property_container.insert(IndexedPropertyMember::Vertex(vertex_id));
375            }
376        }
377
378        Ok(())
379    }
380
381    fn set_edge_properties(&mut self, edges: Vec<Edge>, name: Identifier, value: &Json) -> Result<()> {
382        let mut deletable_edge_properties = Vec::new();
383        for edge in &edges {
384            deletable_edge_properties.push((edge.clone(), name));
385        }
386        self.delete_edge_properties(deletable_edge_properties)?;
387
388        for edge in &edges {
389            self.internal
390                .edge_properties
391                .insert((edge.clone(), name), value.clone());
392        }
393
394        if let Some(property_container) = self.internal.property_values.get_mut(&name) {
395            let property_container = property_container.entry(value.clone()).or_insert_with(HashSet::new);
396            for edge in edges.into_iter() {
397                property_container.insert(IndexedPropertyMember::Edge(edge));
398            }
399        }
400
401        Ok(())
402    }
403}
404
405/// An in-memory datastore.
406#[derive(Debug, Clone)]
407pub struct MemoryDatastore {
408    internal: Arc<Mutex<InternalMemory>>,
409    path: Option<PathBuf>,
410}
411
412impl MemoryDatastore {
413    /// Creates a new in-memory database with no persistence.
414    pub fn new_db() -> Database<MemoryDatastore> {
415        Database::new(MemoryDatastore {
416            internal: Arc::new(Mutex::new(InternalMemory::default())),
417            path: None,
418        })
419    }
420
421    /// Reads a persisted image from disk. Calls to sync will overwrite the
422    /// file at the specified path.
423    ///
424    /// # Arguments
425    /// * `path`: The path to the persisted image.
426    pub fn read_msgpack_db<P: Into<PathBuf>>(path: P) -> StdResult<Database<MemoryDatastore>, RmpDecodeError> {
427        let path = path.into();
428        let f = File::open(&path).map_err(RmpDecodeError::InvalidDataRead)?;
429        let buf = BufReader::new(f);
430        let internal: InternalMemory = rmp_serde::from_read(buf)?;
431        Ok(Database::new(MemoryDatastore {
432            internal: Arc::new(Mutex::new(internal)),
433            path: Some(path),
434        }))
435    }
436
437    /// Creates a new datastore. Calls to sync will overwrite the file at the
438    /// specified path, but as opposed to `read`, this will not read the file
439    /// first.
440    ///
441    /// # Arguments
442    /// * `path`: The path to the persisted image.
443    pub fn create_msgpack_db<P: Into<PathBuf>>(path: P) -> Database<MemoryDatastore> {
444        Database::new(MemoryDatastore {
445            internal: Arc::new(Mutex::new(InternalMemory::default())),
446            path: Some(path.into()),
447        })
448    }
449}
450
451impl Datastore for MemoryDatastore {
452    type Transaction<'a> = MemoryTransaction<'a>;
453    fn transaction(&'_ self) -> Self::Transaction<'_> {
454        MemoryTransaction {
455            internal: self.internal.lock().unwrap(),
456            path: self.path.clone(),
457        }
458    }
459}