Skip to main content

infigraph_docs/
store.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use arrow::array::{Int64Array, StringArray};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use kuzu::{Connection, Database, SystemConfig};
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::chunk::Chunk;
14use crate::extract::ExtractedDoc;
15
16fn fwd_slash_path(p: &Path) -> String {
17    p.to_string_lossy().replace('\\', "/")
18}
19
20const CREATE_SCHEMA: &[&str] = &[
21    "CREATE NODE TABLE IF NOT EXISTS Document(
22        id STRING,
23        title STRING,
24        file STRING,
25        format STRING,
26        content_hash STRING,
27        page_count INT64,
28        chunk_count INT64,
29        PRIMARY KEY(id)
30    )",
31    "CREATE NODE TABLE IF NOT EXISTS Chunk(
32        id STRING,
33        doc_file STRING,
34        idx INT64,
35        heading STRING,
36        text STRING,
37        start_offset INT64,
38        end_offset INT64,
39        page INT64,
40        content_hash STRING,
41        PRIMARY KEY(id)
42    )",
43    "CREATE REL TABLE IF NOT EXISTS HAS_CHUNK(FROM Document TO Chunk)",
44    "CREATE NODE TABLE IF NOT EXISTS Source(
45        id STRING,
46        source_type STRING,
47        base_url STRING,
48        space_key STRING,
49        last_synced STRING,
50        PRIMARY KEY(id)
51    )",
52    "CREATE REL TABLE IF NOT EXISTS FROM_SOURCE(FROM Document TO Source)",
53    "CREATE REL TABLE IF NOT EXISTS LINKS_TO(FROM Document TO Document, url STRING, link_type STRING)",
54    "CREATE NODE TABLE IF NOT EXISTS PipelineCore(id STRING, name STRING, doc_id STRING, plugin_id STRING, inputs STRING[], outputs STRING[], PRIMARY KEY(id))",
55    "CREATE REL TABLE IF NOT EXISTS DEFINED_IN(FROM PipelineCore TO Document, ONE_MANY)",
56    "CREATE REL TABLE IF NOT EXISTS DEPENDS_ON(FROM PipelineCore TO PipelineCore, dep_type STRING, MANY_MANY)",
57];
58
59pub struct DocStore {
60    db: Database,
61}
62
63impl DocStore {
64    pub fn open(path: &Path) -> Result<Self> {
65        if let Some(parent) = path.parent() {
66            std::fs::create_dir_all(parent)?;
67        }
68        let db = Database::new(path, SystemConfig::default())
69            .map_err(|e| anyhow::anyhow!("failed to open docs kuzu db: {e}"))?;
70        let store = Self { db };
71        store.init_schema()?;
72        Ok(store)
73    }
74
75    fn init_schema(&self) -> Result<()> {
76        let conn = self.connection()?;
77        for ddl in CREATE_SCHEMA {
78            conn.query(ddl)
79                .map_err(|e| anyhow::anyhow!("schema DDL failed: {e}"))?;
80        }
81        Ok(())
82    }
83
84    pub fn connection(&self) -> Result<Connection<'_>> {
85        Connection::new(&self.db).map_err(|e| anyhow::anyhow!("failed to create connection: {e}"))
86    }
87
88    pub fn get_doc_hashes(&self) -> Result<HashMap<String, String>> {
89        let conn = self.connection()?;
90        let result = conn
91            .query("MATCH (d:Document) RETURN d.file, d.content_hash")
92            .map_err(|e| anyhow::anyhow!("query doc hashes: {e}"))?;
93        let mut hashes = HashMap::new();
94        for row in result {
95            if row.len() >= 2 {
96                hashes.insert(row[0].to_string(), row[1].to_string());
97            }
98        }
99        Ok(hashes)
100    }
101
102    pub fn upsert_all_parquet(&self, docs: &[&ExtractedDoc], chunks: &[&Chunk]) -> Result<()> {
103        let conn = self.connection()?;
104
105        // Delete existing data for changed files
106        let file_list: Vec<String> = docs
107            .iter()
108            .map(|d| format!("'{}'", escape_str(&d.file)))
109            .collect();
110        if !file_list.is_empty() {
111            let files_in = file_list.join(", ");
112            let _ = conn.query(&format!(
113                "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
114                files_in
115            ));
116            let _ = conn.query(&format!(
117                "MATCH (d:Document) WHERE d.file IN [{}] DETACH DELETE d",
118                files_in
119            ));
120        }
121
122        let tmp_dir = tempfile::tempdir().context("create temp dir")?;
123
124        // Write Document parquet
125        {
126            let ids: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
127            let titles: Vec<Option<&str>> = docs.iter().map(|d| d.title.as_deref()).collect();
128            let files: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
129            let formats: Vec<&str> = docs.iter().map(|d| d.format.as_str()).collect();
130            let hashes: Vec<&str> = docs.iter().map(|d| d.content_hash.as_str()).collect();
131            let page_counts: Vec<i64> = docs
132                .iter()
133                .map(|d| d.page_count.unwrap_or(0) as i64)
134                .collect();
135            let chunk_counts: Vec<i64> = docs
136                .iter()
137                .map(|d| chunks.iter().filter(|c| c.doc_file == d.file).count() as i64)
138                .collect();
139
140            let schema = Arc::new(Schema::new(vec![
141                Field::new("id", DataType::Utf8, false),
142                Field::new("title", DataType::Utf8, true),
143                Field::new("file", DataType::Utf8, false),
144                Field::new("format", DataType::Utf8, false),
145                Field::new("content_hash", DataType::Utf8, false),
146                Field::new("page_count", DataType::Int64, false),
147                Field::new("chunk_count", DataType::Int64, false),
148            ]));
149
150            let batch = RecordBatch::try_new(
151                schema.clone(),
152                vec![
153                    Arc::new(StringArray::from(ids)),
154                    Arc::new(StringArray::from(titles)),
155                    Arc::new(StringArray::from(files)),
156                    Arc::new(StringArray::from(formats)),
157                    Arc::new(StringArray::from(hashes)),
158                    Arc::new(Int64Array::from(page_counts)),
159                    Arc::new(Int64Array::from(chunk_counts)),
160                ],
161            )?;
162
163            let path = tmp_dir.path().join("documents.parquet");
164            let file = std::fs::File::create(&path)?;
165            let props = WriterProperties::builder().build();
166            let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
167            writer.write(&batch)?;
168            writer.close()?;
169
170            conn.query(&format!("COPY Document FROM '{}'", fwd_slash_path(&path)))
171                .map_err(|e| anyhow::anyhow!("COPY Document: {e}"))?;
172        }
173
174        // Write Chunk parquet
175        if !chunks.is_empty() {
176            let ids: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
177            let doc_files: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
178            let indices: Vec<i64> = chunks.iter().map(|c| c.index as i64).collect();
179            let headings: Vec<Option<&str>> = chunks.iter().map(|c| c.heading.as_deref()).collect();
180            let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
181            let start_offsets: Vec<i64> = chunks.iter().map(|c| c.start_offset as i64).collect();
182            let end_offsets: Vec<i64> = chunks.iter().map(|c| c.end_offset as i64).collect();
183            let pages: Vec<i64> = chunks.iter().map(|c| c.page.unwrap_or(0) as i64).collect();
184            let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
185
186            let schema = Arc::new(Schema::new(vec![
187                Field::new("id", DataType::Utf8, false),
188                Field::new("doc_file", DataType::Utf8, false),
189                Field::new("idx", DataType::Int64, false),
190                Field::new("heading", DataType::Utf8, true),
191                Field::new("text", DataType::Utf8, false),
192                Field::new("start_offset", DataType::Int64, false),
193                Field::new("end_offset", DataType::Int64, false),
194                Field::new("page", DataType::Int64, false),
195                Field::new("content_hash", DataType::Utf8, false),
196            ]));
197
198            let batch = RecordBatch::try_new(
199                schema.clone(),
200                vec![
201                    Arc::new(StringArray::from(ids)),
202                    Arc::new(StringArray::from(doc_files)),
203                    Arc::new(Int64Array::from(indices)),
204                    Arc::new(StringArray::from(headings)),
205                    Arc::new(StringArray::from(texts)),
206                    Arc::new(Int64Array::from(start_offsets)),
207                    Arc::new(Int64Array::from(end_offsets)),
208                    Arc::new(Int64Array::from(pages)),
209                    Arc::new(StringArray::from(hashes)),
210                ],
211            )?;
212
213            let path = tmp_dir.path().join("chunks.parquet");
214            let file = std::fs::File::create(&path)?;
215            let props = WriterProperties::builder().build();
216            let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
217            writer.write(&batch)?;
218            writer.close()?;
219
220            conn.query(&format!("COPY Chunk FROM '{}'", fwd_slash_path(&path)))
221                .map_err(|e| anyhow::anyhow!("COPY Chunk: {e}"))?;
222        }
223
224        // Write HAS_CHUNK edges
225        if !chunks.is_empty() {
226            let froms: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
227            let tos: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
228
229            let schema = Arc::new(Schema::new(vec![
230                Field::new("from", DataType::Utf8, false),
231                Field::new("to", DataType::Utf8, false),
232            ]));
233
234            let batch = RecordBatch::try_new(
235                schema.clone(),
236                vec![
237                    Arc::new(StringArray::from(froms)),
238                    Arc::new(StringArray::from(tos)),
239                ],
240            )?;
241
242            let path = tmp_dir.path().join("has_chunk.parquet");
243            let file = std::fs::File::create(&path)?;
244            let props = WriterProperties::builder().build();
245            let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
246            writer.write(&batch)?;
247            writer.close()?;
248
249            conn.query(&format!("COPY HAS_CHUNK FROM '{}'", fwd_slash_path(&path)))
250                .map_err(|e| anyhow::anyhow!("COPY HAS_CHUNK: {e}"))?;
251        }
252
253        Ok(())
254    }
255
256    pub fn upsert_source(
257        &self,
258        id: &str,
259        source_type: &str,
260        base_url: &str,
261        space_key: &str,
262    ) -> Result<()> {
263        let conn = self.connection()?;
264        let _ = conn.query(&format!(
265            "MATCH (s:Source) WHERE s.id = '{}' DETACH DELETE s",
266            escape_str(id)
267        ));
268        conn.query(&format!(
269            "CREATE (s:Source {{id: '{}', source_type: '{}', base_url: '{}', space_key: '{}', last_synced: '{}'}})",
270            escape_str(id),
271            escape_str(source_type),
272            escape_str(base_url),
273            escape_str(space_key),
274            chrono::Utc::now().to_rfc3339(),
275        ))
276        .map_err(|e| anyhow::anyhow!("create Source: {e}"))?;
277        Ok(())
278    }
279
280    pub fn link_doc_to_source(&self, doc_id: &str, source_id: &str) -> Result<()> {
281        let conn = self.connection()?;
282        conn.query(&format!(
283            "MATCH (d:Document), (s:Source) WHERE d.id = '{}' AND s.id = '{}' CREATE (d)-[:FROM_SOURCE]->(s)",
284            escape_str(doc_id),
285            escape_str(source_id),
286        ))
287        .map_err(|e| anyhow::anyhow!("link FROM_SOURCE: {e}"))?;
288        Ok(())
289    }
290
291    pub fn get_docs_by_source(&self, source_id: &str) -> Result<Vec<String>> {
292        let conn = self.connection()?;
293        let result = conn
294            .query(&format!(
295                "MATCH (d:Document)-[:FROM_SOURCE]->(s:Source) WHERE s.id = '{}' RETURN d.id",
296                escape_str(source_id)
297            ))
298            .map_err(|e| anyhow::anyhow!("query docs by source: {e}"))?;
299        let mut ids = Vec::new();
300        for row in result {
301            if !row.is_empty() {
302                ids.insert(ids.len(), row[0].to_string());
303            }
304        }
305        Ok(ids)
306    }
307
308    pub fn delete_docs_by_ids(&self, doc_ids: &[&str]) -> Result<()> {
309        if doc_ids.is_empty() {
310            return Ok(());
311        }
312        let conn = self.connection()?;
313        let id_list: String = doc_ids
314            .iter()
315            .map(|id| format!("'{}'", escape_str(id)))
316            .collect::<Vec<_>>()
317            .join(", ");
318        let _ = conn.query(&format!(
319            "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
320            id_list
321        ));
322        let _ = conn.query(&format!(
323            "MATCH (d:Document) WHERE d.id IN [{}] DETACH DELETE d",
324            id_list
325        ));
326        Ok(())
327    }
328
329    pub fn create_link(
330        &self,
331        from_doc_id: &str,
332        to_doc_id: &str,
333        url: &str,
334        link_type: &str,
335    ) -> Result<()> {
336        let conn = self.connection()?;
337        conn.query(&format!(
338            "MATCH (a:Document), (b:Document) WHERE a.id = '{}' AND b.id = '{}' \
339             CREATE (a)-[:LINKS_TO {{url: '{}', link_type: '{}'}}]->(b)",
340            escape_str(from_doc_id),
341            escape_str(to_doc_id),
342            escape_str(url),
343            escape_str(link_type),
344        ))
345        .map_err(|e| anyhow::anyhow!("create LINKS_TO: {e}"))?;
346        Ok(())
347    }
348
349    pub fn delete_links_from(&self, doc_id: &str) -> Result<()> {
350        let conn = self.connection()?;
351        let _ = conn.query(&format!(
352            "MATCH (a:Document)-[r:LINKS_TO]->() WHERE a.id = '{}' DELETE r",
353            escape_str(doc_id),
354        ));
355        Ok(())
356    }
357
358    pub fn stats(&self) -> Result<DocStoreStats> {
359        let conn = self.connection()?;
360        let doc_count = count_query(&conn, "MATCH (d:Document) RETURN count(d)");
361        let chunk_count = count_query(&conn, "MATCH (c:Chunk) RETURN count(c)");
362        Ok(DocStoreStats {
363            document_count: doc_count,
364            chunk_count,
365        })
366    }
367
368    // ── PipelineCore methods ──────────────────────────────────────────────
369
370    /// Create a per-plugin node table from schema definition.
371    pub fn ensure_plugin_table(&self, plugin_id: &str, columns: &[(String, String)]) -> Result<()> {
372        let conn = self.connection()?;
373        let mut col_defs = String::from("id STRING");
374        for (name, col_type) in columns {
375            col_defs.push_str(&format!(", {} {}", name, col_type));
376        }
377        let ddl = format!(
378            "CREATE NODE TABLE IF NOT EXISTS Pipeline_{}({}, PRIMARY KEY(id))",
379            plugin_id, col_defs
380        );
381        conn.query(&ddl)
382            .map_err(|e| anyhow::anyhow!("ensure_plugin_table DDL: {e}"))?;
383        Ok(())
384    }
385
386    /// Upsert a pipeline core record.
387    pub fn upsert_pipeline_core(&self, record: &PipelineCoreRecord) -> Result<()> {
388        let conn = self.connection()?;
389        // Delete existing if present
390        let _ = conn.query(&format!(
391            "MATCH (p:PipelineCore) WHERE p.id = '{}' DETACH DELETE p",
392            escape_str(&record.id)
393        ));
394        // Build list literals
395        let inputs_str = record
396            .inputs
397            .iter()
398            .map(|s| format!("'{}'", escape_str(s)))
399            .collect::<Vec<_>>()
400            .join(",");
401        let outputs_str = record
402            .outputs
403            .iter()
404            .map(|s| format!("'{}'", escape_str(s)))
405            .collect::<Vec<_>>()
406            .join(",");
407        conn.query(&format!(
408            "CREATE (p:PipelineCore {{id: '{}', name: '{}', doc_id: '{}', plugin_id: '{}', inputs: [{}], outputs: [{}]}})",
409            escape_str(&record.id),
410            escape_str(&record.name),
411            escape_str(&record.doc_id),
412            escape_str(&record.plugin_id),
413            inputs_str,
414            outputs_str,
415        ))
416        .map_err(|e| anyhow::anyhow!("create PipelineCore: {e}"))?;
417        Ok(())
418    }
419
420    /// Upsert plugin-specific properties into Pipeline_<plugin_id> table.
421    pub fn upsert_plugin_properties(
422        &self,
423        pipeline_id: &str,
424        plugin_id: &str,
425        properties: &serde_json::Map<String, serde_json::Value>,
426        schema: &[(String, String)],
427    ) -> Result<()> {
428        let conn = self.connection()?;
429        let table = format!("Pipeline_{}", plugin_id);
430        let esc_id = escape_str(pipeline_id);
431        // Delete existing
432        let _ = conn.query(&format!(
433            "MATCH (p:{}) WHERE p.id = '{}' DELETE p",
434            table, esc_id
435        ));
436        // Build property assignments
437        let mut props = format!("id: '{}'", esc_id);
438        for (col_name, _col_type) in schema {
439            if let Some(val) = properties.get(col_name.as_str()) {
440                let s = match val {
441                    serde_json::Value::String(s) => escape_str(s),
442                    other => escape_str(&other.to_string()),
443                };
444                props.push_str(&format!(", {}: '{}'", col_name, s));
445            }
446        }
447        conn.query(&format!("CREATE (p:{} {{{}}})", table, props))
448            .map_err(|e| anyhow::anyhow!("upsert plugin properties: {e}"))?;
449        Ok(())
450    }
451
452    /// Link a pipeline core to a document via DEFINED_IN edge.
453    pub fn link_pipeline_core_to_doc(&self, pipeline_id: &str, doc_id: &str) -> Result<()> {
454        let conn = self.connection()?;
455        conn.query(&format!(
456            "MATCH (p:PipelineCore), (d:Document) WHERE p.id = '{}' AND d.id = '{}' CREATE (p)-[:DEFINED_IN]->(d)",
457            escape_str(pipeline_id),
458            escape_str(doc_id),
459        ))
460        .map_err(|e| anyhow::anyhow!("link DEFINED_IN: {e}"))?;
461        Ok(())
462    }
463
464    /// Link pipeline dependencies using PipelineCore inputs/outputs matching.
465    /// Cross-plugin: if plugin A's output matches plugin B's input, creates DEPENDS_ON edge.
466    pub fn link_pipeline_dependencies(&self) -> Result<usize> {
467        let cores = self.get_all_pipeline_cores(None)?;
468        if cores.len() < 2 {
469            return Ok(0);
470        }
471
472        let conn = self.connection()?;
473        // Clear old edges
474        let _ = conn.query("MATCH ()-[r:DEPENDS_ON]->() DELETE r");
475
476        let mut count = 0;
477        for producer in &cores {
478            if producer.outputs.is_empty() {
479                continue;
480            }
481            for consumer in &cores {
482                if consumer.id == producer.id || consumer.inputs.is_empty() {
483                    continue;
484                }
485                // Check if any output of producer matches any input of consumer
486                let has_match = producer
487                    .outputs
488                    .iter()
489                    .any(|out| consumer.inputs.iter().any(|inp| inp == out));
490                if has_match {
491                    conn.query(&format!(
492                        "MATCH (a:PipelineCore), (b:PipelineCore) WHERE a.id = '{}' AND b.id = '{}' CREATE (a)-[:DEPENDS_ON {{dep_type: 'data'}}]->(b)",
493                        escape_str(&consumer.id),
494                        escape_str(&producer.id),
495                    ))
496                    .map_err(|e| anyhow::anyhow!("create DEPENDS_ON: {e}"))?;
497                    count += 1;
498                }
499            }
500        }
501        Ok(count)
502    }
503
504    /// Get all PipelineCore records, optionally filtered by plugin_id.
505    pub fn get_all_pipeline_cores(
506        &self,
507        plugin_id: Option<&str>,
508    ) -> Result<Vec<PipelineCoreRecord>> {
509        let conn = self.connection()?;
510        let query = match plugin_id {
511            Some(pid) => format!(
512                "MATCH (p:PipelineCore) WHERE p.plugin_id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
513                escape_str(pid)
514            ),
515            None => "MATCH (p:PipelineCore) RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs".to_string(),
516        };
517        let result = conn
518            .query(&query)
519            .map_err(|e| anyhow::anyhow!("query pipeline cores: {e}"))?;
520        let mut records = Vec::new();
521        for row in result {
522            if row.len() >= 6 {
523                records.push(PipelineCoreRecord {
524                    id: row[0].to_string(),
525                    name: row[1].to_string(),
526                    doc_id: row[2].to_string(),
527                    plugin_id: row[3].to_string(),
528                    inputs: parse_string_list(&row[4].to_string()),
529                    outputs: parse_string_list(&row[5].to_string()),
530                });
531            }
532        }
533        Ok(records)
534    }
535
536    /// Get a PipelineCore record by id.
537    pub fn get_pipeline_core(&self, pipeline_id: &str) -> Result<Option<PipelineCoreRecord>> {
538        let conn = self.connection()?;
539        let mut result = conn
540            .query(&format!(
541                "MATCH (p:PipelineCore) WHERE p.id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
542                escape_str(pipeline_id)
543            ))
544            .map_err(|e| anyhow::anyhow!("query pipeline core: {e}"))?;
545        if let Some(row) = result.next() {
546            if row.len() >= 6 {
547                return Ok(Some(PipelineCoreRecord {
548                    id: row[0].to_string(),
549                    name: row[1].to_string(),
550                    doc_id: row[2].to_string(),
551                    plugin_id: row[3].to_string(),
552                    inputs: parse_string_list(&row[4].to_string()),
553                    outputs: parse_string_list(&row[5].to_string()),
554                }));
555            }
556        }
557        Ok(None)
558    }
559
560    /// Impact analysis using PipelineCore inputs/outputs.
561    pub fn impact_analysis(&self, table_name: &str, max_depth: u32) -> Result<Vec<ImpactResult>> {
562        let conn = self.connection()?;
563        let esc = escape_str(table_name);
564        let mut results = Vec::new();
565
566        // Direct impact: pipelines that consume this table
567        let direct = conn
568            .query(&format!(
569                "MATCH (p:PipelineCore) WHERE list_contains(p.inputs, '{}') RETURN p.id, p.name",
570                esc
571            ))
572            .map_err(|e| anyhow::anyhow!("impact_analysis direct: {e}"))?;
573        let mut affected_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
574        for row in direct {
575            if row.len() >= 2 {
576                let id = row[0].to_string();
577                let name = row[1].to_string();
578                affected_ids.insert(id.clone());
579                results.push(ImpactResult {
580                    pipeline_id: id,
581                    pipeline_name: name,
582                    impact_type: "direct".to_string(),
583                    depth: 1,
584                    path: table_name.to_string(),
585                });
586            }
587        }
588
589        // Transitive impact via DEPENDS_ON edges
590        if max_depth > 1 && !affected_ids.is_empty() {
591            for depth in 2..=max_depth {
592                let current_ids: Vec<String> = affected_ids.iter().cloned().collect();
593                let mut new_ids = Vec::new();
594
595                for src_id in &current_ids {
596                    let trans = conn
597                        .query(&format!(
598                            "MATCH (a:PipelineCore)-[:DEPENDS_ON]->(b:PipelineCore) WHERE b.id = '{}' RETURN a.id, a.name",
599                            escape_str(src_id)
600                        ))
601                        .map_err(|e| anyhow::anyhow!("impact_analysis transitive: {e}"))?;
602
603                    for row in trans {
604                        if row.len() >= 2 {
605                            let id = row[0].to_string();
606                            if !affected_ids.contains(&id) {
607                                results.push(ImpactResult {
608                                    pipeline_id: id.clone(),
609                                    pipeline_name: row[1].to_string(),
610                                    impact_type: "transitive".to_string(),
611                                    depth,
612                                    path: format!("{} → ... (depth {})", table_name, depth),
613                                });
614                                new_ids.push(id);
615                            }
616                        }
617                    }
618                }
619
620                if new_ids.is_empty() {
621                    break;
622                }
623                affected_ids.extend(new_ids);
624            }
625        }
626
627        Ok(results)
628    }
629
630    /// Get all DEPENDS_ON edges as (from_name, to_name, dep_type) tuples.
631    pub fn get_pipeline_deps(&self) -> Result<Vec<(String, String, String)>> {
632        let conn = self.connection()?;
633        let result = conn
634            .query(
635                "MATCH (c:PipelineCore)-[r:DEPENDS_ON]->(p:PipelineCore) \
636                 RETURN c.name, p.name, r.dep_type",
637            )
638            .map_err(|e| anyhow::anyhow!("query pipeline deps: {e}"))?;
639        let mut deps = Vec::new();
640        for row in result {
641            if row.len() >= 3 {
642                deps.push((row[0].to_string(), row[1].to_string(), row[2].to_string()));
643            }
644        }
645        Ok(deps)
646    }
647
648    /// Query a plugin-specific table by field value.
649    pub fn query_plugin_table(
650        &self,
651        plugin_id: &str,
652        field: &str,
653        value: &str,
654    ) -> Result<Vec<serde_json::Value>> {
655        let conn = self.connection()?;
656        let table = format!("Pipeline_{}", plugin_id);
657        let esc_val = escape_str(value);
658        let result = conn
659            .query(&format!(
660                "MATCH (p:{}) WHERE lower(p.{}) CONTAINS lower('{}') RETURN p.*",
661                table, field, esc_val
662            ))
663            .map_err(|e| anyhow::anyhow!("query plugin table: {e}"))?;
664        let mut rows = Vec::new();
665        for row in result {
666            let vals: Vec<serde_json::Value> = row
667                .iter()
668                .map(|v| serde_json::Value::String(v.to_string()))
669                .collect();
670            rows.push(serde_json::Value::Array(vals));
671        }
672        Ok(rows)
673    }
674
675    /// Pipeline count for stats (using PipelineCore).
676    pub fn pipeline_core_count(&self) -> Result<usize> {
677        let conn = self.connection()?;
678        Ok(count_query(&conn, "MATCH (p:PipelineCore) RETURN count(p)"))
679    }
680
681    pub fn get_all_chunks(&self) -> Result<Vec<(String, String)>> {
682        let conn = self.connection()?;
683        let result = conn
684            .query("MATCH (c:Chunk) RETURN c.id, c.text")
685            .map_err(|e| anyhow::anyhow!("query chunks: {e}"))?;
686        let mut chunks = Vec::new();
687        for row in result {
688            if row.len() >= 2 {
689                chunks.push((row[0].to_string(), row[1].to_string()));
690            }
691        }
692        Ok(chunks)
693    }
694
695    pub fn get_chunk_ids(&self) -> Result<std::collections::HashSet<String>> {
696        let conn = self.connection()?;
697        let result = conn
698            .query("MATCH (c:Chunk) RETURN c.id")
699            .map_err(|e| anyhow::anyhow!("query chunk ids: {e}"))?;
700        let mut ids = std::collections::HashSet::new();
701        for row in result {
702            if !row.is_empty() {
703                ids.insert(row[0].to_string());
704            }
705        }
706        Ok(ids)
707    }
708
709    pub fn get_chunk_details(&self, chunk_ids: &[&str]) -> Result<Vec<ChunkDetail>> {
710        let conn = self.connection()?;
711        let id_list: String = chunk_ids
712            .iter()
713            .map(|id| format!("'{}'", escape_str(id)))
714            .collect::<Vec<_>>()
715            .join(", ");
716        let query = format!(
717            "MATCH (c:Chunk) WHERE c.id IN [{}] RETURN c.id, c.doc_file, c.idx, c.heading, c.text, c.start_offset, c.end_offset, c.page",
718            id_list
719        );
720        let result = conn
721            .query(&query)
722            .map_err(|e| anyhow::anyhow!("chunk details: {e}"))?;
723        let mut details = Vec::new();
724        for row in result {
725            if row.len() >= 8 {
726                let heading_str = row[3].to_string();
727                let page_val: i64 = row[7].to_string().parse().unwrap_or(0);
728                details.push(ChunkDetail {
729                    id: row[0].to_string(),
730                    doc_file: row[1].to_string(),
731                    index: row[2].to_string().parse().unwrap_or(0),
732                    heading: if heading_str.is_empty() {
733                        None
734                    } else {
735                        Some(heading_str)
736                    },
737                    text: row[4].to_string(),
738                    start_offset: row[5].to_string().parse().unwrap_or(0),
739                    end_offset: row[6].to_string().parse().unwrap_or(0),
740                    page: if page_val > 0 {
741                        Some(page_val as usize)
742                    } else {
743                        None
744                    },
745                });
746            }
747        }
748        Ok(details)
749    }
750}
751
752#[derive(Debug)]
753pub struct DocStoreStats {
754    pub document_count: usize,
755    pub chunk_count: usize,
756}
757
758#[derive(Debug, Clone)]
759pub struct ChunkDetail {
760    pub id: String,
761    pub doc_file: String,
762    pub index: usize,
763    pub heading: Option<String>,
764    pub text: String,
765    pub start_offset: usize,
766    pub end_offset: usize,
767    pub page: Option<usize>,
768}
769
770#[derive(Debug, Clone)]
771pub struct PipelineCoreRecord {
772    pub id: String,
773    pub name: String,
774    pub doc_id: String,
775    pub plugin_id: String,
776    pub inputs: Vec<String>,
777    pub outputs: Vec<String>,
778}
779
780#[derive(Debug, Clone)]
781pub struct ImpactResult {
782    pub pipeline_id: String,
783    pub pipeline_name: String,
784    pub impact_type: String,
785    pub depth: u32,
786    pub path: String,
787}
788
789fn count_query(conn: &Connection<'_>, query: &str) -> usize {
790    conn.query(query)
791        .ok()
792        .and_then(|mut r| r.next().map(|row| row[0].to_string().parse().unwrap_or(0)))
793        .unwrap_or(0)
794}
795
796fn escape_str(s: &str) -> String {
797    s.replace('\'', "\\'")
798}
799
800/// Parse a Kuzu STRING[] column rendered via `.to_string()`.
801/// Kuzu returns STRING[] as "[val1,val2,val3]".
802fn parse_string_list(s: &str) -> Vec<String> {
803    let trimmed = s.trim_matches(|c| c == '[' || c == ']');
804    if trimmed.is_empty() {
805        return Vec::new();
806    }
807    trimmed
808        .split(',')
809        .map(|s| s.trim().trim_matches('\'').trim_matches('"').to_string())
810        .filter(|s| !s.is_empty())
811        .collect()
812}