1use std::path::Path;
2use std::sync::Arc;
3use std::{u64, usize};
4
5use super::errors::map_err;
6use super::managers::*;
7
8use chrono::offset::Utc;
9use indradb::util::next_uuid;
10use indradb::{
11 BulkInsertItem, Datastore, Edge, EdgeDirection, EdgeKey, EdgeProperties, EdgeProperty, EdgePropertyQuery,
12 EdgeQuery, NamedProperty, Result, Transaction, Type, Vertex, VertexProperties, VertexProperty, VertexPropertyQuery,
13 VertexQuery,
14};
15use serde_json::Value as JsonValue;
16use sled::{Config, Db, Tree};
17use uuid::Uuid;
18
19#[derive(Copy, Clone, Default, Debug)]
20pub struct SledConfig {
21 use_compression: bool,
22 compression_factor: Option<i32>,
23}
24
25impl SledConfig {
26 pub fn with_compression(factor: Option<i32>) -> SledConfig {
32 SledConfig {
33 use_compression: true,
34 compression_factor: factor,
35 }
36 }
37
38 pub fn open<P: AsRef<Path>>(self, path: P) -> Result<SledDatastore> {
40 Ok(SledDatastore {
41 holder: Arc::new(SledHolder::new(path, self)?),
42 })
43 }
44}
45
46pub struct SledHolder {
48 pub(crate) db: Arc<Db>, pub(crate) edges: Tree,
50 pub(crate) edge_ranges: Tree,
51 pub(crate) reversed_edge_ranges: Tree,
52 pub(crate) vertex_properties: Tree,
53 pub(crate) edge_properties: Tree,
54}
55
56impl<'ds> SledHolder {
57 pub fn new<P: AsRef<Path>>(path: P, opts: SledConfig) -> Result<SledHolder> {
63 let mut config = Config::default().path(path);
64
65 if opts.use_compression {
66 config = config.use_compression(true);
67 }
68
69 if let Some(compression_factor) = opts.compression_factor {
70 config = config.compression_factor(compression_factor);
71 }
72
73 let db = map_err(config.open())?;
74
75 Ok(SledHolder {
76 edges: map_err(db.open_tree("edges"))?,
77 edge_ranges: map_err(db.open_tree("edge_ranges"))?,
78 reversed_edge_ranges: map_err(db.open_tree("reversed_edge_ranges"))?,
79 vertex_properties: map_err(db.open_tree("vertex_properties"))?,
80 edge_properties: map_err(db.open_tree("edge_properties"))?,
81 db: Arc::new(db),
82 })
83 }
84}
85
86pub struct SledDatastore {
88 pub(crate) holder: Arc<SledHolder>,
89}
90
91impl<'ds> SledDatastore {
92 pub fn new<P: AsRef<Path>>(path: P) -> Result<SledDatastore> {
97 Ok(SledDatastore {
98 holder: Arc::new(SledHolder::new(path, SledConfig::default())?),
99 })
100 }
101}
102
103impl Datastore for SledDatastore {
104 type Trans = SledTransaction;
105
106 fn sync(&self) -> Result<()> {
107 let holder = self.holder.clone();
108 let db = holder.db.clone();
109 map_err(db.flush())?;
110 Ok(())
111 }
112
113 fn transaction(&self) -> Result<Self::Trans> {
114 Ok(SledTransaction::new(self.holder.clone()))
115 }
116
117 fn bulk_insert<I>(&self, items: I) -> Result<()>
118 where
119 I: Iterator<Item = BulkInsertItem>,
120 {
121 let vertex_manager = VertexManager::new(&self.holder);
122 let edge_manager = EdgeManager::new(&self.holder);
123 let vertex_property_manager = VertexPropertyManager::new(&self.holder.vertex_properties);
124 let edge_property_manager = EdgePropertyManager::new(&self.holder.edge_properties);
125
126 for item in items {
127 match item {
128 BulkInsertItem::Vertex(ref vertex) => {
129 vertex_manager.create(vertex)?;
130 }
131 BulkInsertItem::Edge(ref key) => {
132 edge_manager.set(key.outbound_id, &key.t, key.inbound_id, Utc::now())?;
133 }
134 BulkInsertItem::VertexProperty(id, ref name, ref value) => {
135 vertex_property_manager.set(id, name, value)?;
136 }
137 BulkInsertItem::EdgeProperty(ref key, ref name, ref value) => {
138 edge_property_manager.set(key.outbound_id, &key.t, key.inbound_id, name, value)?;
139 }
140 }
141 }
142
143 map_err(self.holder.db.flush())?;
144 Ok(())
145 }
146}
147
148pub struct SledTransaction {
150 holder: Arc<SledHolder>,
151}
152
153impl SledTransaction {
154 fn new(holder: Arc<SledHolder>) -> Self {
155 SledTransaction { holder }
156 }
157
158 #[allow(clippy::needless_collect)]
159 fn vertex_query_to_iterator<'iter, 'trans: 'iter>(
160 &'trans self,
161 q: VertexQuery,
162 ) -> Result<Box<dyn Iterator<Item = Result<VertexItem>> + 'iter>> {
163 match q {
164 VertexQuery::Range(q) => {
165 let vertex_manager = VertexManager::new(&self.holder);
166
167 let next_uuid = match q.start_id {
168 Some(start_id) => {
169 match next_uuid(start_id) {
170 Ok(next_uuid) => next_uuid,
171 Err(_) => return Ok(Box::new(vec![].into_iter())),
177 }
178 }
179 None => Uuid::default(),
180 };
181
182 let mut iter: Box<dyn Iterator<Item = Result<VertexItem>>> =
183 Box::new(vertex_manager.iterate_for_range(next_uuid));
184
185 if let Some(ref t) = q.t {
186 iter = Box::new(iter.filter(move |item| match item {
187 Ok((_, v)) => v == t,
188 Err(_) => true,
189 }));
190 }
191
192 let results: Vec<Result<VertexItem>> = iter.take(q.limit as usize).collect();
193 Ok(Box::new(results.into_iter()))
194 }
195 VertexQuery::Specific(q) => {
196 let vertex_manager = VertexManager::new(&self.holder);
197
198 let iter = q.ids.into_iter().map(move |id| match vertex_manager.get(id)? {
199 Some(value) => Ok(Some((id, value))),
200 None => Ok(None),
201 });
202
203 Ok(Box::new(remove_nones_from_iterator(iter)))
204 }
205 VertexQuery::Pipe(q) => {
206 let vertex_manager = VertexManager::new(&self.holder);
207 let edge_iterator = self.edge_query_to_iterator(*q.inner)?;
208 let direction = q.direction;
209
210 let iter = edge_iterator.map(move |item| {
211 let (outbound_id, _, _, inbound_id) = item?;
212
213 let id = match direction {
214 EdgeDirection::Outbound => outbound_id,
215 EdgeDirection::Inbound => inbound_id,
216 };
217
218 match vertex_manager.get(id)? {
219 Some(value) => Ok(Some((id, value))),
220 None => Ok(None),
221 }
222 });
223
224 let mut iter: Box<dyn Iterator<Item = Result<VertexItem>>> = Box::new(remove_nones_from_iterator(iter));
225
226 if let Some(ref t) = q.t {
227 iter = Box::new(iter.filter(move |item| match item {
228 Ok((_, v)) => v == t,
229 Err(_) => true,
230 }));
231 }
232
233 let results: Vec<Result<VertexItem>> = iter.take(q.limit as usize).collect();
234 Ok(Box::new(results.into_iter()))
235 }
236 }
237 }
238
239 fn edge_query_to_iterator<'iter, 'trans: 'iter>(
240 &'trans self,
241 q: EdgeQuery,
242 ) -> Result<Box<dyn Iterator<Item = Result<EdgeRangeItem>> + 'iter>> {
243 match q {
244 EdgeQuery::Specific(q) => {
245 let edge_manager = EdgeManager::new(&self.holder);
246
247 let edges = q.keys.into_iter().map(move |key| {
248 match edge_manager.get(key.outbound_id, &key.t, key.inbound_id)? {
249 Some(update_datetime) => {
250 Ok(Some((key.outbound_id, key.t.clone(), update_datetime, key.inbound_id)))
251 }
252 None => Ok(None),
253 }
254 });
255
256 let iterator = remove_nones_from_iterator(edges);
257 Ok(Box::new(iterator))
258 }
259 EdgeQuery::Pipe(q) => {
260 let vertex_iterator = self.vertex_query_to_iterator(*q.inner)?;
261
262 let edge_range_manager = match q.direction {
263 EdgeDirection::Outbound => EdgeRangeManager::new(&self.holder),
264 EdgeDirection::Inbound => EdgeRangeManager::new_reversed(&self.holder),
265 };
266
267 let mut edges: Vec<Result<EdgeRangeItem>> = Vec::new();
273
274 for item in vertex_iterator {
275 let (id, _) = item?;
276 let edge_iterator = edge_range_manager.iterate_for_range(id, q.t.as_ref(), q.high)?;
277
278 for item in edge_iterator {
279 match item {
280 Ok((
281 edge_range_first_id,
282 edge_range_t,
283 edge_range_update_datetime,
284 edge_range_second_id,
285 )) => {
286 if let Some(low) = q.low {
287 if edge_range_update_datetime < low {
288 break;
289 }
290 }
291
292 edges.push(match q.direction {
293 EdgeDirection::Outbound => Ok((
294 edge_range_first_id,
295 edge_range_t,
296 edge_range_update_datetime,
297 edge_range_second_id,
298 )),
299 EdgeDirection::Inbound => Ok((
300 edge_range_second_id,
301 edge_range_t,
302 edge_range_update_datetime,
303 edge_range_first_id,
304 )),
305 })
306 }
307 Err(_) => edges.push(item),
308 }
309
310 if edges.len() == q.limit as usize {
311 break;
312 }
313 }
314 }
315
316 Ok(Box::new(edges.into_iter()))
317 }
318 }
319 }
320}
321
322impl Transaction for SledTransaction {
323 fn create_vertex(&self, vertex: &Vertex) -> Result<bool> {
324 let vertex_manager = VertexManager::new(&self.holder);
325
326 if vertex_manager.exists(vertex.id)? {
327 Ok(false)
328 } else {
329 vertex_manager.create(vertex)?;
330 Ok(true)
331 }
332 }
333
334 fn get_vertices<Q: Into<VertexQuery>>(&self, q: Q) -> Result<Vec<Vertex>> {
335 let iterator = self.vertex_query_to_iterator(q.into())?;
336
337 let mapped = iterator.map(move |item| {
338 let (id, t) = item?;
339 let vertex = Vertex::with_id(id, t);
340 Ok(vertex)
341 });
342
343 mapped.collect()
344 }
345
346 fn delete_vertices<Q: Into<VertexQuery>>(&self, q: Q) -> Result<()> {
347 let iterator = self.vertex_query_to_iterator(q.into())?;
348 let vertex_manager = VertexManager::new(&self.holder);
349
350 for item in iterator {
351 let (id, _) = item?;
352 vertex_manager.delete(id)?;
353 }
354
355 Ok(())
356 }
357
358 fn get_vertex_count(&self) -> Result<u64> {
359 let vertex_manager = VertexManager::new(&self.holder);
360 let iterator = vertex_manager.iterate_for_range(Uuid::default());
361 Ok(iterator.count() as u64)
362 }
363
364 fn create_edge(&self, key: &EdgeKey) -> Result<bool> {
365 let vertex_manager = VertexManager::new(&self.holder);
366
367 if !vertex_manager.exists(key.outbound_id)? || !vertex_manager.exists(key.inbound_id)? {
368 Ok(false)
369 } else {
370 let edge_manager = EdgeManager::new(&self.holder);
371 edge_manager.set(key.outbound_id, &key.t, key.inbound_id, Utc::now())?;
372 Ok(true)
373 }
374 }
375
376 fn get_edges<Q: Into<EdgeQuery>>(&self, q: Q) -> Result<Vec<Edge>> {
377 let iterator = self.edge_query_to_iterator(q.into())?;
378
379 let mapped = iterator.map(move |item: Result<EdgeRangeItem>| {
380 let (outbound_id, t, update_datetime, inbound_id) = item?;
381 let key = EdgeKey::new(outbound_id, t, inbound_id);
382 let edge = Edge::new(key, update_datetime);
383 Ok(edge)
384 });
385
386 mapped.collect()
387 }
388
389 fn delete_edges<Q: Into<EdgeQuery>>(&self, q: Q) -> Result<()> {
390 let edge_manager = EdgeManager::new(&self.holder);
391 let vertex_manager = VertexManager::new(&self.holder);
392 let iterator = self.edge_query_to_iterator(q.into())?;
393
394 for item in iterator {
395 let (outbound_id, t, update_datetime, inbound_id) = item?;
396
397 if vertex_manager.get(outbound_id)?.is_some() {
398 edge_manager.delete(outbound_id, &t, inbound_id, update_datetime)?;
399 };
400 }
401 Ok(())
402 }
403
404 fn get_edge_count(&self, id: Uuid, t: Option<&Type>, direction: EdgeDirection) -> Result<u64> {
405 let edge_range_manager = match direction {
406 EdgeDirection::Outbound => EdgeRangeManager::new(&self.holder),
407 EdgeDirection::Inbound => EdgeRangeManager::new_reversed(&self.holder),
408 };
409
410 let iter = edge_range_manager.iterate_for_range(id, t, None)?;
411 let count = iter.count();
412
413 Ok(count as u64)
414 }
415
416 fn get_vertex_properties(&self, q: VertexPropertyQuery) -> Result<Vec<VertexProperty>> {
417 let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
418 let mut properties = Vec::new();
419
420 for item in self.vertex_query_to_iterator(q.inner)? {
421 let (id, _) = item?;
422 let value = manager.get(id, &q.name)?;
423
424 if let Some(value) = value {
425 properties.push(VertexProperty::new(id, value));
426 }
427 }
428
429 Ok(properties)
430 }
431
432 fn get_all_vertex_properties<Q: Into<VertexQuery>>(&self, q: Q) -> Result<Vec<VertexProperties>> {
433 let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
434 let iterator = self.vertex_query_to_iterator(q.into())?;
435
436 let iter = iterator.map(move |item| {
437 let (id, t) = item?;
438 let vertex = Vertex::with_id(id, t);
439
440 let it = manager.iterate_for_owner(id)?;
441 let props: Result<Vec<_>> = it.collect();
442 let props_iter = props?.into_iter();
443 let props = props_iter
444 .map(|((_, name), value)| NamedProperty::new(name, value))
445 .collect();
446
447 Ok(VertexProperties::new(vertex, props))
448 });
449
450 iter.collect()
451 }
452
453 fn set_vertex_properties(&self, q: VertexPropertyQuery, value: &JsonValue) -> Result<()> {
454 let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
455
456 for item in self.vertex_query_to_iterator(q.inner)? {
457 let (id, _) = item?;
458 manager.set(id, &q.name, value)?;
459 }
460 Ok(())
461 }
462
463 fn delete_vertex_properties(&self, q: VertexPropertyQuery) -> Result<()> {
464 let manager = VertexPropertyManager::new(&self.holder.vertex_properties);
465
466 for item in self.vertex_query_to_iterator(q.inner)? {
467 let (id, _) = item?;
468 manager.delete(id, &q.name)?;
469 }
470 Ok(())
471 }
472
473 fn get_edge_properties(&self, q: EdgePropertyQuery) -> Result<Vec<EdgeProperty>> {
474 let manager = EdgePropertyManager::new(&self.holder.edge_properties);
475 let mut properties = Vec::new();
476
477 for item in self.edge_query_to_iterator(q.inner)? {
478 let (outbound_id, t, _, inbound_id) = item?;
479 let value = manager.get(outbound_id, &t, inbound_id, &q.name)?;
480
481 if let Some(value) = value {
482 let key = EdgeKey::new(outbound_id, t, inbound_id);
483 properties.push(EdgeProperty::new(key, value));
484 }
485 }
486
487 Ok(properties)
488 }
489
490 fn get_all_edge_properties<Q: Into<EdgeQuery>>(&self, q: Q) -> Result<Vec<EdgeProperties>> {
491 let manager = EdgePropertyManager::new(&self.holder.edge_properties);
492 let iterator = self.edge_query_to_iterator(q.into())?;
493
494 let iter = iterator.map(move |item| {
495 let (out_id, t, time, in_id) = item?;
496 let edge = Edge::new(EdgeKey::new(out_id, t.clone(), in_id), time);
497 let it = manager.iterate_for_owner(out_id, &t, in_id)?;
498 let props: Result<Vec<_>> = it.collect();
499 let props_iter = props?.into_iter();
500 let props = props_iter
501 .map(|((_, _, _, name), value)| NamedProperty::new(name, value))
502 .collect();
503
504 Ok(EdgeProperties::new(edge, props))
505 });
506
507 iter.collect()
508 }
509
510 fn set_edge_properties(&self, q: EdgePropertyQuery, value: &JsonValue) -> Result<()> {
511 let manager = EdgePropertyManager::new(&self.holder.edge_properties);
512
513 for item in self.edge_query_to_iterator(q.inner)? {
514 let (outbound_id, t, _, inbound_id) = item?;
515 manager.set(outbound_id, &t, inbound_id, &q.name, value)?;
516 }
517 Ok(())
518 }
519
520 fn delete_edge_properties(&self, q: EdgePropertyQuery) -> Result<()> {
521 let manager = EdgePropertyManager::new(&self.holder.edge_properties);
522
523 for item in self.edge_query_to_iterator(q.inner)? {
524 let (outbound_id, t, _, inbound_id) = item?;
525 manager.delete(outbound_id, &t, inbound_id, &q.name)?;
526 }
527 Ok(())
528 }
529}
530
531fn remove_nones_from_iterator<I, T>(iter: I) -> impl Iterator<Item = Result<T>>
532where
533 I: Iterator<Item = Result<Option<T>>>,
534{
535 iter.filter_map(|item| match item {
536 Err(err) => Some(Err(err)),
537 Ok(Some(value)) => Some(Ok(value)),
538 _ => None,
539 })
540}