1use std::path::PathBuf;
12
13use crate::collection::graph::{GraphEdge, GraphSchema, TraversalConfig, TraversalResult};
14use crate::collection::types::Collection;
15use crate::distance::DistanceMetric;
16use crate::error::Result;
17use crate::point::{Point, SearchResult};
18
19#[derive(Clone)]
41pub struct GraphCollection {
42 pub(crate) inner: Collection,
44}
45
46impl GraphCollection {
47 pub fn create(
57 path: PathBuf,
58 name: &str,
59 dimension: Option<usize>,
60 metric: DistanceMetric,
61 schema: GraphSchema,
62 ) -> Result<Self> {
63 Ok(Self {
64 inner: Collection::create_graph_collection(path, name, schema, dimension, metric)?,
65 })
66 }
67
68 pub fn open(path: PathBuf) -> Result<Self> {
74 Ok(Self {
75 inner: Collection::open(path)?,
76 })
77 }
78
79 pub fn flush(&self) -> Result<()> {
88 self.inner.flush()
89 }
90
91 pub fn flush_full(&self) -> Result<()> {
100 self.inner.flush_full()
101 }
102
103 #[must_use]
109 pub fn name(&self) -> String {
110 self.inner.config().name
111 }
112
113 #[must_use]
117 pub fn schema(&self) -> GraphSchema {
118 self.inner
119 .graph_schema()
120 .unwrap_or_else(GraphSchema::schemaless)
121 }
122
123 #[must_use]
125 pub fn has_embeddings(&self) -> bool {
126 self.inner.has_embeddings()
127 }
128
129 pub fn add_edge(&self, edge: GraphEdge) -> Result<()> {
149 self.inner.add_edge(edge)
150 }
151
152 pub fn add_edges_batch(&self, edges: Vec<GraphEdge>) -> Result<usize> {
166 self.inner.add_edges_batch(edges)
167 }
168
169 #[must_use]
171 pub fn get_edges(&self, label: Option<&str>) -> Vec<GraphEdge> {
172 match label {
173 Some(lbl) => self.inner.get_edges_by_label(lbl),
174 None => self.inner.get_all_edges(),
175 }
176 }
177
178 #[must_use]
180 pub fn get_outgoing(&self, node_id: u64) -> Vec<GraphEdge> {
181 self.inner.get_outgoing_edges(node_id)
182 }
183
184 #[must_use]
186 pub fn get_incoming(&self, node_id: u64) -> Vec<GraphEdge> {
187 self.inner.get_incoming_edges(node_id)
188 }
189
190 #[must_use]
192 pub fn edge_count(&self) -> usize {
193 self.inner.edge_count()
194 }
195
196 #[must_use]
198 pub fn node_degree(&self, node_id: u64) -> (usize, usize) {
199 self.inner.get_node_degree(node_id)
200 }
201
202 #[must_use]
208 pub fn all_node_ids(&self) -> Vec<u64> {
209 self.inner.all_ids()
210 }
211
212 pub fn scroll_batch(
221 &self,
222 cursor: Option<u64>,
223 batch_size: usize,
224 filter: Option<&crate::filter::Filter>,
225 ) -> Result<crate::collection::ScrollBatch> {
226 self.inner.scroll_batch(cursor, batch_size, filter)
227 }
228
229 #[must_use]
231 pub fn len(&self) -> usize {
232 self.inner.len()
233 }
234
235 #[must_use]
237 pub fn is_empty(&self) -> bool {
238 self.inner.is_empty()
239 }
240
241 #[must_use]
243 pub fn get(&self, ids: &[u64]) -> Vec<Option<Point>> {
244 self.inner.get(ids)
245 }
246
247 pub fn delete(&self, ids: &[u64]) -> Result<()> {
255 self.inner.delete(ids)
256 }
257
258 #[must_use]
262 pub fn remove_edge(&self, edge_id: u64) -> bool {
263 self.inner.remove_edge(edge_id)
264 }
265
266 #[must_use]
268 pub fn has_edge(&self, edge_id: u64) -> bool {
269 self.inner.edge_exists(edge_id)
270 }
271
272 #[must_use]
288 pub fn traverse_bfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
289 self.inner.traverse_bfs_config(source_id, config)
290 }
291
292 #[must_use]
294 pub fn traverse_dfs(&self, source_id: u64, config: &TraversalConfig) -> Vec<TraversalResult> {
295 self.inner.traverse_dfs_config(source_id, config)
296 }
297
298 #[must_use]
318 pub fn traverse_bfs_parallel(
319 &self,
320 start_nodes: &[u64],
321 config: &TraversalConfig,
322 ) -> Vec<TraversalResult> {
323 self.inner.traverse_bfs_parallel(start_nodes, config)
324 }
325
326 pub fn upsert_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
336 self.inner.store_node_payload(node_id, payload)
337 }
338
339 pub fn upsert_node(
346 &self,
347 node_id: u64,
348 payload: &serde_json::Value,
349 vector: Option<Vec<f32>>,
350 ) -> Result<()> {
351 match vector {
352 Some(vector) => self
353 .inner
354 .upsert([Point::new(node_id, vector, Some(payload.clone()))]),
355 None => self.upsert_node_payload(node_id, payload),
356 }
357 }
358
359 #[deprecated(since = "1.6.0", note = "Use upsert_node_payload() instead")]
365 pub fn store_node_payload(&self, node_id: u64, payload: &serde_json::Value) -> Result<()> {
366 self.upsert_node_payload(node_id, payload)
367 }
368
369 pub fn get_node_payload(&self, node_id: u64) -> Result<Option<serde_json::Value>> {
375 self.inner.get_node_payload(node_id)
376 }
377
378 pub fn search_by_embedding(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
389 self.inner.search_by_embedding(query, k)
390 }
391
392 pub fn search(&self, query: &[f32], k: usize) -> Result<Vec<SearchResult>> {
401 self.search_by_embedding(query, k)
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use super::*;
408 use crate::collection::graph::GraphSchema;
409 use crate::distance::DistanceMetric;
410 use std::collections::HashMap;
411 use tempfile::{tempdir, TempDir};
412
413 fn make_test_collection(dimension: Option<usize>) -> (TempDir, GraphCollection) {
419 let dir = tempdir().unwrap();
420 let col = GraphCollection::create(
421 dir.path().to_path_buf(),
422 "kg",
423 dimension,
424 DistanceMetric::Cosine,
425 GraphSchema::schemaless(),
426 )
427 .unwrap();
428 (dir, col)
429 }
430
431 #[test]
432 fn test_all_node_ids_returns_ids_with_payload() {
433 let (_dir, col) = make_test_collection(None);
434
435 col.upsert_node_payload(10, &serde_json::json!({"name": "Alice"}))
437 .unwrap();
438 col.upsert_node_payload(20, &serde_json::json!({"name": "Bob"}))
439 .unwrap();
440
441 let ids = col.all_node_ids();
442 assert!(ids.contains(&10), "node 10 should be present");
443 assert!(ids.contains(&20), "node 20 should be present");
444 assert_eq!(ids.len(), 2);
445 }
446
447 #[test]
448 fn test_upsert_node_with_embedding_is_searchable() {
449 let (_dir, col) = make_test_collection(Some(4));
450
451 col.upsert_node(
452 10,
453 &serde_json::json!({"name": "Alice"}),
454 Some(vec![1.0, 0.0, 0.0, 0.0]),
455 )
456 .unwrap();
457
458 assert_eq!(
459 col.get_node_payload(10).unwrap(),
460 Some(serde_json::json!({"name": "Alice"}))
461 );
462 let results = col.search_by_embedding(&[1.0, 0.0, 0.0, 0.0], 1).unwrap();
463 assert_eq!(results[0].point.id, 10);
464 }
465
466 #[test]
467 fn test_edge_count_returns_correct_count() {
468 let (_dir, col) = make_test_collection(None);
469
470 assert_eq!(col.edge_count(), 0);
471
472 let edge1 = crate::collection::graph::GraphEdge::new(1, 10, 20, "knows").unwrap();
473 col.add_edge(edge1).unwrap();
474 assert_eq!(col.edge_count(), 1);
475
476 let edge2 = crate::collection::graph::GraphEdge::new(2, 20, 30, "likes").unwrap();
477 col.add_edge(edge2).unwrap();
478 assert_eq!(col.edge_count(), 2);
479 }
480
481 #[test]
482 fn test_traverse_bfs_parallel_through_graph_collection() {
483 let (_dir, col) = make_test_collection(None);
484
485 col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
487 .unwrap();
488 col.add_edge(GraphEdge::new(2, 2, 3, "NEXT").unwrap())
489 .unwrap();
490
491 let config = TraversalConfig {
492 max_depth: 3,
493 min_depth: 1,
494 ..TraversalConfig::default()
495 };
496 let results = col.traverse_bfs_parallel(&[1], &config);
497 let target_ids: std::collections::HashSet<u64> =
498 results.iter().map(|r| r.target_id).collect();
499 assert!(target_ids.contains(&2), "should reach node 2");
500 assert!(target_ids.contains(&3), "should reach node 3");
501 }
502
503 #[test]
504 fn test_execute_match_finds_edges() {
505 let (_dir, col) = make_test_collection(None);
506
507 col.upsert_node_payload(
509 10,
510 &serde_json::json!({"_labels": ["Person"], "name": "Alice"}),
511 )
512 .unwrap();
513 col.upsert_node_payload(
514 20,
515 &serde_json::json!({"_labels": ["Person"], "name": "Bob"}),
516 )
517 .unwrap();
518
519 let edge = crate::collection::graph::GraphEdge::new(1, 10, 20, "KNOWS").unwrap();
521 col.add_edge(edge).unwrap();
522
523 let match_clause = crate::velesql::MatchClause {
525 patterns: vec![crate::velesql::GraphPattern {
526 name: None,
527 nodes: vec![
528 crate::velesql::NodePattern::new().with_alias("a"),
529 crate::velesql::NodePattern::new().with_alias("b"),
530 ],
531 relationships: vec![crate::velesql::RelationshipPattern::new(
532 crate::velesql::Direction::Outgoing,
533 )],
534 }],
535 where_clause: None,
536 return_clause: crate::velesql::ReturnClause {
537 items: vec![],
538 order_by: None,
539 limit: Some(10),
540 },
541 };
542
543 let params = HashMap::new();
544 let results = col.execute_match(&match_clause, ¶ms).unwrap();
545 assert!(
546 !results.is_empty(),
547 "execute_match should find the KNOWS edge"
548 );
549 assert_eq!(results[0].node_id, 20, "target should be Bob (id=20)");
550 }
551
552 #[test]
553 fn test_has_edge_and_remove_edge() {
554 let (_dir, col) = make_test_collection(None);
555 assert!(!col.has_edge(7), "unknown edge id is absent");
556
557 col.add_edge(GraphEdge::new(7, 10, 20, "KNOWS").unwrap())
558 .unwrap();
559 assert!(col.has_edge(7), "edge present after add");
560
561 assert!(col.remove_edge(7), "removing an existing edge returns true");
562 assert!(!col.has_edge(7), "edge gone after remove");
563 assert!(!col.remove_edge(7), "removing a missing edge returns false");
564 }
565
566 #[test]
567 fn test_upsert_node_without_vector_stores_payload_only() {
568 let (_dir, col) = make_test_collection(None);
570 col.upsert_node(42, &serde_json::json!({"name": "Carol"}), None)
571 .unwrap();
572 assert_eq!(
573 col.get_node_payload(42).unwrap(),
574 Some(serde_json::json!({"name": "Carol"}))
575 );
576 assert!(col.all_node_ids().contains(&42));
577 assert!(!col.has_embeddings(), "no embeddings without a dimension");
578 }
579
580 #[test]
581 fn test_get_edges_filtered_by_label() {
582 let (_dir, col) = make_test_collection(None);
583 col.add_edge(GraphEdge::new(1, 10, 20, "KNOWS").unwrap())
584 .unwrap();
585 col.add_edge(GraphEdge::new(2, 20, 30, "LIKES").unwrap())
586 .unwrap();
587 col.add_edge(GraphEdge::new(3, 30, 40, "KNOWS").unwrap())
588 .unwrap();
589
590 let knows = col.get_edges(Some("KNOWS"));
591 assert_eq!(knows.len(), 2, "two KNOWS edges");
592 assert!(knows.iter().all(|e| e.label() == "KNOWS"));
593
594 let all = col.get_edges(None);
595 assert_eq!(all.len(), 3, "three edges total");
596 }
597
598 #[test]
599 fn test_node_degree_and_directional_edges() {
600 let (_dir, col) = make_test_collection(None);
601 col.add_edge(GraphEdge::new(1, 10, 20, "NEXT").unwrap())
602 .unwrap();
603 col.add_edge(GraphEdge::new(2, 30, 20, "NEXT").unwrap())
604 .unwrap();
605 col.add_edge(GraphEdge::new(3, 20, 40, "NEXT").unwrap())
606 .unwrap();
607
608 assert_eq!(col.node_degree(20), (2, 1));
610 assert_eq!(col.get_incoming(20).len(), 2);
611 let outgoing = col.get_outgoing(20);
612 assert_eq!(outgoing.len(), 1);
613 assert_eq!(outgoing[0].target(), 40);
614 }
615
616 #[test]
617 fn test_delete_removes_node_payload() {
618 let (_dir, col) = make_test_collection(None);
619 col.upsert_node_payload(10, &serde_json::json!({"k": 1}))
620 .unwrap();
621 col.upsert_node_payload(20, &serde_json::json!({"k": 2}))
622 .unwrap();
623 assert_eq!(col.all_node_ids().len(), 2);
624
625 col.delete(&[10]).unwrap();
626 assert!(col.get(&[10])[0].is_none(), "deleted node is gone");
627 assert!(
628 !col.all_node_ids().contains(&10),
629 "deleted node leaves the id set"
630 );
631 assert!(col.get_node_payload(20).unwrap().is_some(), "node 20 stays");
632 }
633
634 #[test]
635 fn test_scroll_batch_paginates_embedded_nodes() {
636 let (_dir, col) = make_test_collection(Some(2));
638 for id in [1u64, 2, 3] {
639 col.upsert_node(id, &serde_json::json!({"id": id}), Some(vec![1.0, 0.0]))
640 .unwrap();
641 }
642 assert!(!col.is_empty());
643 assert_eq!(col.len(), 3);
644
645 let first = col.scroll_batch(None, 2, None).unwrap();
646 assert_eq!(first.points.len(), 2, "first page has 2 of 3 nodes");
647 let cursor = first.next_cursor.expect("non-empty page yields a cursor");
648 let second = col.scroll_batch(Some(cursor), 2, None).unwrap();
649 assert_eq!(second.points.len(), 1, "second page has the last node");
650 let tail_cursor = second.next_cursor.expect("page yields a cursor");
652 let third = col.scroll_batch(Some(tail_cursor), 2, None).unwrap();
653 assert!(third.points.is_empty(), "no points past the end");
654 assert!(third.next_cursor.is_none(), "empty page yields no cursor");
655
656 assert!(col.scroll_batch(None, 0, None).is_err());
658 }
659
660 #[test]
661 fn test_flush_and_flush_full_succeed() {
662 let (_dir, col) = make_test_collection(None);
663 col.upsert_node_payload(1, &serde_json::json!({"k": 1}))
664 .unwrap();
665 col.add_edge(GraphEdge::new(1, 1, 2, "NEXT").unwrap())
666 .unwrap();
667 col.flush().expect("fast-path flush succeeds");
668 col.flush_full().expect("full durability flush succeeds");
669 }
670
671 #[test]
672 fn test_reopen_recovers_edges_and_payloads() {
673 let dir = tempdir().unwrap();
674 let path = dir.path().to_path_buf();
675 {
676 let col = GraphCollection::create(
677 path.clone(),
678 "kg",
679 None,
680 DistanceMetric::Cosine,
681 GraphSchema::schemaless(),
682 )
683 .unwrap();
684 col.upsert_node_payload(1, &serde_json::json!({"name": "A"}))
685 .unwrap();
686 col.add_edge(GraphEdge::new(5, 1, 2, "NEXT").unwrap())
687 .unwrap();
688 col.flush_full().unwrap();
689 }
690 let reopened = GraphCollection::open(path).unwrap();
691 assert_eq!(reopened.name(), "kg");
692 assert!(reopened.has_edge(5), "edge survives reopen");
693 assert_eq!(
694 reopened.get_node_payload(1).unwrap(),
695 Some(serde_json::json!({"name": "A"}))
696 );
697 }
698}