1use crate::commands::ingest::IngestArgs;
8use crate::commands::opencode_runner;
9use crate::errors::AppError;
10use crate::parsers::normalize_entity_name;
11use serde::{Deserialize, Serialize};
12use std::io::Write;
13use std::path::{Path, PathBuf};
14
15const EXTRACTION_SCHEMA: &str = r#"Return ONLY a valid JSON object with this exact structure (no markdown, no explanation):
16{
17 "entities": [
18 {"name": "entity-name-in-kebab-case", "entity_type": "concept|project|tool|person|file|incident|decision|organization|location|date"}
19 ],
20 "relationships": [
21 {"source": "entity-a", "target": "entity-b", "relation": "applies-to|uses|depends-on|causes|fixes|contradicts|supports|follows|related|replaces|tracked-in", "strength": 0.7}
22 ]
23}"#;
24
25#[derive(Debug, Deserialize, Serialize)]
26pub struct ExtractionResult {
27 #[serde(default)]
28 pub entities: Vec<ExtractedEntity>,
29 #[serde(default)]
30 pub relationships: Vec<ExtractedRelationship>,
31}
32
33#[derive(Debug, Deserialize, Serialize, Clone)]
34pub struct ExtractedEntity {
35 pub name: String,
36 pub entity_type: String,
37}
38
39#[derive(Debug, Deserialize, Serialize, Clone)]
40pub struct ExtractedRelationship {
41 pub source: String,
42 pub target: String,
43 pub relation: String,
44 #[serde(default = "default_strength")]
45 pub strength: f64,
46}
47
48fn default_strength() -> f64 {
49 0.5
50}
51
52pub async fn extract_with_opencode(
53 binary: &Path,
54 model: &str,
55 body: &str,
56 memory_name: &str,
57 timeout_secs: u64,
58) -> Result<(ExtractionResult, f64, u64), AppError> {
59 let prompt = format!(
60 "Analyze the following document and extract domain-specific entities and their relationships.\n\
61 Memory name: {memory_name}\n\n\
62 {EXTRACTION_SCHEMA}\n\n\
63 Document content:\n{body}"
64 );
65
66 opencode_runner::call_opencode::<ExtractionResult>(binary, model, &prompt, timeout_secs).await
67}
68
69fn emit_json(value: &serde_json::Value) {
70 let _ = writeln!(
71 std::io::stdout(),
72 "{}",
73 serde_json::to_string(value).unwrap_or_default()
74 );
75 let _ = std::io::stdout().flush();
76}
77
78pub fn run_opencode_ingest(args: &IngestArgs) -> Result<(), AppError> {
79 let started = std::time::Instant::now();
80
81 if !args.dir.exists() {
82 return Err(AppError::Validation(format!(
83 "directory not found: {}",
84 args.dir.display()
85 )));
86 }
87
88 let binary =
89 opencode_runner::find_opencode_binary_with_override(args.opencode_binary.as_deref())?;
90 let version = opencode_runner::validate_opencode_version(&binary)?;
91 let model = opencode_runner::resolve_opencode_model(args.opencode_model.as_deref());
92 let timeout = opencode_runner::resolve_opencode_timeout(if args.opencode_timeout != 300 {
93 Some(args.opencode_timeout)
94 } else {
95 None
96 });
97
98 emit_json(&serde_json::json!({
99 "phase": "validate",
100 "opencode_path": binary.display().to_string(),
101 "version": format!("{}.{}.{}", version.0, version.1, version.2),
102 "model": &model,
103 }));
104
105 let mut files: Vec<PathBuf> = Vec::new();
106 super::ingest::collect_files(&args.dir, &args.pattern, args.recursive, &mut files)?;
107
108 if files.len() > args.max_files {
109 return Err(AppError::Validation(format!(
110 "found {} files exceeding --max-files cap of {}; aborting (all-or-nothing)",
111 files.len(),
112 args.max_files
113 )));
114 }
115
116 files.sort();
117
118 emit_json(&serde_json::json!({
119 "phase": "scan",
120 "dir": args.dir.display().to_string(),
121 "files_total": files.len(),
122 "files_new": files.len(),
123 "files_existing": 0,
124 }));
125
126 if args.dry_run {
127 for (idx, file) in files.iter().enumerate() {
128 let (name, truncated, orig) =
129 super::ingest::derive_kebab_name(file, args.max_name_length);
130 emit_json(&serde_json::json!({
131 "file": file.display().to_string(),
132 "name": name,
133 "status": "preview",
134 "index": idx + 1,
135 "total": files.len(),
136 "truncated": truncated,
137 "original_name": orig,
138 }));
139 }
140 emit_json(&serde_json::json!({
141 "summary": true,
142 "files_total": files.len(),
143 "completed": 0,
144 "failed": 0,
145 "skipped": 0,
146 "entities_total": 0,
147 "rels_total": 0,
148 "cost_usd": 0.0,
149 "elapsed_ms": started.elapsed().as_millis() as u64,
150 }));
151 return Ok(());
152 }
153
154 let rt = crate::embedder::shared_runtime()?;
155
156 let ns = crate::namespace::resolve_namespace(args.namespace.as_deref())?;
157 let app_paths = crate::paths::AppPaths::resolve(args.db.as_deref())?;
158
159 let mut completed = 0usize;
160 let mut failed = 0usize;
161 let mut skipped = 0usize;
162 let mut entities_total = 0usize;
163 let mut rels_total = 0usize;
164 let mut cost_total: f64 = 0.0;
165
166 for (idx, file) in files.iter().enumerate() {
167 let (name, truncated, orig) = super::ingest::derive_kebab_name(file, args.max_name_length);
168
169 let body = match std::fs::read_to_string(file) {
170 Ok(b) => b,
171 Err(e) => {
172 emit_json(&serde_json::json!({
173 "file": file.display().to_string(),
174 "name": name,
175 "status": "failed",
176 "error": format!("read error: {e}"),
177 "index": idx + 1,
178 "total": files.len(),
179 }));
180 failed += 1;
181 if args.fail_fast {
182 break;
183 }
184 continue;
185 }
186 };
187
188 if body.len() > 512_000 {
189 emit_json(&serde_json::json!({
190 "file": file.display().to_string(),
191 "name": name,
192 "status": "skipped",
193 "error": format!("file exceeds 512KB limit ({} bytes)", body.len()),
194 "index": idx + 1,
195 "total": files.len(),
196 }));
197 skipped += 1;
198 continue;
199 }
200
201 let file_started = std::time::Instant::now();
202
203 let extraction = rt.block_on(extract_with_opencode(
204 &binary, &model, &body, &name, timeout,
205 ));
206
207 match extraction {
208 Ok((result, cost, _tokens)) => {
209 let ent_count = result.entities.len();
210 let rel_count = result.relationships.len();
211
212 let graph_payload = serde_json::json!({
213 "body": body,
214 "entities": result.entities.iter().map(|e| {
215 serde_json::json!({"name": e.name, "entity_type": e.entity_type})
216 }).collect::<Vec<_>>(),
217 "relationships": result.relationships.iter().map(|r| {
218 serde_json::json!({
219 "source": r.source,
220 "target": r.target,
221 "relation": r.relation,
222 "strength": r.strength
223 })
224 }).collect::<Vec<_>>(),
225 });
226
227 let remember_result = persist_memory_with_graph(
228 &app_paths.db,
229 &ns,
230 &name,
231 &format!("{:?}", args.r#type).to_lowercase(),
232 &format!("ingested from {} via opencode", file.display()),
233 &graph_payload,
234 );
235
236 match remember_result {
237 Ok(memory_id) => {
238 entities_total += ent_count;
239 rels_total += rel_count;
240 cost_total += cost;
241 completed += 1;
242
243 emit_json(&serde_json::json!({
244 "file": file.display().to_string(),
245 "name": name,
246 "status": "done",
247 "memory_id": memory_id,
248 "entities": ent_count,
249 "rels": rel_count,
250 "cost_usd": cost,
251 "elapsed_ms": file_started.elapsed().as_millis() as u64,
252 "index": idx + 1,
253 "total": files.len(),
254 "truncated": truncated,
255 "original_name": orig,
256 }));
257 }
258 Err(e) => {
259 failed += 1;
260 emit_json(&serde_json::json!({
261 "file": file.display().to_string(),
262 "name": name,
263 "status": "failed",
264 "error": format!("persist error: {e}"),
265 "elapsed_ms": file_started.elapsed().as_millis() as u64,
266 "index": idx + 1,
267 "total": files.len(),
268 }));
269 if args.fail_fast {
270 break;
271 }
272 }
273 }
274 }
275 Err(e) => {
276 failed += 1;
277 emit_json(&serde_json::json!({
278 "file": file.display().to_string(),
279 "name": name,
280 "status": "failed",
281 "error": format!("extraction error: {e}"),
282 "elapsed_ms": file_started.elapsed().as_millis() as u64,
283 "index": idx + 1,
284 "total": files.len(),
285 }));
286 if args.fail_fast {
287 break;
288 }
289 }
290 }
291 }
292
293 emit_json(&serde_json::json!({
294 "summary": true,
295 "files_total": files.len(),
296 "completed": completed,
297 "failed": failed,
298 "skipped": skipped,
299 "entities_total": entities_total,
300 "rels_total": rels_total,
301 "cost_usd": cost_total,
302 "elapsed_ms": started.elapsed().as_millis() as u64,
303 }));
304
305 Ok(())
306}
307
308fn persist_memory_with_graph(
309 db_path: &Path,
310 namespace: &str,
311 name: &str,
312 memory_type: &str,
313 description: &str,
314 graph_payload: &serde_json::Value,
315) -> Result<i64, AppError> {
316 let conn = crate::storage::connection::open_rw(db_path)?;
317
318 let existing = conn
319 .query_row(
320 "SELECT id FROM memories WHERE name = ?1 AND namespace = ?2",
321 rusqlite::params![name, namespace],
322 |row| row.get::<_, i64>(0),
323 )
324 .ok();
325
326 let body = graph_payload
327 .get("body")
328 .and_then(|b| b.as_str())
329 .unwrap_or("");
330 let body_hash = blake3::hash(body.as_bytes()).to_hex().to_string();
331
332 let memory_id = if let Some(id) = existing {
333 conn.execute(
334 "UPDATE memories SET body = ?1, description = ?2, type = ?3, body_hash = ?4, updated_at = strftime('%s','now') WHERE id = ?5",
335 rusqlite::params![body, description, memory_type, body_hash, id],
336 )
337 .map_err(AppError::Database)?;
338 id
339 } else {
340 conn.execute(
341 "INSERT INTO memories (name, namespace, type, description, body, body_hash, created_at, updated_at) \
342 VALUES (?1, ?2, ?3, ?4, ?5, ?6, strftime('%s','now'), strftime('%s','now'))",
343 rusqlite::params![name, namespace, memory_type, description, body, body_hash],
344 )
345 .map_err(AppError::Database)?;
346 conn.last_insert_rowid()
347 };
348
349 if let Some(entities) = graph_payload.get("entities").and_then(|e| e.as_array()) {
350 for ent in entities {
351 let ent_name = ent.get("name").and_then(|n| n.as_str()).unwrap_or("");
352 let ent_type = ent
353 .get("entity_type")
354 .and_then(|t| t.as_str())
355 .unwrap_or("concept");
356 if ent_name.len() < 2 {
357 continue;
358 }
359 let normalized = normalize_entity_name(ent_name);
360 conn.execute(
361 "INSERT OR IGNORE INTO entities (name, type, namespace) VALUES (?1, ?2, ?3)",
362 rusqlite::params![normalized, ent_type, namespace],
363 )
364 .map_err(AppError::Database)?;
365
366 let entity_id: i64 = conn
367 .query_row(
368 "SELECT id FROM entities WHERE name = ?1 AND namespace = ?2",
369 rusqlite::params![normalized, namespace],
370 |row| row.get(0),
371 )
372 .map_err(AppError::Database)?;
373
374 conn.execute(
375 "INSERT OR IGNORE INTO memory_entities (memory_id, entity_id) VALUES (?1, ?2)",
376 rusqlite::params![memory_id, entity_id],
377 )
378 .map_err(AppError::Database)?;
379 }
380 }
381
382 if let Some(rels) = graph_payload
383 .get("relationships")
384 .and_then(|r| r.as_array())
385 {
386 for rel in rels {
387 let source = rel.get("source").and_then(|s| s.as_str()).unwrap_or("");
388 let target = rel.get("target").and_then(|t| t.as_str()).unwrap_or("");
389 let relation = rel
390 .get("relation")
391 .and_then(|r| r.as_str())
392 .unwrap_or("related");
393 let strength = rel.get("strength").and_then(|s| s.as_f64()).unwrap_or(0.5);
394
395 if source.len() < 2 || target.len() < 2 {
396 continue;
397 }
398
399 let src_norm = normalize_entity_name(source);
400 let tgt_norm = normalize_entity_name(target);
401
402 for name_val in [&src_norm, &tgt_norm] {
403 conn.execute(
404 "INSERT OR IGNORE INTO entities (name, type, namespace) VALUES (?1, 'concept', ?2)",
405 rusqlite::params![name_val, namespace],
406 )
407 .map_err(AppError::Database)?;
408 }
409
410 let src_id: i64 = conn
411 .query_row(
412 "SELECT id FROM entities WHERE name = ?1 AND namespace = ?2",
413 rusqlite::params![src_norm, namespace],
414 |row| row.get(0),
415 )
416 .map_err(AppError::Database)?;
417
418 let tgt_id: i64 = conn
419 .query_row(
420 "SELECT id FROM entities WHERE name = ?1 AND namespace = ?2",
421 rusqlite::params![tgt_norm, namespace],
422 |row| row.get(0),
423 )
424 .map_err(AppError::Database)?;
425
426 let rel_normalized = relation.replace('-', "_");
427 conn.execute(
428 "INSERT OR IGNORE INTO relationships (source_id, target_id, relation, weight, namespace) \
429 VALUES (?1, ?2, ?3, ?4, ?5)",
430 rusqlite::params![src_id, tgt_id, rel_normalized, strength, namespace],
431 )
432 .map_err(AppError::Database)?;
433 }
434 }
435
436 Ok(memory_id)
437}
438
439#[cfg(test)]
440mod tests {
441 use super::*;
442
443 #[test]
444 fn extraction_result_deserializes_empty() {
445 let json = r#"{"entities":[],"relationships":[]}"#;
446 let result: ExtractionResult = serde_json::from_str(json).unwrap();
447 assert!(result.entities.is_empty());
448 assert!(result.relationships.is_empty());
449 }
450
451 #[test]
452 fn extraction_result_deserializes_with_data() {
453 let json = r#"{
454 "entities": [
455 {"name": "sqlite-graphrag", "entity_type": "project"},
456 {"name": "opencode", "entity_type": "tool"}
457 ],
458 "relationships": [
459 {"source": "sqlite-graphrag", "target": "opencode", "relation": "uses", "strength": 0.8}
460 ]
461 }"#;
462 let result: ExtractionResult = serde_json::from_str(json).unwrap();
463 assert_eq!(result.entities.len(), 2);
464 assert_eq!(result.relationships.len(), 1);
465 assert_eq!(result.relationships[0].strength, 0.8);
466 }
467
468 #[test]
469 fn extraction_result_default_strength() {
470 let json = r#"{
471 "entities": [],
472 "relationships": [
473 {"source": "a", "target": "b", "relation": "related"}
474 ]
475 }"#;
476 let result: ExtractionResult = serde_json::from_str(json).unwrap();
477 assert_eq!(result.relationships[0].strength, 0.5);
478 }
479}