1use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
2use std::fs::File;
3use std::io::{BufReader, BufWriter};
4use std::path::PathBuf;
5use std::result::Result as StdResult;
6use std::sync::{Arc, Mutex, MutexGuard};
7
8use crate::errors::{Error, Result};
9use crate::util;
10use crate::{Database, Datastore, DynIter, Edge, Identifier, Json, Transaction, Vertex};
11
12use rmp_serde::decode::Error as RmpDecodeError;
13use serde::{Deserialize, Serialize};
14use tempfile::NamedTempFile;
15use uuid::Uuid;
16
17#[derive(Eq, PartialEq, Hash, Serialize, Deserialize, Debug)]
18enum IndexedPropertyMember {
19 Vertex(Uuid),
20 Edge(Edge),
21}
22
23#[derive(Debug, Default, Serialize, Deserialize)]
28struct InternalMemory {
29 vertices: BTreeMap<Uuid, Identifier>,
30 edges: BTreeSet<Edge>,
31 reversed_edges: BTreeSet<Edge>,
32 vertex_properties: BTreeMap<(Uuid, Identifier), Json>,
33 edge_properties: BTreeMap<(Edge, Identifier), Json>,
34 property_values: HashMap<Identifier, HashMap<Json, HashSet<IndexedPropertyMember>>>,
35}
36
37pub struct MemoryTransaction<'a> {
38 internal: MutexGuard<'a, InternalMemory>,
39 path: Option<PathBuf>,
40}
41
42impl<'a> Transaction<'a> for MemoryTransaction<'a> {
43 fn vertex_count(&self) -> u64 {
44 self.internal.vertices.len() as u64
45 }
46
47 fn all_vertices(&'a self) -> Result<DynIter<'a, Vertex>> {
48 let iter = self
49 .internal
50 .vertices
51 .iter()
52 .map(|(id, t)| Ok(Vertex::with_id(*id, *t)));
53 Ok(Box::new(iter))
54 }
55
56 fn range_vertices(&'a self, offset: Uuid) -> Result<DynIter<'a, Vertex>> {
57 let iter = self
58 .internal
59 .vertices
60 .range(offset..)
61 .map(|(id, t)| Ok(Vertex::with_id(*id, *t)));
62 Ok(Box::new(iter))
63 }
64
65 fn specific_vertices(&'a self, ids: Vec<Uuid>) -> Result<DynIter<'a, Vertex>> {
66 let iter = ids.into_iter().filter_map(move |id| {
67 self.internal
68 .vertices
69 .get(&id)
70 .map(|value| Ok(Vertex::with_id(id, *value)))
71 });
72 Ok(Box::new(iter))
73 }
74
75 fn vertex_ids_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Uuid>>> {
76 if let Some(container) = self.internal.property_values.get(&name) {
77 let mut vertex_ids = HashSet::<Uuid>::default();
78 for sub_container in container.values() {
79 for member in sub_container {
80 if let IndexedPropertyMember::Vertex(id) = member {
81 vertex_ids.insert(*id);
82 }
83 }
84 }
85 Ok(Some(Box::new(vertex_ids.into_iter().map(Ok))))
86 } else {
87 Ok(None)
88 }
89 }
90
91 fn vertex_ids_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Uuid>>> {
92 if let Some(container) = self.internal.property_values.get(&name) {
93 if let Some(sub_container) = container.get(value) {
94 let iter = Box::new(sub_container.iter().filter_map(move |member| match member {
95 IndexedPropertyMember::Vertex(id) => Some(Ok(*id)),
96 _ => None,
97 }));
98 Ok(Some(Box::new(iter)))
99 } else {
100 let iter = Vec::default().into_iter();
101 Ok(Some(Box::new(iter)))
102 }
103 } else {
104 Ok(None)
105 }
106 }
107
108 fn edge_count(&self) -> u64 {
109 self.internal.edges.len() as u64
110 }
111
112 fn all_edges(&'a self) -> Result<DynIter<'a, Edge>> {
113 let iter = self.internal.edges.iter().map(|e| Ok(e.clone()));
114 Ok(Box::new(iter))
115 }
116
117 fn range_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>> {
118 let iter = self.internal.edges.range(offset..).map(|e| Ok(e.clone()));
119 Ok(Box::new(iter))
120 }
121
122 fn range_reversed_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>> {
123 let iter = self.internal.reversed_edges.range(offset..).map(|e| Ok(e.clone()));
124 Ok(Box::new(iter))
125 }
126
127 fn specific_edges(&'a self, edges: Vec<Edge>) -> Result<DynIter<'a, Edge>> {
128 let iter = edges
129 .into_iter()
130 .filter(move |edge| self.internal.edges.contains(edge))
131 .map(Ok);
132 Ok(Box::new(iter))
133 }
134
135 fn edges_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Edge>>> {
136 if let Some(container) = self.internal.property_values.get(&name) {
137 let mut edges = HashSet::<Edge>::default();
138 for sub_container in container.values() {
139 for member in sub_container {
140 if let IndexedPropertyMember::Edge(edge) = member {
141 edges.insert(edge.clone());
142 }
143 }
144 }
145 Ok(Some(Box::new(edges.into_iter().map(Ok))))
146 } else {
147 Ok(None)
148 }
149 }
150
151 fn edges_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Edge>>> {
152 if let Some(container) = self.internal.property_values.get(&name) {
153 if let Some(sub_container) = container.get(value) {
154 let iter = Box::new(sub_container.iter().filter_map(move |member| match member {
155 IndexedPropertyMember::Edge(edge) if self.internal.edges.contains(edge) => Some(edge),
156 _ => None,
157 }));
158 Ok(Some(Box::new(iter.map(|e| Ok(e.clone())))))
159 } else {
160 let iter = Vec::default().into_iter();
161 Ok(Some(Box::new(iter)))
162 }
163 } else {
164 Ok(None)
165 }
166 }
167
168 fn vertex_property(&self, vertex: &Vertex, name: Identifier) -> Result<Option<Json>> {
169 if let Some(value) = self.internal.vertex_properties.get(&(vertex.id, name)) {
170 Ok(Some(value.clone()))
171 } else {
172 Ok(None)
173 }
174 }
175
176 fn all_vertex_properties_for_vertex(&'a self, vertex: &Vertex) -> Result<DynIter<'a, (Identifier, Json)>> {
177 let mut vertex_properties = Vec::new();
178 let from = &(vertex.id, Identifier::default());
179 let to = &(util::next_uuid(vertex.id).unwrap(), Identifier::default());
180 for ((_prop_vertex_id, prop_name), prop_value) in self.internal.vertex_properties.range(from..to) {
181 vertex_properties.push((*prop_name, prop_value.clone()));
182 }
183 Ok(Box::new(vertex_properties.into_iter().map(Ok)))
184 }
185
186 fn edge_property(&self, edge: &Edge, name: Identifier) -> Result<Option<Json>> {
187 if let Some(value) = self.internal.edge_properties.get(&(edge.clone(), name)) {
188 Ok(Some(value.clone()))
189 } else {
190 Ok(None)
191 }
192 }
193
194 fn all_edge_properties_for_edge(&'a self, edge: &Edge) -> Result<DynIter<'a, (Identifier, Json)>> {
195 let mut edge_properties = Vec::new();
196 let from = &(edge.clone(), Identifier::default());
197 for ((prop_edge, prop_name), prop_value) in self.internal.edge_properties.range(from..) {
198 if prop_edge != edge {
199 break;
200 }
201 edge_properties.push((*prop_name, prop_value.clone()));
202 }
203 Ok(Box::new(edge_properties.into_iter().map(Ok)))
204 }
205
206 fn delete_vertices(&mut self, vertices: Vec<Vertex>) -> Result<()> {
207 for vertex in vertices {
208 self.internal.vertices.remove(&vertex.id);
209
210 let mut deletable_vertex_properties: Vec<(Uuid, Identifier)> = Vec::new();
211 for (property_key, _) in self
212 .internal
213 .vertex_properties
214 .range((vertex.id, Identifier::default())..)
215 {
216 let (property_vertex_id, _) = property_key;
217
218 if &vertex.id != property_vertex_id {
219 break;
220 }
221
222 deletable_vertex_properties.push(*property_key);
223 }
224 self.delete_vertex_properties(deletable_vertex_properties)?;
225
226 let mut deletable_edges: Vec<Edge> = Vec::new();
227 for edge in self.internal.edges.iter() {
228 if edge.outbound_id == vertex.id || edge.inbound_id == vertex.id {
229 deletable_edges.push(edge.clone());
230 }
231 }
232 self.delete_edges(deletable_edges)?;
233 }
234 Ok(())
235 }
236
237 fn delete_edges(&mut self, edges: Vec<Edge>) -> Result<()> {
238 for edge in edges {
239 self.internal.edges.remove(&edge);
240 self.internal.reversed_edges.remove(&edge.reversed());
241
242 let mut deletable_edge_properties: Vec<(Edge, Identifier)> = Vec::new();
243 for (property_key, _) in self
244 .internal
245 .edge_properties
246 .range((edge.clone(), Identifier::default())..)
247 {
248 let (property_edge, _) = property_key;
249
250 if &edge != property_edge {
251 break;
252 }
253
254 deletable_edge_properties.push(property_key.clone());
255 }
256 self.delete_edge_properties(deletable_edge_properties)?;
257 }
258 Ok(())
259 }
260
261 fn delete_vertex_properties(&mut self, props: Vec<(Uuid, Identifier)>) -> Result<()> {
262 for prop in props {
263 if let Some(property_value) = self.internal.vertex_properties.remove(&prop) {
264 let (property_vertex_id, property_name) = prop;
265 if let Some(property_container) = self.internal.property_values.get_mut(&property_name) {
266 debug_assert!(property_container
267 .get_mut(&property_value)
268 .unwrap()
269 .remove(&IndexedPropertyMember::Vertex(property_vertex_id)));
270 }
271 }
272 }
273 Ok(())
274 }
275
276 fn delete_edge_properties(&mut self, props: Vec<(Edge, Identifier)>) -> Result<()> {
277 for prop in props {
278 if let Some(property_value) = self.internal.edge_properties.remove(&prop) {
279 let (property_edge, property_name) = prop;
280 if let Some(property_container) = self.internal.property_values.get_mut(&property_name) {
281 debug_assert!(property_container
282 .get_mut(&property_value)
283 .unwrap()
284 .remove(&IndexedPropertyMember::Edge(property_edge)));
285 }
286 }
287 }
288 Ok(())
289 }
290
291 fn sync(&self) -> Result<()> {
292 if let Some(ref persist_path) = self.path {
293 let temp_path = NamedTempFile::new().map_err(|err| Error::Datastore(Box::new(err)))?;
294 {
295 let mut buf = BufWriter::new(temp_path.as_file());
296 rmp_serde::encode::write(&mut buf, &*self.internal)?;
297 }
298 temp_path
299 .persist(persist_path)
300 .map_err(|err| Error::Datastore(Box::new(err)))?;
301 }
302 Ok(())
303 }
304
305 fn create_vertex(&mut self, vertex: &Vertex) -> Result<bool> {
306 let mut inserted = false;
307
308 self.internal.vertices.entry(vertex.id).or_insert_with(|| {
309 inserted = true;
310 vertex.t
311 });
312
313 Ok(inserted)
314 }
315
316 fn create_edge(&mut self, edge: &Edge) -> Result<bool> {
317 if !self.internal.vertices.contains_key(&edge.outbound_id)
318 || !self.internal.vertices.contains_key(&edge.inbound_id)
319 {
320 return Ok(false);
321 }
322
323 self.internal.edges.insert(edge.clone());
324 self.internal.reversed_edges.insert(edge.reversed());
325 Ok(true)
326 }
327
328 fn index_property(&mut self, name: Identifier) -> Result<()> {
329 let mut property_container: HashMap<Json, HashSet<IndexedPropertyMember>> = HashMap::new();
330 for id in self.internal.vertices.keys() {
331 if let Some(value) = self.internal.vertex_properties.get(&(*id, name)) {
332 property_container
333 .entry(value.clone())
334 .or_default()
335 .insert(IndexedPropertyMember::Vertex(*id));
336 }
337 }
338 for edge in self.internal.edges.iter() {
339 if let Some(value) = self.internal.edge_properties.get(&(edge.clone(), name)) {
340 property_container
341 .entry(value.clone())
342 .or_default()
343 .insert(IndexedPropertyMember::Edge(edge.clone()));
344 }
345 }
346
347 let existing_property_container = self.internal.property_values.entry(name).or_default();
348 for (value, members) in property_container.into_iter() {
349 let existing_members = existing_property_container.entry(value).or_default();
350 for member in members {
351 existing_members.insert(member);
352 }
353 }
354
355 Ok(())
356 }
357
358 fn set_vertex_properties(&mut self, vertex_ids: Vec<Uuid>, name: Identifier, value: &Json) -> Result<()> {
359 let mut deletable_vertex_properties = Vec::new();
360 for vertex_id in &vertex_ids {
361 deletable_vertex_properties.push((*vertex_id, name));
362 }
363 self.delete_vertex_properties(deletable_vertex_properties)?;
364
365 for vertex_id in &vertex_ids {
366 self.internal
367 .vertex_properties
368 .insert((*vertex_id, name), value.clone());
369 }
370
371 if let Some(property_container) = self.internal.property_values.get_mut(&name) {
372 let property_container = property_container.entry(value.clone()).or_insert_with(HashSet::new);
373 for vertex_id in vertex_ids.into_iter() {
374 property_container.insert(IndexedPropertyMember::Vertex(vertex_id));
375 }
376 }
377
378 Ok(())
379 }
380
381 fn set_edge_properties(&mut self, edges: Vec<Edge>, name: Identifier, value: &Json) -> Result<()> {
382 let mut deletable_edge_properties = Vec::new();
383 for edge in &edges {
384 deletable_edge_properties.push((edge.clone(), name));
385 }
386 self.delete_edge_properties(deletable_edge_properties)?;
387
388 for edge in &edges {
389 self.internal
390 .edge_properties
391 .insert((edge.clone(), name), value.clone());
392 }
393
394 if let Some(property_container) = self.internal.property_values.get_mut(&name) {
395 let property_container = property_container.entry(value.clone()).or_insert_with(HashSet::new);
396 for edge in edges.into_iter() {
397 property_container.insert(IndexedPropertyMember::Edge(edge));
398 }
399 }
400
401 Ok(())
402 }
403}
404
405#[derive(Debug, Clone)]
407pub struct MemoryDatastore {
408 internal: Arc<Mutex<InternalMemory>>,
409 path: Option<PathBuf>,
410}
411
412impl MemoryDatastore {
413 pub fn new_db() -> Database<MemoryDatastore> {
415 Database::new(MemoryDatastore {
416 internal: Arc::new(Mutex::new(InternalMemory::default())),
417 path: None,
418 })
419 }
420
421 pub fn read_msgpack_db<P: Into<PathBuf>>(path: P) -> StdResult<Database<MemoryDatastore>, RmpDecodeError> {
427 let path = path.into();
428 let f = File::open(&path).map_err(RmpDecodeError::InvalidDataRead)?;
429 let buf = BufReader::new(f);
430 let internal: InternalMemory = rmp_serde::from_read(buf)?;
431 Ok(Database::new(MemoryDatastore {
432 internal: Arc::new(Mutex::new(internal)),
433 path: Some(path),
434 }))
435 }
436
437 pub fn create_msgpack_db<P: Into<PathBuf>>(path: P) -> Database<MemoryDatastore> {
444 Database::new(MemoryDatastore {
445 internal: Arc::new(Mutex::new(InternalMemory::default())),
446 path: Some(path.into()),
447 })
448 }
449}
450
451impl Datastore for MemoryDatastore {
452 type Transaction<'a> = MemoryTransaction<'a>;
453 fn transaction(&'_ self) -> Self::Transaction<'_> {
454 MemoryTransaction {
455 internal: self.internal.lock().unwrap(),
456 path: self.path.clone(),
457 }
458 }
459}