Skip to main content

khive_db/stores/
graph.rs

1//! SQL-backed `GraphStore` implementation.
2//!
3//! `SqlGraphStore` stores graph edges in a regular SQLite table.
4//! Traversal uses recursive CTEs for multi-hop queries.
5//!
6//! # Connection strategy
7//!
8//! - **File-backed**: Opens standalone connections per operation.
9//! - **In-memory**: Acquires pool connections per operation via `spawn_blocking`.
10
11use std::sync::Arc;
12
13use async_trait::async_trait;
14use chrono::{DateTime, TimeZone, Utc};
15use uuid::Uuid;
16
17use khive_storage::error::StorageError;
18use khive_storage::types::{
19    BatchWriteSummary, DeleteMode, Edge, EdgeFilter, EdgeSortField, GraphPath, NeighborHit,
20    NeighborQuery, Page, PageRequest, PathNode, SortDirection, SortOrder, TraversalRequest,
21};
22use khive_storage::GraphStore;
23use khive_storage::LinkId;
24use khive_storage::StorageCapability;
25use khive_types::EdgeRelation;
26
27use crate::error::SqliteError;
28use crate::pool::ConnectionPool;
29
30/// Map a rusqlite error to `StorageError` with `Graph` capability.
31fn map_err(e: rusqlite::Error, op: &'static str) -> StorageError {
32    StorageError::driver(StorageCapability::Graph, op, e)
33}
34
35fn map_sqlite_err(e: SqliteError, op: &'static str) -> StorageError {
36    StorageError::driver(StorageCapability::Graph, op, e)
37}
38
39/// A GraphStore backed by SQLite tables.
40pub struct SqlGraphStore {
41    pool: Arc<ConnectionPool>,
42    is_file_backed: bool,
43    namespace: String,
44}
45
46impl SqlGraphStore {
47    /// Create a new store scoped to one namespace.
48    pub fn new_scoped(
49        pool: Arc<ConnectionPool>,
50        is_file_backed: bool,
51        namespace: impl Into<String>,
52    ) -> Self {
53        Self {
54            pool,
55            is_file_backed,
56            namespace: namespace.into(),
57        }
58    }
59
60    fn open_standalone_writer(&self) -> Result<rusqlite::Connection, StorageError> {
61        let config = self.pool.config();
62        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
63            operation: "graph_writer".into(),
64            message: "in-memory databases do not support standalone connections".into(),
65        })?;
66
67        let conn = rusqlite::Connection::open_with_flags(
68            path,
69            rusqlite::OpenFlags::SQLITE_OPEN_READ_WRITE
70                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
71                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
72        )
73        .map_err(|e| map_err(e, "open_graph_writer"))?;
74
75        conn.busy_timeout(config.busy_timeout)
76            .map_err(|e| map_err(e, "open_graph_writer"))?;
77        conn.pragma_update(None, "foreign_keys", "ON")
78            .map_err(|e| map_err(e, "open_graph_writer"))?;
79        conn.pragma_update(None, "synchronous", "NORMAL")
80            .map_err(|e| map_err(e, "open_graph_writer"))?;
81
82        Ok(conn)
83    }
84
85    fn open_standalone_reader(&self) -> Result<rusqlite::Connection, StorageError> {
86        let config = self.pool.config();
87        let path = config.path.as_ref().ok_or_else(|| StorageError::Pool {
88            operation: "graph_reader".into(),
89            message: "in-memory databases do not support standalone connections".into(),
90        })?;
91
92        let conn = rusqlite::Connection::open_with_flags(
93            path,
94            rusqlite::OpenFlags::SQLITE_OPEN_READ_ONLY
95                | rusqlite::OpenFlags::SQLITE_OPEN_NO_MUTEX
96                | rusqlite::OpenFlags::SQLITE_OPEN_URI,
97        )
98        .map_err(|e| map_err(e, "open_graph_reader"))?;
99
100        conn.busy_timeout(config.busy_timeout)
101            .map_err(|e| map_err(e, "open_graph_reader"))?;
102        conn.pragma_update(None, "foreign_keys", "ON")
103            .map_err(|e| map_err(e, "open_graph_reader"))?;
104        conn.pragma_update(None, "synchronous", "NORMAL")
105            .map_err(|e| map_err(e, "open_graph_reader"))?;
106
107        Ok(conn)
108    }
109
110    async fn with_writer<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
111    where
112        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
113        R: Send + 'static,
114    {
115        if self.is_file_backed {
116            let conn = self.open_standalone_writer()?;
117            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
118                .await
119                .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
120        } else {
121            let pool = Arc::clone(&self.pool);
122            tokio::task::spawn_blocking(move || {
123                let guard = pool.try_writer().map_err(|e| map_sqlite_err(e, op))?;
124                f(guard.conn()).map_err(|e| map_err(e, op))
125            })
126            .await
127            .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
128        }
129    }
130
131    async fn with_reader<F, R>(&self, op: &'static str, f: F) -> Result<R, StorageError>
132    where
133        F: FnOnce(&rusqlite::Connection) -> Result<R, rusqlite::Error> + Send + 'static,
134        R: Send + 'static,
135    {
136        if self.is_file_backed {
137            let conn = self.open_standalone_reader()?;
138            tokio::task::spawn_blocking(move || f(&conn).map_err(|e| map_err(e, op)))
139                .await
140                .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
141        } else {
142            let pool = Arc::clone(&self.pool);
143            tokio::task::spawn_blocking(move || {
144                let guard = pool.reader().map_err(|e| map_sqlite_err(e, op))?;
145                f(guard.conn()).map_err(|e| map_err(e, op))
146            })
147            .await
148            .map_err(|e| StorageError::driver(StorageCapability::Graph, op, e))?
149        }
150    }
151}
152
153// =============================================================================
154// Helpers
155// =============================================================================
156
157fn read_edge(row: &rusqlite::Row<'_>) -> Result<Edge, rusqlite::Error> {
158    let namespace: String = row.get(0)?;
159    let id_str: String = row.get(1)?;
160    let source_str: String = row.get(2)?;
161    let target_str: String = row.get(3)?;
162    let relation_str: String = row.get(4)?;
163    let weight: f64 = row.get(5)?;
164    let created_micros: i64 = row.get(6)?;
165    let updated_micros: i64 = row.get(7)?;
166    let deleted_micros: Option<i64> = row.get(8)?;
167    let metadata_str: Option<String> = row.get(9)?;
168    let target_backend: Option<String> = row.get(10)?;
169
170    let id = parse_uuid(&id_str)?;
171    let source_id = parse_uuid(&source_str)?;
172    let target_id = parse_uuid(&target_str)?;
173    let created_at = micros_to_datetime(created_micros);
174    let relation = relation_str.parse::<EdgeRelation>().map_err(|e| {
175        rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(e))
176    })?;
177    let metadata = match metadata_str {
178        Some(s) => {
179            let v = serde_json::from_str(&s).map_err(|e| {
180                rusqlite::Error::FromSqlConversionFailure(
181                    9,
182                    rusqlite::types::Type::Text,
183                    Box::new(e),
184                )
185            })?;
186            Some(v)
187        }
188        None => None,
189    };
190
191    Ok(Edge {
192        id: id.into(),
193        namespace,
194        source_id,
195        target_id,
196        relation,
197        weight,
198        created_at,
199        updated_at: micros_to_datetime(updated_micros),
200        deleted_at: deleted_micros.map(micros_to_datetime),
201        metadata,
202        target_backend,
203    })
204}
205
206fn parse_uuid(s: &str) -> Result<Uuid, rusqlite::Error> {
207    Uuid::parse_str(s).map_err(|e| {
208        rusqlite::Error::FromSqlConversionFailure(0, rusqlite::types::Type::Text, Box::new(e))
209    })
210}
211
212fn micros_to_datetime(micros: i64) -> DateTime<Utc> {
213    Utc.timestamp_micros(micros)
214        .single()
215        .unwrap_or_else(Utc::now)
216}
217
218fn build_edge_filter_sql(
219    namespace: &str,
220    filter: &EdgeFilter,
221) -> (String, Vec<Box<dyn rusqlite::types::ToSql>>) {
222    let mut conditions: Vec<String> = vec![
223        "namespace = ?1".to_string(),
224        "deleted_at IS NULL".to_string(),
225    ];
226    let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = vec![Box::new(namespace.to_string())];
227
228    if !filter.ids.is_empty() {
229        let placeholders: Vec<String> = filter
230            .ids
231            .iter()
232            .map(|id| {
233                params.push(Box::new(id.to_string()));
234                format!("?{}", params.len())
235            })
236            .collect();
237        conditions.push(format!("id IN ({})", placeholders.join(",")));
238    }
239
240    if !filter.source_ids.is_empty() {
241        let placeholders: Vec<String> = filter
242            .source_ids
243            .iter()
244            .map(|id| {
245                params.push(Box::new(id.to_string()));
246                format!("?{}", params.len())
247            })
248            .collect();
249        conditions.push(format!("source_id IN ({})", placeholders.join(",")));
250    }
251
252    if !filter.target_ids.is_empty() {
253        let placeholders: Vec<String> = filter
254            .target_ids
255            .iter()
256            .map(|id| {
257                params.push(Box::new(id.to_string()));
258                format!("?{}", params.len())
259            })
260            .collect();
261        conditions.push(format!("target_id IN ({})", placeholders.join(",")));
262    }
263
264    if !filter.relations.is_empty() {
265        let placeholders: Vec<String> = filter
266            .relations
267            .iter()
268            .map(|r| {
269                params.push(Box::new(r.to_string()));
270                format!("?{}", params.len())
271            })
272            .collect();
273        conditions.push(format!("relation IN ({})", placeholders.join(",")));
274    }
275
276    if let Some(min_w) = filter.min_weight {
277        params.push(Box::new(min_w));
278        conditions.push(format!("weight >= ?{}", params.len()));
279    }
280
281    if let Some(max_w) = filter.max_weight {
282        params.push(Box::new(max_w));
283        conditions.push(format!("weight <= ?{}", params.len()));
284    }
285
286    if let Some(ref time_range) = filter.created_at {
287        if let Some(start) = time_range.start {
288            params.push(Box::new(start.timestamp_micros()));
289            conditions.push(format!("created_at >= ?{}", params.len()));
290        }
291        if let Some(end) = time_range.end {
292            params.push(Box::new(end.timestamp_micros()));
293            conditions.push(format!("created_at < ?{}", params.len()));
294        }
295    }
296
297    let clause = format!(" WHERE {}", conditions.join(" AND "));
298    (clause, params)
299}
300
301fn edge_sort_col(field: &EdgeSortField) -> &'static str {
302    match field {
303        EdgeSortField::CreatedAt => "created_at",
304        EdgeSortField::Weight => "weight",
305        EdgeSortField::Relation => "relation",
306    }
307}
308
309// =============================================================================
310// GraphStore implementation
311// =============================================================================
312
313/// Canonical endpoint order for symmetric relations (F012).
314///
315/// For `competes_with` and `composed_with`, ensures `source_uuid < target_uuid`
316/// so A→B and B→A collapse to a single canonical row in storage.
317fn canonical_edge_endpoints(
318    relation: EdgeRelation,
319    source_id: Uuid,
320    target_id: Uuid,
321) -> (Uuid, Uuid) {
322    if relation.is_symmetric() && target_id < source_id {
323        (target_id, source_id)
324    } else {
325        (source_id, target_id)
326    }
327}
328
329#[async_trait]
330impl GraphStore for SqlGraphStore {
331    async fn upsert_edge(&self, edge: Edge) -> Result<(), StorageError> {
332        let namespace = self.namespace.clone();
333        if edge.namespace != namespace {
334            return Err(StorageError::InvalidInput {
335                capability: StorageCapability::Graph,
336                operation: "upsert_edge".into(),
337                message: format!(
338                    "edge namespace {:?} does not match store namespace {:?}",
339                    edge.namespace, namespace
340                ),
341            });
342        }
343        let id_str = Uuid::from(edge.id).to_string();
344        let (source_id, target_id) =
345            canonical_edge_endpoints(edge.relation, edge.source_id, edge.target_id);
346        let src_str = source_id.to_string();
347        let tgt_str = target_id.to_string();
348        let relation_str = edge.relation.to_string();
349        let metadata_str = edge
350            .metadata
351            .as_ref()
352            .map(serde_json::to_string)
353            .transpose()
354            .map_err(|e| StorageError::driver(StorageCapability::Graph, "upsert_edge", e))?;
355        self.with_writer("upsert_edge", move |conn| {
356            conn.execute(
357                "INSERT INTO graph_edges \
358                 (namespace, id, source_id, target_id, relation, weight, \
359                  created_at, updated_at, deleted_at, metadata, target_backend) \
360                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
361                 ON CONFLICT(namespace, id) DO UPDATE SET \
362                     source_id = excluded.source_id, \
363                     target_id = excluded.target_id, \
364                     relation = excluded.relation, \
365                     weight = excluded.weight, \
366                     updated_at = excluded.updated_at, \
367                     deleted_at = NULL, \
368                     metadata = excluded.metadata, \
369                     target_backend = excluded.target_backend \
370                 ON CONFLICT(namespace, source_id, target_id, relation) DO UPDATE SET \
371                     weight = excluded.weight, \
372                     updated_at = excluded.updated_at, \
373                     deleted_at = NULL, \
374                     metadata = excluded.metadata, \
375                     target_backend = excluded.target_backend",
376                rusqlite::params![
377                    namespace,
378                    id_str,
379                    src_str,
380                    tgt_str,
381                    relation_str,
382                    edge.weight,
383                    edge.created_at.timestamp_micros(),
384                    edge.updated_at.timestamp_micros(),
385                    edge.deleted_at.map(|t| t.timestamp_micros()),
386                    metadata_str,
387                    edge.target_backend,
388                ],
389            )?;
390            Ok(())
391        })
392        .await
393    }
394
395    async fn upsert_edges(&self, edges: Vec<Edge>) -> Result<BatchWriteSummary, StorageError> {
396        let attempted = edges.len() as u64;
397        let namespace = self.namespace.clone();
398
399        // Validate namespaces before acquiring writer.
400        for edge in &edges {
401            if edge.namespace != namespace {
402                return Err(StorageError::InvalidInput {
403                    capability: StorageCapability::Graph,
404                    operation: "upsert_edges".into(),
405                    message: format!(
406                        "edge namespace {:?} does not match store namespace {:?}",
407                        edge.namespace, namespace
408                    ),
409                });
410            }
411        }
412
413        self.with_writer("upsert_edges", move |conn| {
414            conn.execute_batch("BEGIN IMMEDIATE")?;
415            let mut affected = 0u64;
416
417            for edge in &edges {
418                let id_str = Uuid::from(edge.id).to_string();
419                let (canon_src, canon_tgt) =
420                    canonical_edge_endpoints(edge.relation, edge.source_id, edge.target_id);
421                let src_str = canon_src.to_string();
422                let tgt_str = canon_tgt.to_string();
423                let relation_str = edge.relation.to_string();
424                let metadata_str = edge
425                    .metadata
426                    .as_ref()
427                    .map(serde_json::to_string)
428                    .transpose()
429                    .map_err(|e| rusqlite::Error::ToSqlConversionFailure(Box::new(e)))?;
430                if let Err(e) = conn.execute(
431                    "INSERT INTO graph_edges \
432                     (namespace, id, source_id, target_id, relation, weight, \
433                      created_at, updated_at, deleted_at, metadata, target_backend) \
434                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11) \
435                     ON CONFLICT(namespace, id) DO UPDATE SET \
436                         source_id = excluded.source_id, \
437                         target_id = excluded.target_id, \
438                         relation = excluded.relation, \
439                         weight = excluded.weight, \
440                         updated_at = excluded.updated_at, \
441                         deleted_at = NULL, \
442                         metadata = excluded.metadata, \
443                         target_backend = excluded.target_backend \
444                     ON CONFLICT(namespace, source_id, target_id, relation) DO UPDATE SET \
445                         weight = excluded.weight, \
446                         updated_at = excluded.updated_at, \
447                         deleted_at = NULL, \
448                         metadata = excluded.metadata, \
449                         target_backend = excluded.target_backend",
450                    rusqlite::params![
451                        &namespace,
452                        id_str,
453                        src_str,
454                        tgt_str,
455                        relation_str,
456                        edge.weight,
457                        edge.created_at.timestamp_micros(),
458                        edge.updated_at.timestamp_micros(),
459                        edge.deleted_at.map(|t| t.timestamp_micros()),
460                        metadata_str,
461                        edge.target_backend.as_deref(),
462                    ],
463                ) {
464                    let _ = conn.execute_batch("ROLLBACK");
465                    return Err(e);
466                }
467                affected += 1;
468            }
469
470            if let Err(e) = conn.execute_batch("COMMIT") {
471                let _ = conn.execute_batch("ROLLBACK");
472                return Err(e);
473            }
474            Ok(BatchWriteSummary {
475                attempted,
476                affected,
477                failed: 0,
478                first_error: String::new(),
479            })
480        })
481        .await
482    }
483
484    async fn get_edge(&self, id: LinkId) -> Result<Option<Edge>, StorageError> {
485        let namespace = self.namespace.clone();
486        let id_str = Uuid::from(id).to_string();
487
488        self.with_reader("get_edge", move |conn| {
489            let mut stmt = conn.prepare(
490                "SELECT namespace, id, source_id, target_id, relation, weight, \
491                        created_at, updated_at, deleted_at, metadata, target_backend \
492                 FROM graph_edges WHERE namespace = ?1 AND id = ?2 AND deleted_at IS NULL",
493            )?;
494            let mut rows = stmt.query(rusqlite::params![namespace, id_str])?;
495            match rows.next()? {
496                Some(row) => Ok(Some(read_edge(row)?)),
497                None => Ok(None),
498            }
499        })
500        .await
501    }
502
503    async fn delete_edge(&self, id: LinkId, mode: DeleteMode) -> Result<bool, StorageError> {
504        let namespace = self.namespace.clone();
505        let id_str = Uuid::from(id).to_string();
506
507        self.with_writer("delete_edge", move |conn| {
508            let affected = match mode {
509                DeleteMode::Soft => conn.execute(
510                    "UPDATE graph_edges SET deleted_at = ?3, updated_at = ?3 \
511                     WHERE namespace = ?1 AND id = ?2 AND deleted_at IS NULL",
512                    rusqlite::params![namespace, id_str, chrono::Utc::now().timestamp_micros(),],
513                )?,
514                DeleteMode::Hard => conn.execute(
515                    "DELETE FROM graph_edges WHERE namespace = ?1 AND id = ?2",
516                    rusqlite::params![namespace, id_str],
517                )?,
518            };
519            Ok(affected > 0)
520        })
521        .await
522    }
523
524    async fn query_edges(
525        &self,
526        filter: EdgeFilter,
527        sort: Vec<SortOrder<EdgeSortField>>,
528        page: PageRequest,
529    ) -> Result<Page<Edge>, StorageError> {
530        let namespace = self.namespace.clone();
531        self.with_reader("query_edges", move |conn| {
532            let (where_clause, filter_params) = build_edge_filter_sql(&namespace, &filter);
533
534            let count_sql = format!("SELECT COUNT(*) FROM graph_edges{}", where_clause);
535            let total: i64 = {
536                let mut stmt = conn.prepare(&count_sql)?;
537                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
538                    filter_params.iter().map(|p| p.as_ref()).collect();
539                stmt.query_row(param_refs.as_slice(), |row| row.get(0))?
540            };
541
542            let order_clause = if sort.is_empty() {
543                " ORDER BY created_at DESC".to_string()
544            } else {
545                let parts: Vec<String> = sort
546                    .iter()
547                    .map(|s| {
548                        let dir = match s.direction {
549                            SortDirection::Asc => "ASC",
550                            SortDirection::Desc => "DESC",
551                        };
552                        format!("{} {}", edge_sort_col(&s.field), dir)
553                    })
554                    .collect();
555                format!(" ORDER BY {}", parts.join(", "))
556            };
557
558            let (_, data_filter_params) = build_edge_filter_sql(&namespace, &filter);
559            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = data_filter_params;
560            all_params.push(Box::new(page.limit as i64));
561            all_params.push(Box::new(page.offset as i64));
562
563            let limit_idx = all_params.len() - 1;
564            let offset_idx = all_params.len();
565
566            let data_sql = format!(
567                "SELECT namespace, id, source_id, target_id, relation, weight, \
568                        created_at, updated_at, deleted_at, metadata, target_backend \
569                 FROM graph_edges{}{} LIMIT ?{} OFFSET ?{}",
570                where_clause, order_clause, limit_idx, offset_idx,
571            );
572
573            let mut stmt = conn.prepare(&data_sql)?;
574            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
575                all_params.iter().map(|p| p.as_ref()).collect();
576            let rows = stmt.query_map(param_refs.as_slice(), read_edge)?;
577
578            let mut items = Vec::new();
579            for row in rows {
580                items.push(row?);
581            }
582
583            Ok(Page {
584                items,
585                total: Some(total as u64),
586            })
587        })
588        .await
589    }
590
591    async fn count_edges(&self, filter: EdgeFilter) -> Result<u64, StorageError> {
592        let namespace = self.namespace.clone();
593        self.with_reader("count_edges", move |conn| {
594            let (where_clause, params) = build_edge_filter_sql(&namespace, &filter);
595            let sql = format!("SELECT COUNT(*) FROM graph_edges{}", where_clause);
596            let mut stmt = conn.prepare(&sql)?;
597            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
598                params.iter().map(|p| p.as_ref()).collect();
599            let count: i64 = stmt.query_row(param_refs.as_slice(), |row| row.get(0))?;
600            Ok(count as u64)
601        })
602        .await
603    }
604
605    async fn neighbors(
606        &self,
607        node_id: Uuid,
608        query: NeighborQuery,
609    ) -> Result<Vec<NeighborHit>, StorageError> {
610        use khive_storage::types::Direction;
611
612        let namespace = self.namespace.clone();
613        let node_str = node_id.to_string();
614
615        self.with_reader("neighbors", move |conn| {
616            let base_out = "SELECT target_id AS node_id, id AS edge_id, relation, weight \
617                            FROM graph_edges \
618                            WHERE namespace = ?1 AND source_id = ?2 AND deleted_at IS NULL";
619            let base_in = "SELECT source_id AS node_id, id AS edge_id, relation, weight \
620                           FROM graph_edges \
621                           WHERE namespace = ?1 AND target_id = ?2 AND deleted_at IS NULL";
622
623            let sql = match query.direction {
624                Direction::Out => base_out.to_string(),
625                Direction::In => base_in.to_string(),
626                Direction::Both => format!("{} UNION ALL {}", base_out, base_in),
627            };
628
629            let mut conditions: Vec<String> = Vec::new();
630            let mut extra_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
631            let mut param_idx = 3;
632
633            if let Some(ref rels) = query.relations {
634                if !rels.is_empty() {
635                    let placeholders: Vec<String> = rels
636                        .iter()
637                        .map(|r| {
638                            extra_params.push(Box::new(r.to_string()));
639                            let p = format!("?{}", param_idx);
640                            param_idx += 1;
641                            p
642                        })
643                        .collect();
644                    conditions.push(format!("relation IN ({})", placeholders.join(",")));
645                }
646            }
647
648            if let Some(min_w) = query.min_weight {
649                extra_params.push(Box::new(min_w));
650                conditions.push(format!("weight >= ?{}", param_idx));
651                param_idx += 1;
652            }
653
654            let where_extra = if conditions.is_empty() {
655                String::new()
656            } else {
657                format!(" WHERE {}", conditions.join(" AND "))
658            };
659
660            let limit_clause = if let Some(lim) = query.limit {
661                extra_params.push(Box::new(lim as i64));
662                format!(" LIMIT ?{}", param_idx)
663            } else {
664                String::new()
665            };
666
667            let full_sql = format!(
668                "SELECT node_id, edge_id, relation, weight FROM ({}){}{}",
669                sql, where_extra, limit_clause
670            );
671
672            let mut stmt = conn.prepare(&full_sql)?;
673
674            let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
675            all_params.push(Box::new(namespace.clone()));
676            all_params.push(Box::new(node_str.clone()));
677            all_params.extend(extra_params);
678
679            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
680                all_params.iter().map(|p| p.as_ref()).collect();
681
682            let rows = stmt.query_map(param_refs.as_slice(), |row| {
683                let nid_str: String = row.get(0)?;
684                let eid_str: String = row.get(1)?;
685                let relation_str: String = row.get(2)?;
686                let weight: f64 = row.get(3)?;
687                Ok((nid_str, eid_str, relation_str, weight))
688            })?;
689
690            let mut hits = Vec::new();
691            for row in rows {
692                let (nid_str, eid_str, relation_str, weight) = row?;
693                let relation = relation_str.parse::<EdgeRelation>().map_err(|e| {
694                    rusqlite::Error::FromSqlConversionFailure(
695                        2,
696                        rusqlite::types::Type::Text,
697                        Box::new(e),
698                    )
699                })?;
700                hits.push(NeighborHit {
701                    node_id: parse_uuid(&nid_str)?,
702                    edge_id: parse_uuid(&eid_str)?,
703                    relation,
704                    weight,
705                    name: None,
706                    kind: None,
707                });
708            }
709
710            Ok(hits)
711        })
712        .await
713    }
714
715    async fn traverse(&self, request: TraversalRequest) -> Result<Vec<GraphPath>, StorageError> {
716        use khive_storage::types::Direction;
717
718        if request.roots.is_empty() {
719            return Ok(Vec::new());
720        }
721
722        let roots = request.roots.clone();
723        let opts = request.options.clone();
724        let include_roots = request.include_roots;
725        let namespace = self.namespace.clone();
726
727        self.with_reader("traverse", move |conn| {
728            let mut all_paths: Vec<GraphPath> = Vec::new();
729
730            for root_id in &roots {
731                let root_str = root_id.to_string();
732
733                let (join_condition, next_node) = match opts.direction {
734                    Direction::Out => ("e.source_id = t.node_id", "e.target_id"),
735                    Direction::In => ("e.target_id = t.node_id", "e.source_id"),
736                    Direction::Both => (
737                        "(e.source_id = t.node_id OR e.target_id = t.node_id)",
738                        "CASE WHEN e.source_id = t.node_id THEN e.target_id ELSE e.source_id END",
739                    ),
740                };
741
742                let mut relation_cond = String::new();
743                let mut relation_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
744                let mut param_idx = 4;
745
746                if let Some(ref rels) = opts.relations {
747                    if !rels.is_empty() {
748                        let placeholders: Vec<String> = rels
749                            .iter()
750                            .map(|r| {
751                                relation_params.push(Box::new(r.to_string()));
752                                let p = format!("?{}", param_idx);
753                                param_idx += 1;
754                                p
755                            })
756                            .collect();
757                        relation_cond =
758                            format!(" AND e.relation IN ({})", placeholders.join(","));
759                    }
760                }
761
762                let mut weight_cond = String::new();
763                if let Some(min_w) = opts.min_weight {
764                    relation_params.push(Box::new(min_w));
765                    weight_cond = format!(" AND e.weight >= ?{}", param_idx);
766                    param_idx += 1;
767                }
768
769                let limit_clause = if let Some(lim) = opts.limit {
770                    relation_params.push(Box::new(lim as i64));
771                    format!(" LIMIT ?{}", param_idx)
772                } else {
773                    String::new()
774                };
775
776                let cte_sql = format!(
777                    "WITH RECURSIVE traversal(node_id, edge_id, depth, path, total_weight) AS (\
778                         SELECT ?2, NULL, 0, ?2, 0.0 \
779                         UNION ALL \
780                         SELECT {next_node}, e.id, t.depth + 1, \
781                                t.path || ',' || {next_node}, \
782                                t.total_weight + e.weight \
783                         FROM graph_edges e \
784                         JOIN traversal t ON {join_condition} \
785                         WHERE e.namespace = ?1 \
786                           AND e.deleted_at IS NULL \
787                           AND t.depth < ?3 \
788                           AND (',' || t.path || ',') NOT LIKE '%,' || {next_node} || ',%'{rel_cond}{wt_cond} \
789                     ) \
790                     SELECT node_id, edge_id, depth, path, total_weight \
791                     FROM traversal WHERE depth > 0 \
792                     ORDER BY depth{limit}",
793                    next_node = next_node,
794                    join_condition = join_condition,
795                    rel_cond = relation_cond,
796                    wt_cond = weight_cond,
797                    limit = limit_clause,
798                );
799
800                let mut stmt = conn.prepare(&cte_sql)?;
801
802                let mut all_params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
803                all_params.push(Box::new(namespace.clone()));
804                all_params.push(Box::new(root_str.clone()));
805                all_params.push(Box::new(opts.max_depth as i64));
806                all_params.extend(relation_params);
807
808                let param_refs: Vec<&dyn rusqlite::types::ToSql> =
809                    all_params.iter().map(|p| p.as_ref()).collect();
810
811                let rows = stmt.query_map(param_refs.as_slice(), |row| {
812                    let node_str: String = row.get(0)?;
813                    let edge_str: Option<String> = row.get(1)?;
814                    let depth: i64 = row.get(2)?;
815                    let _path: String = row.get(3)?;
816                    let total_weight: f64 = row.get(4)?;
817                    Ok((node_str, edge_str, depth, total_weight))
818                })?;
819
820                let mut nodes = Vec::new();
821                let mut max_weight = 0.0f64;
822                // Track visited node IDs to deduplicate multi-path reachability
823                // (#285). Rows are ordered by depth (shallowest first), so the
824                // first occurrence is the BFS-order first-visit — that is the
825                // one we keep.
826                let mut seen: std::collections::HashSet<Uuid> =
827                    std::collections::HashSet::new();
828
829                if include_roots {
830                    seen.insert(*root_id);
831                    nodes.push(PathNode {
832                        node_id: *root_id,
833                        via_edge: None,
834                        depth: 0,
835                        name: None,
836                        kind: None,
837                    });
838                }
839
840                for row in rows {
841                    let (node_str, edge_str, depth, total_weight) = row?;
842                    let node_id = parse_uuid(&node_str)?;
843                    // Skip nodes already seen via an earlier (shallower) path.
844                    if !seen.insert(node_id) {
845                        continue;
846                    }
847                    let via_edge = edge_str.map(|s| parse_uuid(&s)).transpose()?;
848                    nodes.push(PathNode {
849                        node_id,
850                        via_edge,
851                        depth: depth as usize,
852                        name: None,
853                        kind: None,
854                    });
855                    if total_weight > max_weight {
856                        max_weight = total_weight;
857                    }
858                }
859
860                if nodes.len() > if include_roots { 1 } else { 0 } || include_roots {
861                    all_paths.push(GraphPath {
862                        root_id: *root_id,
863                        nodes,
864                        total_weight: max_weight,
865                    });
866                }
867            }
868
869            Ok(all_paths)
870        })
871        .await
872    }
873}
874
875// =============================================================================
876// DDL
877// =============================================================================
878
879const GRAPH_DDL: &str = "\
880    CREATE TABLE IF NOT EXISTS graph_edges (\
881        namespace TEXT NOT NULL,\
882        id TEXT NOT NULL,\
883        source_id TEXT NOT NULL,\
884        target_id TEXT NOT NULL,\
885        relation TEXT NOT NULL,\
886        weight REAL NOT NULL DEFAULT 1.0,\
887        created_at INTEGER NOT NULL,\
888        updated_at INTEGER NOT NULL,\
889        deleted_at INTEGER,\
890        metadata TEXT,\
891        target_backend TEXT,\
892        PRIMARY KEY (namespace, id)\
893    );\
894    CREATE UNIQUE INDEX IF NOT EXISTS idx_graph_edges_unique_triple ON graph_edges(namespace, source_id, target_id, relation);\
895    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_source ON graph_edges(namespace, source_id);\
896    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_target ON graph_edges(namespace, target_id);\
897    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_relation ON graph_edges(namespace, relation);\
898    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_src_rel ON graph_edges(namespace, source_id, relation);\
899    CREATE INDEX IF NOT EXISTS idx_graph_edges_ns_tgt_rel ON graph_edges(namespace, target_id, relation);\
900    CREATE INDEX IF NOT EXISTS idx_graph_edges_target_backend ON graph_edges(target_backend) WHERE target_backend IS NOT NULL;\
901";
902
903pub(crate) fn ensure_graph_schema(conn: &rusqlite::Connection) -> Result<(), rusqlite::Error> {
904    conn.execute_batch(GRAPH_DDL)
905}
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910    use crate::pool::PoolConfig;
911    use khive_storage::types::{Direction, TraversalOptions};
912
913    fn setup_memory_store() -> SqlGraphStore {
914        let config = PoolConfig {
915            path: None,
916            ..PoolConfig::default()
917        };
918        let pool = Arc::new(ConnectionPool::new(config).unwrap());
919
920        {
921            let writer = pool.writer().unwrap();
922            writer.conn().execute_batch(GRAPH_DDL).unwrap();
923        }
924
925        SqlGraphStore::new_scoped(pool, false, "default")
926    }
927
928    fn make_edge(source: Uuid, target: Uuid, relation: EdgeRelation, weight: f64) -> Edge {
929        let now = Utc::now();
930        Edge {
931            id: Uuid::new_v4().into(),
932            namespace: "default".to_string(),
933            source_id: source,
934            target_id: target,
935            relation,
936            weight,
937            created_at: now,
938            updated_at: now,
939            deleted_at: None,
940            metadata: None,
941            target_backend: None,
942        }
943    }
944
945    #[tokio::test]
946    async fn test_upsert_and_get_edge() {
947        let store = setup_memory_store();
948
949        let src = Uuid::new_v4();
950        let tgt = Uuid::new_v4();
951        let now = Utc::now();
952        let edge = Edge {
953            id: Uuid::new_v4().into(),
954            namespace: "default".to_string(),
955            source_id: src,
956            target_id: tgt,
957            relation: EdgeRelation::Extends,
958            weight: 0.8,
959            created_at: now,
960            updated_at: now,
961            deleted_at: None,
962            metadata: None,
963            target_backend: None,
964        };
965        let edge_id = edge.id;
966
967        store.upsert_edge(edge).await.unwrap();
968
969        let fetched = store.get_edge(edge_id).await.unwrap();
970        assert!(fetched.is_some());
971        let fetched = fetched.unwrap();
972        assert_eq!(fetched.id, edge_id);
973        assert_eq!(fetched.namespace, "default");
974        assert_eq!(fetched.source_id, src);
975        assert_eq!(fetched.target_id, tgt);
976        assert_eq!(fetched.relation, EdgeRelation::Extends);
977        assert!((fetched.weight - 0.8).abs() < 1e-9);
978    }
979
980    #[tokio::test]
981    async fn test_delete_edge() {
982        let store = setup_memory_store();
983
984        let edge = make_edge(Uuid::new_v4(), Uuid::new_v4(), EdgeRelation::Contains, 1.0);
985        let edge_id = edge.id;
986
987        store.upsert_edge(edge).await.unwrap();
988        assert!(store.get_edge(edge_id).await.unwrap().is_some());
989
990        let deleted = store.delete_edge(edge_id, DeleteMode::Hard).await.unwrap();
991        assert!(deleted);
992
993        assert!(store.get_edge(edge_id).await.unwrap().is_none());
994
995        let deleted_again = store.delete_edge(edge_id, DeleteMode::Hard).await.unwrap();
996        assert!(!deleted_again);
997    }
998
999    #[tokio::test]
1000    async fn test_count_edges() {
1001        let store = setup_memory_store();
1002
1003        assert_eq!(store.count_edges(EdgeFilter::default()).await.unwrap(), 0);
1004
1005        for _ in 0..5 {
1006            store
1007                .upsert_edge(make_edge(
1008                    Uuid::new_v4(),
1009                    Uuid::new_v4(),
1010                    EdgeRelation::DependsOn,
1011                    1.0,
1012                ))
1013                .await
1014                .unwrap();
1015        }
1016
1017        assert_eq!(store.count_edges(EdgeFilter::default()).await.unwrap(), 5);
1018    }
1019
1020    #[tokio::test]
1021    async fn test_neighbors_outbound() {
1022        let store = setup_memory_store();
1023
1024        let a = Uuid::new_v4();
1025        let b = Uuid::new_v4();
1026        let c = Uuid::new_v4();
1027        let d = Uuid::new_v4();
1028
1029        store
1030            .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1031            .await
1032            .unwrap();
1033        store
1034            .upsert_edge(make_edge(a, c, EdgeRelation::DependsOn, 0.7))
1035            .await
1036            .unwrap();
1037        store
1038            .upsert_edge(make_edge(d, a, EdgeRelation::Extends, 0.5))
1039            .await
1040            .unwrap();
1041
1042        let query = NeighborQuery {
1043            direction: Direction::Out,
1044            relations: None,
1045            limit: None,
1046            min_weight: None,
1047        };
1048
1049        let hits = store.neighbors(a, query).await.unwrap();
1050        assert_eq!(hits.len(), 2);
1051
1052        let neighbor_ids: Vec<Uuid> = hits.iter().map(|h| h.node_id).collect();
1053        assert!(neighbor_ids.contains(&b));
1054        assert!(neighbor_ids.contains(&c));
1055        assert!(!neighbor_ids.contains(&d));
1056    }
1057
1058    #[tokio::test]
1059    async fn test_traverse_depth_2() {
1060        let store = setup_memory_store();
1061
1062        let a = Uuid::new_v4();
1063        let b = Uuid::new_v4();
1064        let c = Uuid::new_v4();
1065        let d = Uuid::new_v4();
1066
1067        store
1068            .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1069            .await
1070            .unwrap();
1071        store
1072            .upsert_edge(make_edge(b, c, EdgeRelation::Extends, 2.0))
1073            .await
1074            .unwrap();
1075        store
1076            .upsert_edge(make_edge(c, d, EdgeRelation::Extends, 3.0))
1077            .await
1078            .unwrap();
1079
1080        let request = TraversalRequest {
1081            roots: vec![a],
1082            options: TraversalOptions::new(2).with_direction(Direction::Out),
1083            include_roots: true,
1084        };
1085
1086        let paths = store.traverse(request).await.unwrap();
1087        assert_eq!(paths.len(), 1);
1088
1089        let path = &paths[0];
1090        let node_ids: Vec<Uuid> = path.nodes.iter().map(|n| n.node_id).collect();
1091        assert!(node_ids.contains(&a));
1092        assert!(node_ids.contains(&b));
1093        assert!(node_ids.contains(&c));
1094        assert!(!node_ids.contains(&d));
1095    }
1096
1097    /// Diamond graph: A→B, A→C, B→D, C→D.
1098    /// D is reachable via two paths at depth 2.  After the fix it must appear
1099    /// exactly once in the result (#285).
1100    #[tokio::test]
1101    async fn test_traverse_dedups_multipath_node() {
1102        let store = setup_memory_store();
1103
1104        let a = Uuid::new_v4();
1105        let b = Uuid::new_v4();
1106        let c = Uuid::new_v4();
1107        let d = Uuid::new_v4();
1108
1109        store
1110            .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1111            .await
1112            .unwrap();
1113        store
1114            .upsert_edge(make_edge(a, c, EdgeRelation::Extends, 1.0))
1115            .await
1116            .unwrap();
1117        store
1118            .upsert_edge(make_edge(b, d, EdgeRelation::Extends, 1.0))
1119            .await
1120            .unwrap();
1121        store
1122            .upsert_edge(make_edge(c, d, EdgeRelation::Extends, 1.0))
1123            .await
1124            .unwrap();
1125
1126        let request = TraversalRequest {
1127            roots: vec![a],
1128            options: TraversalOptions::new(3).with_direction(Direction::Out),
1129            include_roots: false,
1130        };
1131
1132        let paths = store.traverse(request).await.unwrap();
1133        assert_eq!(paths.len(), 1);
1134        let nodes = &paths[0].nodes;
1135
1136        // D must appear exactly once despite being reachable via both B and C.
1137        let d_count = nodes.iter().filter(|n| n.node_id == d).count();
1138        assert_eq!(d_count, 1, "D must appear exactly once (dedup multi-path)");
1139
1140        // B and C must each appear once as well.
1141        assert_eq!(nodes.iter().filter(|n| n.node_id == b).count(), 1);
1142        assert_eq!(nodes.iter().filter(|n| n.node_id == c).count(), 1);
1143    }
1144
1145    /// First-visit (BFS) ordering is deterministic: the node seen at the
1146    /// shallowest depth wins, and the `via_edge` recorded for it is the one
1147    /// from that first-visited path.
1148    ///
1149    /// Graph: A→B (depth 1), A→C (depth 1), B→D (depth 2), C→D (depth 2).
1150    /// D appears at depth 2 via B or C.  Rows are ordered by depth; whichever
1151    /// path SQLite enumerates first for depth-2 is the keeper.  The test
1152    /// asserts that D has exactly one entry with a non-None `via_edge` — we
1153    /// do NOT assert *which* edge wins because SQLite row order within the
1154    /// same depth level is non-deterministic, but we DO assert stability:
1155    /// running twice gives the same count.
1156    #[tokio::test]
1157    async fn test_traverse_preserves_first_path_metadata() {
1158        let store = setup_memory_store();
1159
1160        let a = Uuid::new_v4();
1161        let b = Uuid::new_v4();
1162        let c = Uuid::new_v4();
1163        let d = Uuid::new_v4();
1164
1165        store
1166            .upsert_edge(make_edge(a, b, EdgeRelation::Extends, 1.0))
1167            .await
1168            .unwrap();
1169        store
1170            .upsert_edge(make_edge(a, c, EdgeRelation::Extends, 1.0))
1171            .await
1172            .unwrap();
1173        store
1174            .upsert_edge(make_edge(b, d, EdgeRelation::Extends, 1.0))
1175            .await
1176            .unwrap();
1177        store
1178            .upsert_edge(make_edge(c, d, EdgeRelation::Extends, 1.0))
1179            .await
1180            .unwrap();
1181
1182        let make_request = || TraversalRequest {
1183            roots: vec![a],
1184            options: TraversalOptions::new(3).with_direction(Direction::Out),
1185            include_roots: false,
1186        };
1187
1188        let paths1 = store.traverse(make_request()).await.unwrap();
1189        let paths2 = store.traverse(make_request()).await.unwrap();
1190
1191        // Both runs must return the same total node count (dedup is stable).
1192        let count1 = paths1[0].nodes.len();
1193        let count2 = paths2[0].nodes.len();
1194        assert_eq!(
1195            count1, count2,
1196            "traverse result count must be stable across calls"
1197        );
1198
1199        // D must appear exactly once and carry a via_edge (it was not a root).
1200        let d_nodes: Vec<_> = paths1[0].nodes.iter().filter(|n| n.node_id == d).collect();
1201        assert_eq!(d_nodes.len(), 1, "D deduped to one entry");
1202        assert!(
1203            d_nodes[0].via_edge.is_some(),
1204            "kept entry must have a via_edge"
1205        );
1206        assert_eq!(d_nodes[0].depth, 2, "D lives at depth 2");
1207    }
1208
1209    #[tokio::test]
1210    async fn test_metadata_roundtrip() {
1211        let store = setup_memory_store();
1212
1213        let src = Uuid::new_v4();
1214        let tgt = Uuid::new_v4();
1215        let meta = serde_json::json!({"note": "important link", "confidence": 0.95});
1216        let now = Utc::now();
1217        let edge = Edge {
1218            id: Uuid::new_v4().into(),
1219            namespace: "default".to_string(),
1220            source_id: src,
1221            target_id: tgt,
1222            relation: EdgeRelation::Implements,
1223            weight: 0.9,
1224            created_at: now,
1225            updated_at: now,
1226            deleted_at: None,
1227            metadata: Some(meta.clone()),
1228            target_backend: None,
1229        };
1230        let edge_id = edge.id;
1231
1232        store.upsert_edge(edge).await.unwrap();
1233
1234        let fetched = store.get_edge(edge_id).await.unwrap().unwrap();
1235        assert_eq!(
1236            fetched.metadata.as_ref(),
1237            Some(&meta),
1238            "metadata must survive a write/read roundtrip via get_edge"
1239        );
1240
1241        // Also verify via query_edges.
1242        let page = store
1243            .query_edges(EdgeFilter::default(), vec![], PageRequest::default())
1244            .await
1245            .unwrap();
1246        let from_query = page
1247            .items
1248            .iter()
1249            .find(|e| e.id == edge_id)
1250            .expect("edge must appear in query_edges result");
1251        assert_eq!(
1252            from_query.metadata.as_ref(),
1253            Some(&meta),
1254            "metadata must survive a write/read roundtrip via query_edges"
1255        );
1256    }
1257
1258    #[tokio::test]
1259    async fn test_upsert_edges_batch() {
1260        let store = setup_memory_store();
1261
1262        let edges: Vec<Edge> = (0..10)
1263            .map(|i| {
1264                make_edge(
1265                    Uuid::new_v4(),
1266                    Uuid::new_v4(),
1267                    EdgeRelation::Implements,
1268                    i as f64,
1269                )
1270            })
1271            .collect();
1272
1273        let summary = store.upsert_edges(edges).await.unwrap();
1274        assert_eq!(summary.attempted, 10);
1275        assert_eq!(summary.affected, 10);
1276        assert_eq!(summary.failed, 0);
1277
1278        assert_eq!(store.count_edges(EdgeFilter::default()).await.unwrap(), 10);
1279    }
1280
1281    // ---- #229 deduplication test ----
1282
1283    #[tokio::test]
1284    async fn graph_duplicate_edges_ignored() {
1285        let store = setup_memory_store();
1286
1287        let src = Uuid::new_v4();
1288        let tgt = Uuid::new_v4();
1289
1290        // Two edges with the same (source_id, target_id, relation) triple but different IDs.
1291        let now = Utc::now();
1292        let edge1 = Edge {
1293            id: Uuid::new_v4().into(),
1294            namespace: "default".to_string(),
1295            source_id: src,
1296            target_id: tgt,
1297            relation: EdgeRelation::Extends,
1298            weight: 1.0,
1299            created_at: now,
1300            updated_at: now,
1301            deleted_at: None,
1302            metadata: None,
1303            target_backend: None,
1304        };
1305        let edge2 = Edge {
1306            id: Uuid::new_v4().into(),
1307            namespace: "default".to_string(),
1308            source_id: src,
1309            target_id: tgt,
1310            relation: EdgeRelation::Extends,
1311            weight: 0.5,
1312            created_at: now,
1313            updated_at: now,
1314            deleted_at: None,
1315            metadata: None,
1316            target_backend: None,
1317        };
1318
1319        store.upsert_edge(edge1).await.unwrap();
1320        store.upsert_edge(edge2).await.unwrap();
1321
1322        assert_eq!(
1323            store.count_edges(EdgeFilter::default()).await.unwrap(),
1324            1,
1325            "duplicate (source, target, relation) triple must be ignored; only one edge must exist"
1326        );
1327    }
1328
1329    // F053 (CRIT): natural-key conflict must DO UPDATE (refresh weight/metadata), not DO NOTHING.
1330    // ADR-009 requires the second upsert to overwrite weight=0.5; current code keeps weight=1.0.
1331    #[tokio::test]
1332    async fn graph_duplicate_edges_refresh_existing_row() {
1333        let store = setup_memory_store();
1334        let src = Uuid::new_v4();
1335        let tgt = Uuid::new_v4();
1336
1337        let now = Utc::now();
1338        let edge1 = Edge {
1339            id: Uuid::new_v4().into(),
1340            namespace: "default".to_string(),
1341            source_id: src,
1342            target_id: tgt,
1343            relation: EdgeRelation::Extends,
1344            weight: 1.0,
1345            created_at: now,
1346            updated_at: now,
1347            deleted_at: None,
1348            metadata: None,
1349            target_backend: None,
1350        };
1351        let edge2 = Edge {
1352            id: Uuid::new_v4().into(),
1353            namespace: "default".to_string(),
1354            source_id: src,
1355            target_id: tgt,
1356            relation: EdgeRelation::Extends,
1357            weight: 0.5,
1358            created_at: now,
1359            updated_at: now,
1360            deleted_at: None,
1361            metadata: None,
1362            target_backend: None,
1363        };
1364
1365        store.upsert_edge(edge1).await.unwrap();
1366        store.upsert_edge(edge2).await.unwrap();
1367
1368        let edges = store
1369            .query_edges(EdgeFilter::default(), vec![], PageRequest::default())
1370            .await
1371            .unwrap();
1372        assert_eq!(
1373            edges.items.len(),
1374            1,
1375            "duplicate natural key must collapse to one row"
1376        );
1377        assert!(
1378            (edges.items[0].weight - 0.5).abs() < 0.001,
1379            "F053: natural-key conflict must DO UPDATE (weight=0.5 from second upsert); \
1380             current DO NOTHING keeps stale weight={}",
1381            edges.items[0].weight
1382        );
1383    }
1384
1385    // Regression test for #476: symmetric edges stored via upsert_edge must
1386    // always have source_id < target_id (lexicographic on UUID bytes).
1387    #[tokio::test]
1388    async fn upsert_edge_canonicalizes_symmetric_relation() {
1389        let store = setup_memory_store();
1390
1391        // Construct two UUIDs where larger > smaller lexicographically.
1392        let smaller = Uuid::from_bytes([0x00; 16]);
1393        let larger = Uuid::from_bytes([0xff; 16]);
1394        assert!(
1395            larger > smaller,
1396            "test setup: larger must sort after smaller"
1397        );
1398
1399        // Insert with source > target — the invariant-violating order.
1400        let edge = make_edge(larger, smaller, EdgeRelation::CompetesWith, 1.0);
1401        let edge_id = edge.id;
1402        store.upsert_edge(edge).await.unwrap();
1403
1404        let stored = store.get_edge(edge_id).await.unwrap().unwrap();
1405        assert_eq!(
1406            stored.source_id, smaller,
1407            "#476: CompetesWith edge must be stored with source_id < target_id"
1408        );
1409        assert_eq!(
1410            stored.target_id, larger,
1411            "#476: CompetesWith edge must be stored with target_id > source_id"
1412        );
1413    }
1414
1415    #[tokio::test]
1416    async fn upsert_edges_batch_canonicalizes_symmetric_relation() {
1417        let store = setup_memory_store();
1418
1419        let smaller = Uuid::from_bytes([0x11; 16]);
1420        let larger = Uuid::from_bytes([0xee; 16]);
1421
1422        // ComposedWith is the other symmetric relation — insert reversed.
1423        let edge = make_edge(larger, smaller, EdgeRelation::ComposedWith, 0.9);
1424        let edge_id = edge.id;
1425        store.upsert_edges(vec![edge]).await.unwrap();
1426
1427        let stored = store.get_edge(edge_id).await.unwrap().unwrap();
1428        assert_eq!(
1429            stored.source_id, smaller,
1430            "#476: ComposedWith edge must be stored with source_id < target_id (batch path)"
1431        );
1432        assert_eq!(
1433            stored.target_id, larger,
1434            "#476: ComposedWith edge must be stored with target_id > source_id (batch path)"
1435        );
1436    }
1437
1438    #[tokio::test]
1439    async fn upsert_edge_non_symmetric_relation_preserves_direction() {
1440        let store = setup_memory_store();
1441
1442        // DependsOn is not symmetric — direction must NOT be swapped.
1443        let src = Uuid::from_bytes([0xff; 16]);
1444        let tgt = Uuid::from_bytes([0x00; 16]);
1445        let edge = make_edge(src, tgt, EdgeRelation::DependsOn, 1.0);
1446        let edge_id = edge.id;
1447        store.upsert_edge(edge).await.unwrap();
1448
1449        let stored = store.get_edge(edge_id).await.unwrap().unwrap();
1450        assert_eq!(
1451            stored.source_id, src,
1452            "non-symmetric edge direction must be preserved"
1453        );
1454        assert_eq!(
1455            stored.target_id, tgt,
1456            "non-symmetric edge direction must be preserved"
1457        );
1458    }
1459}