1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use arrow::array::{Int64Array, StringArray};
7use arrow::datatypes::{DataType, Field, Schema};
8use arrow::record_batch::RecordBatch;
9use kuzu::{Connection, Database, SystemConfig};
10use parquet::arrow::ArrowWriter;
11use parquet::file::properties::WriterProperties;
12
13use crate::chunk::Chunk;
14use crate::extract::ExtractedDoc;
15
16const CREATE_SCHEMA: &[&str] = &[
17 "CREATE NODE TABLE IF NOT EXISTS Document(
18 id STRING,
19 title STRING,
20 file STRING,
21 format STRING,
22 content_hash STRING,
23 page_count INT64,
24 chunk_count INT64,
25 PRIMARY KEY(id)
26 )",
27 "CREATE NODE TABLE IF NOT EXISTS Chunk(
28 id STRING,
29 doc_file STRING,
30 idx INT64,
31 heading STRING,
32 text STRING,
33 start_offset INT64,
34 end_offset INT64,
35 page INT64,
36 content_hash STRING,
37 PRIMARY KEY(id)
38 )",
39 "CREATE REL TABLE IF NOT EXISTS HAS_CHUNK(FROM Document TO Chunk)",
40 "CREATE NODE TABLE IF NOT EXISTS Source(
41 id STRING,
42 source_type STRING,
43 base_url STRING,
44 space_key STRING,
45 last_synced STRING,
46 PRIMARY KEY(id)
47 )",
48 "CREATE REL TABLE IF NOT EXISTS FROM_SOURCE(FROM Document TO Source)",
49 "CREATE REL TABLE IF NOT EXISTS LINKS_TO(FROM Document TO Document, url STRING, link_type STRING)",
50 "CREATE NODE TABLE IF NOT EXISTS PipelineCore(id STRING, name STRING, doc_id STRING, plugin_id STRING, inputs STRING[], outputs STRING[], PRIMARY KEY(id))",
51 "CREATE REL TABLE IF NOT EXISTS DEFINED_IN(FROM PipelineCore TO Document, ONE_MANY)",
52 "CREATE REL TABLE IF NOT EXISTS DEPENDS_ON(FROM PipelineCore TO PipelineCore, dep_type STRING, MANY_MANY)",
53];
54
55pub struct DocStore {
56 db: Database,
57}
58
59impl DocStore {
60 pub fn open(path: &Path) -> Result<Self> {
61 if let Some(parent) = path.parent() {
62 std::fs::create_dir_all(parent)?;
63 }
64 let db = Database::new(path, SystemConfig::default())
65 .map_err(|e| anyhow::anyhow!("failed to open docs kuzu db: {e}"))?;
66 let store = Self { db };
67 store.init_schema()?;
68 Ok(store)
69 }
70
71 fn init_schema(&self) -> Result<()> {
72 let conn = self.connection()?;
73 for ddl in CREATE_SCHEMA {
74 conn.query(ddl)
75 .map_err(|e| anyhow::anyhow!("schema DDL failed: {e}"))?;
76 }
77 Ok(())
78 }
79
80 pub fn connection(&self) -> Result<Connection<'_>> {
81 Connection::new(&self.db).map_err(|e| anyhow::anyhow!("failed to create connection: {e}"))
82 }
83
84 pub fn get_doc_hashes(&self) -> Result<HashMap<String, String>> {
85 let conn = self.connection()?;
86 let result = conn
87 .query("MATCH (d:Document) RETURN d.file, d.content_hash")
88 .map_err(|e| anyhow::anyhow!("query doc hashes: {e}"))?;
89 let mut hashes = HashMap::new();
90 for row in result {
91 if row.len() >= 2 {
92 hashes.insert(row[0].to_string(), row[1].to_string());
93 }
94 }
95 Ok(hashes)
96 }
97
98 pub fn upsert_all_parquet(&self, docs: &[&ExtractedDoc], chunks: &[&Chunk]) -> Result<()> {
99 let conn = self.connection()?;
100
101 let file_list: Vec<String> = docs
103 .iter()
104 .map(|d| format!("'{}'", escape_str(&d.file)))
105 .collect();
106 if !file_list.is_empty() {
107 let files_in = file_list.join(", ");
108 let _ = conn.query(&format!(
109 "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
110 files_in
111 ));
112 let _ = conn.query(&format!(
113 "MATCH (d:Document) WHERE d.file IN [{}] DETACH DELETE d",
114 files_in
115 ));
116 }
117
118 let tmp_dir = tempfile::tempdir().context("create temp dir")?;
119
120 {
122 let ids: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
123 let titles: Vec<Option<&str>> = docs.iter().map(|d| d.title.as_deref()).collect();
124 let files: Vec<&str> = docs.iter().map(|d| d.file.as_str()).collect();
125 let formats: Vec<&str> = docs.iter().map(|d| d.format.as_str()).collect();
126 let hashes: Vec<&str> = docs.iter().map(|d| d.content_hash.as_str()).collect();
127 let page_counts: Vec<i64> = docs
128 .iter()
129 .map(|d| d.page_count.unwrap_or(0) as i64)
130 .collect();
131 let chunk_counts: Vec<i64> = docs
132 .iter()
133 .map(|d| chunks.iter().filter(|c| c.doc_file == d.file).count() as i64)
134 .collect();
135
136 let schema = Arc::new(Schema::new(vec![
137 Field::new("id", DataType::Utf8, false),
138 Field::new("title", DataType::Utf8, true),
139 Field::new("file", DataType::Utf8, false),
140 Field::new("format", DataType::Utf8, false),
141 Field::new("content_hash", DataType::Utf8, false),
142 Field::new("page_count", DataType::Int64, false),
143 Field::new("chunk_count", DataType::Int64, false),
144 ]));
145
146 let batch = RecordBatch::try_new(
147 schema.clone(),
148 vec![
149 Arc::new(StringArray::from(ids)),
150 Arc::new(StringArray::from(titles)),
151 Arc::new(StringArray::from(files)),
152 Arc::new(StringArray::from(formats)),
153 Arc::new(StringArray::from(hashes)),
154 Arc::new(Int64Array::from(page_counts)),
155 Arc::new(Int64Array::from(chunk_counts)),
156 ],
157 )?;
158
159 let path = tmp_dir.path().join("documents.parquet");
160 let file = std::fs::File::create(&path)?;
161 let props = WriterProperties::builder().build();
162 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
163 writer.write(&batch)?;
164 writer.close()?;
165
166 conn.query(&format!("COPY Document FROM '{}'", path.to_string_lossy()))
167 .map_err(|e| anyhow::anyhow!("COPY Document: {e}"))?;
168 }
169
170 if !chunks.is_empty() {
172 let ids: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
173 let doc_files: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
174 let indices: Vec<i64> = chunks.iter().map(|c| c.index as i64).collect();
175 let headings: Vec<Option<&str>> = chunks.iter().map(|c| c.heading.as_deref()).collect();
176 let texts: Vec<&str> = chunks.iter().map(|c| c.text.as_str()).collect();
177 let start_offsets: Vec<i64> = chunks.iter().map(|c| c.start_offset as i64).collect();
178 let end_offsets: Vec<i64> = chunks.iter().map(|c| c.end_offset as i64).collect();
179 let pages: Vec<i64> = chunks.iter().map(|c| c.page.unwrap_or(0) as i64).collect();
180 let hashes: Vec<&str> = chunks.iter().map(|c| c.content_hash.as_str()).collect();
181
182 let schema = Arc::new(Schema::new(vec![
183 Field::new("id", DataType::Utf8, false),
184 Field::new("doc_file", DataType::Utf8, false),
185 Field::new("idx", DataType::Int64, false),
186 Field::new("heading", DataType::Utf8, true),
187 Field::new("text", DataType::Utf8, false),
188 Field::new("start_offset", DataType::Int64, false),
189 Field::new("end_offset", DataType::Int64, false),
190 Field::new("page", DataType::Int64, false),
191 Field::new("content_hash", DataType::Utf8, false),
192 ]));
193
194 let batch = RecordBatch::try_new(
195 schema.clone(),
196 vec![
197 Arc::new(StringArray::from(ids)),
198 Arc::new(StringArray::from(doc_files)),
199 Arc::new(Int64Array::from(indices)),
200 Arc::new(StringArray::from(headings)),
201 Arc::new(StringArray::from(texts)),
202 Arc::new(Int64Array::from(start_offsets)),
203 Arc::new(Int64Array::from(end_offsets)),
204 Arc::new(Int64Array::from(pages)),
205 Arc::new(StringArray::from(hashes)),
206 ],
207 )?;
208
209 let path = tmp_dir.path().join("chunks.parquet");
210 let file = std::fs::File::create(&path)?;
211 let props = WriterProperties::builder().build();
212 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
213 writer.write(&batch)?;
214 writer.close()?;
215
216 conn.query(&format!("COPY Chunk FROM '{}'", path.to_string_lossy()))
217 .map_err(|e| anyhow::anyhow!("COPY Chunk: {e}"))?;
218 }
219
220 if !chunks.is_empty() {
222 let froms: Vec<&str> = chunks.iter().map(|c| c.doc_file.as_str()).collect();
223 let tos: Vec<&str> = chunks.iter().map(|c| c.id.as_str()).collect();
224
225 let schema = Arc::new(Schema::new(vec![
226 Field::new("from", DataType::Utf8, false),
227 Field::new("to", DataType::Utf8, false),
228 ]));
229
230 let batch = RecordBatch::try_new(
231 schema.clone(),
232 vec![
233 Arc::new(StringArray::from(froms)),
234 Arc::new(StringArray::from(tos)),
235 ],
236 )?;
237
238 let path = tmp_dir.path().join("has_chunk.parquet");
239 let file = std::fs::File::create(&path)?;
240 let props = WriterProperties::builder().build();
241 let mut writer = ArrowWriter::try_new(file, schema, Some(props))?;
242 writer.write(&batch)?;
243 writer.close()?;
244
245 conn.query(&format!("COPY HAS_CHUNK FROM '{}'", path.to_string_lossy()))
246 .map_err(|e| anyhow::anyhow!("COPY HAS_CHUNK: {e}"))?;
247 }
248
249 Ok(())
250 }
251
252 pub fn upsert_source(
253 &self,
254 id: &str,
255 source_type: &str,
256 base_url: &str,
257 space_key: &str,
258 ) -> Result<()> {
259 let conn = self.connection()?;
260 let _ = conn.query(&format!(
261 "MATCH (s:Source) WHERE s.id = '{}' DETACH DELETE s",
262 escape_str(id)
263 ));
264 conn.query(&format!(
265 "CREATE (s:Source {{id: '{}', source_type: '{}', base_url: '{}', space_key: '{}', last_synced: '{}'}})",
266 escape_str(id),
267 escape_str(source_type),
268 escape_str(base_url),
269 escape_str(space_key),
270 chrono::Utc::now().to_rfc3339(),
271 ))
272 .map_err(|e| anyhow::anyhow!("create Source: {e}"))?;
273 Ok(())
274 }
275
276 pub fn link_doc_to_source(&self, doc_id: &str, source_id: &str) -> Result<()> {
277 let conn = self.connection()?;
278 conn.query(&format!(
279 "MATCH (d:Document), (s:Source) WHERE d.id = '{}' AND s.id = '{}' CREATE (d)-[:FROM_SOURCE]->(s)",
280 escape_str(doc_id),
281 escape_str(source_id),
282 ))
283 .map_err(|e| anyhow::anyhow!("link FROM_SOURCE: {e}"))?;
284 Ok(())
285 }
286
287 pub fn get_docs_by_source(&self, source_id: &str) -> Result<Vec<String>> {
288 let conn = self.connection()?;
289 let result = conn
290 .query(&format!(
291 "MATCH (d:Document)-[:FROM_SOURCE]->(s:Source) WHERE s.id = '{}' RETURN d.id",
292 escape_str(source_id)
293 ))
294 .map_err(|e| anyhow::anyhow!("query docs by source: {e}"))?;
295 let mut ids = Vec::new();
296 for row in result {
297 if !row.is_empty() {
298 ids.insert(ids.len(), row[0].to_string());
299 }
300 }
301 Ok(ids)
302 }
303
304 pub fn delete_docs_by_ids(&self, doc_ids: &[&str]) -> Result<()> {
305 if doc_ids.is_empty() {
306 return Ok(());
307 }
308 let conn = self.connection()?;
309 let id_list: String = doc_ids
310 .iter()
311 .map(|id| format!("'{}'", escape_str(id)))
312 .collect::<Vec<_>>()
313 .join(", ");
314 let _ = conn.query(&format!(
315 "MATCH (c:Chunk) WHERE c.doc_file IN [{}] DETACH DELETE c",
316 id_list
317 ));
318 let _ = conn.query(&format!(
319 "MATCH (d:Document) WHERE d.id IN [{}] DETACH DELETE d",
320 id_list
321 ));
322 Ok(())
323 }
324
325 pub fn create_link(
326 &self,
327 from_doc_id: &str,
328 to_doc_id: &str,
329 url: &str,
330 link_type: &str,
331 ) -> Result<()> {
332 let conn = self.connection()?;
333 conn.query(&format!(
334 "MATCH (a:Document), (b:Document) WHERE a.id = '{}' AND b.id = '{}' \
335 CREATE (a)-[:LINKS_TO {{url: '{}', link_type: '{}'}}]->(b)",
336 escape_str(from_doc_id),
337 escape_str(to_doc_id),
338 escape_str(url),
339 escape_str(link_type),
340 ))
341 .map_err(|e| anyhow::anyhow!("create LINKS_TO: {e}"))?;
342 Ok(())
343 }
344
345 pub fn delete_links_from(&self, doc_id: &str) -> Result<()> {
346 let conn = self.connection()?;
347 let _ = conn.query(&format!(
348 "MATCH (a:Document)-[r:LINKS_TO]->() WHERE a.id = '{}' DELETE r",
349 escape_str(doc_id),
350 ));
351 Ok(())
352 }
353
354 pub fn stats(&self) -> Result<DocStoreStats> {
355 let conn = self.connection()?;
356 let doc_count = count_query(&conn, "MATCH (d:Document) RETURN count(d)");
357 let chunk_count = count_query(&conn, "MATCH (c:Chunk) RETURN count(c)");
358 Ok(DocStoreStats {
359 document_count: doc_count,
360 chunk_count,
361 })
362 }
363
364 pub fn ensure_plugin_table(&self, plugin_id: &str, columns: &[(String, String)]) -> Result<()> {
368 let conn = self.connection()?;
369 let mut col_defs = String::from("id STRING");
370 for (name, col_type) in columns {
371 col_defs.push_str(&format!(", {} {}", name, col_type));
372 }
373 let ddl = format!(
374 "CREATE NODE TABLE IF NOT EXISTS Pipeline_{}({}, PRIMARY KEY(id))",
375 plugin_id, col_defs
376 );
377 conn.query(&ddl)
378 .map_err(|e| anyhow::anyhow!("ensure_plugin_table DDL: {e}"))?;
379 Ok(())
380 }
381
382 pub fn upsert_pipeline_core(&self, record: &PipelineCoreRecord) -> Result<()> {
384 let conn = self.connection()?;
385 let _ = conn.query(&format!(
387 "MATCH (p:PipelineCore) WHERE p.id = '{}' DETACH DELETE p",
388 escape_str(&record.id)
389 ));
390 let inputs_str = record
392 .inputs
393 .iter()
394 .map(|s| format!("'{}'", escape_str(s)))
395 .collect::<Vec<_>>()
396 .join(",");
397 let outputs_str = record
398 .outputs
399 .iter()
400 .map(|s| format!("'{}'", escape_str(s)))
401 .collect::<Vec<_>>()
402 .join(",");
403 conn.query(&format!(
404 "CREATE (p:PipelineCore {{id: '{}', name: '{}', doc_id: '{}', plugin_id: '{}', inputs: [{}], outputs: [{}]}})",
405 escape_str(&record.id),
406 escape_str(&record.name),
407 escape_str(&record.doc_id),
408 escape_str(&record.plugin_id),
409 inputs_str,
410 outputs_str,
411 ))
412 .map_err(|e| anyhow::anyhow!("create PipelineCore: {e}"))?;
413 Ok(())
414 }
415
416 pub fn upsert_plugin_properties(
418 &self,
419 pipeline_id: &str,
420 plugin_id: &str,
421 properties: &serde_json::Map<String, serde_json::Value>,
422 schema: &[(String, String)],
423 ) -> Result<()> {
424 let conn = self.connection()?;
425 let table = format!("Pipeline_{}", plugin_id);
426 let esc_id = escape_str(pipeline_id);
427 let _ = conn.query(&format!(
429 "MATCH (p:{}) WHERE p.id = '{}' DELETE p",
430 table, esc_id
431 ));
432 let mut props = format!("id: '{}'", esc_id);
434 for (col_name, _col_type) in schema {
435 if let Some(val) = properties.get(col_name.as_str()) {
436 let s = match val {
437 serde_json::Value::String(s) => escape_str(s),
438 other => escape_str(&other.to_string()),
439 };
440 props.push_str(&format!(", {}: '{}'", col_name, s));
441 }
442 }
443 conn.query(&format!("CREATE (p:{} {{{}}})", table, props))
444 .map_err(|e| anyhow::anyhow!("upsert plugin properties: {e}"))?;
445 Ok(())
446 }
447
448 pub fn link_pipeline_core_to_doc(&self, pipeline_id: &str, doc_id: &str) -> Result<()> {
450 let conn = self.connection()?;
451 conn.query(&format!(
452 "MATCH (p:PipelineCore), (d:Document) WHERE p.id = '{}' AND d.id = '{}' CREATE (p)-[:DEFINED_IN]->(d)",
453 escape_str(pipeline_id),
454 escape_str(doc_id),
455 ))
456 .map_err(|e| anyhow::anyhow!("link DEFINED_IN: {e}"))?;
457 Ok(())
458 }
459
460 pub fn link_pipeline_dependencies(&self) -> Result<usize> {
463 let cores = self.get_all_pipeline_cores(None)?;
464 if cores.len() < 2 {
465 return Ok(0);
466 }
467
468 let conn = self.connection()?;
469 let _ = conn.query("MATCH ()-[r:DEPENDS_ON]->() DELETE r");
471
472 let mut count = 0;
473 for producer in &cores {
474 if producer.outputs.is_empty() {
475 continue;
476 }
477 for consumer in &cores {
478 if consumer.id == producer.id || consumer.inputs.is_empty() {
479 continue;
480 }
481 let has_match = producer
483 .outputs
484 .iter()
485 .any(|out| consumer.inputs.iter().any(|inp| inp == out));
486 if has_match {
487 conn.query(&format!(
488 "MATCH (a:PipelineCore), (b:PipelineCore) WHERE a.id = '{}' AND b.id = '{}' CREATE (a)-[:DEPENDS_ON {{dep_type: 'data'}}]->(b)",
489 escape_str(&consumer.id),
490 escape_str(&producer.id),
491 ))
492 .map_err(|e| anyhow::anyhow!("create DEPENDS_ON: {e}"))?;
493 count += 1;
494 }
495 }
496 }
497 Ok(count)
498 }
499
500 pub fn get_all_pipeline_cores(
502 &self,
503 plugin_id: Option<&str>,
504 ) -> Result<Vec<PipelineCoreRecord>> {
505 let conn = self.connection()?;
506 let query = match plugin_id {
507 Some(pid) => format!(
508 "MATCH (p:PipelineCore) WHERE p.plugin_id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
509 escape_str(pid)
510 ),
511 None => "MATCH (p:PipelineCore) RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs".to_string(),
512 };
513 let mut result = conn
514 .query(&query)
515 .map_err(|e| anyhow::anyhow!("query pipeline cores: {e}"))?;
516 let mut records = Vec::new();
517 while let Some(row) = result.next() {
518 if row.len() >= 6 {
519 records.push(PipelineCoreRecord {
520 id: row[0].to_string(),
521 name: row[1].to_string(),
522 doc_id: row[2].to_string(),
523 plugin_id: row[3].to_string(),
524 inputs: parse_string_list(&row[4].to_string()),
525 outputs: parse_string_list(&row[5].to_string()),
526 });
527 }
528 }
529 Ok(records)
530 }
531
532 pub fn get_pipeline_core(&self, pipeline_id: &str) -> Result<Option<PipelineCoreRecord>> {
534 let conn = self.connection()?;
535 let mut result = conn
536 .query(&format!(
537 "MATCH (p:PipelineCore) WHERE p.id = '{}' RETURN p.id, p.name, p.doc_id, p.plugin_id, p.inputs, p.outputs",
538 escape_str(pipeline_id)
539 ))
540 .map_err(|e| anyhow::anyhow!("query pipeline core: {e}"))?;
541 if let Some(row) = result.next() {
542 if row.len() >= 6 {
543 return Ok(Some(PipelineCoreRecord {
544 id: row[0].to_string(),
545 name: row[1].to_string(),
546 doc_id: row[2].to_string(),
547 plugin_id: row[3].to_string(),
548 inputs: parse_string_list(&row[4].to_string()),
549 outputs: parse_string_list(&row[5].to_string()),
550 }));
551 }
552 }
553 Ok(None)
554 }
555
556 pub fn impact_analysis(&self, table_name: &str, max_depth: u32) -> Result<Vec<ImpactResult>> {
558 let conn = self.connection()?;
559 let esc = escape_str(table_name);
560 let mut results = Vec::new();
561
562 let mut direct = conn
564 .query(&format!(
565 "MATCH (p:PipelineCore) WHERE list_contains(p.inputs, '{}') RETURN p.id, p.name",
566 esc
567 ))
568 .map_err(|e| anyhow::anyhow!("impact_analysis direct: {e}"))?;
569 let mut affected_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
570 while let Some(row) = direct.next() {
571 if row.len() >= 2 {
572 let id = row[0].to_string();
573 let name = row[1].to_string();
574 affected_ids.insert(id.clone());
575 results.push(ImpactResult {
576 pipeline_id: id,
577 pipeline_name: name,
578 impact_type: "direct".to_string(),
579 depth: 1,
580 path: table_name.to_string(),
581 });
582 }
583 }
584
585 if max_depth > 1 && !affected_ids.is_empty() {
587 for depth in 2..=max_depth {
588 let current_ids: Vec<String> = affected_ids.iter().cloned().collect();
589 let mut new_ids = Vec::new();
590
591 for src_id in ¤t_ids {
592 let mut trans = conn
593 .query(&format!(
594 "MATCH (a:PipelineCore)-[:DEPENDS_ON]->(b:PipelineCore) WHERE b.id = '{}' RETURN a.id, a.name",
595 escape_str(src_id)
596 ))
597 .map_err(|e| anyhow::anyhow!("impact_analysis transitive: {e}"))?;
598
599 while let Some(row) = trans.next() {
600 if row.len() >= 2 {
601 let id = row[0].to_string();
602 if !affected_ids.contains(&id) {
603 results.push(ImpactResult {
604 pipeline_id: id.clone(),
605 pipeline_name: row[1].to_string(),
606 impact_type: "transitive".to_string(),
607 depth,
608 path: format!("{} → ... (depth {})", table_name, depth),
609 });
610 new_ids.push(id);
611 }
612 }
613 }
614 }
615
616 if new_ids.is_empty() {
617 break;
618 }
619 affected_ids.extend(new_ids);
620 }
621 }
622
623 Ok(results)
624 }
625
626 pub fn get_pipeline_deps(&self) -> Result<Vec<(String, String, String)>> {
628 let conn = self.connection()?;
629 let mut result = conn
630 .query(
631 "MATCH (c:PipelineCore)-[r:DEPENDS_ON]->(p:PipelineCore) \
632 RETURN c.name, p.name, r.dep_type",
633 )
634 .map_err(|e| anyhow::anyhow!("query pipeline deps: {e}"))?;
635 let mut deps = Vec::new();
636 while let Some(row) = result.next() {
637 if row.len() >= 3 {
638 deps.push((row[0].to_string(), row[1].to_string(), row[2].to_string()));
639 }
640 }
641 Ok(deps)
642 }
643
644 pub fn query_plugin_table(
646 &self,
647 plugin_id: &str,
648 field: &str,
649 value: &str,
650 ) -> Result<Vec<serde_json::Value>> {
651 let conn = self.connection()?;
652 let table = format!("Pipeline_{}", plugin_id);
653 let esc_val = escape_str(value);
654 let mut result = conn
655 .query(&format!(
656 "MATCH (p:{}) WHERE lower(p.{}) CONTAINS lower('{}') RETURN p.*",
657 table, field, esc_val
658 ))
659 .map_err(|e| anyhow::anyhow!("query plugin table: {e}"))?;
660 let mut rows = Vec::new();
661 while let Some(row) = result.next() {
662 let vals: Vec<serde_json::Value> = row
663 .iter()
664 .map(|v| serde_json::Value::String(v.to_string()))
665 .collect();
666 rows.push(serde_json::Value::Array(vals));
667 }
668 Ok(rows)
669 }
670
671 pub fn pipeline_core_count(&self) -> Result<usize> {
673 let conn = self.connection()?;
674 Ok(count_query(&conn, "MATCH (p:PipelineCore) RETURN count(p)"))
675 }
676
677 pub fn get_all_chunks(&self) -> Result<Vec<(String, String)>> {
678 let conn = self.connection()?;
679 let result = conn
680 .query("MATCH (c:Chunk) RETURN c.id, c.text")
681 .map_err(|e| anyhow::anyhow!("query chunks: {e}"))?;
682 let mut chunks = Vec::new();
683 for row in result {
684 if row.len() >= 2 {
685 chunks.push((row[0].to_string(), row[1].to_string()));
686 }
687 }
688 Ok(chunks)
689 }
690
691 pub fn get_chunk_ids(&self) -> Result<std::collections::HashSet<String>> {
692 let conn = self.connection()?;
693 let result = conn
694 .query("MATCH (c:Chunk) RETURN c.id")
695 .map_err(|e| anyhow::anyhow!("query chunk ids: {e}"))?;
696 let mut ids = std::collections::HashSet::new();
697 for row in result {
698 if !row.is_empty() {
699 ids.insert(row[0].to_string());
700 }
701 }
702 Ok(ids)
703 }
704
705 pub fn get_chunk_details(&self, chunk_ids: &[&str]) -> Result<Vec<ChunkDetail>> {
706 let conn = self.connection()?;
707 let id_list: String = chunk_ids
708 .iter()
709 .map(|id| format!("'{}'", escape_str(id)))
710 .collect::<Vec<_>>()
711 .join(", ");
712 let query = format!(
713 "MATCH (c:Chunk) WHERE c.id IN [{}] RETURN c.id, c.doc_file, c.idx, c.heading, c.text, c.start_offset, c.end_offset, c.page",
714 id_list
715 );
716 let result = conn
717 .query(&query)
718 .map_err(|e| anyhow::anyhow!("chunk details: {e}"))?;
719 let mut details = Vec::new();
720 for row in result {
721 if row.len() >= 8 {
722 let heading_str = row[3].to_string();
723 let page_val: i64 = row[7].to_string().parse().unwrap_or(0);
724 details.push(ChunkDetail {
725 id: row[0].to_string(),
726 doc_file: row[1].to_string(),
727 index: row[2].to_string().parse().unwrap_or(0),
728 heading: if heading_str.is_empty() {
729 None
730 } else {
731 Some(heading_str)
732 },
733 text: row[4].to_string(),
734 start_offset: row[5].to_string().parse().unwrap_or(0),
735 end_offset: row[6].to_string().parse().unwrap_or(0),
736 page: if page_val > 0 {
737 Some(page_val as usize)
738 } else {
739 None
740 },
741 });
742 }
743 }
744 Ok(details)
745 }
746}
747
748#[derive(Debug)]
749pub struct DocStoreStats {
750 pub document_count: usize,
751 pub chunk_count: usize,
752}
753
754#[derive(Debug, Clone)]
755pub struct ChunkDetail {
756 pub id: String,
757 pub doc_file: String,
758 pub index: usize,
759 pub heading: Option<String>,
760 pub text: String,
761 pub start_offset: usize,
762 pub end_offset: usize,
763 pub page: Option<usize>,
764}
765
766#[derive(Debug, Clone)]
767pub struct PipelineCoreRecord {
768 pub id: String,
769 pub name: String,
770 pub doc_id: String,
771 pub plugin_id: String,
772 pub inputs: Vec<String>,
773 pub outputs: Vec<String>,
774}
775
776#[derive(Debug, Clone)]
777pub struct ImpactResult {
778 pub pipeline_id: String,
779 pub pipeline_name: String,
780 pub impact_type: String,
781 pub depth: u32,
782 pub path: String,
783}
784
785fn count_query(conn: &Connection<'_>, query: &str) -> usize {
786 conn.query(query)
787 .ok()
788 .and_then(|mut r| r.next().map(|row| row[0].to_string().parse().unwrap_or(0)))
789 .unwrap_or(0)
790}
791
792fn escape_str(s: &str) -> String {
793 s.replace('\'', "\\'")
794}
795
796fn parse_string_list(s: &str) -> Vec<String> {
799 let trimmed = s.trim_matches(|c| c == '[' || c == ']');
800 if trimmed.is_empty() {
801 return Vec::new();
802 }
803 trimmed
804 .split(',')
805 .map(|s| s.trim().trim_matches('\'').trim_matches('"').to_string())
806 .filter(|s| !s.is_empty())
807 .collect()
808}