1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use arrow::array::{Int64Array, StringArray};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use kuzu::{Connection, Database, SystemConfig};
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::chunk::Chunk;
14use crate::extract::ExtractedDoc;
15
16const CREATE_SCHEMA: &[&str] = &[
17 "CREATE NODE TABLE IF NOT EXISTS Document(
18 id STRING,
19 title STRING,
20 file STRING,
21 format STRING,
22 content_hash STRING,
23 page_count INT64,
24 chunk_count INT64,
25 PRIMARY KEY(id)
26 )",
27 "CREATE NODE TABLE IF NOT EXISTS Chunk(
28 id STRING,
29 doc_file STRING,
30 idx INT64,
31 heading STRING,
32 text STRING,
33 start_offset INT64,
34 end_offset INT64,
35 page INT64,
36 content_hash STRING,
37 PRIMARY KEY(id)
38 )",
39 "CREATE REL TABLE IF NOT EXISTS HAS_CHUNK(FROM Document TO Chunk)",
40 "CREATE NODE TABLE IF NOT EXISTS Source(
41 id STRING,
42 source_type STRING,
43 base_url STRING,
44 space_key STRING,
45 last_synced STRING,
46 PRIMARY KEY(id)
47 )",
48 "CREATE REL TABLE IF NOT EXISTS FROM_SOURCE(FROM Document TO Source)",
49 "CREATE REL TABLE IF NOT EXISTS LINKS_TO(FROM Document TO Document, url STRING, link_type STRING)",
50 "CREATE NODE TABLE IF NOT EXISTS PipelineCore(id STRING, name STRING, doc_id STRING, plugin_id STRING, inputs STRING[], outputs STRING[], PRIMARY KEY(id))",
51 "CREATE REL TABLE IF NOT EXISTS DEFINED_IN(FROM PipelineCore TO Document, ONE_MANY)",
52 "CREATE REL TABLE IF NOT EXISTS DEPENDS_ON(FROM PipelineCore TO PipelineCore, dep_type STRING, MANY_MANY)",
53];
54
55pub struct DocStore {
56 db: Database,
57}
58
59impl DocStore {
60 pub fn open(path: &Path) -> Result<Self> {
61 if let Some(parent) = path.parent() {
62 std::fs::create_dir_all(parent)?;
63 }
64 let db = Database::new(path, SystemConfig::default())
65 .map_err(|e| anyhow::anyhow!("failed to open docs kuzu db: {e}"))?;
66 let store = Self { db };
67 store.init_schema()?;
68 Ok(store)
69 }
70
71 fn init_schema(&self) -> Result<()> {
72 let conn = self.connection()?;
73 for ddl in CREATE_SCHEMA {
74 conn.query(ddl)
75 .map_err(|e| anyhow::anyhow!("schema DDL failed: {e}"))?;
76 }
77 Ok(())
78 }
79
80 pub fn connection(&self) -> Result<Connection<'_>> {
81 Connection::new(&self.db).map_err(|e| anyhow::anyhow!("failed to create connection: {e}"))
82 }
83
84 pub fn get_doc_hashes(&self) -> Result<HashMap<String, String>> {
85 let conn = self.connection()?;
86 let result = conn
87 .query("MATCH (d:Document) RETURN d.file, d.content_hash")
88 .map_err(|e| anyhow::anyhow!("query doc hashes: {e}"))?;
89 let mut hashes = HashMap::new();
90 for row in result {
91 if row.len() >= 2 {
92 hashes.insert(row[0].to_string(), row[1].to_string());
93 }
94 }
95 Ok(hashes)
96 }
97
98 pub fn upsert_all_parquet(&self, docs: &[&ExtractedDoc], chunks: &[&Chunk]) -> Result<()> {
99 let conn = self.connection()?;
100
101 let file_list: Vec<String> = docs
103 .iter()
104 .map(|d| format!("'{}'", escape_str(&d.file)))
105 .collect();
106 if !file_list.is_empty() {
107 let files_in = file_list.join(", ");
108 let _ = conn.query(&format!(
109 "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
110 files_in
111 ));
112 let _ = conn.query(&format!(
113 "MATCH (d:Document) WHERE d.file IN [{}] DETACH DELETE d",
114 files_in
115 ));
116 }
117
118 let tmp_dir = tempfile::tempdir().context("create temp dir")?;
119
120 {
122 let ids: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
123 let titles: Vec<Option<&str>> = docs.iter().map(|d| d.title.as_deref()).collect();
124 let files: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
125 let formats: Vec<&str> = docs.iter().map(|d| d.format.as_str()).collect();
126 let hashes: Vec<&str> = docs.iter().map(|d| d.content_hash.as_str()).collect();
127 let page_counts: Vec<i64> = docs
128 .iter()
129 .map(|d| d.page_count.unwrap_or(0) as i64)
130 .collect();
131 let chunk_counts: Vec<i64> = docs
132 .iter()
133 .map(|d| chunks.iter().filter(|c| c.doc_file == d.file).count() as i64)
134 .collect();
135
136 let schema = Arc::new(Schema::new(vec![
137 Field::new("id", DataType::Utf8, false),
138 Field::new("title", DataType::Utf8, true),
139 Field::new("file", DataType::Utf8, false),
140 Field::new("format", DataType::Utf8, false),
141 Field::new("content_hash", DataType::Utf8, false),
142 Field::new("page_count", DataType::Int64, false),
143 Field::new("chunk_count", DataType::Int64, false),
144 ]));
145
146 let batch = RecordBatch::try_new(
147 schema.clone(),
148 vec![
149 Arc::new(StringArray::from(ids)),
150 Arc::new(StringArray::from(titles)),
151 Arc::new(StringArray::from(files)),
152 Arc::new(StringArray::from(formats)),
153 Arc::new(StringArray::from(hashes)),
154 Arc::new(Int64Array::from(page_counts)),
155 Arc::new(Int64Array::from(chunk_counts)),
156 ],
157 )?;
158
159 let path = tmp_dir.path().join("documents.parquet");
160 let file = std::fs::File::create(&path)?;
161 let props = WriterProperties::builder().build();
162 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
163 writer.write(&batch)?;
164 writer.close()?;
165
166 conn.query(&format!("COPY Document FROM '{}'", path.to_string_lossy()))
167 .map_err(|e| anyhow::anyhow!("COPY Document: {e}"))?;
168 }
169
170 if !chunks.is_empty() {
172 let ids: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
173 let doc_files: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
174 let indices: Vec<i64> = chunks.iter().map(|c| c.index as i64).collect();
175 let headings: Vec<Option<&str>> = chunks.iter().map(|c| c.heading.as_deref()).collect();
176 let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
177 let start_offsets: Vec<i64> = chunks.iter().map(|c| c.start_offset as i64).collect();
178 let end_offsets: Vec<i64> = chunks.iter().map(|c| c.end_offset as i64).collect();
179 let pages: Vec<i64> = chunks.iter().map(|c| c.page.unwrap_or(0) as i64).collect();
180 let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
181
182 let schema = Arc::new(Schema::new(vec![
183 Field::new("id", DataType::Utf8, false),
184 Field::new("doc_file", DataType::Utf8, false),
185 Field::new("idx", DataType::Int64, false),
186 Field::new("heading", DataType::Utf8, true),
187 Field::new("text", DataType::Utf8, false),
188 Field::new("start_offset", DataType::Int64, false),
189 Field::new("end_offset", DataType::Int64, false),
190 Field::new("page", DataType::Int64, false),
191 Field::new("content_hash", DataType::Utf8, false),
192 ]));
193
194 let batch = RecordBatch::try_new(
195 schema.clone(),
196 vec![
197 Arc::new(StringArray::from(ids)),
198 Arc::new(StringArray::from(doc_files)),
199 Arc::new(Int64Array::from(indices)),
200 Arc::new(StringArray::from(headings)),
201 Arc::new(StringArray::from(texts)),
202 Arc::new(Int64Array::from(start_offsets)),
203 Arc::new(Int64Array::from(end_offsets)),
204 Arc::new(Int64Array::from(pages)),
205 Arc::new(StringArray::from(hashes)),
206 ],
207 )?;
208
209 let path = tmp_dir.path().join("chunks.parquet");
210 let file = std::fs::File::create(&path)?;
211 let props = WriterProperties::builder().build();
212 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
213 writer.write(&batch)?;
214 writer.close()?;
215
216 conn.query(&format!("COPY Chunk FROM '{}'", path.to_string_lossy()))
217 .map_err(|e| anyhow::anyhow!("COPY Chunk: {e}"))?;
218 }
219
220 if !chunks.is_empty() {
222 let froms: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
223 let tos: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
224
225 let schema = Arc::new(Schema::new(vec![
226 Field::new("from", DataType::Utf8, false),
227 Field::new("to", DataType::Utf8, false),
228 ]));
229
230 let batch = RecordBatch::try_new(
231 schema.clone(),
232 vec![
233 Arc::new(StringArray::from(froms)),
234 Arc::new(StringArray::from(tos)),
235 ],
236 )?;
237
238 let path = tmp_dir.path().join("has_chunk.parquet");
239 let file = std::fs::File::create(&path)?;
240 let props = WriterProperties::builder().build();
241 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
242 writer.write(&batch)?;
243 writer.close()?;
244
245 conn.query(&format!("COPY HAS_CHUNK FROM '{}'", path.to_string_lossy()))
246 .map_err(|e| anyhow::anyhow!("COPY HAS_CHUNK: {e}"))?;
247 }
248
249 Ok(())
250 }
251
252 pub fn upsert_source(
253 &self,
254 id: &str,
255 source_type: &str,
256 base_url: &str,
257 space_key: &str,
258 ) -> Result<()> {
259 let conn = self.connection()?;
260 let _ = conn.query(&format!(
261 "MATCH (s:Source) WHERE s.id = '{}' DETACH DELETE s",
262 escape_str(id)
263 ));
264 conn.query(&format!(
265 "CREATE (s:Source {{id: '{}', source_type: '{}', base_url: '{}', space_key: '{}', last_synced: '{}'}})",
266 escape_str(id),
267 escape_str(source_type),
268 escape_str(base_url),
269 escape_str(space_key),
270 chrono::Utc::now().to_rfc3339(),
271 ))
272 .map_err(|e| anyhow::anyhow!("create Source: {e}"))?;
273 Ok(())
274 }
275
276 pub fn link_doc_to_source(&self, doc_id: &str, source_id: &str) -> Result<()> {
277 let conn = self.connection()?;
278 conn.query(&format!(
279 "MATCH (d:Document), (s:Source) WHERE d.id = '{}' AND s.id = '{}' CREATE (d)-[:FROM_SOURCE]->(s)",
280 escape_str(doc_id),
281 escape_str(source_id),
282 ))
283 .map_err(|e| anyhow::anyhow!("link FROM_SOURCE: {e}"))?;
284 Ok(())
285 }
286
287 pub fn get_docs_by_source(&self, source_id: &str) -> Result<Vec<String>> {
288 let conn = self.connection()?;
289 let result = conn
290 .query(&format!(
291 "MATCH (d:Document)-[:FROM_SOURCE]->(s:Source) WHERE s.id = '{}' RETURN d.id",
292 escape_str(source_id)
293 ))
294 .map_err(|e| anyhow::anyhow!("query docs by source: {e}"))?;
295 let mut ids = Vec::new();
296 for row in result {
297 if !row.is_empty() {
298 ids.insert(ids.len(), row[0].to_string());
299 }
300 }
301 Ok(ids)
302 }
303
304 pub fn delete_docs_by_ids(&self, doc_ids: &[&str]) -> Result<()> {
305 if doc_ids.is_empty() {
306 return Ok(());
307 }
308 let conn = self.connection()?;
309 let id_list: String = doc_ids
310 .iter()
311 .map(|id| format!("'{}'", escape_str(id)))
312 .collect::<Vec<_>>()
313 .join(", ");
314 let _ = conn.query(&format!(
315 "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
316 id_list
317 ));
318 let _ = conn.query(&format!(
319 "MATCH (d:Document) WHERE d.id IN [{}] DETACH DELETE d",
320 id_list
321 ));
322 Ok(())
323 }
324
325 pub fn create_link(
326 &self,
327 from_doc_id: &str,
328 to_doc_id: &str,
329 url: &str,
330 link_type: &str,
331 ) -> Result<()> {
332 let conn = self.connection()?;
333 conn.query(&format!(
334 "MATCH (a:Document), (b:Document) WHERE a.id = '{}' AND b.id = '{}' \
335 CREATE (a)-[:LINKS_TO {{url: '{}', link_type: '{}'}}]->(b)",
336 escape_str(from_doc_id),
337 escape_str(to_doc_id),
338 escape_str(url),
339 escape_str(link_type),
340 ))
341 .map_err(|e| anyhow::anyhow!("create LINKS_TO: {e}"))?;
342 Ok(())
343 }
344
345 pub fn delete_links_from(&self, doc_id: &str) -> Result<()> {
346 let conn = self.connection()?;
347 let _ = conn.query(&format!(
348 "MATCH (a:Document)-[r:LINKS_TO]->() WHERE a.id = '{}' DELETE r",
349 escape_str(doc_id),
350 ));
351 Ok(())
352 }
353
354 pub fn stats(&self) -> Result<DocStoreStats> {
355 let conn = self.connection()?;
356 let doc_count = count_query(&conn, "MATCH (d:Document) RETURN count(d)");
357 let chunk_count = count_query(&conn, "MATCH (c:Chunk) RETURN count(c)");
358 Ok(DocStoreStats {
359 document_count: doc_count,
360 chunk_count,
361 })
362 }
363
364 pub fn ensure_plugin_table(&self, plugin_id: &str, columns: &[(String, String)]) -> Result<()> {
368 let conn = self.connection()?;
369 let mut col_defs = String::from("id STRING");
370 for (name, col_type) in columns {
371 col_defs.push_str(&format!(", {} {}", name, col_type));
372 }
373 let ddl = format!(
374 "CREATE NODE TABLE IF NOT EXISTS Pipeline_{}({}, PRIMARY KEY(id))",
375 plugin_id, col_defs
376 );
377 conn.query(&ddl)
378 .map_err(|e| anyhow::anyhow!("ensure_plugin_table DDL: {e}"))?;
379 Ok(())
380 }
381
382 pub fn upsert_pipeline_core(&self, record: &PipelineCoreRecord) -> Result<()> {
384 let conn = self.connection()?;
385 let _ = conn.query(&format!(
387 "MATCH (p:PipelineCore) WHERE p.id = '{}' DETACH DELETE p",
388 escape_str(&record.id)
389 ));
390 let inputs_str = record
392 .inputs
393 .iter()
394 .map(|s| format!("'{}'", escape_str(s)))
395 .collect::<Vec<_>>()
396 .join(",");
397 let outputs_str = record
398 .outputs
399 .iter()
400 .map(|s| format!("'{}'", escape_str(s)))
401 .collect::<Vec<_>>()
402 .join(",");
403 conn.query(&format!(
404 "CREATE (p:PipelineCore {{id: '{}', name: '{}', doc_id: '{}', plugin_id: '{}', inputs: [{}], outputs: [{}]}})",
405 escape_str(&record.id),
406 escape_str(&record.name),
407 escape_str(&record.doc_id),
408 escape_str(&record.plugin_id),
409 inputs_str,
410 outputs_str,
411 ))
412 .map_err(|e| anyhow::anyhow!("create PipelineCore: {e}"))?;
413 Ok(())
414 }
415
416 pub fn upsert_plugin_properties(
418 &self,
419 pipeline_id: &str,
420 plugin_id: &str,
421 properties: &serde_json::Map<String, serde_json::Value>,
422 schema: &[(String, String)],
423 ) -> Result<()> {
424 let conn = self.connection()?;
425 let table = format!("Pipeline_{}", plugin_id);
426 let esc_id = escape_str(pipeline_id);
427 let _ = conn.query(&format!(
429 "MATCH (p:{}) WHERE p.id = '{}' DELETE p",
430 table, esc_id
431 ));
432 let mut props = format!("id: '{}'", esc_id);
434 for (col_name, _col_type) in schema {
435 if let Some(val) = properties.get(col_name.as_str()) {
436 let s = match val {
437 serde_json::Value::String(s) => escape_str(s),
438 other => escape_str(&other.to_string()),
439 };
440 props.push_str(&format!(", {}: '{}'", col_name, s));
441 }
442 }
443 conn.query(&format!("CREATE (p:{} {{{}}})", table, props))
444 .map_err(|e| anyhow::anyhow!("upsert plugin properties: {e}"))?;
445 Ok(())
446 }
447
448 pub fn link_pipeline_core_to_doc(&self, pipeline_id: &str, doc_id: &str) -> Result<()> {
450 let conn = self.connection()?;
451 conn.query(&format!(
452 "MATCH (p:PipelineCore), (d:Document) WHERE p.id = '{}' AND d.id = '{}' CREATE (p)-[:DEFINED_IN]->(d)",
453 escape_str(pipeline_id),
454 escape_str(doc_id),
455 ))
456 .map_err(|e| anyhow::anyhow!("link DEFINED_IN: {e}"))?;
457 Ok(())
458 }
459
460 pub fn link_pipeline_dependencies(&self) -> Result<usize> {
463 let cores = self.get_all_pipeline_cores(None)?;
464 if cores.len() < 2 {
465 return Ok(0);
466 }
467
468 let conn = self.connection()?;
469 let _ = conn.query("MATCH ()-[r:DEPENDS_ON]->() DELETE r");
471
472 let mut count = 0;
473 for producer in &cores {
474 if producer.outputs.is_empty() {
475 continue;
476 }
477 for consumer in &cores {
478 if consumer.id == producer.id || consumer.inputs.is_empty() {
479 continue;
480 }
481 let has_match = producer
483 .outputs
484 .iter()
485 .any(|out| consumer.inputs.iter().any(|inp| inp == out));
486 if has_match {
487 conn.query(&format!(
488 "MATCH (a:PipelineCore), (b:PipelineCore) WHERE a.id = '{}' AND b.id = '{}' CREATE (a)-[:DEPENDS_ON {{dep_type: 'data'}}]->(b)",
489 escape_str(&consumer.id),
490 escape_str(&producer.id),
491 ))
492 .map_err(|e| anyhow::anyhow!("create DEPENDS_ON: {e}"))?;
493 count += 1;
494 }
495 }
496 }
497 Ok(count)
498 }
499
500 pub fn get_all_pipeline_cores(
502 &self,
503 plugin_id: Option<&str>,
504 ) -> Result<Vec<PipelineCoreRecord>> {
505 let conn = self.connection()?;
506 let query = match plugin_id {
507 Some(pid) => format!(
508 "MATCH (p:PipelineCore) WHERE p.plugin_id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
509 escape_str(pid)
510 ),
511 None => "MATCH (p:PipelineCore) RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs".to_string(),
512 };
513 let mut result = conn
514 .query(&query)
515 .map_err(|e| anyhow::anyhow!("query pipeline cores: {e}"))?;
516 let mut records = Vec::new();
517 while let Some(row) = result.next() {
518 if row.len() >= 6 {
519 records.push(PipelineCoreRecord {
520 id: row[0].to_string(),
521 name: row[1].to_string(),
522 doc_id: row[2].to_string(),
523 plugin_id: row[3].to_string(),
524 inputs: parse_string_list(&row[4].to_string()),
525 outputs: parse_string_list(&row[5].to_string()),
526 });
527 }
528 }
529 Ok(records)
530 }
531
532 pub fn get_pipeline_core(&self, pipeline_id: &str) -> Result<Option<PipelineCoreRecord>> {
534 let conn = self.connection()?;
535 let mut result = conn
536 .query(&format!(
537 "MATCH (p:PipelineCore) WHERE p.id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
538 escape_str(pipeline_id)
539 ))
540 .map_err(|e| anyhow::anyhow!("query pipeline core: {e}"))?;
541 if let Some(row) = result.next() {
542 if row.len() >= 6 {
543 return Ok(Some(PipelineCoreRecord {
544 id: row[0].to_string(),
545 name: row[1].to_string(),
546 doc_id: row[2].to_string(),
547 plugin_id: row[3].to_string(),
548 inputs: parse_string_list(&row[4].to_string()),
549 outputs: parse_string_list(&row[5].to_string()),
550 }));
551 }
552 }
553 Ok(None)
554 }
555
556 pub fn impact_analysis(
558 &self,
559 table_name: &str,
560 max_depth: u32,
561 ) -> Result<Vec<ImpactResult>> {
562 let conn = self.connection()?;
563 let esc = escape_str(table_name);
564 let mut results = Vec::new();
565
566 let mut direct = conn
568 .query(&format!(
569 "MATCH (p:PipelineCore) WHERE list_contains(p.inputs, '{}') RETURN p.id, p.name",
570 esc
571 ))
572 .map_err(|e| anyhow::anyhow!("impact_analysis direct: {e}"))?;
573 let mut affected_ids: std::collections::HashSet<String> =
574 std::collections::HashSet::new();
575 while let Some(row) = direct.next() {
576 if row.len() >= 2 {
577 let id = row[0].to_string();
578 let name = row[1].to_string();
579 affected_ids.insert(id.clone());
580 results.push(ImpactResult {
581 pipeline_id: id,
582 pipeline_name: name,
583 impact_type: "direct".to_string(),
584 depth: 1,
585 path: table_name.to_string(),
586 });
587 }
588 }
589
590 if max_depth > 1 && !affected_ids.is_empty() {
592 for depth in 2..=max_depth {
593 let current_ids: Vec<String> = affected_ids.iter().cloned().collect();
594 let mut new_ids = Vec::new();
595
596 for src_id in ¤t_ids {
597 let mut trans = conn
598 .query(&format!(
599 "MATCH (a:PipelineCore)-[:DEPENDS_ON]->(b:PipelineCore) WHERE b.id = '{}' RETURN a.id, a.name",
600 escape_str(src_id)
601 ))
602 .map_err(|e| anyhow::anyhow!("impact_analysis transitive: {e}"))?;
603
604 while let Some(row) = trans.next() {
605 if row.len() >= 2 {
606 let id = row[0].to_string();
607 if !affected_ids.contains(&id) {
608 results.push(ImpactResult {
609 pipeline_id: id.clone(),
610 pipeline_name: row[1].to_string(),
611 impact_type: "transitive".to_string(),
612 depth,
613 path: format!("{} → ... (depth {})", table_name, depth),
614 });
615 new_ids.push(id);
616 }
617 }
618 }
619 }
620
621 if new_ids.is_empty() {
622 break;
623 }
624 affected_ids.extend(new_ids);
625 }
626 }
627
628 Ok(results)
629 }
630
631 pub fn get_pipeline_deps(&self) -> Result<Vec<(String, String, String)>> {
633 let conn = self.connection()?;
634 let mut result = conn
635 .query(
636 "MATCH (c:PipelineCore)-[r:DEPENDS_ON]->(p:PipelineCore) \
637 RETURN c.name, p.name, r.dep_type",
638 )
639 .map_err(|e| anyhow::anyhow!("query pipeline deps: {e}"))?;
640 let mut deps = Vec::new();
641 while let Some(row) = result.next() {
642 if row.len() >= 3 {
643 deps.push((row[0].to_string(), row[1].to_string(), row[2].to_string()));
644 }
645 }
646 Ok(deps)
647 }
648
649 pub fn query_plugin_table(
651 &self,
652 plugin_id: &str,
653 field: &str,
654 value: &str,
655 ) -> Result<Vec<serde_json::Value>> {
656 let conn = self.connection()?;
657 let table = format!("Pipeline_{}", plugin_id);
658 let esc_val = escape_str(value);
659 let mut result = conn
660 .query(&format!(
661 "MATCH (p:{}) WHERE lower(p.{}) CONTAINS lower('{}') RETURN p.*",
662 table, field, esc_val
663 ))
664 .map_err(|e| anyhow::anyhow!("query plugin table: {e}"))?;
665 let mut rows = Vec::new();
666 while let Some(row) = result.next() {
667 let vals: Vec<serde_json::Value> = row
668 .iter()
669 .map(|v| serde_json::Value::String(v.to_string()))
670 .collect();
671 rows.push(serde_json::Value::Array(vals));
672 }
673 Ok(rows)
674 }
675
676 pub fn pipeline_core_count(&self) -> Result<usize> {
678 let conn = self.connection()?;
679 Ok(count_query(&conn, "MATCH (p:PipelineCore) RETURN count(p)"))
680 }
681
682 pub fn get_all_chunks(&self) -> Result<Vec<(String, String)>> {
683 let conn = self.connection()?;
684 let result = conn
685 .query("MATCH (c:Chunk) RETURN c.id, c.text")
686 .map_err(|e| anyhow::anyhow!("query chunks: {e}"))?;
687 let mut chunks = Vec::new();
688 for row in result {
689 if row.len() >= 2 {
690 chunks.push((row[0].to_string(), row[1].to_string()));
691 }
692 }
693 Ok(chunks)
694 }
695
696 pub fn get_chunk_ids(&self) -> Result<std::collections::HashSet<String>> {
697 let conn = self.connection()?;
698 let result = conn
699 .query("MATCH (c:Chunk) RETURN c.id")
700 .map_err(|e| anyhow::anyhow!("query chunk ids: {e}"))?;
701 let mut ids = std::collections::HashSet::new();
702 for row in result {
703 if !row.is_empty() {
704 ids.insert(row[0].to_string());
705 }
706 }
707 Ok(ids)
708 }
709
710 pub fn get_chunk_details(&self, chunk_ids: &[&str]) -> Result<Vec<ChunkDetail>> {
711 let conn = self.connection()?;
712 let id_list: String = chunk_ids
713 .iter()
714 .map(|id| format!("'{}'", escape_str(id)))
715 .collect::<Vec<_>>()
716 .join(", ");
717 let query = format!(
718 "MATCH (c:Chunk) WHERE c.id IN [{}] RETURN c.id, c.doc_file, c.idx, c.heading, c.text, c.start_offset, c.end_offset, c.page",
719 id_list
720 );
721 let result = conn
722 .query(&query)
723 .map_err(|e| anyhow::anyhow!("chunk details: {e}"))?;
724 let mut details = Vec::new();
725 for row in result {
726 if row.len() >= 8 {
727 let heading_str = row[3].to_string();
728 let page_val: i64 = row[7].to_string().parse().unwrap_or(0);
729 details.push(ChunkDetail {
730 id: row[0].to_string(),
731 doc_file: row[1].to_string(),
732 index: row[2].to_string().parse().unwrap_or(0),
733 heading: if heading_str.is_empty() {
734 None
735 } else {
736 Some(heading_str)
737 },
738 text: row[4].to_string(),
739 start_offset: row[5].to_string().parse().unwrap_or(0),
740 end_offset: row[6].to_string().parse().unwrap_or(0),
741 page: if page_val > 0 {
742 Some(page_val as usize)
743 } else {
744 None
745 },
746 });
747 }
748 }
749 Ok(details)
750 }
751}
752
753#[derive(Debug)]
754pub struct DocStoreStats {
755 pub document_count: usize,
756 pub chunk_count: usize,
757}
758
759#[derive(Debug, Clone)]
760pub struct ChunkDetail {
761 pub id: String,
762 pub doc_file: String,
763 pub index: usize,
764 pub heading: Option<String>,
765 pub text: String,
766 pub start_offset: usize,
767 pub end_offset: usize,
768 pub page: Option<usize>,
769}
770
771#[derive(Debug, Clone)]
772pub struct PipelineCoreRecord {
773 pub id: String,
774 pub name: String,
775 pub doc_id: String,
776 pub plugin_id: String,
777 pub inputs: Vec<String>,
778 pub outputs: Vec<String>,
779}
780
781#[derive(Debug, Clone)]
782pub struct ImpactResult {
783 pub pipeline_id: String,
784 pub pipeline_name: String,
785 pub impact_type: String,
786 pub depth: u32,
787 pub path: String,
788}
789
790fn count_query(conn: &Connection<'_>, query: &str) -> usize {
791 conn.query(query)
792 .ok()
793 .and_then(|mut r| r.next().map(|row| row[0].to_string().parse().unwrap_or(0)))
794 .unwrap_or(0)
795}
796
797fn escape_str(s: &str) -> String {
798 s.replace('\'', "\\'")
799}
800
801fn parse_string_list(s: &str) -> Vec<String> {
804 let trimmed = s.trim_matches(|c| c == '[' || c == ']');
805 if trimmed.is_empty() {
806 return Vec::new();
807 }
808 trimmed
809 .split(',')
810 .map(|s| s.trim().trim_matches('\'').trim_matches('"').to_string())
811 .filter(|s| !s.is_empty())
812 .collect()
813}