indradb/
database.rs

1use crate::errors::{Error, Result};
2use crate::models::{
3    BulkInsertItem, Edge, EdgeDirection, EdgeProperties, Identifier, Json, NamedProperty, Query, QueryOutputValue,
4    Vertex, VertexProperties,
5};
6use std::collections::HashSet;
7use std::vec::Vec;
8use uuid::Uuid;
9
10/// A dynamic iterator over results, which are commonly employed as
11/// transaction return types.
12pub type DynIter<'a, T> = Box<dyn Iterator<Item = Result<T>> + 'a>;
13
14/// Specifies a datastore transaction, which contains nearly all of the
15/// datastore implementation-specific logic.
16///
17/// Note that this trait and its members purposefully do not employ any
18/// generic arguments. While that would improve ergonomics, it would remove
19/// object safety, which we need for plugins.
20///
21/// # Errors
22/// Nearly all methods may return an error if something unexpected happens -
23/// e.g. if there was a problem connecting to the underlying database.
24pub trait Transaction<'a> {
25    /// Gets the number of vertices.
26    fn vertex_count(&self) -> u64;
27    /// Returns all vertices.
28    fn all_vertices(&'a self) -> Result<DynIter<'a, Vertex>>;
29    /// Returns all vertices with `id >= offset`.
30    ///
31    /// # Arguments
32    /// * `offset` - Only fetch vertices with an offset greater than or equal
33    ///   to this value.
34    fn range_vertices(&'a self, offset: Uuid) -> Result<DynIter<'a, Vertex>>;
35    /// Gets a specific set of vertices with the given IDs.
36    fn specific_vertices(&'a self, ids: Vec<Uuid>) -> Result<DynIter<'a, Vertex>>;
37    /// Get all vertices with a given property.
38    ///
39    /// # Arguments
40    /// * `name` - The property name.
41    fn vertex_ids_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Uuid>>>;
42    /// Get all vertices with a given property value.
43    ///
44    /// # Arguments
45    /// * `name` - The property name.
46    /// * `value` - The property value.
47    fn vertex_ids_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Uuid>>>;
48
49    /// Gets the number of edges.
50    fn edge_count(&self) -> u64;
51    /// Returns all edges.
52    fn all_edges(&'a self) -> Result<DynIter<'a, Edge>>;
53    /// Returns all edges with that are greater than or equal to `offset`.
54    ///
55    /// # Arguments
56    /// * `offset` - Only fetch edges greater than or equal to this value.
57    fn range_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>>;
58    /// Returns all reversed edges (where the outbound and inbound IDs are
59    /// reversed from their actual values) that are greater than or equal
60    /// to `offset`.
61    ///
62    /// # Arguments
63    /// * `offset` - Only fetch edges greater than or equal to this value.
64    fn range_reversed_edges(&'a self, offset: Edge) -> Result<DynIter<'a, Edge>>;
65    /// Gets a specific set of edges.
66    ///
67    /// # Arguments
68    /// * `edges` - The edges to get.
69    fn specific_edges(&'a self, edges: Vec<Edge>) -> Result<DynIter<'a, Edge>>;
70    /// Get all edges with a given property.
71    ///
72    /// # Arguments
73    /// * `name` - The property name.
74    fn edges_with_property(&'a self, name: Identifier) -> Result<Option<DynIter<'a, Edge>>>;
75    /// Get all edges with a given property value.
76    ///
77    /// # Arguments
78    /// * `name` - The property name.
79    /// * `value` - The property value.
80    fn edges_with_property_value(&'a self, name: Identifier, value: &Json) -> Result<Option<DynIter<'a, Edge>>>;
81
82    /// Gets the value of a vertex property if it exists, or `None` otherwise.
83    ///
84    /// # Arguments
85    /// * `vertex` - The vertex.
86    /// * `name` - The property name.
87    fn vertex_property(&self, vertex: &Vertex, name: Identifier) -> Result<Option<Json>>;
88    /// Gets all vertex properties for a given vertex.
89    ///
90    /// # Arguments
91    /// * `vertex` - The vertex.
92    fn all_vertex_properties_for_vertex(&'a self, vertex: &Vertex) -> Result<DynIter<'a, (Identifier, Json)>>;
93
94    /// Gets the value of an edge property if it exists, or `None` otherwise.
95    ///
96    /// # Arguments
97    /// * `edge` - The edge.
98    /// * `name` - The property name.
99    fn edge_property(&self, edge: &Edge, name: Identifier) -> Result<Option<Json>>;
100    /// Gets all edge properties for a given edges.
101    ///
102    /// # Arguments
103    /// * `edge` - The edge.
104    fn all_edge_properties_for_edge(&'a self, edge: &Edge) -> Result<DynIter<'a, (Identifier, Json)>>;
105
106    /// Deletes the given vertices.
107    ///
108    /// # Arguments
109    /// * `vertices` - The vertices to delete.
110    fn delete_vertices(&mut self, vertices: Vec<Vertex>) -> Result<()>;
111    /// Deletes the given edges.
112    ///
113    /// # Arguments
114    /// * `edges` - The edges to delete.
115    fn delete_edges(&mut self, edges: Vec<Edge>) -> Result<()>;
116    /// Deletes the given vertex properties.
117    ///
118    /// # Arguments
119    /// * `props` - The vertex properties to delete.
120    fn delete_vertex_properties(&mut self, props: Vec<(Uuid, Identifier)>) -> Result<()>;
121    /// Deletes the given edge properties.
122    ///
123    /// # Arguments
124    /// * `props` - The edge properties to delete.
125    fn delete_edge_properties(&mut self, props: Vec<(Edge, Identifier)>) -> Result<()>;
126
127    /// Syncs persisted content. By default, this errors out, but this can be
128    /// overridden in datastores that support syncing.
129    fn sync(&self) -> Result<()> {
130        Err(Error::Unsupported)
131    }
132
133    /// Creates a new vertex. Returns whether the vertex was successfully
134    /// created - if this is false, it's because a vertex with the same UUID
135    /// already exists.
136    ///
137    /// # Arguments
138    /// * `vertex`: The vertex to create.
139    fn create_vertex(&mut self, vertex: &Vertex) -> Result<bool>;
140    /// Creates a new edge. Returns whether the edge was successfully
141    /// created - if this is false, it's because one of the specified vertices
142    /// is missing.
143    ///
144    /// # Arguments
145    /// * `edge`: The edge to create.
146    fn create_edge(&mut self, edge: &Edge) -> Result<bool>;
147
148    /// Bulk inserts many vertices, edges, and/or properties. By default, this
149    /// makes the underlying calls to insert the values, but can be overridden
150    /// to offer a more efficient implementation.
151    ///
152    /// # Arguments
153    /// * `items`: The items to insert.
154    fn bulk_insert(&mut self, items: Vec<BulkInsertItem>) -> Result<()> {
155        for item in items {
156            match item {
157                BulkInsertItem::Vertex(vertex) => {
158                    self.create_vertex(&vertex)?;
159                }
160                BulkInsertItem::Edge(edge) => {
161                    self.create_edge(&edge)?;
162                }
163                BulkInsertItem::VertexProperty(id, name, value) => {
164                    self.set_vertex_properties(vec![id], name, &value)?;
165                }
166                BulkInsertItem::EdgeProperty(edge, name, value) => {
167                    self.set_edge_properties(vec![edge], name, &value)?;
168                }
169            }
170        }
171
172        Ok(())
173    }
174
175    /// Enables indexing on a specified property. When indexing is enabled on a
176    /// property, it's possible to query on its presence and values.
177    ///
178    /// # Arguments
179    /// * `name`: The name of the property to index.
180    fn index_property(&mut self, name: Identifier) -> Result<()>;
181
182    /// Sets vertex properties.
183    ///
184    /// # Arguments
185    /// * `vertices`: The vertices to set the properties on.
186    /// * `name`: The property name.
187    /// * `value`: The property value.
188    fn set_vertex_properties(&mut self, vertices: Vec<Uuid>, name: Identifier, value: &Json) -> Result<()>;
189    /// Sets edge properties.
190    ///
191    /// # Arguments
192    /// * `edges`: The edges to set the properties on.
193    /// * `name`: The property name.
194    /// * `value`: The property value.
195    fn set_edge_properties(&mut self, edges: Vec<Edge>, name: Identifier, value: &Json) -> Result<()>;
196}
197
198/// Specifies a datastore, which provides datastore transaction
199/// implementations to the database.
200pub trait Datastore {
201    /// The datastore transaction type.
202    type Transaction<'a>: Transaction<'a>
203    where
204        Self: 'a;
205    /// Creates a new transaction.
206    fn transaction(&self) -> Self::Transaction<'_>;
207}
208
209/// The IndraDB database.
210///
211/// This contains all of the logic shared across implementations, e.g. query
212/// handling. Underlying it (as a generic argument) are datastores, which
213/// contain implementation-specific logic.
214///
215/// As an IndraDB end-user, you should interact with this rather than
216/// datastores.
217pub struct Database<D: Datastore> {
218    pub datastore: D,
219}
220
221impl<D: Datastore> Database<D> {
222    /// Creates a new database.
223    ///
224    /// # Arguments
225    /// * `datastore`: The underlying datastore to use.
226    pub fn new(datastore: D) -> Database<D> {
227        Self { datastore }
228    }
229
230    /// Syncs persisted content. Depending on the datastore implementation,
231    /// this has different meanings - including potentially being a no-op.
232    pub fn sync(&self) -> Result<()> {
233        let txn = self.datastore.transaction();
234        txn.sync()
235    }
236
237    /// Creates a new vertex. Returns whether the vertex was successfully
238    /// created - if this is false, it's because a vertex with the same UUID
239    /// already exists.
240    ///
241    /// # Arguments
242    /// * `vertex`: The vertex to create.
243    pub fn create_vertex(&self, vertex: &Vertex) -> Result<bool> {
244        let mut txn = self.datastore.transaction();
245        txn.create_vertex(vertex)
246    }
247
248    /// Creates a new vertex with just a type specification. As opposed to
249    /// `create_vertex`, this is used when you do not want to manually specify
250    /// the vertex's UUID. Returns the new vertex's UUID.
251    ///
252    /// # Arguments
253    /// * `t`: The type of the vertex to create.
254    pub fn create_vertex_from_type(&self, t: Identifier) -> Result<Uuid> {
255        let v = Vertex::new(t);
256
257        if !self.create_vertex(&v)? {
258            Err(Error::UuidTaken)
259        } else {
260            Ok(v.id)
261        }
262    }
263
264    /// Creates a new edge. Returns whether the edge was successfully
265    /// created - if this is false, it's because one of the specified vertices
266    /// is missing.
267    ///
268    /// # Arguments
269    /// * `edge`: The edge to create.
270    pub fn create_edge(&self, edge: &Edge) -> Result<bool> {
271        let mut txn = self.datastore.transaction();
272        txn.create_edge(edge)
273    }
274
275    /// Gets values specified by a query.
276    ///
277    /// # Arguments
278    /// * `q`: The query to run.
279    pub fn get<Q: Into<Query>>(&self, q: Q) -> Result<Vec<QueryOutputValue>> {
280        let q = q.into();
281        let txn = self.datastore.transaction();
282        let mut output = Vec::with_capacity(q.output_len());
283        unsafe {
284            query(&txn as *const D::Transaction<'_>, &q, &mut output)?;
285        }
286        Ok(output)
287    }
288
289    /// Deletes values specified by a query.
290    ///
291    /// # Arguments
292    /// * `q`: The query to run.
293    pub fn delete<Q: Into<Query>>(&self, q: Q) -> Result<()> {
294        let q = q.into();
295        let mut txn = self.datastore.transaction();
296        let mut output = Vec::with_capacity(q.output_len());
297        unsafe {
298            query(&txn as *const D::Transaction<'_>, &q, &mut output)?;
299        }
300        match output.pop().unwrap() {
301            QueryOutputValue::Vertices(vertices) => {
302                txn.delete_vertices(vertices)?;
303            }
304            QueryOutputValue::Edges(edges) => {
305                txn.delete_edges(edges)?;
306            }
307            QueryOutputValue::VertexProperties(vertex_properties) => {
308                txn.delete_vertex_properties(
309                    vertex_properties
310                        .into_iter()
311                        .flat_map(|vps| {
312                            let iter = vps.props.iter().map(move |vp| (vps.vertex.id, vp.name));
313                            iter.collect::<Vec<(Uuid, Identifier)>>()
314                        })
315                        .collect(),
316                )?;
317            }
318            QueryOutputValue::EdgeProperties(edge_properties) => {
319                txn.delete_edge_properties(
320                    edge_properties
321                        .into_iter()
322                        .flat_map(|eps| {
323                            let iter = eps.props.iter().map(move |ep| (eps.edge.clone(), ep.name));
324                            iter.collect::<Vec<(Edge, Identifier)>>()
325                        })
326                        .collect(),
327                )?;
328            }
329            QueryOutputValue::Count(_) => return Err(Error::OperationOnQuery),
330        }
331        Ok(())
332    }
333
334    /// Sets properties.
335    ///
336    /// # Arguments
337    /// * `q`: The query to run.
338    /// * `name`: The property name.
339    /// * `value`: The property value.
340    pub fn set_properties<Q: Into<Query>>(&self, q: Q, name: Identifier, value: &Json) -> Result<()> {
341        let q = q.into();
342        let mut txn = self.datastore.transaction();
343        let mut output = Vec::with_capacity(q.output_len());
344        unsafe {
345            query(&txn as *const D::Transaction<'_>, &q, &mut output)?;
346        }
347
348        match output.pop().unwrap() {
349            QueryOutputValue::Vertices(vertices) => {
350                txn.set_vertex_properties(vertices.into_iter().map(|v| v.id).collect(), name, value)?;
351            }
352            QueryOutputValue::Edges(edges) => {
353                txn.set_edge_properties(edges, name, value)?;
354            }
355            _ => return Err(Error::OperationOnQuery),
356        }
357        Ok(())
358    }
359
360    /// Bulk inserts many vertices, edges, and/or properties.
361    ///
362    /// # Arguments
363    /// * `items`: The items to insert.
364    pub fn bulk_insert(&self, items: Vec<BulkInsertItem>) -> Result<()> {
365        let mut txn = self.datastore.transaction();
366        txn.bulk_insert(items)
367    }
368
369    /// Enables indexing on a specified property. When indexing is enabled on a
370    /// property, it's possible to query on its presence and values.
371    ///
372    /// # Arguments
373    /// * `name`: The name of the property to index.
374    pub fn index_property(&self, name: Identifier) -> Result<()> {
375        let mut txn = self.datastore.transaction();
376        txn.index_property(name)
377    }
378}
379
380unsafe fn query<'a, T: Transaction<'a> + 'a>(
381    txn: *const T,
382    q: &Query,
383    output: &mut Vec<QueryOutputValue>,
384) -> Result<()> {
385    let value = match q {
386        Query::AllVertex => {
387            let iter = (*txn).all_vertices()?;
388            QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
389        }
390        Query::RangeVertex(ref q) => {
391            let mut iter: DynIter<Vertex> = if let Some(start_id) = q.start_id {
392                (*txn).range_vertices(start_id)?
393            } else {
394                (*txn).all_vertices()?
395            };
396
397            if let Some(ref t) = q.t {
398                iter = Box::new(iter.filter(move |r| match r {
399                    Ok(v) => &v.t == t,
400                    Err(_) => true,
401                }));
402            }
403
404            iter = Box::new(iter.take(q.limit as usize));
405            QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
406        }
407        Query::SpecificVertex(ref q) => {
408            let iter = (*txn).specific_vertices(q.ids.clone())?;
409            QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
410        }
411        Query::Pipe(ref q) => {
412            query(txn, &q.inner, output)?;
413            let piped_values = output.pop().unwrap();
414
415            let values = match piped_values {
416                QueryOutputValue::Edges(ref piped_edges) => {
417                    let iter: Box<dyn Iterator<Item = Uuid>> = match q.direction {
418                        EdgeDirection::Outbound => Box::new(piped_edges.iter().map(|e| e.outbound_id)),
419                        EdgeDirection::Inbound => Box::new(piped_edges.iter().map(|e| e.inbound_id)),
420                    };
421
422                    let mut iter: DynIter<Vertex> = (*txn).specific_vertices(iter.collect())?;
423
424                    if let Some(ref t) = q.t {
425                        iter = Box::new(iter.filter(move |r| match r {
426                            Ok(v) => &v.t == t,
427                            Err(_) => true,
428                        }));
429                    }
430
431                    iter = Box::new(iter.take(q.limit as usize));
432
433                    QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
434                }
435                QueryOutputValue::Vertices(ref piped_vertices) => {
436                    let mut edges = Vec::new();
437
438                    for vertex in piped_vertices {
439                        let lower_bound = match &q.t {
440                            Some(t) => Edge::new(vertex.id, *t, Uuid::default()),
441                            None => Edge::new(vertex.id, Identifier::default(), Uuid::default()),
442                        };
443
444                        let mut iter = if q.direction == EdgeDirection::Outbound {
445                            (*txn).range_edges(lower_bound)?
446                        } else {
447                            (*txn).range_reversed_edges(lower_bound)?
448                        };
449
450                        iter = Box::new(iter.take_while(move |r| match r {
451                            Ok(e) => e.outbound_id == vertex.id,
452                            Err(_) => true,
453                        }));
454
455                        if let Some(ref t) = q.t {
456                            iter = Box::new(iter.filter(move |r| match r {
457                                Ok(e) => &e.t == t,
458                                Err(_) => true,
459                            }));
460                        }
461
462                        if q.direction == EdgeDirection::Inbound {
463                            iter = Box::new(iter.map(move |r| Ok(r?.reversed())));
464                        }
465
466                        iter = Box::new(iter.take((q.limit as usize) - edges.len()));
467
468                        for result in iter {
469                            edges.push(result?);
470                        }
471
472                        if edges.len() >= (q.limit as usize) {
473                            break;
474                        }
475                    }
476
477                    QueryOutputValue::Edges(edges)
478                }
479                _ => {
480                    return Err(Error::OperationOnQuery);
481                }
482            };
483
484            if let Query::Include(_) = *q.inner {
485                // keep the value exported
486                output.push(piped_values);
487            }
488
489            values
490        }
491        Query::PipeProperty(ref q) => {
492            query(txn, &q.inner, output)?;
493            let piped_values = output.pop().unwrap();
494
495            let values = match piped_values {
496                QueryOutputValue::Edges(ref piped_edges) => {
497                    let mut edge_properties = Vec::with_capacity(piped_edges.len());
498                    for edge in piped_edges {
499                        let mut props = Vec::new();
500                        if let Some(name) = &q.name {
501                            if let Some(value) = (*txn).edge_property(edge, *name)? {
502                                props.push(NamedProperty::new(*name, value.clone()));
503                            }
504                        } else {
505                            for result in (*txn).all_edge_properties_for_edge(edge)? {
506                                let (name, value) = result?;
507                                props.push(NamedProperty::new(name, value.clone()));
508                            }
509                        }
510                        if !props.is_empty() {
511                            edge_properties.push(EdgeProperties::new(edge.clone(), props));
512                        }
513                    }
514
515                    QueryOutputValue::EdgeProperties(edge_properties)
516                }
517                QueryOutputValue::Vertices(ref piped_vertices) => {
518                    let mut vertex_properties = Vec::with_capacity(piped_vertices.len());
519                    for vertex in piped_vertices {
520                        let mut props = Vec::new();
521                        if let Some(name) = &q.name {
522                            if let Some(value) = (*txn).vertex_property(vertex, *name)? {
523                                props.push(NamedProperty::new(*name, value.clone()));
524                            }
525                        } else {
526                            for result in (*txn).all_vertex_properties_for_vertex(vertex)? {
527                                let (name, value) = result?;
528                                props.push(NamedProperty::new(name, value.clone()));
529                            }
530                        }
531                        if !props.is_empty() {
532                            vertex_properties.push(VertexProperties::new(vertex.clone(), props));
533                        }
534                    }
535
536                    QueryOutputValue::VertexProperties(vertex_properties)
537                }
538                _ => {
539                    return Err(Error::OperationOnQuery);
540                }
541            };
542
543            if let Query::Include(_) = *q.inner {
544                // keep the value exported
545                output.push(piped_values);
546            }
547
548            values
549        }
550        Query::VertexWithPropertyPresence(ref q) => {
551            if let Some(iter) = (*txn).vertex_ids_with_property(q.name)? {
552                let iter = (*txn).specific_vertices(iter.collect::<Result<Vec<Uuid>>>()?)?;
553                QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
554            } else {
555                return Err(Error::NotIndexed);
556            }
557        }
558        Query::VertexWithPropertyValue(ref q) => {
559            if let Some(iter) = (*txn).vertex_ids_with_property_value(q.name, &q.value)? {
560                let iter = (*txn).specific_vertices(iter.collect::<Result<Vec<Uuid>>>()?)?;
561                QueryOutputValue::Vertices(iter.collect::<Result<Vec<Vertex>>>()?)
562            } else {
563                return Err(Error::NotIndexed);
564            }
565        }
566        Query::EdgeWithPropertyPresence(ref q) => {
567            if let Some(iter) = (*txn).edges_with_property(q.name)? {
568                QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
569            } else {
570                return Err(Error::NotIndexed);
571            }
572        }
573        Query::EdgeWithPropertyValue(ref q) => {
574            if let Some(iter) = (*txn).edges_with_property_value(q.name, &q.value)? {
575                QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
576            } else {
577                return Err(Error::NotIndexed);
578            }
579        }
580        Query::PipeWithPropertyPresence(ref q) => {
581            query(txn, &q.inner, output)?;
582            let piped_values = output.pop().unwrap();
583
584            let values = match piped_values {
585                QueryOutputValue::Edges(ref piped_edges) => {
586                    let edges_with_property = match (*txn).edges_with_property(q.name)? {
587                        Some(iter) => iter.collect::<Result<HashSet<Edge>>>()?,
588                        None => return Err(Error::NotIndexed),
589                    };
590                    let iter = piped_edges.iter().filter(move |e| {
591                        let contains = edges_with_property.contains(e);
592                        (q.exists && contains) || (!q.exists && !contains)
593                    });
594                    QueryOutputValue::Edges(iter.cloned().collect())
595                }
596                QueryOutputValue::Vertices(ref piped_vertices) => {
597                    let vertices_with_property = match (*txn).vertex_ids_with_property(q.name)? {
598                        Some(iter) => iter.collect::<Result<HashSet<Uuid>>>()?,
599                        None => return Err(Error::NotIndexed),
600                    };
601                    let iter = piped_vertices.iter().filter(move |v| {
602                        let contains = vertices_with_property.contains(&v.id);
603                        (q.exists && contains) || (!q.exists && !contains)
604                    });
605                    QueryOutputValue::Vertices(iter.cloned().collect())
606                }
607                _ => {
608                    return Err(Error::OperationOnQuery);
609                }
610            };
611
612            if let Query::Include(_) = *q.inner {
613                // keep the value exported
614                output.push(piped_values);
615            }
616
617            values
618        }
619        Query::PipeWithPropertyValue(ref q) => {
620            query(txn, &q.inner, output)?;
621            let piped_values = output.pop().unwrap();
622
623            let values = match piped_values {
624                QueryOutputValue::Edges(ref piped_edges) => {
625                    let edges = match (*txn).edges_with_property_value(q.name, &q.value)? {
626                        Some(iter) => iter.collect::<Result<HashSet<Edge>>>()?,
627                        None => return Err(Error::NotIndexed),
628                    };
629                    let iter = piped_edges.iter().filter(move |e| {
630                        let contains = edges.contains(e);
631                        (q.equal && contains) || (!q.equal && !contains)
632                    });
633                    QueryOutputValue::Edges(iter.cloned().collect())
634                }
635                QueryOutputValue::Vertices(ref piped_vertices) => {
636                    let vertex_ids = match (*txn).vertex_ids_with_property_value(q.name, &q.value)? {
637                        Some(iter) => iter.collect::<Result<HashSet<Uuid>>>()?,
638                        None => return Err(Error::NotIndexed),
639                    };
640                    let iter = piped_vertices.iter().filter(move |v| {
641                        let contains = vertex_ids.contains(&v.id);
642                        (q.equal && contains) || (!q.equal && !contains)
643                    });
644                    QueryOutputValue::Vertices(iter.cloned().collect())
645                }
646                _ => {
647                    return Err(Error::OperationOnQuery);
648                }
649            };
650
651            if let Query::Include(_) = *q.inner {
652                // keep the value exported
653                output.push(piped_values);
654            }
655
656            values
657        }
658        Query::AllEdge => {
659            let iter = (*txn).all_edges()?;
660            QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
661        }
662        Query::SpecificEdge(ref q) => {
663            let iter = (*txn).specific_edges(q.edges.clone())?;
664            QueryOutputValue::Edges(iter.collect::<Result<Vec<Edge>>>()?)
665        }
666        Query::Include(ref q) => {
667            query(txn, &q.inner, output)?;
668            output.pop().unwrap()
669        }
670        Query::Count(ref q) => {
671            let count = match &*q.inner {
672                // These paths are optimized
673                Query::AllVertex => (*txn).vertex_count(),
674                Query::AllEdge => (*txn).edge_count(),
675                q => {
676                    query(txn, q, output)?;
677                    let piped_values = output.pop().unwrap();
678                    let len = match piped_values {
679                        QueryOutputValue::Vertices(ref v) => v.len(),
680                        QueryOutputValue::Edges(ref e) => e.len(),
681                        QueryOutputValue::VertexProperties(ref p) => p.len(),
682                        QueryOutputValue::EdgeProperties(ref p) => p.len(),
683                        _ => return Err(Error::OperationOnQuery),
684                    };
685                    if let Query::Include(_) = q {
686                        // keep the value exported
687                        output.push(piped_values);
688                    }
689                    len as u64
690                }
691            };
692            QueryOutputValue::Count(count)
693        }
694    };
695
696    output.push(value);
697    Ok(())
698}