1use crate::errors::AppError;
11use crate::i18n::errors_msg;
12use crate::output::{self, OutputFormat};
13use crate::paths::AppPaths;
14use crate::storage::connection::open_rw;
15use crate::storage::entities;
16use rusqlite::params;
17use serde::Serialize;
18
19#[derive(clap::Args)]
20#[command(after_long_help = "EXAMPLES:\n \
21 # Merge two source entities into a target\n \
22 sqlite-graphrag merge-entities --names auth,authentication --into auth-service\n\n \
23 # Merge three sources into one target across a namespace\n \
24 sqlite-graphrag merge-entities --names svc-a,svc-b,old-svc --into canonical-service --namespace my-project\n\n \
25 # Merge by ID (unambiguous when homonyms exist across namespaces)\n \
26 sqlite-graphrag merge-entities --ids 12,17 --into-id 3\n\n\
27NOTE:\n \
28 --names is a comma-separated list of source entity names.\n \
29 --into is the target entity name and must already exist.\n \
30 --ids / --into-id select entities by ID; IDs are globally unique so they\n \
31 disambiguate homonyms. They conflict with --names / --into respectively\n \
32 and must belong to the resolved namespace.\n \
33 Source entities are deleted after the merge; the target is preserved.\n \
34 Duplicate relationships (same endpoints + relation) are removed automatically.\n \
35 Run `sqlite-graphrag cleanup-orphans` afterwards if sources had no other links.")]
36pub struct MergeEntitiesArgs {
37 #[arg(
39 long,
40 value_delimiter = ',',
41 value_name = "NAMES",
42 required_unless_present = "ids",
43 conflicts_with = "ids"
44 )]
45 pub names: Vec<String>,
46 #[arg(long, value_delimiter = ',', value_name = "IDS")]
50 pub ids: Vec<i64>,
51 #[arg(
53 long,
54 value_name = "TARGET",
55 required_unless_present = "into_id",
56 conflicts_with = "into_id"
57 )]
58 pub into: Option<String>,
59 #[arg(long, value_name = "TARGET_ID")]
61 pub into_id: Option<i64>,
62 #[arg(long)]
63 pub namespace: Option<String>,
64 #[arg(long, value_enum, default_value = "json")]
65 pub format: OutputFormat,
66 #[arg(long, hide = true, help = "No-op; JSON is always emitted on stdout")]
67 pub json: bool,
68 #[arg(long, env = "SQLITE_GRAPHRAG_DB_PATH")]
69 pub db: Option<String>,
70}
71
72#[derive(Serialize)]
73struct MergeEntitiesResponse {
74 action: String,
75 sources: Vec<String>,
76 target: String,
77 namespace: String,
78 target_id: i64,
80 relationships_moved: usize,
81 entities_removed: usize,
82 elapsed_ms: u64,
84}
85
86fn find_entity_name_by_id(
90 conn: &rusqlite::Connection,
91 namespace: &str,
92 id: i64,
93) -> Result<String, AppError> {
94 let mut stmt =
95 conn.prepare_cached("SELECT name FROM entities WHERE id = ?1 AND namespace = ?2")?;
96 match stmt.query_row(params![id, namespace], |r| r.get::<_, String>(0)) {
97 Ok(name) => Ok(name),
98 Err(rusqlite::Error::QueryReturnedNoRows) => Err(AppError::NotFound(format!(
99 "entity id={id} not found in namespace '{namespace}'"
100 ))),
101 Err(e) => Err(AppError::Database(e)),
102 }
103}
104
105pub fn run(args: MergeEntitiesArgs) -> Result<(), AppError> {
106 let inicio = std::time::Instant::now();
107
108 if args.names.is_empty() && args.ids.is_empty() {
109 return Err(AppError::Validation(
110 "--names or --ids must contain at least one source entity".to_string(),
111 ));
112 }
113
114 let namespace = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
115 let paths = AppPaths::resolve(args.db.as_deref())?;
116
117 crate::storage::connection::ensure_db_ready(&paths)?;
118
119 let mut conn = open_rw(&paths.db)?;
120
121 let (target_id, target_name) = match args.into_id {
124 Some(id) => {
125 let name = find_entity_name_by_id(&conn, &namespace, id)?;
126 (id, name)
127 }
128 None => {
129 let Some(name) = args.into.clone() else {
130 return Err(AppError::Validation(
131 "--into or --into-id is required".to_string(),
132 ));
133 };
134 let id = entities::find_entity_id(&conn, &namespace, &name)?.ok_or_else(|| {
135 AppError::NotFound(errors_msg::entity_not_found(&name, &namespace))
136 })?;
137 (id, name)
138 }
139 };
140
141 let mut source_ids: Vec<i64> = Vec::with_capacity(args.names.len() + args.ids.len());
144 let mut source_names: Vec<String> = Vec::with_capacity(source_ids.capacity());
145 if !args.ids.is_empty() {
146 for &id in &args.ids {
147 if id == target_id {
148 return Err(AppError::Validation(format!(
149 "source entity id={id} equals target id={target_id} — \
150 self-referential merge is not allowed"
151 )));
152 }
153 let name = find_entity_name_by_id(&conn, &namespace, id)?;
154 if !source_ids.contains(&id) {
155 source_ids.push(id);
156 source_names.push(name);
157 }
158 }
159 } else {
160 for name in &args.names {
161 if name == &target_name {
162 return Err(AppError::Validation(format!(
163 "source entity '{name}' equals target '{target_name}' — \
164 self-referential merge is not allowed"
165 )));
166 }
167 let id = entities::find_entity_id(&conn, &namespace, name)?.ok_or_else(|| {
168 AppError::NotFound(errors_msg::entity_not_found(name, &namespace))
169 })?;
170 if id == target_id {
171 return Err(AppError::Validation(format!(
172 "source entity '{name}' resolves to the target (id={target_id}) — \
173 self-referential merge is not allowed"
174 )));
175 }
176 if !source_ids.contains(&id) {
177 source_ids.push(id);
178 source_names.push(name.clone());
179 }
180 }
181 }
182
183 if source_ids.is_empty() {
184 return Err(AppError::Validation(
185 "no valid source entities to merge (all names equal the target or were duplicates)"
186 .to_string(),
187 ));
188 }
189
190 let tx = conn.transaction_with_behavior(rusqlite::TransactionBehavior::Immediate)?;
191
192 let mut relationships_moved: usize = 0;
193
194 for &src_id in &source_ids {
195 let moved_src = tx.execute(
197 "UPDATE OR IGNORE relationships SET source_id = ?1 WHERE source_id = ?2",
198 params![target_id, src_id],
199 )?;
200 tx.execute(
201 "DELETE FROM relationships WHERE source_id = ?1",
202 params![src_id],
203 )?;
204 let moved_tgt = tx.execute(
206 "UPDATE OR IGNORE relationships SET target_id = ?1 WHERE target_id = ?2",
207 params![target_id, src_id],
208 )?;
209 tx.execute(
210 "DELETE FROM relationships WHERE target_id = ?1",
211 params![src_id],
212 )?;
213 relationships_moved += moved_src + moved_tgt;
214 }
215
216 tx.execute("DELETE FROM relationships WHERE source_id = target_id", [])?;
218
219 tx.execute(
222 "DELETE FROM relationships
223 WHERE id NOT IN (
224 SELECT MIN(id)
225 FROM relationships
226 GROUP BY source_id, target_id, relation
227 )",
228 [],
229 )?;
230
231 for &src_id in &source_ids {
236 tx.execute(
237 "UPDATE OR IGNORE memory_entities SET entity_id = ?1 WHERE entity_id = ?2",
238 params![target_id, src_id],
239 )?;
240 tx.execute(
241 "DELETE FROM memory_entities WHERE entity_id = ?1",
242 params![src_id],
243 )?;
244 }
245
246 tx.execute(
248 "DELETE FROM memory_entities
249 WHERE rowid NOT IN (
250 SELECT MIN(rowid)
251 FROM memory_entities
252 GROUP BY memory_id, entity_id
253 )",
254 [],
255 )?;
256
257 let mut entities_removed: usize = 0;
260 for &src_id in &source_ids {
261 let removed = tx.execute("DELETE FROM entities WHERE id = ?1", params![src_id])?;
262 entities_removed += removed;
263 }
264
265 let adjacent_ids: Vec<i64> = {
267 let mut stmt = tx.prepare(
268 "SELECT DISTINCT CASE WHEN source_id = ?1 THEN target_id ELSE source_id END
269 FROM relationships WHERE source_id = ?1 OR target_id = ?1",
270 )?;
271 let ids: Vec<i64> = stmt
272 .query_map(params![target_id], |r| r.get(0))?
273 .collect::<Result<Vec<_>, _>>()?;
274 ids
275 };
276 entities::recalculate_degree(&tx, target_id)?;
277 for &adj_id in &adjacent_ids {
278 entities::recalculate_degree(&tx, adj_id)?;
279 }
280
281 tx.commit()?;
282
283 conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;
284
285 let response = MergeEntitiesResponse {
286 action: "merged".to_string(),
287 sources: source_names,
288 target: target_name,
289 namespace: namespace.clone(),
290 target_id,
291 relationships_moved,
292 entities_removed,
293 elapsed_ms: inicio.elapsed().as_millis() as u64,
294 };
295
296 match args.format {
297 OutputFormat::Json => output::emit_json(&response)?,
298 OutputFormat::Text | OutputFormat::Markdown => {
299 output::emit_text(&format!(
300 "merged: {} sources into '{}' (relationships_moved={}, entities_removed={}) [{}]",
301 response.sources.len(),
302 response.target,
303 response.relationships_moved,
304 response.entities_removed,
305 response.namespace
306 ));
307 }
308 }
309
310 Ok(())
311}
312
313#[cfg(test)]
314mod tests {
315 use super::*;
316
317 #[test]
320 fn find_entity_name_by_id_disambiguates_homonyms_across_namespaces() {
321 let conn = rusqlite::Connection::open_in_memory().unwrap();
322 conn.execute_batch(
323 "CREATE TABLE entities (
324 id INTEGER PRIMARY KEY,
325 namespace TEXT NOT NULL,
326 name TEXT NOT NULL,
327 UNIQUE(namespace, name)
328 );",
329 )
330 .unwrap();
331 conn.execute(
332 "INSERT INTO entities (id, namespace, name)
333 VALUES (1, 'ns-a', 'auth'), (2, 'ns-b', 'auth')",
334 [],
335 )
336 .unwrap();
337
338 assert_eq!(find_entity_name_by_id(&conn, "ns-a", 1).unwrap(), "auth");
340 assert_eq!(find_entity_name_by_id(&conn, "ns-b", 2).unwrap(), "auth");
341 let err = find_entity_name_by_id(&conn, "ns-a", 2).unwrap_err();
342 assert_eq!(err.exit_code(), 4, "cross-namespace ID must be NotFound");
343 assert!(err.to_string().contains("id=2"), "obtido: {err}");
344 }
345
346 #[test]
347 fn find_entity_name_by_id_missing_id_is_not_found() {
348 let conn = rusqlite::Connection::open_in_memory().unwrap();
349 conn.execute_batch(
350 "CREATE TABLE entities (
351 id INTEGER PRIMARY KEY,
352 namespace TEXT NOT NULL,
353 name TEXT NOT NULL
354 );",
355 )
356 .unwrap();
357 let err = find_entity_name_by_id(&conn, "global", 99).unwrap_err();
358 assert_eq!(err.exit_code(), 4);
359 }
360
361 #[derive(clap::Parser)]
364 struct TestCli {
365 #[command(flatten)]
366 args: MergeEntitiesArgs,
367 }
368
369 #[test]
370 fn clap_rejects_names_combined_with_ids() {
371 use clap::Parser;
372 let err =
373 match TestCli::try_parse_from(["t", "--names", "a,b", "--ids", "1,2", "--into", "tgt"])
374 {
375 Ok(_) => panic!("expected argument conflict"),
376 Err(e) => e,
377 };
378 assert_eq!(err.kind(), clap::error::ErrorKind::ArgumentConflict);
379 }
380
381 #[test]
382 fn clap_rejects_into_combined_with_into_id() {
383 use clap::Parser;
384 let err =
385 match TestCli::try_parse_from(["t", "--names", "a", "--into", "tgt", "--into-id", "3"])
386 {
387 Ok(_) => panic!("expected argument conflict"),
388 Err(e) => e,
389 };
390 assert_eq!(err.kind(), clap::error::ErrorKind::ArgumentConflict);
391 }
392
393 #[test]
394 fn clap_requires_a_source_and_a_target_selector() {
395 use clap::Parser;
396 assert!(TestCli::try_parse_from(["t", "--into", "tgt"]).is_err());
397 assert!(TestCli::try_parse_from(["t", "--names", "a"]).is_err());
398 let ok = match TestCli::try_parse_from(["t", "--ids", "1,2", "--into-id", "3"]) {
399 Ok(cli) => cli,
400 Err(e) => panic!("expected successful parse: {e}"),
401 };
402 assert_eq!(ok.args.ids, vec![1, 2]);
403 assert_eq!(ok.args.into_id, Some(3));
404 assert!(ok.args.names.is_empty());
405 assert!(ok.args.into.is_none());
406 }
407
408 #[test]
409 fn merge_entities_response_serializes_all_fields() {
410 let resp = MergeEntitiesResponse {
411 action: "merged".to_string(),
412 sources: vec!["auth".to_string(), "authentication".to_string()],
413 target: "auth-service".to_string(),
414 namespace: "global".to_string(),
415 target_id: 1,
416 relationships_moved: 7,
417 entities_removed: 2,
418 elapsed_ms: 15,
419 };
420 let json = serde_json::to_value(&resp).expect("serialization failed");
421 assert_eq!(json["action"], "merged");
422 assert_eq!(json["target"], "auth-service");
423 assert_eq!(json["namespace"], "global");
424 assert_eq!(json["relationships_moved"], 7);
425 assert_eq!(json["entities_removed"], 2);
426 let sources = json["sources"].as_array().expect("must be array");
427 assert_eq!(sources.len(), 2);
428 assert!(json["elapsed_ms"].is_number());
429 }
430
431 #[test]
432 fn merge_entities_response_action_is_merged() {
433 let resp = MergeEntitiesResponse {
434 action: "merged".to_string(),
435 sources: vec!["src".to_string()],
436 target: "tgt".to_string(),
437 namespace: "ns".to_string(),
438 target_id: 1,
439 relationships_moved: 0,
440 entities_removed: 1,
441 elapsed_ms: 0,
442 };
443 assert_eq!(resp.action, "merged");
444 }
445
446 #[test]
447 fn merge_entities_response_empty_sources_serializes() {
448 let resp = MergeEntitiesResponse {
449 action: "merged".to_string(),
450 sources: vec![],
451 target: "target".to_string(),
452 namespace: "global".to_string(),
453 target_id: 1,
454 relationships_moved: 0,
455 entities_removed: 0,
456 elapsed_ms: 1,
457 };
458 let json = serde_json::to_value(&resp).expect("serialization failed");
459 let sources = json["sources"].as_array().expect("must be array");
460 assert_eq!(sources.len(), 0);
461 }
462
463 #[test]
464 fn merge_entities_response_with_zero_relationships_moved() {
465 let resp = MergeEntitiesResponse {
466 action: "merged".to_string(),
467 sources: vec!["src-a".to_string()],
468 target: "tgt".to_string(),
469 namespace: "global".to_string(),
470 target_id: 1,
471 relationships_moved: 0,
472 entities_removed: 1,
473 elapsed_ms: 5,
474 };
475 let json = serde_json::to_value(&resp).expect("serialization failed");
476 assert_eq!(json["relationships_moved"], 0);
477 assert_eq!(json["entities_removed"], 1);
478 }
479
480 #[test]
481 fn merge_entities_response_multiple_sources() {
482 let resp = MergeEntitiesResponse {
483 action: "merged".to_string(),
484 sources: vec!["a".into(), "b".into(), "c".into()],
485 target: "canonical".to_string(),
486 namespace: "proj".to_string(),
487 target_id: 1,
488 relationships_moved: 12,
489 entities_removed: 3,
490 elapsed_ms: 42,
491 };
492 let json = serde_json::to_value(&resp).expect("serialization failed");
493 assert_eq!(json["entities_removed"], 3);
494 let sources = json["sources"].as_array().unwrap();
495 assert_eq!(sources.len(), 3);
496 }
497}