Skip to main content

sqlite_graphrag/commands/
ingest_opencode.rs

1//! OpenCode-curated ingest pipeline (v1.0.90, GAP-OPENCODE-002).
2//!
3//! Spawns `opencode run` per file to extract entities and relationships
4//! via LLM, then persists them alongside the memory body via `remember
5//! --graph-stdin --force-merge`.
6
7use crate::commands::ingest::IngestArgs;
8use crate::commands::opencode_runner;
9use crate::errors::AppError;
10use crate::parsers::normalize_entity_name;
11use serde::{Deserialize, Serialize};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15const EXTRACTION_SCHEMA: &str = r#"Return ONLY a valid JSON object with this exact structure (no markdown, no explanation):
16{
17  "entities": [
18    {"name": "entity-name-in-kebab-case", "entity_type": "concept|project|tool|person|file|incident|decision|organization|location|date"}
19  ],
20  "relationships": [
21    {"source": "entity-a", "target": "entity-b", "relation": "applies-to|uses|depends-on|causes|fixes|contradicts|supports|follows|related|replaces|tracked-in", "strength": 0.7}
22  ]
23}"#;
24
25#[derive(Debug, Deserialize, Serialize)]
26pub struct ExtractionResult {
27    #[serde(default)]
28    pub entities: Vec<ExtractedEntity>,
29    #[serde(default)]
30    pub relationships: Vec<ExtractedRelationship>,
31}
32
33#[derive(Debug, Deserialize, Serialize, Clone)]
34pub struct ExtractedEntity {
35    pub name: String,
36    pub entity_type: String,
37}
38
39#[derive(Debug, Deserialize, Serialize, Clone)]
40pub struct ExtractedRelationship {
41    pub source: String,
42    pub target: String,
43    pub relation: String,
44    #[serde(default = "default_strength")]
45    pub strength: f64,
46}
47
48fn default_strength() -> f64 {
49    0.5
50}
51
52pub async fn extract_with_opencode(
53    binary: &Path,
54    model: &str,
55    body: &str,
56    memory_name: &str,
57    timeout_secs: u64,
58) -> Result<(ExtractionResult, f64, u64), AppError> {
59    let prompt = format!(
60        "Analyze the following document and extract domain-specific entities and their relationships.\n\
61         Memory name: {memory_name}\n\n\
62         {EXTRACTION_SCHEMA}\n\n\
63         Document content:\n{body}"
64    );
65
66    opencode_runner::call_opencode::<ExtractionResult>(binary, model, &prompt, timeout_secs).await
67}
68
69fn emit_json(value: &serde_json::Value) {
70    let _ = writeln!(
71        std::io::stdout(),
72        "{}",
73        serde_json::to_string(value).unwrap_or_default()
74    );
75    let _ = std::io::stdout().flush();
76}
77
78pub fn run_opencode_ingest(args: &IngestArgs) -> Result<(), AppError> {
79    let started = std::time::Instant::now();
80
81    if !args.dir.exists() {
82        return Err(AppError::Validation(format!(
83            "directory not found: {}",
84            args.dir.display()
85        )));
86    }
87
88    let binary =
89        opencode_runner::find_opencode_binary_with_override(args.opencode_binary.as_deref())?;
90    let version = opencode_runner::validate_opencode_version(&binary)?;
91    let model = opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
92    let timeout = opencode_runner::resolve_opencode_timeout(if args.opencode_timeout != 300 {
93        Some(args.opencode_timeout)
94    } else {
95        None
96    });
97
98    emit_json(&serde_json::json!({
99        "phase": "validate",
100        "opencode_path": binary.display().to_string(),
101        "version": format!("{}.{}.{}", version.0, version.1, version.2),
102        "model": &model,
103    }));
104
105    let mut files: Vec<PathBuf> = Vec::new();
106    super::ingest::collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
107
108    if files.len() > args.max_files {
109        return Err(AppError::Validation(format!(
110            "found {} files exceeding --max-files cap of {}; aborting (all-or-nothing)",
111            files.len(),
112            args.max_files
113        )));
114    }
115
116    files.sort();
117
118    emit_json(&serde_json::json!({
119        "phase": "scan",
120        "dir": args.dir.display().to_string(),
121        "files_total": files.len(),
122        "files_new": files.len(),
123        "files_existing": 0,
124    }));
125
126    if args.dry_run {
127        for (idx, file) in files.iter().enumerate() {
128            let (name, truncated, orig) =
129                super::ingest::derive_kebab_name(file, args.max_name_length);
130            emit_json(&serde_json::json!({
131                "file": file.display().to_string(),
132                "name": name,
133                "status": "preview",
134                "index": idx + 1,
135                "total": files.len(),
136                "truncated": truncated,
137                "original_name": orig,
138            }));
139        }
140        emit_json(&serde_json::json!({
141            "summary": true,
142            "files_total": files.len(),
143            "completed": 0,
144            "failed": 0,
145            "skipped": 0,
146            "entities_total": 0,
147            "rels_total": 0,
148            "cost_usd": 0.0,
149            "elapsed_ms": started.elapsed().as_millis() as u64,
150        }));
151        return Ok(());
152    }
153
154    let rt = crate::embedder::shared_runtime()?;
155
156    let ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
157    let app_paths = crate::paths::AppPaths::resolve(args.db.as_deref())?;
158
159    let mut completed = 0usize;
160    let mut failed = 0usize;
161    let mut skipped = 0usize;
162    let mut entities_total = 0usize;
163    let mut rels_total = 0usize;
164    let mut cost_total: f64 = 0.0;
165
166    for (idx, file) in files.iter().enumerate() {
167        let (name, truncated, orig) = super::ingest::derive_kebab_name(file, args.max_name_length);
168
169        let body = match std::fs::read_to_string(file) {
170            Ok(b) => b,
171            Err(e) => {
172                emit_json(&serde_json::json!({
173                    "file": file.display().to_string(),
174                    "name": name,
175                    "status": "failed",
176                    "error": format!("read error: {e}"),
177                    "index": idx + 1,
178                    "total": files.len(),
179                }));
180                failed += 1;
181                if args.fail_fast {
182                    break;
183                }
184                continue;
185            }
186        };
187
188        if body.len() > 512_000 {
189            emit_json(&serde_json::json!({
190                "file": file.display().to_string(),
191                "name": name,
192                "status": "skipped",
193                "error": format!("file exceeds 512KB limit ({} bytes)", body.len()),
194                "index": idx + 1,
195                "total": files.len(),
196            }));
197            skipped += 1;
198            continue;
199        }
200
201        let file_started = std::time::Instant::now();
202
203        let extraction = rt.block_on(extract_with_opencode(
204            &binary, &model, &body, &name, timeout,
205        ));
206
207        match extraction {
208            Ok((result, cost, _tokens)) => {
209                let ent_count = result.entities.len();
210                let rel_count = result.relationships.len();
211
212                let graph_payload = serde_json::json!({
213                    "body": body,
214                    "entities": result.entities.iter().map(|e| {
215                        serde_json::json!({"name": e.name, "entity_type": e.entity_type})
216                    }).collect::<Vec<_>>(),
217                    "relationships": result.relationships.iter().map(|r| {
218                        serde_json::json!({
219                            "source": r.source,
220                            "target": r.target,
221                            "relation": r.relation,
222                            "strength": r.strength
223                        })
224                    }).collect::<Vec<_>>(),
225                });
226
227                let remember_result = persist_memory_with_graph(
228                    &app_paths.db,
229                    &ns,
230                    &name,
231                    &format!("{:?}", args.r#type).to_lowercase(),
232                    &format!("ingested from {} via opencode", file.display()),
233                    &graph_payload,
234                );
235
236                match remember_result {
237                    Ok(memory_id) => {
238                        entities_total += ent_count;
239                        rels_total += rel_count;
240                        cost_total += cost;
241                        completed += 1;
242
243                        emit_json(&serde_json::json!({
244                            "file": file.display().to_string(),
245                            "name": name,
246                            "status": "done",
247                            "memory_id": memory_id,
248                            "entities": ent_count,
249                            "rels": rel_count,
250                            "cost_usd": cost,
251                            "elapsed_ms": file_started.elapsed().as_millis() as u64,
252                            "index": idx + 1,
253                            "total": files.len(),
254                            "truncated": truncated,
255                            "original_name": orig,
256                        }));
257                    }
258                    Err(e) => {
259                        failed += 1;
260                        emit_json(&serde_json::json!({
261                            "file": file.display().to_string(),
262                            "name": name,
263                            "status": "failed",
264                            "error": format!("persist error: {e}"),
265                            "elapsed_ms": file_started.elapsed().as_millis() as u64,
266                            "index": idx + 1,
267                            "total": files.len(),
268                        }));
269                        if args.fail_fast {
270                            break;
271                        }
272                    }
273                }
274            }
275            Err(e) => {
276                failed += 1;
277                emit_json(&serde_json::json!({
278                    "file": file.display().to_string(),
279                    "name": name,
280                    "status": "failed",
281                    "error": format!("extraction error: {e}"),
282                    "elapsed_ms": file_started.elapsed().as_millis() as u64,
283                    "index": idx + 1,
284                    "total": files.len(),
285                }));
286                if args.fail_fast {
287                    break;
288                }
289            }
290        }
291    }
292
293    emit_json(&serde_json::json!({
294        "summary": true,
295        "files_total": files.len(),
296        "completed": completed,
297        "failed": failed,
298        "skipped": skipped,
299        "entities_total": entities_total,
300        "rels_total": rels_total,
301        "cost_usd": cost_total,
302        "elapsed_ms": started.elapsed().as_millis() as u64,
303    }));
304
305    Ok(())
306}
307
308fn persist_memory_with_graph(
309    db_path: &Path,
310    namespace: &str,
311    name: &str,
312    memory_type: &str,
313    description: &str,
314    graph_payload: &serde_json::Value,
315) -> Result<i64, AppError> {
316    let conn = crate::storage::connection::open_rw(db_path)?;
317
318    let existing = conn
319        .query_row(
320            "SELECT id FROM memories WHERE name = ?1 AND namespace = ?2",
321            rusqlite::params![name, namespace],
322            |row| row.get::<_, i64>(0),
323        )
324        .ok();
325
326    let body = graph_payload
327        .get("body")
328        .and_then(|b| b.as_str())
329        .unwrap_or("");
330    let body_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
331
332    let memory_id = if let Some(id) = existing {
333        conn.execute(
334            "UPDATE memories SET body = ?1, description = ?2, type = ?3, body_hash = ?4, updated_at = strftime('%s','now') WHERE id = ?5",
335            rusqlite::params![body, description, memory_type, body_hash, id],
336        )
337        .map_err(AppError::Database)?;
338        id
339    } else {
340        conn.execute(
341            "INSERT INTO memories (name, namespace, type, description, body, body_hash, created_at, updated_at) \
342             VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), strftime('%s','now'))",
343            rusqlite::params![name, namespace, memory_type, description, body, body_hash],
344        )
345        .map_err(AppError::Database)?;
346        conn.last_insert_rowid()
347    };
348
349    if let Some(entities) = graph_payload.get("entities").and_then(|e| e.as_array()) {
350        for ent in entities {
351            let ent_name = ent.get("name").and_then(|n| n.as_str()).unwrap_or("");
352            let ent_type = ent
353                .get("entity_type")
354                .and_then(|t| t.as_str())
355                .unwrap_or("concept");
356            if ent_name.len() < 2 {
357                continue;
358            }
359            let normalized = normalize_entity_name(ent_name);
360            conn.execute(
361                "INSERT OR IGNORE INTO entities (name, type, namespace) VALUES (?1, ?2, ?3)",
362                rusqlite::params![normalized, ent_type, namespace],
363            )
364            .map_err(AppError::Database)?;
365
366            let entity_id: i64 = conn
367                .query_row(
368                    "SELECT id FROM entities WHERE name = ?1 AND namespace = ?2",
369                    rusqlite::params![normalized, namespace],
370                    |row| row.get(0),
371                )
372                .map_err(AppError::Database)?;
373
374            conn.execute(
375                "INSERT OR IGNORE INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
376                rusqlite::params![memory_id, entity_id],
377            )
378            .map_err(AppError::Database)?;
379        }
380    }
381
382    if let Some(rels) = graph_payload
383        .get("relationships")
384        .and_then(|r| r.as_array())
385    {
386        for rel in rels {
387            let source = rel.get("source").and_then(|s| s.as_str()).unwrap_or("");
388            let target = rel.get("target").and_then(|t| t.as_str()).unwrap_or("");
389            let relation = rel
390                .get("relation")
391                .and_then(|r| r.as_str())
392                .unwrap_or("related");
393            let strength = rel.get("strength").and_then(|s| s.as_f64()).unwrap_or(0.5);
394
395            if source.len() < 2 || target.len() < 2 {
396                continue;
397            }
398
399            let src_norm = normalize_entity_name(source);
400            let tgt_norm = normalize_entity_name(target);
401
402            for name_val in [&src_norm, &tgt_norm] {
403                conn.execute(
404                    "INSERT OR IGNORE INTO entities (name, type, namespace) VALUES (?1, 'concept', ?2)",
405                    rusqlite::params![name_val, namespace],
406                )
407                .map_err(AppError::Database)?;
408            }
409
410            let src_id: i64 = conn
411                .query_row(
412                    "SELECT id FROM entities WHERE name = ?1 AND namespace = ?2",
413                    rusqlite::params![src_norm, namespace],
414                    |row| row.get(0),
415                )
416                .map_err(AppError::Database)?;
417
418            let tgt_id: i64 = conn
419                .query_row(
420                    "SELECT id FROM entities WHERE name = ?1 AND namespace = ?2",
421                    rusqlite::params![tgt_norm, namespace],
422                    |row| row.get(0),
423                )
424                .map_err(AppError::Database)?;
425
426            let rel_normalized = relation.replace('-', "_");
427            conn.execute(
428                "INSERT OR IGNORE INTO relationships (source_id, target_id, relation, weight, namespace) \
429                 VALUES (?1, ?2, ?3, ?4, ?5)",
430                rusqlite::params![src_id, tgt_id, rel_normalized, strength, namespace],
431            )
432            .map_err(AppError::Database)?;
433        }
434    }
435
436    Ok(memory_id)
437}
438
439#[cfg(test)]
440mod tests {
441    use super::*;
442
443    #[test]
444    fn extraction_result_deserializes_empty() {
445        let json = r#"{"entities":[],"relationships":[]}"#;
446        let result: ExtractionResult = serde_json::from_str(json).unwrap();
447        assert!(result.entities.is_empty());
448        assert!(result.relationships.is_empty());
449    }
450
451    #[test]
452    fn extraction_result_deserializes_with_data() {
453        let json = r#"{
454            "entities": [
455                {"name": "sqlite-graphrag", "entity_type": "project"},
456                {"name": "opencode", "entity_type": "tool"}
457            ],
458            "relationships": [
459                {"source": "sqlite-graphrag", "target": "opencode", "relation": "uses", "strength": 0.8}
460            ]
461        }"#;
462        let result: ExtractionResult = serde_json::from_str(json).unwrap();
463        assert_eq!(result.entities.len(), 2);
464        assert_eq!(result.relationships.len(), 1);
465        assert_eq!(result.relationships[0].strength, 0.8);
466    }
467
468    #[test]
469    fn extraction_result_default_strength() {
470        let json = r#"{
471            "entities": [],
472            "relationships": [
473                {"source": "a", "target": "b", "relation": "related"}
474            ]
475        }"#;
476        let result: ExtractionResult = serde_json::from_str(json).unwrap();
477        assert_eq!(result.relationships[0].strength, 0.5);
478    }
479}