Skip to main content

sqlite_graphrag/commands/
merge_entities.rs

1//! Handler for the `merge-entities` CLI subcommand (GAP-19).
2//!
3//! Merges two or more source entities into a single target entity by:
4//!   1. Retargeting all relationships pointing at any source to the target.
5//!   2. Deduplicating relationships that become identical after the merge
6//!      (same source_id + target_id + relation).
7//!   3. Retargeting memory_entities bindings.
8//!   4. Deleting the now-empty source entity rows.
9
10use crate::errors::AppError;
11use crate::i18n::errors_msg;
12use crate::output::{self, OutputFormat};
13use crate::paths::AppPaths;
14use crate::storage::connection::open_rw;
15use crate::storage::entities;
16use rusqlite::params;
17use serde::Serialize;
18
19#[derive(clap::Args)]
20#[command(after_long_help = "EXAMPLES:\n  \
21    # Merge two source entities into a target\n  \
22    sqlite-graphrag merge-entities --names auth,authentication --into auth-service\n\n  \
23    # Merge three sources into one target across a namespace\n  \
24    sqlite-graphrag merge-entities --names svc-a,svc-b,old-svc --into canonical-service --namespace my-project\n\n  \
25    # Merge by ID (unambiguous when homonyms exist across namespaces)\n  \
26    sqlite-graphrag merge-entities --ids 12,17 --into-id 3\n\n\
27NOTE:\n  \
28    --names is a comma-separated list of source entity names.\n  \
29    --into is the target entity name and must already exist.\n  \
30    --ids / --into-id select entities by ID; IDs are globally unique so they\n  \
31    disambiguate homonyms. They conflict with --names / --into respectively\n  \
32    and must belong to the resolved namespace.\n  \
33    Source entities are deleted after the merge; the target is preserved.\n  \
34    Duplicate relationships (same endpoints + relation) are removed automatically.\n  \
35    Run `sqlite-graphrag cleanup-orphans` afterwards if sources had no other links.")]
36pub struct MergeEntitiesArgs {
37    /// Comma-separated list of source entity names to merge into the target.
38    #[arg(
39        long,
40        value_delimiter = ',',
41        value_name = "NAMES",
42        required_unless_present = "ids",
43        conflicts_with = "ids"
44    )]
45    pub names: Vec<String>,
46    /// v1.1.1 (P5): comma-separated list of source entity IDs. IDs are
47    /// globally unique, so they disambiguate homonyms across namespaces.
48    /// Conflicts with --names; every ID must belong to the resolved namespace.
49    #[arg(long, value_delimiter = ',', value_name = "IDS")]
50    pub ids: Vec<i64>,
51    /// Target entity name. Must already exist. All source relationships are redirected here.
52    #[arg(
53        long,
54        value_name = "TARGET",
55        required_unless_present = "into_id",
56        conflicts_with = "into_id"
57    )]
58    pub into: Option<String>,
59    /// v1.1.1 (P5): target entity ID. Unambiguous alternative to --into.
60    #[arg(long, value_name = "TARGET_ID")]
61    pub into_id: Option<i64>,
62    #[arg(long)]
63    pub namespace: Option<String>,
64    #[arg(long, value_enum, default_value = "json")]
65    pub format: OutputFormat,
66    #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
67    pub json: bool,
68    #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
69    pub db: Option<String>,
70}
71
72#[derive(Serialize)]
73struct MergeEntitiesResponse {
74    action: String,
75    sources: Vec<String>,
76    target: String,
77    namespace: String,
78    /// v1.1.1 (P5): resolved target entity ID, echoed for unambiguous auditing.
79    target_id: i64,
80    relationships_moved: usize,
81    entities_removed: usize,
82    /// Total execution time in milliseconds from handler start to serialisation.
83    elapsed_ms: u64,
84}
85
86/// v1.1.1 (P5): resolves an entity ID to its name, enforcing that the entity
87/// exists AND belongs to the namespace — IDs are global, so a bare existence
88/// check could silently cross namespaces.
89fn find_entity_name_by_id(
90    conn: &rusqlite::Connection,
91    namespace: &str,
92    id: i64,
93) -> Result<String, AppError> {
94    let mut stmt =
95        conn.prepare_cached("SELECT name FROM entities WHERE id = ?1 AND namespace = ?2")?;
96    match stmt.query_row(params![id, namespace], |r| r.get::<_, String>(0)) {
97        Ok(name) => Ok(name),
98        Err(rusqlite::Error::QueryReturnedNoRows) => Err(AppError::NotFound(format!(
99            "entity id={id} not found in namespace '{namespace}'"
100        ))),
101        Err(e) => Err(AppError::Database(e)),
102    }
103}
104
105pub fn run(args: MergeEntitiesArgs) -> Result<(), AppError> {
106    let inicio = std::time::Instant::now();
107
108    if args.names.is_empty() && args.ids.is_empty() {
109        return Err(AppError::Validation(
110            "--names or --ids must contain at least one source entity".to_string(),
111        ));
112    }
113
114    let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
115    let paths = AppPaths::resolve(args.db.as_deref())?;
116
117    crate::storage::connection::ensure_db_ready(&paths)?;
118
119    let mut conn = open_rw(&paths.db)?;
120
121    // Resolve target entity — by ID (v1.1.1 P5, unambiguous) or by name.
122    // Existence is validated here, BEFORE any mutation.
123    let (target_id, target_name) = match args.into_id {
124        Some(id) => {
125            let name = find_entity_name_by_id(&conn, &namespace, id)?;
126            (id, name)
127        }
128        None => {
129            let Some(name) = args.into.clone() else {
130                return Err(AppError::Validation(
131                    "--into or --into-id is required".to_string(),
132                ));
133            };
134            let id = entities::find_entity_id(&conn, &namespace, &name)?.ok_or_else(|| {
135                AppError::NotFound(errors_msg::entity_not_found(&name, &namespace))
136            })?;
137            (id, name)
138        }
139    };
140
141    // Resolve source entity IDs — reject self-referential merge (G21),
142    // by ID (v1.1.1 P5) or by name. All lookups happen BEFORE the transaction.
143    let mut source_ids: Vec<i64> = Vec::with_capacity(args.names.len() + args.ids.len());
144    let mut source_names: Vec<String> = Vec::with_capacity(source_ids.capacity());
145    if !args.ids.is_empty() {
146        for &id in &args.ids {
147            if id == target_id {
148                return Err(AppError::Validation(format!(
149                    "source entity id={id} equals target id={target_id} — \
150                     self-referential merge is not allowed"
151                )));
152            }
153            let name = find_entity_name_by_id(&conn, &namespace, id)?;
154            if !source_ids.contains(&id) {
155                source_ids.push(id);
156                source_names.push(name);
157            }
158        }
159    } else {
160        for name in &args.names {
161            if name == &target_name {
162                return Err(AppError::Validation(format!(
163                    "source entity '{name}' equals target '{target_name}' — \
164                     self-referential merge is not allowed"
165                )));
166            }
167            let id = entities::find_entity_id(&conn, &namespace, name)?.ok_or_else(|| {
168                AppError::NotFound(errors_msg::entity_not_found(name, &namespace))
169            })?;
170            if id == target_id {
171                return Err(AppError::Validation(format!(
172                    "source entity '{name}' resolves to the target (id={target_id}) — \
173                     self-referential merge is not allowed"
174                )));
175            }
176            if !source_ids.contains(&id) {
177                source_ids.push(id);
178                source_names.push(name.clone());
179            }
180        }
181    }
182
183    if source_ids.is_empty() {
184        return Err(AppError::Validation(
185            "no valid source entities to merge (all names equal the target or were duplicates)"
186                .to_string(),
187        ));
188    }
189
190    let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
191
192    let mut relationships_moved: usize = 0;
193
194    for &src_id in &source_ids {
195        // Step 1a: redirect source_id, ignoring UNIQUE conflicts.
196        let moved_src = tx.execute(
197            "UPDATE OR IGNORE relationships SET source_id = ?1 WHERE source_id = ?2",
198            params![target_id, src_id],
199        )?;
200        tx.execute(
201            "DELETE FROM relationships WHERE source_id = ?1",
202            params![src_id],
203        )?;
204        // Step 1b: redirect target_id, ignoring UNIQUE conflicts.
205        let moved_tgt = tx.execute(
206            "UPDATE OR IGNORE relationships SET target_id = ?1 WHERE target_id = ?2",
207            params![target_id, src_id],
208        )?;
209        tx.execute(
210            "DELETE FROM relationships WHERE target_id = ?1",
211            params![src_id],
212        )?;
213        relationships_moved += moved_src + moved_tgt;
214    }
215
216    // Step 2: remove self-loops introduced by the redirect (target → target).
217    tx.execute("DELETE FROM relationships WHERE source_id = target_id", [])?;
218
219    // Step 3: deduplicate relationships that now share (source, target, relation).
220    // Safety net — UPDATE OR IGNORE should have handled most duplicates above.
221    tx.execute(
222        "DELETE FROM relationships
223         WHERE id NOT IN (
224             SELECT MIN(id)
225             FROM relationships
226             GROUP BY source_id, target_id, relation
227         )",
228        [],
229    )?;
230
231    // Step 4: retarget memory_entities bindings.
232    // Use UPDATE OR IGNORE to skip conflicts when memory is already bound to
233    // target entity. Then DELETE remaining source rows (the conflicting ones
234    // that UPDATE OR IGNORE skipped). Same pattern as relationships (Step 1).
235    for &src_id in &source_ids {
236        tx.execute(
237            "UPDATE OR IGNORE memory_entities SET entity_id = ?1 WHERE entity_id = ?2",
238            params![target_id, src_id],
239        )?;
240        tx.execute(
241            "DELETE FROM memory_entities WHERE entity_id = ?1",
242            params![src_id],
243        )?;
244    }
245
246    // Step 5: deduplicate memory_entities bindings (same memory + entity).
247    tx.execute(
248        "DELETE FROM memory_entities
249         WHERE rowid NOT IN (
250             SELECT MIN(rowid)
251             FROM memory_entities
252             GROUP BY memory_id, entity_id
253         )",
254        [],
255    )?;
256
257    // Step 6: delete source entities. v1.0.76: FK ON DELETE CASCADE on
258    // entity_embeddings handles the vector row automatically.
259    let mut entities_removed: usize = 0;
260    for &src_id in &source_ids {
261        let removed = tx.execute("DELETE FROM entities WHERE id = ?1", params![src_id])?;
262        entities_removed += removed;
263    }
264
265    // Step 7: recalculate degree for target and all adjacent entities.
266    let adjacent_ids: Vec<i64> = {
267        let mut stmt = tx.prepare(
268            "SELECT DISTINCT CASE WHEN source_id = ?1 THEN target_id ELSE source_id END
269             FROM relationships WHERE source_id = ?1 OR target_id = ?1",
270        )?;
271        let ids: Vec<i64> = stmt
272            .query_map(params![target_id], |r| r.get(0))?
273            .collect::<Result<Vec<_>, _>>()?;
274        ids
275    };
276    entities::recalculate_degree(&tx, target_id)?;
277    for &adj_id in &adjacent_ids {
278        entities::recalculate_degree(&tx, adj_id)?;
279    }
280
281    tx.commit()?;
282
283    conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
284
285    let response = MergeEntitiesResponse {
286        action: "merged".to_string(),
287        sources: source_names,
288        target: target_name,
289        namespace: namespace.clone(),
290        target_id,
291        relationships_moved,
292        entities_removed,
293        elapsed_ms: inicio.elapsed().as_millis() as u64,
294    };
295
296    match args.format {
297        OutputFormat::Json => output::emit_json(&response)?,
298        OutputFormat::Text | OutputFormat::Markdown => {
299            output::emit_text(&format!(
300                "merged: {} sources into '{}' (relationships_moved={}, entities_removed={}) [{}]",
301                response.sources.len(),
302                response.target,
303                response.relationships_moved,
304                response.entities_removed,
305                response.namespace
306            ));
307        }
308    }
309
310    Ok(())
311}
312
313#[cfg(test)]
314mod tests {
315    use super::*;
316
317    // v1.1.1 (P5): ID resolution is namespace-scoped — a homonym in another
318    // namespace must NOT be reachable through its ID from the wrong namespace.
319    #[test]
320    fn find_entity_name_by_id_disambiguates_homonyms_across_namespaces() {
321        let conn = rusqlite::Connection::open_in_memory().unwrap();
322        conn.execute_batch(
323            "CREATE TABLE entities (
324                id INTEGER PRIMARY KEY,
325                namespace TEXT NOT NULL,
326                name TEXT NOT NULL,
327                UNIQUE(namespace, name)
328            );",
329        )
330        .unwrap();
331        conn.execute(
332            "INSERT INTO entities (id, namespace, name)
333             VALUES (1, 'ns-a', 'auth'), (2, 'ns-b', 'auth')",
334            [],
335        )
336        .unwrap();
337
338        // Same name in two namespaces: each ID resolves only in its own.
339        assert_eq!(find_entity_name_by_id(&conn, "ns-a", 1).unwrap(), "auth");
340        assert_eq!(find_entity_name_by_id(&conn, "ns-b", 2).unwrap(), "auth");
341        let err = find_entity_name_by_id(&conn, "ns-a", 2).unwrap_err();
342        assert_eq!(err.exit_code(), 4, "cross-namespace ID must be NotFound");
343        assert!(err.to_string().contains("id=2"), "obtido: {err}");
344    }
345
346    #[test]
347    fn find_entity_name_by_id_missing_id_is_not_found() {
348        let conn = rusqlite::Connection::open_in_memory().unwrap();
349        conn.execute_batch(
350            "CREATE TABLE entities (
351                id INTEGER PRIMARY KEY,
352                namespace TEXT NOT NULL,
353                name TEXT NOT NULL
354            );",
355        )
356        .unwrap();
357        let err = find_entity_name_by_id(&conn, "global", 99).unwrap_err();
358        assert_eq!(err.exit_code(), 4);
359    }
360
361    // v1.1.1 (P5): clap-level exclusivity between name-based and ID-based
362    // selectors, and requiredness of at least one selector per side.
363    #[derive(clap::Parser)]
364    struct TestCli {
365        #[command(flatten)]
366        args: MergeEntitiesArgs,
367    }
368
369    #[test]
370    fn clap_rejects_names_combined_with_ids() {
371        use clap::Parser;
372        let err =
373            match TestCli::try_parse_from(["t", "--names", "a,b", "--ids", "1,2", "--into", "tgt"])
374            {
375                Ok(_) => panic!("expected argument conflict"),
376                Err(e) => e,
377            };
378        assert_eq!(err.kind(), clap::error::ErrorKind::ArgumentConflict);
379    }
380
381    #[test]
382    fn clap_rejects_into_combined_with_into_id() {
383        use clap::Parser;
384        let err =
385            match TestCli::try_parse_from(["t", "--names", "a", "--into", "tgt", "--into-id", "3"])
386            {
387                Ok(_) => panic!("expected argument conflict"),
388                Err(e) => e,
389            };
390        assert_eq!(err.kind(), clap::error::ErrorKind::ArgumentConflict);
391    }
392
393    #[test]
394    fn clap_requires_a_source_and_a_target_selector() {
395        use clap::Parser;
396        assert!(TestCli::try_parse_from(["t", "--into", "tgt"]).is_err());
397        assert!(TestCli::try_parse_from(["t", "--names", "a"]).is_err());
398        let ok = match TestCli::try_parse_from(["t", "--ids", "1,2", "--into-id", "3"]) {
399            Ok(cli) => cli,
400            Err(e) => panic!("expected successful parse: {e}"),
401        };
402        assert_eq!(ok.args.ids, vec![1, 2]);
403        assert_eq!(ok.args.into_id, Some(3));
404        assert!(ok.args.names.is_empty());
405        assert!(ok.args.into.is_none());
406    }
407
408    #[test]
409    fn merge_entities_response_serializes_all_fields() {
410        let resp = MergeEntitiesResponse {
411            action: "merged".to_string(),
412            sources: vec!["auth".to_string(), "authentication".to_string()],
413            target: "auth-service".to_string(),
414            namespace: "global".to_string(),
415            target_id: 1,
416            relationships_moved: 7,
417            entities_removed: 2,
418            elapsed_ms: 15,
419        };
420        let json = serde_json::to_value(&resp).expect("serialization failed");
421        assert_eq!(json["action"], "merged");
422        assert_eq!(json["target"], "auth-service");
423        assert_eq!(json["namespace"], "global");
424        assert_eq!(json["relationships_moved"], 7);
425        assert_eq!(json["entities_removed"], 2);
426        let sources = json["sources"].as_array().expect("must be array");
427        assert_eq!(sources.len(), 2);
428        assert!(json["elapsed_ms"].is_number());
429    }
430
431    #[test]
432    fn merge_entities_response_action_is_merged() {
433        let resp = MergeEntitiesResponse {
434            action: "merged".to_string(),
435            sources: vec!["src".to_string()],
436            target: "tgt".to_string(),
437            namespace: "ns".to_string(),
438            target_id: 1,
439            relationships_moved: 0,
440            entities_removed: 1,
441            elapsed_ms: 0,
442        };
443        assert_eq!(resp.action, "merged");
444    }
445
446    #[test]
447    fn merge_entities_response_empty_sources_serializes() {
448        let resp = MergeEntitiesResponse {
449            action: "merged".to_string(),
450            sources: vec![],
451            target: "target".to_string(),
452            namespace: "global".to_string(),
453            target_id: 1,
454            relationships_moved: 0,
455            entities_removed: 0,
456            elapsed_ms: 1,
457        };
458        let json = serde_json::to_value(&resp).expect("serialization failed");
459        let sources = json["sources"].as_array().expect("must be array");
460        assert_eq!(sources.len(), 0);
461    }
462
463    #[test]
464    fn merge_entities_response_with_zero_relationships_moved() {
465        let resp = MergeEntitiesResponse {
466            action: "merged".to_string(),
467            sources: vec!["src-a".to_string()],
468            target: "tgt".to_string(),
469            namespace: "global".to_string(),
470            target_id: 1,
471            relationships_moved: 0,
472            entities_removed: 1,
473            elapsed_ms: 5,
474        };
475        let json = serde_json::to_value(&resp).expect("serialization failed");
476        assert_eq!(json["relationships_moved"], 0);
477        assert_eq!(json["entities_removed"], 1);
478    }
479
480    #[test]
481    fn merge_entities_response_multiple_sources() {
482        let resp = MergeEntitiesResponse {
483            action: "merged".to_string(),
484            sources: vec!["a".into(), "b".into(), "c".into()],
485            target: "canonical".to_string(),
486            namespace: "proj".to_string(),
487            target_id: 1,
488            relationships_moved: 12,
489            entities_removed: 3,
490            elapsed_ms: 42,
491        };
492        let json = serde_json::to_value(&resp).expect("serialization failed");
493        assert_eq!(json["entities_removed"], 3);
494        let sources = json["sources"].as_array().unwrap();
495        assert_eq!(sources.len(), 3);
496    }
497}