1use crate::errors::{Error, Result};
2use crate::models::{
3 BulkInsertItem, Edge, EdgeDirection, EdgeProperties, Identifier, Json, NamedProperty, Query, QueryOutputValue,
4 Vertex, VertexProperties,
5};
6use std::collections::HashSet;
7use std::vec::Vec;
8use uuid::Uuid;
9
10pub type DynIter<'a, T> = Box<dyn Iterator<Item = Result<T>> + 'a>;
13
14pub trait Transaction<'a> {
25 fn vertex_count(&self) -> u64;
27 fn all_vertices(&'a self) -> Result<DynIter<'a, Vertex>>;
29 fn range_vertices(&'a self, offset: Uuid) -> Result<DynIter<'a, Vertex>>;
35 fn specific_vertices(&'a self, ids: Vec<Uuid>) -> Result<DynIter<'a, Vertex>>;
37 fn vertex_ids_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Uuid>>>;
42 fn vertex_ids_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Uuid>>>;
48
49 fn edge_count(&self) -> u64;
51 fn all_edges(&'a self) -> Result<DynIter<'a, Edge>>;
53 fn range_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>>;
58 fn range_reversed_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>>;
65 fn specific_edges(&'a self, edges: Vec<Edge>) -> Result<DynIter<'a, Edge>>;
70 fn edges_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Edge>>>;
75 fn edges_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Edge>>>;
81
82 fn vertex_property(&self, vertex: &Vertex, name: Identifier) -> Result<Option<Json>>;
88 fn all_vertex_properties_for_vertex(&'a self, vertex: &Vertex) -> Result<DynIter<'a, (Identifier, Json)>>;
93
94 fn edge_property(&self, edge: &Edge, name: Identifier) -> Result<Option<Json>>;
100 fn all_edge_properties_for_edge(&'a self, edge: &Edge) -> Result<DynIter<'a, (Identifier, Json)>>;
105
106 fn delete_vertices(&mut self, vertices: Vec<Vertex>) -> Result<()>;
111 fn delete_edges(&mut self, edges: Vec<Edge>) -> Result<()>;
116 fn delete_vertex_properties(&mut self, props: Vec<(Uuid, Identifier)>) -> Result<()>;
121 fn delete_edge_properties(&mut self, props: Vec<(Edge, Identifier)>) -> Result<()>;
126
127 fn sync(&self) -> Result<()> {
130 Err(Error::Unsupported)
131 }
132
133 fn create_vertex(&mut self, vertex: &Vertex) -> Result<bool>;
140 fn create_edge(&mut self, edge: &Edge) -> Result<bool>;
147
148 fn bulk_insert(&mut self, items: Vec<BulkInsertItem>) -> Result<()> {
155 for item in items {
156 match item {
157 BulkInsertItem::Vertex(vertex) => {
158 self.create_vertex(&vertex)?;
159 }
160 BulkInsertItem::Edge(edge) => {
161 self.create_edge(&edge)?;
162 }
163 BulkInsertItem::VertexProperty(id, name, value) => {
164 self.set_vertex_properties(vec![id], name, &value)?;
165 }
166 BulkInsertItem::EdgeProperty(edge, name, value) => {
167 self.set_edge_properties(vec![edge], name, &value)?;
168 }
169 }
170 }
171
172 Ok(())
173 }
174
175 fn index_property(&mut self, name: Identifier) -> Result<()>;
181
182 fn set_vertex_properties(&mut self, vertices: Vec<Uuid>, name: Identifier, value: &Json) -> Result<()>;
189 fn set_edge_properties(&mut self, edges: Vec<Edge>, name: Identifier, value: &Json) -> Result<()>;
196}
197
198pub trait Datastore {
201 type Transaction<'a>: Transaction<'a>
203 where
204 Self: 'a;
205 fn transaction(&self) -> Self::Transaction<'_>;
207}
208
209pub struct Database<D: Datastore> {
218 pub datastore: D,
219}
220
221impl<D: Datastore> Database<D> {
222 pub fn new(datastore: D) -> Database<D> {
227 Self { datastore }
228 }
229
230 pub fn sync(&self) -> Result<()> {
233 let txn = self.datastore.transaction();
234 txn.sync()
235 }
236
237 pub fn create_vertex(&self, vertex: &Vertex) -> Result<bool> {
244 let mut txn = self.datastore.transaction();
245 txn.create_vertex(vertex)
246 }
247
248 pub fn create_vertex_from_type(&self, t: Identifier) -> Result<Uuid> {
255 let v = Vertex::new(t);
256
257 if !self.create_vertex(&v)? {
258 Err(Error::UuidTaken)
259 } else {
260 Ok(v.id)
261 }
262 }
263
264 pub fn create_edge(&self, edge: &Edge) -> Result<bool> {
271 let mut txn = self.datastore.transaction();
272 txn.create_edge(edge)
273 }
274
275 pub fn get<Q: Into<Query>>(&self, q: Q) -> Result<Vec<QueryOutputValue>> {
280 let q = q.into();
281 let txn = self.datastore.transaction();
282 let mut output = Vec::with_capacity(q.output_len());
283 unsafe {
284 query(&txn as *const D::Transaction<'_>, &q, &mut output)?;
285 }
286 Ok(output)
287 }
288
289 pub fn delete<Q: Into<Query>>(&self, q: Q) -> Result<()> {
294 let q = q.into();
295 let mut txn = self.datastore.transaction();
296 let mut output = Vec::with_capacity(q.output_len());
297 unsafe {
298 query(&txn as *const D::Transaction<'_>, &q, &mut output)?;
299 }
300 match output.pop().unwrap() {
301 QueryOutputValue::Vertices(vertices) => {
302 txn.delete_vertices(vertices)?;
303 }
304 QueryOutputValue::Edges(edges) => {
305 txn.delete_edges(edges)?;
306 }
307 QueryOutputValue::VertexProperties(vertex_properties) => {
308 txn.delete_vertex_properties(
309 vertex_properties
310 .into_iter()
311 .flat_map(|vps| {
312 let iter = vps.props.iter().map(move |vp| (vps.vertex.id, vp.name));
313 iter.collect::<Vec<(Uuid, Identifier)>>()
314 })
315 .collect(),
316 )?;
317 }
318 QueryOutputValue::EdgeProperties(edge_properties) => {
319 txn.delete_edge_properties(
320 edge_properties
321 .into_iter()
322 .flat_map(|eps| {
323 let iter = eps.props.iter().map(move |ep| (eps.edge.clone(), ep.name));
324 iter.collect::<Vec<(Edge, Identifier)>>()
325 })
326 .collect(),
327 )?;
328 }
329 QueryOutputValue::Count(_) => return Err(Error::OperationOnQuery),
330 }
331 Ok(())
332 }
333
334 pub fn set_properties<Q: Into<Query>>(&self, q: Q, name: Identifier, value: &Json) -> Result<()> {
341 let q = q.into();
342 let mut txn = self.datastore.transaction();
343 let mut output = Vec::with_capacity(q.output_len());
344 unsafe {
345 query(&txn as *const D::Transaction<'_>, &q, &mut output)?;
346 }
347
348 match output.pop().unwrap() {
349 QueryOutputValue::Vertices(vertices) => {
350 txn.set_vertex_properties(vertices.into_iter().map(|v| v.id).collect(), name, value)?;
351 }
352 QueryOutputValue::Edges(edges) => {
353 txn.set_edge_properties(edges, name, value)?;
354 }
355 _ => return Err(Error::OperationOnQuery),
356 }
357 Ok(())
358 }
359
360 pub fn bulk_insert(&self, items: Vec<BulkInsertItem>) -> Result<()> {
365 let mut txn = self.datastore.transaction();
366 txn.bulk_insert(items)
367 }
368
369 pub fn index_property(&self, name: Identifier) -> Result<()> {
375 let mut txn = self.datastore.transaction();
376 txn.index_property(name)
377 }
378}
379
380unsafe fn query<'a, T: Transaction<'a> + 'a>(
381 txn: *const T,
382 q: &Query,
383 output: &mut Vec<QueryOutputValue>,
384) -> Result<()> {
385 let value = match q {
386 Query::AllVertex => {
387 let iter = (*txn).all_vertices()?;
388 QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
389 }
390 Query::RangeVertex(ref q) => {
391 let mut iter: DynIter<Vertex> = if let Some(start_id) = q.start_id {
392 (*txn).range_vertices(start_id)?
393 } else {
394 (*txn).all_vertices()?
395 };
396
397 if let Some(ref t) = q.t {
398 iter = Box::new(iter.filter(move |r| match r {
399 Ok(v) => &v.t == t,
400 Err(_) => true,
401 }));
402 }
403
404 iter = Box::new(iter.take(q.limit as usize));
405 QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
406 }
407 Query::SpecificVertex(ref q) => {
408 let iter = (*txn).specific_vertices(q.ids.clone())?;
409 QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
410 }
411 Query::Pipe(ref q) => {
412 query(txn, &q.inner, output)?;
413 let piped_values = output.pop().unwrap();
414
415 let values = match piped_values {
416 QueryOutputValue::Edges(ref piped_edges) => {
417 let iter: Box<dyn Iterator<Item = Uuid>> = match q.direction {
418 EdgeDirection::Outbound => Box::new(piped_edges.iter().map(|e| e.outbound_id)),
419 EdgeDirection::Inbound => Box::new(piped_edges.iter().map(|e| e.inbound_id)),
420 };
421
422 let mut iter: DynIter<Vertex> = (*txn).specific_vertices(iter.collect())?;
423
424 if let Some(ref t) = q.t {
425 iter = Box::new(iter.filter(move |r| match r {
426 Ok(v) => &v.t == t,
427 Err(_) => true,
428 }));
429 }
430
431 iter = Box::new(iter.take(q.limit as usize));
432
433 QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
434 }
435 QueryOutputValue::Vertices(ref piped_vertices) => {
436 let mut edges = Vec::new();
437
438 for vertex in piped_vertices {
439 let lower_bound = match &q.t {
440 Some(t) => Edge::new(vertex.id, *t, Uuid::default()),
441 None => Edge::new(vertex.id, Identifier::default(), Uuid::default()),
442 };
443
444 let mut iter = if q.direction == EdgeDirection::Outbound {
445 (*txn).range_edges(lower_bound)?
446 } else {
447 (*txn).range_reversed_edges(lower_bound)?
448 };
449
450 iter = Box::new(iter.take_while(move |r| match r {
451 Ok(e) => e.outbound_id == vertex.id,
452 Err(_) => true,
453 }));
454
455 if let Some(ref t) = q.t {
456 iter = Box::new(iter.filter(move |r| match r {
457 Ok(e) => &e.t == t,
458 Err(_) => true,
459 }));
460 }
461
462 if q.direction == EdgeDirection::Inbound {
463 iter = Box::new(iter.map(move |r| Ok(r?.reversed())));
464 }
465
466 iter = Box::new(iter.take((q.limit as usize) - edges.len()));
467
468 for result in iter {
469 edges.push(result?);
470 }
471
472 if edges.len() >= (q.limit as usize) {
473 break;
474 }
475 }
476
477 QueryOutputValue::Edges(edges)
478 }
479 _ => {
480 return Err(Error::OperationOnQuery);
481 }
482 };
483
484 if let Query::Include(_) = *q.inner {
485 output.push(piped_values);
487 }
488
489 values
490 }
491 Query::PipeProperty(ref q) => {
492 query(txn, &q.inner, output)?;
493 let piped_values = output.pop().unwrap();
494
495 let values = match piped_values {
496 QueryOutputValue::Edges(ref piped_edges) => {
497 let mut edge_properties = Vec::with_capacity(piped_edges.len());
498 for edge in piped_edges {
499 let mut props = Vec::new();
500 if let Some(name) = &q.name {
501 if let Some(value) = (*txn).edge_property(edge, *name)? {
502 props.push(NamedProperty::new(*name, value.clone()));
503 }
504 } else {
505 for result in (*txn).all_edge_properties_for_edge(edge)? {
506 let (name, value) = result?;
507 props.push(NamedProperty::new(name, value.clone()));
508 }
509 }
510 if !props.is_empty() {
511 edge_properties.push(EdgeProperties::new(edge.clone(), props));
512 }
513 }
514
515 QueryOutputValue::EdgeProperties(edge_properties)
516 }
517 QueryOutputValue::Vertices(ref piped_vertices) => {
518 let mut vertex_properties = Vec::with_capacity(piped_vertices.len());
519 for vertex in piped_vertices {
520 let mut props = Vec::new();
521 if let Some(name) = &q.name {
522 if let Some(value) = (*txn).vertex_property(vertex, *name)? {
523 props.push(NamedProperty::new(*name, value.clone()));
524 }
525 } else {
526 for result in (*txn).all_vertex_properties_for_vertex(vertex)? {
527 let (name, value) = result?;
528 props.push(NamedProperty::new(name, value.clone()));
529 }
530 }
531 if !props.is_empty() {
532 vertex_properties.push(VertexProperties::new(vertex.clone(), props));
533 }
534 }
535
536 QueryOutputValue::VertexProperties(vertex_properties)
537 }
538 _ => {
539 return Err(Error::OperationOnQuery);
540 }
541 };
542
543 if let Query::Include(_) = *q.inner {
544 output.push(piped_values);
546 }
547
548 values
549 }
550 Query::VertexWithPropertyPresence(ref q) => {
551 if let Some(iter) = (*txn).vertex_ids_with_property(q.name)? {
552 let iter = (*txn).specific_vertices(iter.collect::<Result<Vec<Uuid>>>()?)?;
553 QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
554 } else {
555 return Err(Error::NotIndexed);
556 }
557 }
558 Query::VertexWithPropertyValue(ref q) => {
559 if let Some(iter) = (*txn).vertex_ids_with_property_value(q.name, &q.value)? {
560 let iter = (*txn).specific_vertices(iter.collect::<Result<Vec<Uuid>>>()?)?;
561 QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
562 } else {
563 return Err(Error::NotIndexed);
564 }
565 }
566 Query::EdgeWithPropertyPresence(ref q) => {
567 if let Some(iter) = (*txn).edges_with_property(q.name)? {
568 QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
569 } else {
570 return Err(Error::NotIndexed);
571 }
572 }
573 Query::EdgeWithPropertyValue(ref q) => {
574 if let Some(iter) = (*txn).edges_with_property_value(q.name, &q.value)? {
575 QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
576 } else {
577 return Err(Error::NotIndexed);
578 }
579 }
580 Query::PipeWithPropertyPresence(ref q) => {
581 query(txn, &q.inner, output)?;
582 let piped_values = output.pop().unwrap();
583
584 let values = match piped_values {
585 QueryOutputValue::Edges(ref piped_edges) => {
586 let edges_with_property = match (*txn).edges_with_property(q.name)? {
587 Some(iter) => iter.collect::<Result<HashSet<Edge>>>()?,
588 None => return Err(Error::NotIndexed),
589 };
590 let iter = piped_edges.iter().filter(move |e| {
591 let contains = edges_with_property.contains(e);
592 (q.exists && contains) || (!q.exists && !contains)
593 });
594 QueryOutputValue::Edges(iter.cloned().collect())
595 }
596 QueryOutputValue::Vertices(ref piped_vertices) => {
597 let vertices_with_property = match (*txn).vertex_ids_with_property(q.name)? {
598 Some(iter) => iter.collect::<Result<HashSet<Uuid>>>()?,
599 None => return Err(Error::NotIndexed),
600 };
601 let iter = piped_vertices.iter().filter(move |v| {
602 let contains = vertices_with_property.contains(&v.id);
603 (q.exists && contains) || (!q.exists && !contains)
604 });
605 QueryOutputValue::Vertices(iter.cloned().collect())
606 }
607 _ => {
608 return Err(Error::OperationOnQuery);
609 }
610 };
611
612 if let Query::Include(_) = *q.inner {
613 output.push(piped_values);
615 }
616
617 values
618 }
619 Query::PipeWithPropertyValue(ref q) => {
620 query(txn, &q.inner, output)?;
621 let piped_values = output.pop().unwrap();
622
623 let values = match piped_values {
624 QueryOutputValue::Edges(ref piped_edges) => {
625 let edges = match (*txn).edges_with_property_value(q.name, &q.value)? {
626 Some(iter) => iter.collect::<Result<HashSet<Edge>>>()?,
627 None => return Err(Error::NotIndexed),
628 };
629 let iter = piped_edges.iter().filter(move |e| {
630 let contains = edges.contains(e);
631 (q.equal && contains) || (!q.equal && !contains)
632 });
633 QueryOutputValue::Edges(iter.cloned().collect())
634 }
635 QueryOutputValue::Vertices(ref piped_vertices) => {
636 let vertex_ids = match (*txn).vertex_ids_with_property_value(q.name, &q.value)? {
637 Some(iter) => iter.collect::<Result<HashSet<Uuid>>>()?,
638 None => return Err(Error::NotIndexed),
639 };
640 let iter = piped_vertices.iter().filter(move |v| {
641 let contains = vertex_ids.contains(&v.id);
642 (q.equal && contains) || (!q.equal && !contains)
643 });
644 QueryOutputValue::Vertices(iter.cloned().collect())
645 }
646 _ => {
647 return Err(Error::OperationOnQuery);
648 }
649 };
650
651 if let Query::Include(_) = *q.inner {
652 output.push(piped_values);
654 }
655
656 values
657 }
658 Query::AllEdge => {
659 let iter = (*txn).all_edges()?;
660 QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
661 }
662 Query::SpecificEdge(ref q) => {
663 let iter = (*txn).specific_edges(q.edges.clone())?;
664 QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
665 }
666 Query::Include(ref q) => {
667 query(txn, &q.inner, output)?;
668 output.pop().unwrap()
669 }
670 Query::Count(ref q) => {
671 let count = match &*q.inner {
672 Query::AllVertex => (*txn).vertex_count(),
674 Query::AllEdge => (*txn).edge_count(),
675 q => {
676 query(txn, q, output)?;
677 let piped_values = output.pop().unwrap();
678 let len = match piped_values {
679 QueryOutputValue::Vertices(ref v) => v.len(),
680 QueryOutputValue::Edges(ref e) => e.len(),
681 QueryOutputValue::VertexProperties(ref p) => p.len(),
682 QueryOutputValue::EdgeProperties(ref p) => p.len(),
683 _ => return Err(Error::OperationOnQuery),
684 };
685 if let Query::Include(_) = q {
686 output.push(piped_values);
688 }
689 len as u64
690 }
691 };
692 QueryOutputValue::Count(count)
693 }
694 };
695
696 output.push(value);
697 Ok(())
698}