Skip to main content

infigraph_docs/
store.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use arrow::array::{Int64Array, StringArray};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use kuzu::{Connection, Database, SystemConfig};
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::chunk::Chunk;
14use crate::extract::ExtractedDoc;
15
16const CREATE_SCHEMA: &[&str] = &[
17    "CREATE NODE TABLE IF NOT EXISTS Document(
18        id STRING,
19        title STRING,
20        file STRING,
21        format STRING,
22        content_hash STRING,
23        page_count INT64,
24        chunk_count INT64,
25        PRIMARY KEY(id)
26    )",
27    "CREATE NODE TABLE IF NOT EXISTS Chunk(
28        id STRING,
29        doc_file STRING,
30        idx INT64,
31        heading STRING,
32        text STRING,
33        start_offset INT64,
34        end_offset INT64,
35        page INT64,
36        content_hash STRING,
37        PRIMARY KEY(id)
38    )",
39    "CREATE REL TABLE IF NOT EXISTS HAS_CHUNK(FROM Document TO Chunk)",
40    "CREATE NODE TABLE IF NOT EXISTS Source(
41        id STRING,
42        source_type STRING,
43        base_url STRING,
44        space_key STRING,
45        last_synced STRING,
46        PRIMARY KEY(id)
47    )",
48    "CREATE REL TABLE IF NOT EXISTS FROM_SOURCE(FROM Document TO Source)",
49    "CREATE REL TABLE IF NOT EXISTS LINKS_TO(FROM Document TO Document, url STRING, link_type STRING)",
50    "CREATE NODE TABLE IF NOT EXISTS PipelineCore(id STRING, name STRING, doc_id STRING, plugin_id STRING, inputs STRING[], outputs STRING[], PRIMARY KEY(id))",
51    "CREATE REL TABLE IF NOT EXISTS DEFINED_IN(FROM PipelineCore TO Document, ONE_MANY)",
52    "CREATE REL TABLE IF NOT EXISTS DEPENDS_ON(FROM PipelineCore TO PipelineCore, dep_type STRING, MANY_MANY)",
53];
54
55pub struct DocStore {
56    db: Database,
57}
58
59impl DocStore {
60    pub fn open(path: &Path) -> Result<Self> {
61        if let Some(parent) = path.parent() {
62            std::fs::create_dir_all(parent)?;
63        }
64        let db = Database::new(path, SystemConfig::default())
65            .map_err(|e| anyhow::anyhow!("failed to open docs kuzu db: {e}"))?;
66        let store = Self { db };
67        store.init_schema()?;
68        Ok(store)
69    }
70
71    fn init_schema(&self) -> Result<()> {
72        let conn = self.connection()?;
73        for ddl in CREATE_SCHEMA {
74            conn.query(ddl)
75                .map_err(|e| anyhow::anyhow!("schema DDL failed: {e}"))?;
76        }
77        Ok(())
78    }
79
80    pub fn connection(&self) -> Result<Connection<'_>> {
81        Connection::new(&self.db).map_err(|e| anyhow::anyhow!("failed to create connection: {e}"))
82    }
83
84    pub fn get_doc_hashes(&self) -> Result<HashMap<String, String>> {
85        let conn = self.connection()?;
86        let result = conn
87            .query("MATCH (d:Document) RETURN d.file, d.content_hash")
88            .map_err(|e| anyhow::anyhow!("query doc hashes: {e}"))?;
89        let mut hashes = HashMap::new();
90        for row in result {
91            if row.len() >= 2 {
92                hashes.insert(row[0].to_string(), row[1].to_string());
93            }
94        }
95        Ok(hashes)
96    }
97
98    pub fn upsert_all_parquet(&self, docs: &[&ExtractedDoc], chunks: &[&Chunk]) -> Result<()> {
99        let conn = self.connection()?;
100
101        // Delete existing data for changed files
102        let file_list: Vec<String> = docs
103            .iter()
104            .map(|d| format!("'{}'", escape_str(&d.file)))
105            .collect();
106        if !file_list.is_empty() {
107            let files_in = file_list.join(", ");
108            let _ = conn.query(&format!(
109                "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
110                files_in
111            ));
112            let _ = conn.query(&format!(
113                "MATCH (d:Document) WHERE d.file IN [{}] DETACH DELETE d",
114                files_in
115            ));
116        }
117
118        let tmp_dir = tempfile::tempdir().context("create temp dir")?;
119
120        // Write Document parquet
121        {
122            let ids: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
123            let titles: Vec<Option<&str>> = docs.iter().map(|d| d.title.as_deref()).collect();
124            let files: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
125            let formats: Vec<&str> = docs.iter().map(|d| d.format.as_str()).collect();
126            let hashes: Vec<&str> = docs.iter().map(|d| d.content_hash.as_str()).collect();
127            let page_counts: Vec<i64> = docs
128                .iter()
129                .map(|d| d.page_count.unwrap_or(0) as i64)
130                .collect();
131            let chunk_counts: Vec<i64> = docs
132                .iter()
133                .map(|d| chunks.iter().filter(|c| c.doc_file == d.file).count() as i64)
134                .collect();
135
136            let schema = Arc::new(Schema::new(vec![
137                Field::new("id", DataType::Utf8, false),
138                Field::new("title", DataType::Utf8, true),
139                Field::new("file", DataType::Utf8, false),
140                Field::new("format", DataType::Utf8, false),
141                Field::new("content_hash", DataType::Utf8, false),
142                Field::new("page_count", DataType::Int64, false),
143                Field::new("chunk_count", DataType::Int64, false),
144            ]));
145
146            let batch = RecordBatch::try_new(
147                schema.clone(),
148                vec![
149                    Arc::new(StringArray::from(ids)),
150                    Arc::new(StringArray::from(titles)),
151                    Arc::new(StringArray::from(files)),
152                    Arc::new(StringArray::from(formats)),
153                    Arc::new(StringArray::from(hashes)),
154                    Arc::new(Int64Array::from(page_counts)),
155                    Arc::new(Int64Array::from(chunk_counts)),
156                ],
157            )?;
158
159            let path = tmp_dir.path().join("documents.parquet");
160            let file = std::fs::File::create(&path)?;
161            let props = WriterProperties::builder().build();
162            let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
163            writer.write(&batch)?;
164            writer.close()?;
165
166            conn.query(&format!("COPY Document FROM '{}'", path.to_string_lossy()))
167                .map_err(|e| anyhow::anyhow!("COPY Document: {e}"))?;
168        }
169
170        // Write Chunk parquet
171        if !chunks.is_empty() {
172            let ids: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
173            let doc_files: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
174            let indices: Vec<i64> = chunks.iter().map(|c| c.index as i64).collect();
175            let headings: Vec<Option<&str>> = chunks.iter().map(|c| c.heading.as_deref()).collect();
176            let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
177            let start_offsets: Vec<i64> = chunks.iter().map(|c| c.start_offset as i64).collect();
178            let end_offsets: Vec<i64> = chunks.iter().map(|c| c.end_offset as i64).collect();
179            let pages: Vec<i64> = chunks.iter().map(|c| c.page.unwrap_or(0) as i64).collect();
180            let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
181
182            let schema = Arc::new(Schema::new(vec![
183                Field::new("id", DataType::Utf8, false),
184                Field::new("doc_file", DataType::Utf8, false),
185                Field::new("idx", DataType::Int64, false),
186                Field::new("heading", DataType::Utf8, true),
187                Field::new("text", DataType::Utf8, false),
188                Field::new("start_offset", DataType::Int64, false),
189                Field::new("end_offset", DataType::Int64, false),
190                Field::new("page", DataType::Int64, false),
191                Field::new("content_hash", DataType::Utf8, false),
192            ]));
193
194            let batch = RecordBatch::try_new(
195                schema.clone(),
196                vec![
197                    Arc::new(StringArray::from(ids)),
198                    Arc::new(StringArray::from(doc_files)),
199                    Arc::new(Int64Array::from(indices)),
200                    Arc::new(StringArray::from(headings)),
201                    Arc::new(StringArray::from(texts)),
202                    Arc::new(Int64Array::from(start_offsets)),
203                    Arc::new(Int64Array::from(end_offsets)),
204                    Arc::new(Int64Array::from(pages)),
205                    Arc::new(StringArray::from(hashes)),
206                ],
207            )?;
208
209            let path = tmp_dir.path().join("chunks.parquet");
210            let file = std::fs::File::create(&path)?;
211            let props = WriterProperties::builder().build();
212            let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
213            writer.write(&batch)?;
214            writer.close()?;
215
216            conn.query(&format!("COPY Chunk FROM '{}'", path.to_string_lossy()))
217                .map_err(|e| anyhow::anyhow!("COPY Chunk: {e}"))?;
218        }
219
220        // Write HAS_CHUNK edges
221        if !chunks.is_empty() {
222            let froms: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
223            let tos: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
224
225            let schema = Arc::new(Schema::new(vec![
226                Field::new("from", DataType::Utf8, false),
227                Field::new("to", DataType::Utf8, false),
228            ]));
229
230            let batch = RecordBatch::try_new(
231                schema.clone(),
232                vec![
233                    Arc::new(StringArray::from(froms)),
234                    Arc::new(StringArray::from(tos)),
235                ],
236            )?;
237
238            let path = tmp_dir.path().join("has_chunk.parquet");
239            let file = std::fs::File::create(&path)?;
240            let props = WriterProperties::builder().build();
241            let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
242            writer.write(&batch)?;
243            writer.close()?;
244
245            conn.query(&format!("COPY HAS_CHUNK FROM '{}'", path.to_string_lossy()))
246                .map_err(|e| anyhow::anyhow!("COPY HAS_CHUNK: {e}"))?;
247        }
248
249        Ok(())
250    }
251
252    pub fn upsert_source(
253        &self,
254        id: &str,
255        source_type: &str,
256        base_url: &str,
257        space_key: &str,
258    ) -> Result<()> {
259        let conn = self.connection()?;
260        let _ = conn.query(&format!(
261            "MATCH (s:Source) WHERE s.id = '{}' DETACH DELETE s",
262            escape_str(id)
263        ));
264        conn.query(&format!(
265            "CREATE (s:Source {{id: '{}', source_type: '{}', base_url: '{}', space_key: '{}', last_synced: '{}'}})",
266            escape_str(id),
267            escape_str(source_type),
268            escape_str(base_url),
269            escape_str(space_key),
270            chrono::Utc::now().to_rfc3339(),
271        ))
272        .map_err(|e| anyhow::anyhow!("create Source: {e}"))?;
273        Ok(())
274    }
275
276    pub fn link_doc_to_source(&self, doc_id: &str, source_id: &str) -> Result<()> {
277        let conn = self.connection()?;
278        conn.query(&format!(
279            "MATCH (d:Document), (s:Source) WHERE d.id = '{}' AND s.id = '{}' CREATE (d)-[:FROM_SOURCE]->(s)",
280            escape_str(doc_id),
281            escape_str(source_id),
282        ))
283        .map_err(|e| anyhow::anyhow!("link FROM_SOURCE: {e}"))?;
284        Ok(())
285    }
286
287    pub fn get_docs_by_source(&self, source_id: &str) -> Result<Vec<String>> {
288        let conn = self.connection()?;
289        let result = conn
290            .query(&format!(
291                "MATCH (d:Document)-[:FROM_SOURCE]->(s:Source) WHERE s.id = '{}' RETURN d.id",
292                escape_str(source_id)
293            ))
294            .map_err(|e| anyhow::anyhow!("query docs by source: {e}"))?;
295        let mut ids = Vec::new();
296        for row in result {
297            if !row.is_empty() {
298                ids.insert(ids.len(), row[0].to_string());
299            }
300        }
301        Ok(ids)
302    }
303
304    pub fn delete_docs_by_ids(&self, doc_ids: &[&str]) -> Result<()> {
305        if doc_ids.is_empty() {
306            return Ok(());
307        }
308        let conn = self.connection()?;
309        let id_list: String = doc_ids
310            .iter()
311            .map(|id| format!("'{}'", escape_str(id)))
312            .collect::<Vec<_>>()
313            .join(", ");
314        let _ = conn.query(&format!(
315            "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
316            id_list
317        ));
318        let _ = conn.query(&format!(
319            "MATCH (d:Document) WHERE d.id IN [{}] DETACH DELETE d",
320            id_list
321        ));
322        Ok(())
323    }
324
325    pub fn create_link(
326        &self,
327        from_doc_id: &str,
328        to_doc_id: &str,
329        url: &str,
330        link_type: &str,
331    ) -> Result<()> {
332        let conn = self.connection()?;
333        conn.query(&format!(
334            "MATCH (a:Document), (b:Document) WHERE a.id = '{}' AND b.id = '{}' \
335             CREATE (a)-[:LINKS_TO {{url: '{}', link_type: '{}'}}]->(b)",
336            escape_str(from_doc_id),
337            escape_str(to_doc_id),
338            escape_str(url),
339            escape_str(link_type),
340        ))
341        .map_err(|e| anyhow::anyhow!("create LINKS_TO: {e}"))?;
342        Ok(())
343    }
344
345    pub fn delete_links_from(&self, doc_id: &str) -> Result<()> {
346        let conn = self.connection()?;
347        let _ = conn.query(&format!(
348            "MATCH (a:Document)-[r:LINKS_TO]->() WHERE a.id = '{}' DELETE r",
349            escape_str(doc_id),
350        ));
351        Ok(())
352    }
353
354    pub fn stats(&self) -> Result<DocStoreStats> {
355        let conn = self.connection()?;
356        let doc_count = count_query(&conn, "MATCH (d:Document) RETURN count(d)");
357        let chunk_count = count_query(&conn, "MATCH (c:Chunk) RETURN count(c)");
358        Ok(DocStoreStats {
359            document_count: doc_count,
360            chunk_count,
361        })
362    }
363
364    // ── PipelineCore methods ──────────────────────────────────────────────
365
366    /// Create a per-plugin node table from schema definition.
367    pub fn ensure_plugin_table(&self, plugin_id: &str, columns: &[(String, String)]) -> Result<()> {
368        let conn = self.connection()?;
369        let mut col_defs = String::from("id STRING");
370        for (name, col_type) in columns {
371            col_defs.push_str(&format!(", {} {}", name, col_type));
372        }
373        let ddl = format!(
374            "CREATE NODE TABLE IF NOT EXISTS Pipeline_{}({}, PRIMARY KEY(id))",
375            plugin_id, col_defs
376        );
377        conn.query(&ddl)
378            .map_err(|e| anyhow::anyhow!("ensure_plugin_table DDL: {e}"))?;
379        Ok(())
380    }
381
382    /// Upsert a pipeline core record.
383    pub fn upsert_pipeline_core(&self, record: &PipelineCoreRecord) -> Result<()> {
384        let conn = self.connection()?;
385        // Delete existing if present
386        let _ = conn.query(&format!(
387            "MATCH (p:PipelineCore) WHERE p.id = '{}' DETACH DELETE p",
388            escape_str(&record.id)
389        ));
390        // Build list literals
391        let inputs_str = record
392            .inputs
393            .iter()
394            .map(|s| format!("'{}'", escape_str(s)))
395            .collect::<Vec<_>>()
396            .join(",");
397        let outputs_str = record
398            .outputs
399            .iter()
400            .map(|s| format!("'{}'", escape_str(s)))
401            .collect::<Vec<_>>()
402            .join(",");
403        conn.query(&format!(
404            "CREATE (p:PipelineCore {{id: '{}', name: '{}', doc_id: '{}', plugin_id: '{}', inputs: [{}], outputs: [{}]}})",
405            escape_str(&record.id),
406            escape_str(&record.name),
407            escape_str(&record.doc_id),
408            escape_str(&record.plugin_id),
409            inputs_str,
410            outputs_str,
411        ))
412        .map_err(|e| anyhow::anyhow!("create PipelineCore: {e}"))?;
413        Ok(())
414    }
415
416    /// Upsert plugin-specific properties into Pipeline_<plugin_id> table.
417    pub fn upsert_plugin_properties(
418        &self,
419        pipeline_id: &str,
420        plugin_id: &str,
421        properties: &serde_json::Map<String, serde_json::Value>,
422        schema: &[(String, String)],
423    ) -> Result<()> {
424        let conn = self.connection()?;
425        let table = format!("Pipeline_{}", plugin_id);
426        let esc_id = escape_str(pipeline_id);
427        // Delete existing
428        let _ = conn.query(&format!(
429            "MATCH (p:{}) WHERE p.id = '{}' DELETE p",
430            table, esc_id
431        ));
432        // Build property assignments
433        let mut props = format!("id: '{}'", esc_id);
434        for (col_name, _col_type) in schema {
435            if let Some(val) = properties.get(col_name.as_str()) {
436                let s = match val {
437                    serde_json::Value::String(s) => escape_str(s),
438                    other => escape_str(&other.to_string()),
439                };
440                props.push_str(&format!(", {}: '{}'", col_name, s));
441            }
442        }
443        conn.query(&format!("CREATE (p:{} {{{}}})", table, props))
444            .map_err(|e| anyhow::anyhow!("upsert plugin properties: {e}"))?;
445        Ok(())
446    }
447
448    /// Link a pipeline core to a document via DEFINED_IN edge.
449    pub fn link_pipeline_core_to_doc(&self, pipeline_id: &str, doc_id: &str) -> Result<()> {
450        let conn = self.connection()?;
451        conn.query(&format!(
452            "MATCH (p:PipelineCore), (d:Document) WHERE p.id = '{}' AND d.id = '{}' CREATE (p)-[:DEFINED_IN]->(d)",
453            escape_str(pipeline_id),
454            escape_str(doc_id),
455        ))
456        .map_err(|e| anyhow::anyhow!("link DEFINED_IN: {e}"))?;
457        Ok(())
458    }
459
460    /// Link pipeline dependencies using PipelineCore inputs/outputs matching.
461    /// Cross-plugin: if plugin A's output matches plugin B's input, creates DEPENDS_ON edge.
462    pub fn link_pipeline_dependencies(&self) -> Result<usize> {
463        let cores = self.get_all_pipeline_cores(None)?;
464        if cores.len() < 2 {
465            return Ok(0);
466        }
467
468        let conn = self.connection()?;
469        // Clear old edges
470        let _ = conn.query("MATCH ()-[r:DEPENDS_ON]->() DELETE r");
471
472        let mut count = 0;
473        for producer in &cores {
474            if producer.outputs.is_empty() {
475                continue;
476            }
477            for consumer in &cores {
478                if consumer.id == producer.id || consumer.inputs.is_empty() {
479                    continue;
480                }
481                // Check if any output of producer matches any input of consumer
482                let has_match = producer
483                    .outputs
484                    .iter()
485                    .any(|out| consumer.inputs.iter().any(|inp| inp == out));
486                if has_match {
487                    conn.query(&format!(
488                        "MATCH (a:PipelineCore), (b:PipelineCore) WHERE a.id = '{}' AND b.id = '{}' CREATE (a)-[:DEPENDS_ON {{dep_type: 'data'}}]->(b)",
489                        escape_str(&consumer.id),
490                        escape_str(&producer.id),
491                    ))
492                    .map_err(|e| anyhow::anyhow!("create DEPENDS_ON: {e}"))?;
493                    count += 1;
494                }
495            }
496        }
497        Ok(count)
498    }
499
500    /// Get all PipelineCore records, optionally filtered by plugin_id.
501    pub fn get_all_pipeline_cores(
502        &self,
503        plugin_id: Option<&str>,
504    ) -> Result<Vec<PipelineCoreRecord>> {
505        let conn = self.connection()?;
506        let query = match plugin_id {
507            Some(pid) => format!(
508                "MATCH (p:PipelineCore) WHERE p.plugin_id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
509                escape_str(pid)
510            ),
511            None => "MATCH (p:PipelineCore) RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs".to_string(),
512        };
513        let mut result = conn
514            .query(&query)
515            .map_err(|e| anyhow::anyhow!("query pipeline cores: {e}"))?;
516        let mut records = Vec::new();
517        while let Some(row) = result.next() {
518            if row.len() >= 6 {
519                records.push(PipelineCoreRecord {
520                    id: row[0].to_string(),
521                    name: row[1].to_string(),
522                    doc_id: row[2].to_string(),
523                    plugin_id: row[3].to_string(),
524                    inputs: parse_string_list(&row[4].to_string()),
525                    outputs: parse_string_list(&row[5].to_string()),
526                });
527            }
528        }
529        Ok(records)
530    }
531
532    /// Get a PipelineCore record by id.
533    pub fn get_pipeline_core(&self, pipeline_id: &str) -> Result<Option<PipelineCoreRecord>> {
534        let conn = self.connection()?;
535        let mut result = conn
536            .query(&format!(
537                "MATCH (p:PipelineCore) WHERE p.id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
538                escape_str(pipeline_id)
539            ))
540            .map_err(|e| anyhow::anyhow!("query pipeline core: {e}"))?;
541        if let Some(row) = result.next() {
542            if row.len() >= 6 {
543                return Ok(Some(PipelineCoreRecord {
544                    id: row[0].to_string(),
545                    name: row[1].to_string(),
546                    doc_id: row[2].to_string(),
547                    plugin_id: row[3].to_string(),
548                    inputs: parse_string_list(&row[4].to_string()),
549                    outputs: parse_string_list(&row[5].to_string()),
550                }));
551            }
552        }
553        Ok(None)
554    }
555
556    /// Impact analysis using PipelineCore inputs/outputs.
557    pub fn impact_analysis(
558        &self,
559        table_name: &str,
560        max_depth: u32,
561    ) -> Result<Vec<ImpactResult>> {
562        let conn = self.connection()?;
563        let esc = escape_str(table_name);
564        let mut results = Vec::new();
565
566        // Direct impact: pipelines that consume this table
567        let mut direct = conn
568            .query(&format!(
569                "MATCH (p:PipelineCore) WHERE list_contains(p.inputs, '{}') RETURN p.id, p.name",
570                esc
571            ))
572            .map_err(|e| anyhow::anyhow!("impact_analysis direct: {e}"))?;
573        let mut affected_ids: std::collections::HashSet<String> =
574            std::collections::HashSet::new();
575        while let Some(row) = direct.next() {
576            if row.len() >= 2 {
577                let id = row[0].to_string();
578                let name = row[1].to_string();
579                affected_ids.insert(id.clone());
580                results.push(ImpactResult {
581                    pipeline_id: id,
582                    pipeline_name: name,
583                    impact_type: "direct".to_string(),
584                    depth: 1,
585                    path: table_name.to_string(),
586                });
587            }
588        }
589
590        // Transitive impact via DEPENDS_ON edges
591        if max_depth > 1 && !affected_ids.is_empty() {
592            for depth in 2..=max_depth {
593                let current_ids: Vec<String> = affected_ids.iter().cloned().collect();
594                let mut new_ids = Vec::new();
595
596                for src_id in &current_ids {
597                    let mut trans = conn
598                        .query(&format!(
599                            "MATCH (a:PipelineCore)-[:DEPENDS_ON]->(b:PipelineCore) WHERE b.id = '{}' RETURN a.id, a.name",
600                            escape_str(src_id)
601                        ))
602                        .map_err(|e| anyhow::anyhow!("impact_analysis transitive: {e}"))?;
603
604                    while let Some(row) = trans.next() {
605                        if row.len() >= 2 {
606                            let id = row[0].to_string();
607                            if !affected_ids.contains(&id) {
608                                results.push(ImpactResult {
609                                    pipeline_id: id.clone(),
610                                    pipeline_name: row[1].to_string(),
611                                    impact_type: "transitive".to_string(),
612                                    depth,
613                                    path: format!("{} → ... (depth {})", table_name, depth),
614                                });
615                                new_ids.push(id);
616                            }
617                        }
618                    }
619                }
620
621                if new_ids.is_empty() {
622                    break;
623                }
624                affected_ids.extend(new_ids);
625            }
626        }
627
628        Ok(results)
629    }
630
631    /// Get all DEPENDS_ON edges as (from_name, to_name, dep_type) tuples.
632    pub fn get_pipeline_deps(&self) -> Result<Vec<(String, String, String)>> {
633        let conn = self.connection()?;
634        let mut result = conn
635            .query(
636                "MATCH (c:PipelineCore)-[r:DEPENDS_ON]->(p:PipelineCore) \
637                 RETURN c.name, p.name, r.dep_type",
638            )
639            .map_err(|e| anyhow::anyhow!("query pipeline deps: {e}"))?;
640        let mut deps = Vec::new();
641        while let Some(row) = result.next() {
642            if row.len() >= 3 {
643                deps.push((row[0].to_string(), row[1].to_string(), row[2].to_string()));
644            }
645        }
646        Ok(deps)
647    }
648
649    /// Query a plugin-specific table by field value.
650    pub fn query_plugin_table(
651        &self,
652        plugin_id: &str,
653        field: &str,
654        value: &str,
655    ) -> Result<Vec<serde_json::Value>> {
656        let conn = self.connection()?;
657        let table = format!("Pipeline_{}", plugin_id);
658        let esc_val = escape_str(value);
659        let mut result = conn
660            .query(&format!(
661                "MATCH (p:{}) WHERE lower(p.{}) CONTAINS lower('{}') RETURN p.*",
662                table, field, esc_val
663            ))
664            .map_err(|e| anyhow::anyhow!("query plugin table: {e}"))?;
665        let mut rows = Vec::new();
666        while let Some(row) = result.next() {
667            let vals: Vec<serde_json::Value> = row
668                .iter()
669                .map(|v| serde_json::Value::String(v.to_string()))
670                .collect();
671            rows.push(serde_json::Value::Array(vals));
672        }
673        Ok(rows)
674    }
675
676    /// Pipeline count for stats (using PipelineCore).
677    pub fn pipeline_core_count(&self) -> Result<usize> {
678        let conn = self.connection()?;
679        Ok(count_query(&conn, "MATCH (p:PipelineCore) RETURN count(p)"))
680    }
681
682    pub fn get_all_chunks(&self) -> Result<Vec<(String, String)>> {
683        let conn = self.connection()?;
684        let result = conn
685            .query("MATCH (c:Chunk) RETURN c.id, c.text")
686            .map_err(|e| anyhow::anyhow!("query chunks: {e}"))?;
687        let mut chunks = Vec::new();
688        for row in result {
689            if row.len() >= 2 {
690                chunks.push((row[0].to_string(), row[1].to_string()));
691            }
692        }
693        Ok(chunks)
694    }
695
696    pub fn get_chunk_ids(&self) -> Result<std::collections::HashSet<String>> {
697        let conn = self.connection()?;
698        let result = conn
699            .query("MATCH (c:Chunk) RETURN c.id")
700            .map_err(|e| anyhow::anyhow!("query chunk ids: {e}"))?;
701        let mut ids = std::collections::HashSet::new();
702        for row in result {
703            if !row.is_empty() {
704                ids.insert(row[0].to_string());
705            }
706        }
707        Ok(ids)
708    }
709
710    pub fn get_chunk_details(&self, chunk_ids: &[&str]) -> Result<Vec<ChunkDetail>> {
711        let conn = self.connection()?;
712        let id_list: String = chunk_ids
713            .iter()
714            .map(|id| format!("'{}'", escape_str(id)))
715            .collect::<Vec<_>>()
716            .join(", ");
717        let query = format!(
718            "MATCH (c:Chunk) WHERE c.id IN [{}] RETURN c.id, c.doc_file, c.idx, c.heading, c.text, c.start_offset, c.end_offset, c.page",
719            id_list
720        );
721        let result = conn
722            .query(&query)
723            .map_err(|e| anyhow::anyhow!("chunk details: {e}"))?;
724        let mut details = Vec::new();
725        for row in result {
726            if row.len() >= 8 {
727                let heading_str = row[3].to_string();
728                let page_val: i64 = row[7].to_string().parse().unwrap_or(0);
729                details.push(ChunkDetail {
730                    id: row[0].to_string(),
731                    doc_file: row[1].to_string(),
732                    index: row[2].to_string().parse().unwrap_or(0),
733                    heading: if heading_str.is_empty() {
734                        None
735                    } else {
736                        Some(heading_str)
737                    },
738                    text: row[4].to_string(),
739                    start_offset: row[5].to_string().parse().unwrap_or(0),
740                    end_offset: row[6].to_string().parse().unwrap_or(0),
741                    page: if page_val > 0 {
742                        Some(page_val as usize)
743                    } else {
744                        None
745                    },
746                });
747            }
748        }
749        Ok(details)
750    }
751}
752
753#[derive(Debug)]
754pub struct DocStoreStats {
755    pub document_count: usize,
756    pub chunk_count: usize,
757}
758
759#[derive(Debug, Clone)]
760pub struct ChunkDetail {
761    pub id: String,
762    pub doc_file: String,
763    pub index: usize,
764    pub heading: Option<String>,
765    pub text: String,
766    pub start_offset: usize,
767    pub end_offset: usize,
768    pub page: Option<usize>,
769}
770
771#[derive(Debug, Clone)]
772pub struct PipelineCoreRecord {
773    pub id: String,
774    pub name: String,
775    pub doc_id: String,
776    pub plugin_id: String,
777    pub inputs: Vec<String>,
778    pub outputs: Vec<String>,
779}
780
781#[derive(Debug, Clone)]
782pub struct ImpactResult {
783    pub pipeline_id: String,
784    pub pipeline_name: String,
785    pub impact_type: String,
786    pub depth: u32,
787    pub path: String,
788}
789
790fn count_query(conn: &Connection<'_>, query: &str) -> usize {
791    conn.query(query)
792        .ok()
793        .and_then(|mut r| r.next().map(|row| row[0].to_string().parse().unwrap_or(0)))
794        .unwrap_or(0)
795}
796
797fn escape_str(s: &str) -> String {
798    s.replace('\'', "\\'")
799}
800
801/// Parse a Kuzu STRING[] column rendered via `.to_string()`.
802/// Kuzu returns STRING[] as "[val1,val2,val3]".
803fn parse_string_list(s: &str) -> Vec<String> {
804    let trimmed = s.trim_matches(|c| c == '[' || c == ']');
805    if trimmed.is_empty() {
806        return Vec::new();
807    }
808    trimmed
809        .split(',')
810        .map(|s| s.trim().trim_matches('\'').trim_matches('"').to_string())
811        .filter(|s| !s.is_empty())
812        .collect()
813}