1use std::sync::Arc;
12
13use async_trait::async_trait;
14use chrono::{DateTime, TimeZone, Utc};
15use uuid::Uuid;
16
17use khive_storage::error::StorageError;
18use khive_storage::types::{
19 BatchWriteSummary, DeleteMode, Edge, EdgeFilter, EdgeSortField, GraphPath, NeighborHit,
20 NeighborQuery, Page, PageRequest, PathNode, SortDirection, SortOrder, TraversalRequest,
21};
22use khive_storage::GraphStore;
23use khive_storage::LinkId;
24use khive_storage::StorageCapability;
25use khive_types::EdgeRelation;
26
27use crate::error::SqliteError;
28use crate::pool::ConnectionPool;
29
30fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
32 StorageError::driver(StorageCapability::Graph, op, e)
33}
34
35fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
36 StorageError::driver(StorageCapability::Graph, op, e)
37}
38
39pub struct SqlGraphStore {
41 pool: Arc<ConnectionPool>,
42 is_file_backed: bool,
43 namespace: String,
44}
45
46impl SqlGraphStore {
47 pub fn new_scoped(
49 pool: Arc<ConnectionPool>,
50 is_file_backed: bool,
51 namespace: impl Into<String>,
52 ) -> Self {
53 Self {
54 pool,
55 is_file_backed,
56 namespace: namespace.into(),
57 }
58 }
59
60 fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
61 let config = self.pool.config();
62 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
63 operation: "graph_writer".into(),
64 message: "in-memory databases do not support standalone connections".into(),
65 })?;
66
67 let conn = rusqlite::Connection::open_with_flags(
68 path,
69 rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
70 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
71 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
72 )
73 .map_err(|e| map_err(e, "open_graph_writer"))?;
74
75 conn.busy_timeout(config.busy_timeout)
76 .map_err(|e| map_err(e, "open_graph_writer"))?;
77 conn.pragma_update(None, "foreign_keys", "ON")
78 .map_err(|e| map_err(e, "open_graph_writer"))?;
79 conn.pragma_update(None, "synchronous", "NORMAL")
80 .map_err(|e| map_err(e, "open_graph_writer"))?;
81
82 Ok(conn)
83 }
84
85 fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
86 let config = self.pool.config();
87 let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
88 operation: "graph_reader".into(),
89 message: "in-memory databases do not support standalone connections".into(),
90 })?;
91
92 let conn = rusqlite::Connection::open_with_flags(
93 path,
94 rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
95 | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
96 | rusqlite::OpenFlags::SQLITE_OPEN_URI,
97 )
98 .map_err(|e| map_err(e, "open_graph_reader"))?;
99
100 conn.busy_timeout(config.busy_timeout)
101 .map_err(|e| map_err(e, "open_graph_reader"))?;
102 conn.pragma_update(None, "foreign_keys", "ON")
103 .map_err(|e| map_err(e, "open_graph_reader"))?;
104 conn.pragma_update(None, "synchronous", "NORMAL")
105 .map_err(|e| map_err(e, "open_graph_reader"))?;
106
107 Ok(conn)
108 }
109
110 async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
111 where
112 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
113 R: Send + 'static,
114 {
115 if self.is_file_backed {
116 let conn = self.open_standalone_writer()?;
117 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
118 .await
119 .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
120 } else {
121 let pool = Arc::clone(&self.pool);
122 tokio::task::spawn_blocking(move || {
123 let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
124 f(guard.conn()).map_err(|e| map_err(e, op))
125 })
126 .await
127 .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
128 }
129 }
130
131 async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
132 where
133 F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
134 R: Send + 'static,
135 {
136 if self.is_file_backed {
137 let conn = self.open_standalone_reader()?;
138 tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
139 .await
140 .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
141 } else {
142 let pool = Arc::clone(&self.pool);
143 tokio::task::spawn_blocking(move || {
144 let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
145 f(guard.conn()).map_err(|e| map_err(e, op))
146 })
147 .await
148 .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
149 }
150 }
151}
152
153fn read_edge(row: &rusqlite::Row<'_>) -> Result<Edge, rusqlite::Error> {
158 let namespace: String = row.get(0)?;
159 let id_str: String = row.get(1)?;
160 let source_str: String = row.get(2)?;
161 let target_str: String = row.get(3)?;
162 let relation_str: String = row.get(4)?;
163 let weight: f64 = row.get(5)?;
164 let created_micros: i64 = row.get(6)?;
165 let updated_micros: i64 = row.get(7)?;
166 let deleted_micros: Option<i64> = row.get(8)?;
167 let metadata_str: Option<String> = row.get(9)?;
168 let target_backend: Option<String> = row.get(10)?;
169
170 let id = parse_uuid(&id_str)?;
171 let source_id = parse_uuid(&source_str)?;
172 let target_id = parse_uuid(&target_str)?;
173 let created_at = micros_to_datetime(created_micros);
174 let relation = relation_str.parse::<EdgeRelation>().map_err(|e| {
175 rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e))
176 })?;
177 let metadata = match metadata_str {
178 Some(s) => {
179 let v = serde_json::from_str(&s).map_err(|e| {
180 rusqlite::Error::FromSqlConversionFailure(
181 9,
182 rusqlite::types::Type::Text,
183 Box::new(e),
184 )
185 })?;
186 Some(v)
187 }
188 None => None,
189 };
190
191 Ok(Edge {
192 id: id.into(),
193 namespace,
194 source_id,
195 target_id,
196 relation,
197 weight,
198 created_at,
199 updated_at: micros_to_datetime(updated_micros),
200 deleted_at: deleted_micros.map(micros_to_datetime),
201 metadata,
202 target_backend,
203 })
204}
205
206fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
207 Uuid::parse_str(s).map_err(|e| {
208 rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
209 })
210}
211
212fn micros_to_datetime(micros: i64) -> DateTime<Utc> {
213 Utc.timestamp_micros(micros)
214 .single()
215 .unwrap_or_else(Utc::now)
216}
217
218fn build_edge_filter_sql(
219 namespace: &str,
220 filter: &EdgeFilter,
221) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
222 let mut conditions: Vec<String> = vec![
223 "namespace = ?1".to_string(),
224 "deleted_at IS NULL".to_string(),
225 ];
226 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
227
228 if !filter.ids.is_empty() {
229 let placeholders: Vec<String> = filter
230 .ids
231 .iter()
232 .map(|id| {
233 params.push(Box::new(id.to_string()));
234 format!("?{}", params.len())
235 })
236 .collect();
237 conditions.push(format!("id IN ({})", placeholders.join(",")));
238 }
239
240 if !filter.source_ids.is_empty() {
241 let placeholders: Vec<String> = filter
242 .source_ids
243 .iter()
244 .map(|id| {
245 params.push(Box::new(id.to_string()));
246 format!("?{}", params.len())
247 })
248 .collect();
249 conditions.push(format!("source_id IN ({})", placeholders.join(",")));
250 }
251
252 if !filter.target_ids.is_empty() {
253 let placeholders: Vec<String> = filter
254 .target_ids
255 .iter()
256 .map(|id| {
257 params.push(Box::new(id.to_string()));
258 format!("?{}", params.len())
259 })
260 .collect();
261 conditions.push(format!("target_id IN ({})", placeholders.join(",")));
262 }
263
264 if !filter.relations.is_empty() {
265 let placeholders: Vec<String> = filter
266 .relations
267 .iter()
268 .map(|r| {
269 params.push(Box::new(r.to_string()));
270 format!("?{}", params.len())
271 })
272 .collect();
273 conditions.push(format!("relation IN ({})", placeholders.join(",")));
274 }
275
276 if let Some(min_w) = filter.min_weight {
277 params.push(Box::new(min_w));
278 conditions.push(format!("weight >= ?{}", params.len()));
279 }
280
281 if let Some(max_w) = filter.max_weight {
282 params.push(Box::new(max_w));
283 conditions.push(format!("weight <= ?{}", params.len()));
284 }
285
286 if let Some(ref time_range) = filter.created_at {
287 if let Some(start) = time_range.start {
288 params.push(Box::new(start.timestamp_micros()));
289 conditions.push(format!("created_at >= ?{}", params.len()));
290 }
291 if let Some(end) = time_range.end {
292 params.push(Box::new(end.timestamp_micros()));
293 conditions.push(format!("created_at < ?{}", params.len()));
294 }
295 }
296
297 let clause = format!(" WHERE {}", conditions.join(" AND "));
298 (clause, params)
299}
300
301fn edge_sort_col(field: &EdgeSortField) -> &'static str {
302 match field {
303 EdgeSortField::CreatedAt => "created_at",
304 EdgeSortField::Weight => "weight",
305 EdgeSortField::Relation => "relation",
306 }
307}
308
309fn canonical_edge_endpoints(
318 relation: EdgeRelation,
319 source_id: Uuid,
320 target_id: Uuid,
321) -> (Uuid, Uuid) {
322 if relation.is_symmetric() && target_id < source_id {
323 (target_id, source_id)
324 } else {
325 (source_id, target_id)
326 }
327}
328
329#[async_trait]
330impl GraphStore for SqlGraphStore {
331 async fn upsert_edge(&self, edge: Edge) -> Result<(), StorageError> {
332 let namespace = self.namespace.clone();
333 if edge.namespace != namespace {
334 return Err(StorageError::InvalidInput {
335 capability: StorageCapability::Graph,
336 operation: "upsert_edge".into(),
337 message: format!(
338 "edge namespace {:?} does not match store namespace {:?}",
339 edge.namespace, namespace
340 ),
341 });
342 }
343 let id_str = Uuid::from(edge.id).to_string();
344 let (source_id, target_id) =
345 canonical_edge_endpoints(edge.relation, edge.source_id, edge.target_id);
346 let src_str = source_id.to_string();
347 let tgt_str = target_id.to_string();
348 let relation_str = edge.relation.to_string();
349 let metadata_str = edge
350 .metadata
351 .as_ref()
352 .map(serde_json::to_string)
353 .transpose()
354 .map_err(|e| StorageError::driver(StorageCapability::Graph, "upsert_edge", e))?;
355 self.with_writer("upsert_edge", move |conn| {
356 conn.execute(
357 "INSERT INTO graph_edges \
358 (namespace, id, source_id, target_id, relation, weight, \
359 created_at, updated_at, deleted_at, metadata, target_backend) \
360 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
361 ON CONFLICT(namespace, id) DO UPDATE SET \
362 source_id = excluded.source_id, \
363 target_id = excluded.target_id, \
364 relation = excluded.relation, \
365 weight = excluded.weight, \
366 updated_at = excluded.updated_at, \
367 deleted_at = NULL, \
368 metadata = excluded.metadata, \
369 target_backend = excluded.target_backend \
370 ON CONFLICT(namespace, source_id, target_id, relation) DO UPDATE SET \
371 weight = excluded.weight, \
372 updated_at = excluded.updated_at, \
373 deleted_at = NULL, \
374 metadata = excluded.metadata, \
375 target_backend = excluded.target_backend",
376 rusqlite::params![
377 namespace,
378 id_str,
379 src_str,
380 tgt_str,
381 relation_str,
382 edge.weight,
383 edge.created_at.timestamp_micros(),
384 edge.updated_at.timestamp_micros(),
385 edge.deleted_at.map(|t| t.timestamp_micros()),
386 metadata_str,
387 edge.target_backend,
388 ],
389 )?;
390 Ok(())
391 })
392 .await
393 }
394
395 async fn upsert_edges(&self, edges: Vec<Edge>) -> Result<BatchWriteSummary, StorageError> {
396 let attempted = edges.len() as u64;
397 let namespace = self.namespace.clone();
398
399 for edge in &edges {
401 if edge.namespace != namespace {
402 return Err(StorageError::InvalidInput {
403 capability: StorageCapability::Graph,
404 operation: "upsert_edges".into(),
405 message: format!(
406 "edge namespace {:?} does not match store namespace {:?}",
407 edge.namespace, namespace
408 ),
409 });
410 }
411 }
412
413 self.with_writer("upsert_edges", move |conn| {
414 conn.execute_batch("BEGIN IMMEDIATE")?;
415 let mut affected = 0u64;
416
417 for edge in &edges {
418 let id_str = Uuid::from(edge.id).to_string();
419 let (canon_src, canon_tgt) =
420 canonical_edge_endpoints(edge.relation, edge.source_id, edge.target_id);
421 let src_str = canon_src.to_string();
422 let tgt_str = canon_tgt.to_string();
423 let relation_str = edge.relation.to_string();
424 let metadata_str = edge
425 .metadata
426 .as_ref()
427 .map(serde_json::to_string)
428 .transpose()
429 .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
430 if let Err(e) = conn.execute(
431 "INSERT INTO graph_edges \
432 (namespace, id, source_id, target_id, relation, weight, \
433 created_at, updated_at, deleted_at, metadata, target_backend) \
434 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
435 ON CONFLICT(namespace, id) DO UPDATE SET \
436 source_id = excluded.source_id, \
437 target_id = excluded.target_id, \
438 relation = excluded.relation, \
439 weight = excluded.weight, \
440 updated_at = excluded.updated_at, \
441 deleted_at = NULL, \
442 metadata = excluded.metadata, \
443 target_backend = excluded.target_backend \
444 ON CONFLICT(namespace, source_id, target_id, relation) DO UPDATE SET \
445 weight = excluded.weight, \
446 updated_at = excluded.updated_at, \
447 deleted_at = NULL, \
448 metadata = excluded.metadata, \
449 target_backend = excluded.target_backend",
450 rusqlite::params![
451 &namespace,
452 id_str,
453 src_str,
454 tgt_str,
455 relation_str,
456 edge.weight,
457 edge.created_at.timestamp_micros(),
458 edge.updated_at.timestamp_micros(),
459 edge.deleted_at.map(|t| t.timestamp_micros()),
460 metadata_str,
461 edge.target_backend.as_deref(),
462 ],
463 ) {
464 let _ = conn.execute_batch("ROLLBACK");
465 return Err(e);
466 }
467 affected += 1;
468 }
469
470 if let Err(e) = conn.execute_batch("COMMIT") {
471 let _ = conn.execute_batch("ROLLBACK");
472 return Err(e);
473 }
474 Ok(BatchWriteSummary {
475 attempted,
476 affected,
477 failed: 0,
478 first_error: String::new(),
479 })
480 })
481 .await
482 }
483
484 async fn get_edge(&self, id: LinkId) -> Result<Option<Edge>, StorageError> {
485 let namespace = self.namespace.clone();
486 let id_str = Uuid::from(id).to_string();
487
488 self.with_reader("get_edge", move |conn| {
489 let mut stmt = conn.prepare(
490 "SELECT namespace, id, source_id, target_id, relation, weight, \
491 created_at, updated_at, deleted_at, metadata, target_backend \
492 FROM graph_edges WHERE namespace = ?1 AND id = ?2 AND deleted_at IS NULL",
493 )?;
494 let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
495 match rows.next()? {
496 Some(row) => Ok(Some(read_edge(row)?)),
497 None => Ok(None),
498 }
499 })
500 .await
501 }
502
503 async fn delete_edge(&self, id: LinkId, mode: DeleteMode) -> Result<bool, StorageError> {
504 let namespace = self.namespace.clone();
505 let id_str = Uuid::from(id).to_string();
506
507 self.with_writer("delete_edge", move |conn| {
508 let affected = match mode {
509 DeleteMode::Soft => conn.execute(
510 "UPDATE graph_edges SET deleted_at = ?3, updated_at = ?3 \
511 WHERE namespace = ?1 AND id = ?2 AND deleted_at IS NULL",
512 rusqlite::params![namespace, id_str, chrono::Utc::now().timestamp_micros(),],
513 )?,
514 DeleteMode::Hard => conn.execute(
515 "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
516 rusqlite::params![namespace, id_str],
517 )?,
518 };
519 Ok(affected > 0)
520 })
521 .await
522 }
523
524 async fn query_edges(
525 &self,
526 filter: EdgeFilter,
527 sort: Vec<SortOrder<EdgeSortField>>,
528 page: PageRequest,
529 ) -> Result<Page<Edge>, StorageError> {
530 let namespace = self.namespace.clone();
531 self.with_reader("query_edges", move |conn| {
532 let (where_clause, filter_params) = build_edge_filter_sql(&namespace, &filter);
533
534 let count_sql = format!("SELECT COUNT(*) FROM graph_edges{}", where_clause);
535 let total: i64 = {
536 let mut stmt = conn.prepare(&count_sql)?;
537 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
538 filter_params.iter().map(|p| p.as_ref()).collect();
539 stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
540 };
541
542 let order_clause = if sort.is_empty() {
543 " ORDER BY created_at DESC".to_string()
544 } else {
545 let parts: Vec<String> = sort
546 .iter()
547 .map(|s| {
548 let dir = match s.direction {
549 SortDirection::Asc => "ASC",
550 SortDirection::Desc => "DESC",
551 };
552 format!("{} {}", edge_sort_col(&s.field), dir)
553 })
554 .collect();
555 format!(" ORDER BY {}", parts.join(", "))
556 };
557
558 let (_, data_filter_params) = build_edge_filter_sql(&namespace, &filter);
559 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
560 all_params.push(Box::new(page.limit as i64));
561 all_params.push(Box::new(page.offset as i64));
562
563 let limit_idx = all_params.len() - 1;
564 let offset_idx = all_params.len();
565
566 let data_sql = format!(
567 "SELECT namespace, id, source_id, target_id, relation, weight, \
568 created_at, updated_at, deleted_at, metadata, target_backend \
569 FROM graph_edges{}{} LIMIT ?{} OFFSET ?{}",
570 where_clause, order_clause, limit_idx, offset_idx,
571 );
572
573 let mut stmt = conn.prepare(&data_sql)?;
574 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
575 all_params.iter().map(|p| p.as_ref()).collect();
576 let rows = stmt.query_map(param_refs.as_slice(), read_edge)?;
577
578 let mut items = Vec::new();
579 for row in rows {
580 items.push(row?);
581 }
582
583 Ok(Page {
584 items,
585 total: Some(total as u64),
586 })
587 })
588 .await
589 }
590
591 async fn count_edges(&self, filter: EdgeFilter) -> Result<u64, StorageError> {
592 let namespace = self.namespace.clone();
593 self.with_reader("count_edges", move |conn| {
594 let (where_clause, params) = build_edge_filter_sql(&namespace, &filter);
595 let sql = format!("SELECT COUNT(*) FROM graph_edges{}", where_clause);
596 let mut stmt = conn.prepare(&sql)?;
597 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
598 params.iter().map(|p| p.as_ref()).collect();
599 let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
600 Ok(count as u64)
601 })
602 .await
603 }
604
605 async fn neighbors(
606 &self,
607 node_id: Uuid,
608 query: NeighborQuery,
609 ) -> Result<Vec<NeighborHit>, StorageError> {
610 use khive_storage::types::Direction;
611
612 let namespace = self.namespace.clone();
613 let node_str = node_id.to_string();
614
615 self.with_reader("neighbors", move |conn| {
616 let base_out = "SELECT target_id AS node_id, id AS edge_id, relation, weight \
617 FROM graph_edges \
618 WHERE namespace = ?1 AND source_id = ?2 AND deleted_at IS NULL";
619 let base_in = "SELECT source_id AS node_id, id AS edge_id, relation, weight \
620 FROM graph_edges \
621 WHERE namespace = ?1 AND target_id = ?2 AND deleted_at IS NULL";
622
623 let sql = match query.direction {
624 Direction::Out => base_out.to_string(),
625 Direction::In => base_in.to_string(),
626 Direction::Both => format!("{} UNION ALL {}", base_out, base_in),
627 };
628
629 let mut conditions: Vec<String> = Vec::new();
630 let mut extra_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
631 let mut param_idx = 3;
632
633 if let Some(ref rels) = query.relations {
634 if !rels.is_empty() {
635 let placeholders: Vec<String> = rels
636 .iter()
637 .map(|r| {
638 extra_params.push(Box::new(r.to_string()));
639 let p = format!("?{}", param_idx);
640 param_idx += 1;
641 p
642 })
643 .collect();
644 conditions.push(format!("relation IN ({})", placeholders.join(",")));
645 }
646 }
647
648 if let Some(min_w) = query.min_weight {
649 extra_params.push(Box::new(min_w));
650 conditions.push(format!("weight >= ?{}", param_idx));
651 param_idx += 1;
652 }
653
654 let where_extra = if conditions.is_empty() {
655 String::new()
656 } else {
657 format!(" WHERE {}", conditions.join(" AND "))
658 };
659
660 let limit_clause = if let Some(lim) = query.limit {
661 extra_params.push(Box::new(lim as i64));
662 format!(" LIMIT ?{}", param_idx)
663 } else {
664 String::new()
665 };
666
667 let full_sql = format!(
668 "SELECT node_id, edge_id, relation, weight FROM ({}){}{}",
669 sql, where_extra, limit_clause
670 );
671
672 let mut stmt = conn.prepare(&full_sql)?;
673
674 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
675 all_params.push(Box::new(namespace.clone()));
676 all_params.push(Box::new(node_str.clone()));
677 all_params.extend(extra_params);
678
679 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
680 all_params.iter().map(|p| p.as_ref()).collect();
681
682 let rows = stmt.query_map(param_refs.as_slice(), |row| {
683 let nid_str: String = row.get(0)?;
684 let eid_str: String = row.get(1)?;
685 let relation_str: String = row.get(2)?;
686 let weight: f64 = row.get(3)?;
687 Ok((nid_str, eid_str, relation_str, weight))
688 })?;
689
690 let mut hits = Vec::new();
691 for row in rows {
692 let (nid_str, eid_str, relation_str, weight) = row?;
693 let relation = relation_str.parse::<EdgeRelation>().map_err(|e| {
694 rusqlite::Error::FromSqlConversionFailure(
695 2,
696 rusqlite::types::Type::Text,
697 Box::new(e),
698 )
699 })?;
700 hits.push(NeighborHit {
701 node_id: parse_uuid(&nid_str)?,
702 edge_id: parse_uuid(&eid_str)?,
703 relation,
704 weight,
705 name: None,
706 kind: None,
707 });
708 }
709
710 Ok(hits)
711 })
712 .await
713 }
714
715 async fn traverse(&self, request: TraversalRequest) -> Result<Vec<GraphPath>, StorageError> {
716 use khive_storage::types::Direction;
717
718 if request.roots.is_empty() {
719 return Ok(Vec::new());
720 }
721
722 let roots = request.roots.clone();
723 let opts = request.options.clone();
724 let include_roots = request.include_roots;
725 let namespace = self.namespace.clone();
726
727 self.with_reader("traverse", move |conn| {
728 let mut all_paths: Vec<GraphPath> = Vec::new();
729
730 for root_id in &roots {
731 let root_str = root_id.to_string();
732
733 let (join_condition, next_node) = match opts.direction {
734 Direction::Out => ("e.source_id = t.node_id", "e.target_id"),
735 Direction::In => ("e.target_id = t.node_id", "e.source_id"),
736 Direction::Both => (
737 "(e.source_id = t.node_id OR e.target_id = t.node_id)",
738 "CASE WHEN e.source_id = t.node_id THEN e.target_id ELSE e.source_id END",
739 ),
740 };
741
742 let mut relation_cond = String::new();
743 let mut relation_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
744 let mut param_idx = 4;
745
746 if let Some(ref rels) = opts.relations {
747 if !rels.is_empty() {
748 let placeholders: Vec<String> = rels
749 .iter()
750 .map(|r| {
751 relation_params.push(Box::new(r.to_string()));
752 let p = format!("?{}", param_idx);
753 param_idx += 1;
754 p
755 })
756 .collect();
757 relation_cond =
758 format!(" AND e.relation IN ({})", placeholders.join(","));
759 }
760 }
761
762 let mut weight_cond = String::new();
763 if let Some(min_w) = opts.min_weight {
764 relation_params.push(Box::new(min_w));
765 weight_cond = format!(" AND e.weight >= ?{}", param_idx);
766 param_idx += 1;
767 }
768
769 let limit_clause = if let Some(lim) = opts.limit {
770 relation_params.push(Box::new(lim as i64));
771 format!(" LIMIT ?{}", param_idx)
772 } else {
773 String::new()
774 };
775
776 let cte_sql = format!(
777 "WITH RECURSIVE traversal(node_id, edge_id, depth, path, total_weight) AS (\
778 SELECT ?2, NULL, 0, ?2, 0.0 \
779 UNION ALL \
780 SELECT {next_node}, e.id, t.depth + 1, \
781 t.path || ',' || {next_node}, \
782 t.total_weight + e.weight \
783 FROM graph_edges e \
784 JOIN traversal t ON {join_condition} \
785 WHERE e.namespace = ?1 \
786 AND e.deleted_at IS NULL \
787 AND t.depth < ?3 \
788 AND (',' || t.path || ',') NOT LIKE '%,' || {next_node} || ',%'{rel_cond}{wt_cond} \
789 ) \
790 SELECT node_id, edge_id, depth, path, total_weight \
791 FROM traversal WHERE depth > 0 \
792 ORDER BY depth{limit}",
793 next_node = next_node,
794 join_condition = join_condition,
795 rel_cond = relation_cond,
796 wt_cond = weight_cond,
797 limit = limit_clause,
798 );
799
800 let mut stmt = conn.prepare(&cte_sql)?;
801
802 let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
803 all_params.push(Box::new(namespace.clone()));
804 all_params.push(Box::new(root_str.clone()));
805 all_params.push(Box::new(opts.max_depth as i64));
806 all_params.extend(relation_params);
807
808 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
809 all_params.iter().map(|p| p.as_ref()).collect();
810
811 let rows = stmt.query_map(param_refs.as_slice(), |row| {
812 let node_str: String = row.get(0)?;
813 let edge_str: Option<String> = row.get(1)?;
814 let depth: i64 = row.get(2)?;
815 let _path: String = row.get(3)?;
816 let total_weight: f64 = row.get(4)?;
817 Ok((node_str, edge_str, depth, total_weight))
818 })?;
819
820 let mut nodes = Vec::new();
821 let mut max_weight = 0.0f64;
822 let mut seen: std::collections::HashSet<Uuid> =
827 std::collections::HashSet::new();
828
829 if include_roots {
830 seen.insert(*root_id);
831 nodes.push(PathNode {
832 node_id: *root_id,
833 via_edge: None,
834 depth: 0,
835 name: None,
836 kind: None,
837 });
838 }
839
840 for row in rows {
841 let (node_str, edge_str, depth, total_weight) = row?;
842 let node_id = parse_uuid(&node_str)?;
843 if !seen.insert(node_id) {
845 continue;
846 }
847 let via_edge = edge_str.map(|s| parse_uuid(&s)).transpose()?;
848 nodes.push(PathNode {
849 node_id,
850 via_edge,
851 depth: depth as usize,
852 name: None,
853 kind: None,
854 });
855 if total_weight > max_weight {
856 max_weight = total_weight;
857 }
858 }
859
860 if nodes.len() > if include_roots { 1 } else { 0 } || include_roots {
861 all_paths.push(GraphPath {
862 root_id: *root_id,
863 nodes,
864 total_weight: max_weight,
865 });
866 }
867 }
868
869 Ok(all_paths)
870 })
871 .await
872 }
873}
874
875const GRAPH_DDL: &str = "\
880 CREATE TABLE IF NOT EXISTS graph_edges (\
881 namespace TEXT NOT NULL,\
882 id TEXT NOT NULL,\
883 source_id TEXT NOT NULL,\
884 target_id TEXT NOT NULL,\
885 relation TEXT NOT NULL,\
886 weight REAL NOT NULL DEFAULT 1.0,\
887 created_at INTEGER NOT NULL,\
888 updated_at INTEGER NOT NULL,\
889 deleted_at INTEGER,\
890 metadata TEXT,\
891 target_backend TEXT,\
892 PRIMARY KEY (namespace, id)\
893 );\
894 CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple ON graph_edges(namespace, source_id, target_id, relation);\
895 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
896 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
897 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
898 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
899 CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
900 CREATE INDEX IF NOT EXISTS idx_graph_edges_target_backend ON graph_edges(target_backend) WHERE target_backend IS NOT NULL;\
901";
902
903pub(crate) fn ensure_graph_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
904 conn.execute_batch(GRAPH_DDL)
905}
906
907#[cfg(test)]
908mod tests {
909 use super::*;
910 use crate::pool::PoolConfig;
911 use khive_storage::types::{Direction, TraversalOptions};
912
913 fn setup_memory_store() -> SqlGraphStore {
914 let config = PoolConfig {
915 path: None,
916 ..PoolConfig::default()
917 };
918 let pool = Arc::new(ConnectionPool::new(config).unwrap());
919
920 {
921 let writer = pool.writer().unwrap();
922 writer.conn().execute_batch(GRAPH_DDL).unwrap();
923 }
924
925 SqlGraphStore::new_scoped(pool, false, "default")
926 }
927
928 fn make_edge(source: Uuid, target: Uuid, relation: EdgeRelation, weight: f64) -> Edge {
929 let now = Utc::now();
930 Edge {
931 id: Uuid::new_v4().into(),
932 namespace: "default".to_string(),
933 source_id: source,
934 target_id: target,
935 relation,
936 weight,
937 created_at: now,
938 updated_at: now,
939 deleted_at: None,
940 metadata: None,
941 target_backend: None,
942 }
943 }
944
945 #[tokio::test]
946 async fn test_upsert_and_get_edge() {
947 let store = setup_memory_store();
948
949 let src = Uuid::new_v4();
950 let tgt = Uuid::new_v4();
951 let now = Utc::now();
952 let edge = Edge {
953 id: Uuid::new_v4().into(),
954 namespace: "default".to_string(),
955 source_id: src,
956 target_id: tgt,
957 relation: EdgeRelation::Extends,
958 weight: 0.8,
959 created_at: now,
960 updated_at: now,
961 deleted_at: None,
962 metadata: None,
963 target_backend: None,
964 };
965 let edge_id = edge.id;
966
967 store.upsert_edge(edge).await.unwrap();
968
969 let fetched = store.get_edge(edge_id).await.unwrap();
970 assert!(fetched.is_some());
971 let fetched = fetched.unwrap();
972 assert_eq!(fetched.id, edge_id);
973 assert_eq!(fetched.namespace, "default");
974 assert_eq!(fetched.source_id, src);
975 assert_eq!(fetched.target_id, tgt);
976 assert_eq!(fetched.relation, EdgeRelation::Extends);
977 assert!((fetched.weight - 0.8).abs() < 1e-9);
978 }
979
980 #[tokio::test]
981 async fn test_delete_edge() {
982 let store = setup_memory_store();
983
984 let edge = make_edge(Uuid::new_v4(), Uuid::new_v4(), EdgeRelation::Contains, 1.0);
985 let edge_id = edge.id;
986
987 store.upsert_edge(edge).await.unwrap();
988 assert!(store.get_edge(edge_id).await.unwrap().is_some());
989
990 let deleted = store.delete_edge(edge_id, DeleteMode::Hard).await.unwrap();
991 assert!(deleted);
992
993 assert!(store.get_edge(edge_id).await.unwrap().is_none());
994
995 let deleted_again = store.delete_edge(edge_id, DeleteMode::Hard).await.unwrap();
996 assert!(!deleted_again);
997 }
998
999 #[tokio::test]
1000 async fn test_count_edges() {
1001 let store = setup_memory_store();
1002
1003 assert_eq!(store.count_edges(EdgeFilter::default()).await.unwrap(), 0);
1004
1005 for _ in 0..5 {
1006 store
1007 .upsert_edge(make_edge(
1008 Uuid::new_v4(),
1009 Uuid::new_v4(),
1010 EdgeRelation::DependsOn,
1011 1.0,
1012 ))
1013 .await
1014 .unwrap();
1015 }
1016
1017 assert_eq!(store.count_edges(EdgeFilter::default()).await.unwrap(), 5);
1018 }
1019
1020 #[tokio::test]
1021 async fn test_neighbors_outbound() {
1022 let store = setup_memory_store();
1023
1024 let a = Uuid::new_v4();
1025 let b = Uuid::new_v4();
1026 let c = Uuid::new_v4();
1027 let d = Uuid::new_v4();
1028
1029 store
1030 .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1031 .await
1032 .unwrap();
1033 store
1034 .upsert_edge(make_edge(a, c, EdgeRelation::DependsOn, 0.7))
1035 .await
1036 .unwrap();
1037 store
1038 .upsert_edge(make_edge(d, a, EdgeRelation::Extends, 0.5))
1039 .await
1040 .unwrap();
1041
1042 let query = NeighborQuery {
1043 direction: Direction::Out,
1044 relations: None,
1045 limit: None,
1046 min_weight: None,
1047 };
1048
1049 let hits = store.neighbors(a, query).await.unwrap();
1050 assert_eq!(hits.len(), 2);
1051
1052 let neighbor_ids: Vec<Uuid> = hits.iter().map(|h| h.node_id).collect();
1053 assert!(neighbor_ids.contains(&b));
1054 assert!(neighbor_ids.contains(&c));
1055 assert!(!neighbor_ids.contains(&d));
1056 }
1057
1058 #[tokio::test]
1059 async fn test_traverse_depth_2() {
1060 let store = setup_memory_store();
1061
1062 let a = Uuid::new_v4();
1063 let b = Uuid::new_v4();
1064 let c = Uuid::new_v4();
1065 let d = Uuid::new_v4();
1066
1067 store
1068 .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1069 .await
1070 .unwrap();
1071 store
1072 .upsert_edge(make_edge(b, c, EdgeRelation::Extends, 2.0))
1073 .await
1074 .unwrap();
1075 store
1076 .upsert_edge(make_edge(c, d, EdgeRelation::Extends, 3.0))
1077 .await
1078 .unwrap();
1079
1080 let request = TraversalRequest {
1081 roots: vec![a],
1082 options: TraversalOptions::new(2).with_direction(Direction::Out),
1083 include_roots: true,
1084 };
1085
1086 let paths = store.traverse(request).await.unwrap();
1087 assert_eq!(paths.len(), 1);
1088
1089 let path = &paths[0];
1090 let node_ids: Vec<Uuid> = path.nodes.iter().map(|n| n.node_id).collect();
1091 assert!(node_ids.contains(&a));
1092 assert!(node_ids.contains(&b));
1093 assert!(node_ids.contains(&c));
1094 assert!(!node_ids.contains(&d));
1095 }
1096
1097 #[tokio::test]
1101 async fn test_traverse_dedups_multipath_node() {
1102 let store = setup_memory_store();
1103
1104 let a = Uuid::new_v4();
1105 let b = Uuid::new_v4();
1106 let c = Uuid::new_v4();
1107 let d = Uuid::new_v4();
1108
1109 store
1110 .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1111 .await
1112 .unwrap();
1113 store
1114 .upsert_edge(make_edge(a, c, EdgeRelation::Extends, 1.0))
1115 .await
1116 .unwrap();
1117 store
1118 .upsert_edge(make_edge(b, d, EdgeRelation::Extends, 1.0))
1119 .await
1120 .unwrap();
1121 store
1122 .upsert_edge(make_edge(c, d, EdgeRelation::Extends, 1.0))
1123 .await
1124 .unwrap();
1125
1126 let request = TraversalRequest {
1127 roots: vec![a],
1128 options: TraversalOptions::new(3).with_direction(Direction::Out),
1129 include_roots: false,
1130 };
1131
1132 let paths = store.traverse(request).await.unwrap();
1133 assert_eq!(paths.len(), 1);
1134 let nodes = &paths[0].nodes;
1135
1136 let d_count = nodes.iter().filter(|n| n.node_id == d).count();
1138 assert_eq!(d_count, 1, "D must appear exactly once (dedup multi-path)");
1139
1140 assert_eq!(nodes.iter().filter(|n| n.node_id == b).count(), 1);
1142 assert_eq!(nodes.iter().filter(|n| n.node_id == c).count(), 1);
1143 }
1144
1145 #[tokio::test]
1157 async fn test_traverse_preserves_first_path_metadata() {
1158 let store = setup_memory_store();
1159
1160 let a = Uuid::new_v4();
1161 let b = Uuid::new_v4();
1162 let c = Uuid::new_v4();
1163 let d = Uuid::new_v4();
1164
1165 store
1166 .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1167 .await
1168 .unwrap();
1169 store
1170 .upsert_edge(make_edge(a, c, EdgeRelation::Extends, 1.0))
1171 .await
1172 .unwrap();
1173 store
1174 .upsert_edge(make_edge(b, d, EdgeRelation::Extends, 1.0))
1175 .await
1176 .unwrap();
1177 store
1178 .upsert_edge(make_edge(c, d, EdgeRelation::Extends, 1.0))
1179 .await
1180 .unwrap();
1181
1182 let make_request = || TraversalRequest {
1183 roots: vec![a],
1184 options: TraversalOptions::new(3).with_direction(Direction::Out),
1185 include_roots: false,
1186 };
1187
1188 let paths1 = store.traverse(make_request()).await.unwrap();
1189 let paths2 = store.traverse(make_request()).await.unwrap();
1190
1191 let count1 = paths1[0].nodes.len();
1193 let count2 = paths2[0].nodes.len();
1194 assert_eq!(
1195 count1, count2,
1196 "traverse result count must be stable across calls"
1197 );
1198
1199 let d_nodes: Vec<_> = paths1[0].nodes.iter().filter(|n| n.node_id == d).collect();
1201 assert_eq!(d_nodes.len(), 1, "D deduped to one entry");
1202 assert!(
1203 d_nodes[0].via_edge.is_some(),
1204 "kept entry must have a via_edge"
1205 );
1206 assert_eq!(d_nodes[0].depth, 2, "D lives at depth 2");
1207 }
1208
1209 #[tokio::test]
1210 async fn test_metadata_roundtrip() {
1211 let store = setup_memory_store();
1212
1213 let src = Uuid::new_v4();
1214 let tgt = Uuid::new_v4();
1215 let meta = serde_json::json!({"note": "important link", "confidence": 0.95});
1216 let now = Utc::now();
1217 let edge = Edge {
1218 id: Uuid::new_v4().into(),
1219 namespace: "default".to_string(),
1220 source_id: src,
1221 target_id: tgt,
1222 relation: EdgeRelation::Implements,
1223 weight: 0.9,
1224 created_at: now,
1225 updated_at: now,
1226 deleted_at: None,
1227 metadata: Some(meta.clone()),
1228 target_backend: None,
1229 };
1230 let edge_id = edge.id;
1231
1232 store.upsert_edge(edge).await.unwrap();
1233
1234 let fetched = store.get_edge(edge_id).await.unwrap().unwrap();
1235 assert_eq!(
1236 fetched.metadata.as_ref(),
1237 Some(&meta),
1238 "metadata must survive a write/read roundtrip via get_edge"
1239 );
1240
1241 let page = store
1243 .query_edges(EdgeFilter::default(), vec![], PageRequest::default())
1244 .await
1245 .unwrap();
1246 let from_query = page
1247 .items
1248 .iter()
1249 .find(|e| e.id == edge_id)
1250 .expect("edge must appear in query_edges result");
1251 assert_eq!(
1252 from_query.metadata.as_ref(),
1253 Some(&meta),
1254 "metadata must survive a write/read roundtrip via query_edges"
1255 );
1256 }
1257
1258 #[tokio::test]
1259 async fn test_upsert_edges_batch() {
1260 let store = setup_memory_store();
1261
1262 let edges: Vec<Edge> = (0..10)
1263 .map(|i| {
1264 make_edge(
1265 Uuid::new_v4(),
1266 Uuid::new_v4(),
1267 EdgeRelation::Implements,
1268 i as f64,
1269 )
1270 })
1271 .collect();
1272
1273 let summary = store.upsert_edges(edges).await.unwrap();
1274 assert_eq!(summary.attempted, 10);
1275 assert_eq!(summary.affected, 10);
1276 assert_eq!(summary.failed, 0);
1277
1278 assert_eq!(store.count_edges(EdgeFilter::default()).await.unwrap(), 10);
1279 }
1280
1281 #[tokio::test]
1284 async fn graph_duplicate_edges_ignored() {
1285 let store = setup_memory_store();
1286
1287 let src = Uuid::new_v4();
1288 let tgt = Uuid::new_v4();
1289
1290 let now = Utc::now();
1292 let edge1 = Edge {
1293 id: Uuid::new_v4().into(),
1294 namespace: "default".to_string(),
1295 source_id: src,
1296 target_id: tgt,
1297 relation: EdgeRelation::Extends,
1298 weight: 1.0,
1299 created_at: now,
1300 updated_at: now,
1301 deleted_at: None,
1302 metadata: None,
1303 target_backend: None,
1304 };
1305 let edge2 = Edge {
1306 id: Uuid::new_v4().into(),
1307 namespace: "default".to_string(),
1308 source_id: src,
1309 target_id: tgt,
1310 relation: EdgeRelation::Extends,
1311 weight: 0.5,
1312 created_at: now,
1313 updated_at: now,
1314 deleted_at: None,
1315 metadata: None,
1316 target_backend: None,
1317 };
1318
1319 store.upsert_edge(edge1).await.unwrap();
1320 store.upsert_edge(edge2).await.unwrap();
1321
1322 assert_eq!(
1323 store.count_edges(EdgeFilter::default()).await.unwrap(),
1324 1,
1325 "duplicate (source, target, relation) triple must be ignored; only one edge must exist"
1326 );
1327 }
1328
1329 #[tokio::test]
1332 async fn graph_duplicate_edges_refresh_existing_row() {
1333 let store = setup_memory_store();
1334 let src = Uuid::new_v4();
1335 let tgt = Uuid::new_v4();
1336
1337 let now = Utc::now();
1338 let edge1 = Edge {
1339 id: Uuid::new_v4().into(),
1340 namespace: "default".to_string(),
1341 source_id: src,
1342 target_id: tgt,
1343 relation: EdgeRelation::Extends,
1344 weight: 1.0,
1345 created_at: now,
1346 updated_at: now,
1347 deleted_at: None,
1348 metadata: None,
1349 target_backend: None,
1350 };
1351 let edge2 = Edge {
1352 id: Uuid::new_v4().into(),
1353 namespace: "default".to_string(),
1354 source_id: src,
1355 target_id: tgt,
1356 relation: EdgeRelation::Extends,
1357 weight: 0.5,
1358 created_at: now,
1359 updated_at: now,
1360 deleted_at: None,
1361 metadata: None,
1362 target_backend: None,
1363 };
1364
1365 store.upsert_edge(edge1).await.unwrap();
1366 store.upsert_edge(edge2).await.unwrap();
1367
1368 let edges = store
1369 .query_edges(EdgeFilter::default(), vec![], PageRequest::default())
1370 .await
1371 .unwrap();
1372 assert_eq!(
1373 edges.items.len(),
1374 1,
1375 "duplicate natural key must collapse to one row"
1376 );
1377 assert!(
1378 (edges.items[0].weight - 0.5).abs() < 0.001,
1379 "F053: natural-key conflict must DO UPDATE (weight=0.5 from second upsert); \
1380 current DO NOTHING keeps stale weight={}",
1381 edges.items[0].weight
1382 );
1383 }
1384
1385 #[tokio::test]
1388 async fn upsert_edge_canonicalizes_symmetric_relation() {
1389 let store = setup_memory_store();
1390
1391 let smaller = Uuid::from_bytes([0x00; 16]);
1393 let larger = Uuid::from_bytes([0xff; 16]);
1394 assert!(
1395 larger > smaller,
1396 "test setup: larger must sort after smaller"
1397 );
1398
1399 let edge = make_edge(larger, smaller, EdgeRelation::CompetesWith, 1.0);
1401 let edge_id = edge.id;
1402 store.upsert_edge(edge).await.unwrap();
1403
1404 let stored = store.get_edge(edge_id).await.unwrap().unwrap();
1405 assert_eq!(
1406 stored.source_id, smaller,
1407 "#476: CompetesWith edge must be stored with source_id < target_id"
1408 );
1409 assert_eq!(
1410 stored.target_id, larger,
1411 "#476: CompetesWith edge must be stored with target_id > source_id"
1412 );
1413 }
1414
1415 #[tokio::test]
1416 async fn upsert_edges_batch_canonicalizes_symmetric_relation() {
1417 let store = setup_memory_store();
1418
1419 let smaller = Uuid::from_bytes([0x11; 16]);
1420 let larger = Uuid::from_bytes([0xee; 16]);
1421
1422 let edge = make_edge(larger, smaller, EdgeRelation::ComposedWith, 0.9);
1424 let edge_id = edge.id;
1425 store.upsert_edges(vec![edge]).await.unwrap();
1426
1427 let stored = store.get_edge(edge_id).await.unwrap().unwrap();
1428 assert_eq!(
1429 stored.source_id, smaller,
1430 "#476: ComposedWith edge must be stored with source_id < target_id (batch path)"
1431 );
1432 assert_eq!(
1433 stored.target_id, larger,
1434 "#476: ComposedWith edge must be stored with target_id > source_id (batch path)"
1435 );
1436 }
1437
1438 #[tokio::test]
1439 async fn upsert_edge_non_symmetric_relation_preserves_direction() {
1440 let store = setup_memory_store();
1441
1442 let src = Uuid::from_bytes([0xff; 16]);
1444 let tgt = Uuid::from_bytes([0x00; 16]);
1445 let edge = make_edge(src, tgt, EdgeRelation::DependsOn, 1.0);
1446 let edge_id = edge.id;
1447 store.upsert_edge(edge).await.unwrap();
1448
1449 let stored = store.get_edge(edge_id).await.unwrap().unwrap();
1450 assert_eq!(
1451 stored.source_id, src,
1452 "non-symmetric edge direction must be preserved"
1453 );
1454 assert_eq!(
1455 stored.target_id, tgt,
1456 "non-symmetric edge direction must be preserved"
1457 );
1458 }
1459}