1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use arrow::array::{Int64Array, StringArray};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use kuzu::{Connection, Database, SystemConfig};
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::chunk::Chunk;
14use crate::extract::ExtractedDoc;
15
16fn fwd_slash_path(p: &Path) -> String {
17 p.to_string_lossy().replace('\\', "/")
18}
19
20const CREATE_SCHEMA: &[&str] = &[
21 "CREATE NODE TABLE IF NOT EXISTS Document(
22 id STRING,
23 title STRING,
24 file STRING,
25 format STRING,
26 content_hash STRING,
27 page_count INT64,
28 chunk_count INT64,
29 PRIMARY KEY(id)
30 )",
31 "CREATE NODE TABLE IF NOT EXISTS Chunk(
32 id STRING,
33 doc_file STRING,
34 idx INT64,
35 heading STRING,
36 text STRING,
37 start_offset INT64,
38 end_offset INT64,
39 page INT64,
40 content_hash STRING,
41 PRIMARY KEY(id)
42 )",
43 "CREATE REL TABLE IF NOT EXISTS HAS_CHUNK(FROM Document TO Chunk)",
44 "CREATE NODE TABLE IF NOT EXISTS Source(
45 id STRING,
46 source_type STRING,
47 base_url STRING,
48 space_key STRING,
49 last_synced STRING,
50 PRIMARY KEY(id)
51 )",
52 "CREATE REL TABLE IF NOT EXISTS FROM_SOURCE(FROM Document TO Source)",
53 "CREATE REL TABLE IF NOT EXISTS LINKS_TO(FROM Document TO Document, url STRING, link_type STRING)",
54 "CREATE NODE TABLE IF NOT EXISTS PipelineCore(id STRING, name STRING, doc_id STRING, plugin_id STRING, inputs STRING[], outputs STRING[], PRIMARY KEY(id))",
55 "CREATE REL TABLE IF NOT EXISTS DEFINED_IN(FROM PipelineCore TO Document, ONE_MANY)",
56 "CREATE REL TABLE IF NOT EXISTS DEPENDS_ON(FROM PipelineCore TO PipelineCore, dep_type STRING, MANY_MANY)",
57];
58
59pub struct DocStore {
60 db: Database,
61}
62
63impl DocStore {
64 pub fn open(path: &Path) -> Result<Self> {
65 if let Some(parent) = path.parent() {
66 std::fs::create_dir_all(parent)?;
67 }
68 let db = Database::new(path, SystemConfig::default())
69 .map_err(|e| anyhow::anyhow!("failed to open docs kuzu db: {e}"))?;
70 let store = Self { db };
71 store.init_schema()?;
72 Ok(store)
73 }
74
75 fn init_schema(&self) -> Result<()> {
76 let conn = self.connection()?;
77 for ddl in CREATE_SCHEMA {
78 conn.query(ddl)
79 .map_err(|e| anyhow::anyhow!("schema DDL failed: {e}"))?;
80 }
81 Ok(())
82 }
83
84 pub fn connection(&self) -> Result<Connection<'_>> {
85 Connection::new(&self.db).map_err(|e| anyhow::anyhow!("failed to create connection: {e}"))
86 }
87
88 pub fn get_doc_hashes(&self) -> Result<HashMap<String, String>> {
89 let conn = self.connection()?;
90 let result = conn
91 .query("MATCH (d:Document) RETURN d.file, d.content_hash")
92 .map_err(|e| anyhow::anyhow!("query doc hashes: {e}"))?;
93 let mut hashes = HashMap::new();
94 for row in result {
95 if row.len() >= 2 {
96 hashes.insert(row[0].to_string(), row[1].to_string());
97 }
98 }
99 Ok(hashes)
100 }
101
102 pub fn upsert_all_parquet(&self, docs: &[&ExtractedDoc], chunks: &[&Chunk]) -> Result<()> {
103 let conn = self.connection()?;
104
105 let file_list: Vec<String> = docs
107 .iter()
108 .map(|d| format!("'{}'", escape_str(&d.file)))
109 .collect();
110 if !file_list.is_empty() {
111 let files_in = file_list.join(", ");
112 let _ = conn.query(&format!(
113 "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
114 files_in
115 ));
116 let _ = conn.query(&format!(
117 "MATCH (d:Document) WHERE d.file IN [{}] DETACH DELETE d",
118 files_in
119 ));
120 }
121
122 let tmp_dir = tempfile::tempdir().context("create temp dir")?;
123
124 {
126 let ids: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
127 let titles: Vec<Option<&str>> = docs.iter().map(|d| d.title.as_deref()).collect();
128 let files: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
129 let formats: Vec<&str> = docs.iter().map(|d| d.format.as_str()).collect();
130 let hashes: Vec<&str> = docs.iter().map(|d| d.content_hash.as_str()).collect();
131 let page_counts: Vec<i64> = docs
132 .iter()
133 .map(|d| d.page_count.unwrap_or(0) as i64)
134 .collect();
135 let chunk_counts: Vec<i64> = docs
136 .iter()
137 .map(|d| chunks.iter().filter(|c| c.doc_file == d.file).count() as i64)
138 .collect();
139
140 let schema = Arc::new(Schema::new(vec![
141 Field::new("id", DataType::Utf8, false),
142 Field::new("title", DataType::Utf8, true),
143 Field::new("file", DataType::Utf8, false),
144 Field::new("format", DataType::Utf8, false),
145 Field::new("content_hash", DataType::Utf8, false),
146 Field::new("page_count", DataType::Int64, false),
147 Field::new("chunk_count", DataType::Int64, false),
148 ]));
149
150 let batch = RecordBatch::try_new(
151 schema.clone(),
152 vec![
153 Arc::new(StringArray::from(ids)),
154 Arc::new(StringArray::from(titles)),
155 Arc::new(StringArray::from(files)),
156 Arc::new(StringArray::from(formats)),
157 Arc::new(StringArray::from(hashes)),
158 Arc::new(Int64Array::from(page_counts)),
159 Arc::new(Int64Array::from(chunk_counts)),
160 ],
161 )?;
162
163 let path = tmp_dir.path().join("documents.parquet");
164 let file = std::fs::File::create(&path)?;
165 let props = WriterProperties::builder().build();
166 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
167 writer.write(&batch)?;
168 writer.close()?;
169
170 conn.query(&format!("COPY Document FROM '{}'", fwd_slash_path(&path)))
171 .map_err(|e| anyhow::anyhow!("COPY Document: {e}"))?;
172 }
173
174 if !chunks.is_empty() {
176 let ids: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
177 let doc_files: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
178 let indices: Vec<i64> = chunks.iter().map(|c| c.index as i64).collect();
179 let headings: Vec<Option<&str>> = chunks.iter().map(|c| c.heading.as_deref()).collect();
180 let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
181 let start_offsets: Vec<i64> = chunks.iter().map(|c| c.start_offset as i64).collect();
182 let end_offsets: Vec<i64> = chunks.iter().map(|c| c.end_offset as i64).collect();
183 let pages: Vec<i64> = chunks.iter().map(|c| c.page.unwrap_or(0) as i64).collect();
184 let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
185
186 let schema = Arc::new(Schema::new(vec![
187 Field::new("id", DataType::Utf8, false),
188 Field::new("doc_file", DataType::Utf8, false),
189 Field::new("idx", DataType::Int64, false),
190 Field::new("heading", DataType::Utf8, true),
191 Field::new("text", DataType::Utf8, false),
192 Field::new("start_offset", DataType::Int64, false),
193 Field::new("end_offset", DataType::Int64, false),
194 Field::new("page", DataType::Int64, false),
195 Field::new("content_hash", DataType::Utf8, false),
196 ]));
197
198 let batch = RecordBatch::try_new(
199 schema.clone(),
200 vec![
201 Arc::new(StringArray::from(ids)),
202 Arc::new(StringArray::from(doc_files)),
203 Arc::new(Int64Array::from(indices)),
204 Arc::new(StringArray::from(headings)),
205 Arc::new(StringArray::from(texts)),
206 Arc::new(Int64Array::from(start_offsets)),
207 Arc::new(Int64Array::from(end_offsets)),
208 Arc::new(Int64Array::from(pages)),
209 Arc::new(StringArray::from(hashes)),
210 ],
211 )?;
212
213 let path = tmp_dir.path().join("chunks.parquet");
214 let file = std::fs::File::create(&path)?;
215 let props = WriterProperties::builder().build();
216 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
217 writer.write(&batch)?;
218 writer.close()?;
219
220 conn.query(&format!("COPY Chunk FROM '{}'", fwd_slash_path(&path)))
221 .map_err(|e| anyhow::anyhow!("COPY Chunk: {e}"))?;
222 }
223
224 if !chunks.is_empty() {
226 let froms: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
227 let tos: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
228
229 let schema = Arc::new(Schema::new(vec![
230 Field::new("from", DataType::Utf8, false),
231 Field::new("to", DataType::Utf8, false),
232 ]));
233
234 let batch = RecordBatch::try_new(
235 schema.clone(),
236 vec![
237 Arc::new(StringArray::from(froms)),
238 Arc::new(StringArray::from(tos)),
239 ],
240 )?;
241
242 let path = tmp_dir.path().join("has_chunk.parquet");
243 let file = std::fs::File::create(&path)?;
244 let props = WriterProperties::builder().build();
245 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
246 writer.write(&batch)?;
247 writer.close()?;
248
249 conn.query(&format!("COPY HAS_CHUNK FROM '{}'", fwd_slash_path(&path)))
250 .map_err(|e| anyhow::anyhow!("COPY HAS_CHUNK: {e}"))?;
251 }
252
253 Ok(())
254 }
255
256 pub fn upsert_source(
257 &self,
258 id: &str,
259 source_type: &str,
260 base_url: &str,
261 space_key: &str,
262 ) -> Result<()> {
263 let conn = self.connection()?;
264 let _ = conn.query(&format!(
265 "MATCH (s:Source) WHERE s.id = '{}' DETACH DELETE s",
266 escape_str(id)
267 ));
268 conn.query(&format!(
269 "CREATE (s:Source {{id: '{}', source_type: '{}', base_url: '{}', space_key: '{}', last_synced: '{}'}})",
270 escape_str(id),
271 escape_str(source_type),
272 escape_str(base_url),
273 escape_str(space_key),
274 chrono::Utc::now().to_rfc3339(),
275 ))
276 .map_err(|e| anyhow::anyhow!("create Source: {e}"))?;
277 Ok(())
278 }
279
280 pub fn link_doc_to_source(&self, doc_id: &str, source_id: &str) -> Result<()> {
281 let conn = self.connection()?;
282 conn.query(&format!(
283 "MATCH (d:Document), (s:Source) WHERE d.id = '{}' AND s.id = '{}' CREATE (d)-[:FROM_SOURCE]->(s)",
284 escape_str(doc_id),
285 escape_str(source_id),
286 ))
287 .map_err(|e| anyhow::anyhow!("link FROM_SOURCE: {e}"))?;
288 Ok(())
289 }
290
291 pub fn get_docs_by_source(&self, source_id: &str) -> Result<Vec<String>> {
292 let conn = self.connection()?;
293 let result = conn
294 .query(&format!(
295 "MATCH (d:Document)-[:FROM_SOURCE]->(s:Source) WHERE s.id = '{}' RETURN d.id",
296 escape_str(source_id)
297 ))
298 .map_err(|e| anyhow::anyhow!("query docs by source: {e}"))?;
299 let mut ids = Vec::new();
300 for row in result {
301 if !row.is_empty() {
302 ids.insert(ids.len(), row[0].to_string());
303 }
304 }
305 Ok(ids)
306 }
307
308 pub fn delete_docs_by_ids(&self, doc_ids: &[&str]) -> Result<()> {
309 if doc_ids.is_empty() {
310 return Ok(());
311 }
312 let conn = self.connection()?;
313 let id_list: String = doc_ids
314 .iter()
315 .map(|id| format!("'{}'", escape_str(id)))
316 .collect::<Vec<_>>()
317 .join(", ");
318 let _ = conn.query(&format!(
319 "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
320 id_list
321 ));
322 let _ = conn.query(&format!(
323 "MATCH (d:Document) WHERE d.id IN [{}] DETACH DELETE d",
324 id_list
325 ));
326 Ok(())
327 }
328
329 pub fn create_link(
330 &self,
331 from_doc_id: &str,
332 to_doc_id: &str,
333 url: &str,
334 link_type: &str,
335 ) -> Result<()> {
336 let conn = self.connection()?;
337 conn.query(&format!(
338 "MATCH (a:Document), (b:Document) WHERE a.id = '{}' AND b.id = '{}' \
339 CREATE (a)-[:LINKS_TO {{url: '{}', link_type: '{}'}}]->(b)",
340 escape_str(from_doc_id),
341 escape_str(to_doc_id),
342 escape_str(url),
343 escape_str(link_type),
344 ))
345 .map_err(|e| anyhow::anyhow!("create LINKS_TO: {e}"))?;
346 Ok(())
347 }
348
349 pub fn delete_links_from(&self, doc_id: &str) -> Result<()> {
350 let conn = self.connection()?;
351 let _ = conn.query(&format!(
352 "MATCH (a:Document)-[r:LINKS_TO]->() WHERE a.id = '{}' DELETE r",
353 escape_str(doc_id),
354 ));
355 Ok(())
356 }
357
358 pub fn stats(&self) -> Result<DocStoreStats> {
359 let conn = self.connection()?;
360 let doc_count = count_query(&conn, "MATCH (d:Document) RETURN count(d)");
361 let chunk_count = count_query(&conn, "MATCH (c:Chunk) RETURN count(c)");
362 Ok(DocStoreStats {
363 document_count: doc_count,
364 chunk_count,
365 })
366 }
367
368 pub fn ensure_plugin_table(&self, plugin_id: &str, columns: &[(String, String)]) -> Result<()> {
372 let conn = self.connection()?;
373 let mut col_defs = String::from("id STRING");
374 for (name, col_type) in columns {
375 col_defs.push_str(&format!(", {} {}", name, col_type));
376 }
377 let ddl = format!(
378 "CREATE NODE TABLE IF NOT EXISTS Pipeline_{}({}, PRIMARY KEY(id))",
379 plugin_id, col_defs
380 );
381 conn.query(&ddl)
382 .map_err(|e| anyhow::anyhow!("ensure_plugin_table DDL: {e}"))?;
383 Ok(())
384 }
385
386 pub fn upsert_pipeline_core(&self, record: &PipelineCoreRecord) -> Result<()> {
388 let conn = self.connection()?;
389 let _ = conn.query(&format!(
391 "MATCH (p:PipelineCore) WHERE p.id = '{}' DETACH DELETE p",
392 escape_str(&record.id)
393 ));
394 let inputs_str = record
396 .inputs
397 .iter()
398 .map(|s| format!("'{}'", escape_str(s)))
399 .collect::<Vec<_>>()
400 .join(",");
401 let outputs_str = record
402 .outputs
403 .iter()
404 .map(|s| format!("'{}'", escape_str(s)))
405 .collect::<Vec<_>>()
406 .join(",");
407 conn.query(&format!(
408 "CREATE (p:PipelineCore {{id: '{}', name: '{}', doc_id: '{}', plugin_id: '{}', inputs: [{}], outputs: [{}]}})",
409 escape_str(&record.id),
410 escape_str(&record.name),
411 escape_str(&record.doc_id),
412 escape_str(&record.plugin_id),
413 inputs_str,
414 outputs_str,
415 ))
416 .map_err(|e| anyhow::anyhow!("create PipelineCore: {e}"))?;
417 Ok(())
418 }
419
420 pub fn upsert_plugin_properties(
422 &self,
423 pipeline_id: &str,
424 plugin_id: &str,
425 properties: &serde_json::Map<String, serde_json::Value>,
426 schema: &[(String, String)],
427 ) -> Result<()> {
428 let conn = self.connection()?;
429 let table = format!("Pipeline_{}", plugin_id);
430 let esc_id = escape_str(pipeline_id);
431 let _ = conn.query(&format!(
433 "MATCH (p:{}) WHERE p.id = '{}' DELETE p",
434 table, esc_id
435 ));
436 let mut props = format!("id: '{}'", esc_id);
438 for (col_name, _col_type) in schema {
439 if let Some(val) = properties.get(col_name.as_str()) {
440 let s = match val {
441 serde_json::Value::String(s) => escape_str(s),
442 other => escape_str(&other.to_string()),
443 };
444 props.push_str(&format!(", {}: '{}'", col_name, s));
445 }
446 }
447 conn.query(&format!("CREATE (p:{} {{{}}})", table, props))
448 .map_err(|e| anyhow::anyhow!("upsert plugin properties: {e}"))?;
449 Ok(())
450 }
451
452 pub fn link_pipeline_core_to_doc(&self, pipeline_id: &str, doc_id: &str) -> Result<()> {
454 let conn = self.connection()?;
455 conn.query(&format!(
456 "MATCH (p:PipelineCore), (d:Document) WHERE p.id = '{}' AND d.id = '{}' CREATE (p)-[:DEFINED_IN]->(d)",
457 escape_str(pipeline_id),
458 escape_str(doc_id),
459 ))
460 .map_err(|e| anyhow::anyhow!("link DEFINED_IN: {e}"))?;
461 Ok(())
462 }
463
464 pub fn link_pipeline_dependencies(&self) -> Result<usize> {
467 let cores = self.get_all_pipeline_cores(None)?;
468 if cores.len() < 2 {
469 return Ok(0);
470 }
471
472 let conn = self.connection()?;
473 let _ = conn.query("MATCH ()-[r:DEPENDS_ON]->() DELETE r");
475
476 let mut count = 0;
477 for producer in &cores {
478 if producer.outputs.is_empty() {
479 continue;
480 }
481 for consumer in &cores {
482 if consumer.id == producer.id || consumer.inputs.is_empty() {
483 continue;
484 }
485 let has_match = producer
487 .outputs
488 .iter()
489 .any(|out| consumer.inputs.iter().any(|inp| inp == out));
490 if has_match {
491 conn.query(&format!(
492 "MATCH (a:PipelineCore), (b:PipelineCore) WHERE a.id = '{}' AND b.id = '{}' CREATE (a)-[:DEPENDS_ON {{dep_type: 'data'}}]->(b)",
493 escape_str(&consumer.id),
494 escape_str(&producer.id),
495 ))
496 .map_err(|e| anyhow::anyhow!("create DEPENDS_ON: {e}"))?;
497 count += 1;
498 }
499 }
500 }
501 Ok(count)
502 }
503
504 pub fn get_all_pipeline_cores(
506 &self,
507 plugin_id: Option<&str>,
508 ) -> Result<Vec<PipelineCoreRecord>> {
509 let conn = self.connection()?;
510 let query = match plugin_id {
511 Some(pid) => format!(
512 "MATCH (p:PipelineCore) WHERE p.plugin_id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
513 escape_str(pid)
514 ),
515 None => "MATCH (p:PipelineCore) RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs".to_string(),
516 };
517 let result = conn
518 .query(&query)
519 .map_err(|e| anyhow::anyhow!("query pipeline cores: {e}"))?;
520 let mut records = Vec::new();
521 for row in result {
522 if row.len() >= 6 {
523 records.push(PipelineCoreRecord {
524 id: row[0].to_string(),
525 name: row[1].to_string(),
526 doc_id: row[2].to_string(),
527 plugin_id: row[3].to_string(),
528 inputs: parse_string_list(&row[4].to_string()),
529 outputs: parse_string_list(&row[5].to_string()),
530 });
531 }
532 }
533 Ok(records)
534 }
535
536 pub fn get_pipeline_core(&self, pipeline_id: &str) -> Result<Option<PipelineCoreRecord>> {
538 let conn = self.connection()?;
539 let mut result = conn
540 .query(&format!(
541 "MATCH (p:PipelineCore) WHERE p.id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
542 escape_str(pipeline_id)
543 ))
544 .map_err(|e| anyhow::anyhow!("query pipeline core: {e}"))?;
545 if let Some(row) = result.next() {
546 if row.len() >= 6 {
547 return Ok(Some(PipelineCoreRecord {
548 id: row[0].to_string(),
549 name: row[1].to_string(),
550 doc_id: row[2].to_string(),
551 plugin_id: row[3].to_string(),
552 inputs: parse_string_list(&row[4].to_string()),
553 outputs: parse_string_list(&row[5].to_string()),
554 }));
555 }
556 }
557 Ok(None)
558 }
559
560 pub fn impact_analysis(&self, table_name: &str, max_depth: u32) -> Result<Vec<ImpactResult>> {
562 let conn = self.connection()?;
563 let esc = escape_str(table_name);
564 let mut results = Vec::new();
565
566 let direct = conn
568 .query(&format!(
569 "MATCH (p:PipelineCore) WHERE list_contains(p.inputs, '{}') RETURN p.id, p.name",
570 esc
571 ))
572 .map_err(|e| anyhow::anyhow!("impact_analysis direct: {e}"))?;
573 let mut affected_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
574 for row in direct {
575 if row.len() >= 2 {
576 let id = row[0].to_string();
577 let name = row[1].to_string();
578 affected_ids.insert(id.clone());
579 results.push(ImpactResult {
580 pipeline_id: id,
581 pipeline_name: name,
582 impact_type: "direct".to_string(),
583 depth: 1,
584 path: table_name.to_string(),
585 });
586 }
587 }
588
589 if max_depth > 1 && !affected_ids.is_empty() {
591 for depth in 2..=max_depth {
592 let current_ids: Vec<String> = affected_ids.iter().cloned().collect();
593 let mut new_ids = Vec::new();
594
595 for src_id in ¤t_ids {
596 let trans = conn
597 .query(&format!(
598 "MATCH (a:PipelineCore)-[:DEPENDS_ON]->(b:PipelineCore) WHERE b.id = '{}' RETURN a.id, a.name",
599 escape_str(src_id)
600 ))
601 .map_err(|e| anyhow::anyhow!("impact_analysis transitive: {e}"))?;
602
603 for row in trans {
604 if row.len() >= 2 {
605 let id = row[0].to_string();
606 if !affected_ids.contains(&id) {
607 results.push(ImpactResult {
608 pipeline_id: id.clone(),
609 pipeline_name: row[1].to_string(),
610 impact_type: "transitive".to_string(),
611 depth,
612 path: format!("{} → ... (depth {})", table_name, depth),
613 });
614 new_ids.push(id);
615 }
616 }
617 }
618 }
619
620 if new_ids.is_empty() {
621 break;
622 }
623 affected_ids.extend(new_ids);
624 }
625 }
626
627 Ok(results)
628 }
629
630 pub fn get_pipeline_deps(&self) -> Result<Vec<(String, String, String)>> {
632 let conn = self.connection()?;
633 let result = conn
634 .query(
635 "MATCH (c:PipelineCore)-[r:DEPENDS_ON]->(p:PipelineCore) \
636 RETURN c.name, p.name, r.dep_type",
637 )
638 .map_err(|e| anyhow::anyhow!("query pipeline deps: {e}"))?;
639 let mut deps = Vec::new();
640 for row in result {
641 if row.len() >= 3 {
642 deps.push((row[0].to_string(), row[1].to_string(), row[2].to_string()));
643 }
644 }
645 Ok(deps)
646 }
647
648 pub fn query_plugin_table(
650 &self,
651 plugin_id: &str,
652 field: &str,
653 value: &str,
654 ) -> Result<Vec<serde_json::Value>> {
655 let conn = self.connection()?;
656 let table = format!("Pipeline_{}", plugin_id);
657 let esc_val = escape_str(value);
658 let result = conn
659 .query(&format!(
660 "MATCH (p:{}) WHERE lower(p.{}) CONTAINS lower('{}') RETURN p.*",
661 table, field, esc_val
662 ))
663 .map_err(|e| anyhow::anyhow!("query plugin table: {e}"))?;
664 let mut rows = Vec::new();
665 for row in result {
666 let vals: Vec<serde_json::Value> = row
667 .iter()
668 .map(|v| serde_json::Value::String(v.to_string()))
669 .collect();
670 rows.push(serde_json::Value::Array(vals));
671 }
672 Ok(rows)
673 }
674
675 pub fn pipeline_core_count(&self) -> Result<usize> {
677 let conn = self.connection()?;
678 Ok(count_query(&conn, "MATCH (p:PipelineCore) RETURN count(p)"))
679 }
680
681 pub fn get_all_chunks(&self) -> Result<Vec<(String, String)>> {
682 let conn = self.connection()?;
683 let result = conn
684 .query("MATCH (c:Chunk) RETURN c.id, c.text")
685 .map_err(|e| anyhow::anyhow!("query chunks: {e}"))?;
686 let mut chunks = Vec::new();
687 for row in result {
688 if row.len() >= 2 {
689 chunks.push((row[0].to_string(), row[1].to_string()));
690 }
691 }
692 Ok(chunks)
693 }
694
695 pub fn get_chunk_ids(&self) -> Result<std::collections::HashSet<String>> {
696 let conn = self.connection()?;
697 let result = conn
698 .query("MATCH (c:Chunk) RETURN c.id")
699 .map_err(|e| anyhow::anyhow!("query chunk ids: {e}"))?;
700 let mut ids = std::collections::HashSet::new();
701 for row in result {
702 if !row.is_empty() {
703 ids.insert(row[0].to_string());
704 }
705 }
706 Ok(ids)
707 }
708
709 pub fn get_chunk_details(&self, chunk_ids: &[&str]) -> Result<Vec<ChunkDetail>> {
710 let conn = self.connection()?;
711 let id_list: String = chunk_ids
712 .iter()
713 .map(|id| format!("'{}'", escape_str(id)))
714 .collect::<Vec<_>>()
715 .join(", ");
716 let query = format!(
717 "MATCH (c:Chunk) WHERE c.id IN [{}] RETURN c.id, c.doc_file, c.idx, c.heading, c.text, c.start_offset, c.end_offset, c.page",
718 id_list
719 );
720 let result = conn
721 .query(&query)
722 .map_err(|e| anyhow::anyhow!("chunk details: {e}"))?;
723 let mut details = Vec::new();
724 for row in result {
725 if row.len() >= 8 {
726 let heading_str = row[3].to_string();
727 let page_val: i64 = row[7].to_string().parse().unwrap_or(0);
728 details.push(ChunkDetail {
729 id: row[0].to_string(),
730 doc_file: row[1].to_string(),
731 index: row[2].to_string().parse().unwrap_or(0),
732 heading: if heading_str.is_empty() {
733 None
734 } else {
735 Some(heading_str)
736 },
737 text: row[4].to_string(),
738 start_offset: row[5].to_string().parse().unwrap_or(0),
739 end_offset: row[6].to_string().parse().unwrap_or(0),
740 page: if page_val > 0 {
741 Some(page_val as usize)
742 } else {
743 None
744 },
745 });
746 }
747 }
748 Ok(details)
749 }
750}
751
752#[derive(Debug)]
753pub struct DocStoreStats {
754 pub document_count: usize,
755 pub chunk_count: usize,
756}
757
758#[derive(Debug, Clone)]
759pub struct ChunkDetail {
760 pub id: String,
761 pub doc_file: String,
762 pub index: usize,
763 pub heading: Option<String>,
764 pub text: String,
765 pub start_offset: usize,
766 pub end_offset: usize,
767 pub page: Option<usize>,
768}
769
770#[derive(Debug, Clone)]
771pub struct PipelineCoreRecord {
772 pub id: String,
773 pub name: String,
774 pub doc_id: String,
775 pub plugin_id: String,
776 pub inputs: Vec<String>,
777 pub outputs: Vec<String>,
778}
779
780#[derive(Debug, Clone)]
781pub struct ImpactResult {
782 pub pipeline_id: String,
783 pub pipeline_name: String,
784 pub impact_type: String,
785 pub depth: u32,
786 pub path: String,
787}
788
789fn count_query(conn: &Connection<'_>, query: &str) -> usize {
790 conn.query(query)
791 .ok()
792 .and_then(|mut r| r.next().map(|row| row[0].to_string().parse().unwrap_or(0)))
793 .unwrap_or(0)
794}
795
796fn escape_str(s: &str) -> String {
797 s.replace('\'', "\\'")
798}
799
800fn parse_string_list(s: &str) -> Vec<String> {
803 let trimmed = s.trim_matches(|c| c == '[' || c == ']');
804 if trimmed.is_empty() {
805 return Vec::new();
806 }
807 trimmed
808 .split(',')
809 .map(|s| s.trim().trim_matches('\'').trim_matches('"').to_string())
810 .filter(|s| !s.is_empty())
811 .collect()
812}