thread_flow/flows/builder.rs
1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4use recoco::base::spec::{ExecutionOptions, FlowInstanceSpec, IndexOptions, SourceRefreshOptions};
5use recoco::builder::flow_builder::FlowBuilder;
6use recoco::prelude::Error as RecocoError;
7use serde_json::json;
8use thread_services::error::{ServiceError, ServiceResult};
9
10#[derive(Clone)]
11struct SourceConfig {
12 path: String,
13 included: Vec<String>,
14 excluded: Vec<String>,
15}
16
17#[derive(Clone)]
18enum Step {
19 Parse,
20 ExtractSymbols,
21 ExtractImports,
22 ExtractCalls,
23}
24
25#[derive(Clone)]
26enum Target {
27 Postgres {
28 table: String,
29 primary_key: Vec<String>,
30 },
31 D1 {
32 account_id: String,
33 database_id: String,
34 api_token: String,
35 table: String,
36 primary_key: Vec<String>,
37 },
38}
39
40/// Builder for constructing standard Thread analysis pipelines.
41///
42/// This implements the Builder pattern to simplify the complexity of
43/// constructing CocoIndex flows with multiple operators.
44pub struct ThreadFlowBuilder {
45 name: String,
46 source: Option<SourceConfig>,
47 steps: Vec<Step>,
48 target: Option<Target>,
49}
50
51impl ThreadFlowBuilder {
52 pub fn new(name: impl Into<String>) -> Self {
53 Self {
54 name: name.into(),
55 source: None,
56 steps: Vec::new(),
57 target: None,
58 }
59 }
60
61 pub fn source_local(
62 mut self,
63 path: impl Into<String>,
64 included: &[&str],
65 excluded: &[&str],
66 ) -> Self {
67 self.source = Some(SourceConfig {
68 path: path.into(),
69 included: included.iter().map(|s| s.to_string()).collect(),
70 excluded: excluded.iter().map(|s| s.to_string()).collect(),
71 });
72 self
73 }
74
75 pub fn parse(mut self) -> Self {
76 self.steps.push(Step::Parse);
77 self
78 }
79
80 pub fn extract_symbols(mut self) -> Self {
81 self.steps.push(Step::ExtractSymbols);
82 self
83 }
84
85 pub fn extract_imports(mut self) -> Self {
86 self.steps.push(Step::ExtractImports);
87 self
88 }
89
90 pub fn extract_calls(mut self) -> Self {
91 self.steps.push(Step::ExtractCalls);
92 self
93 }
94
95 pub fn target_postgres(mut self, table: impl Into<String>, primary_key: &[&str]) -> Self {
96 self.target = Some(Target::Postgres {
97 table: table.into(),
98 primary_key: primary_key.iter().map(|s| s.to_string()).collect(),
99 });
100 self
101 }
102
103 /// Configure D1 as the export target
104 ///
105 /// # Arguments
106 /// * `account_id` - Cloudflare account ID
107 /// * `database_id` - D1 database ID
108 /// * `api_token` - Cloudflare API token
109 /// * `table` - Table name to export to
110 /// * `primary_key` - Primary key field names for content-addressed deduplication
111 pub fn target_d1(
112 mut self,
113 account_id: impl Into<String>,
114 database_id: impl Into<String>,
115 api_token: impl Into<String>,
116 table: impl Into<String>,
117 primary_key: &[&str],
118 ) -> Self {
119 self.target = Some(Target::D1 {
120 account_id: account_id.into(),
121 database_id: database_id.into(),
122 api_token: api_token.into(),
123 table: table.into(),
124 primary_key: primary_key.iter().map(|s| s.to_string()).collect(),
125 });
126 self
127 }
128
129 pub async fn build(self) -> ServiceResult<FlowInstanceSpec> {
130 let mut builder = FlowBuilder::new(&self.name)
131 .await
132 .map_err(|e: RecocoError| {
133 ServiceError::execution_dynamic(format!("Failed to create builder: {}", e))
134 })?;
135
136 let source_cfg = self
137 .source
138 .ok_or_else(|| ServiceError::config_static("Missing source configuration"))?;
139
140 // 1. SOURCE
141 let source_node = builder
142 .add_source(
143 "local_file".to_string(),
144 json!({
145 "path": source_cfg.path,
146 "included_patterns": source_cfg.included,
147 "excluded_patterns": source_cfg.excluded
148 })
149 .as_object()
150 .ok_or_else(|| ServiceError::config_static("Invalid source spec"))?
151 .clone(),
152 None,
153 "source".to_string(),
154 Some(SourceRefreshOptions::default()),
155 Some(ExecutionOptions::default()),
156 )
157 .await
158 .map_err(|e: RecocoError| {
159 ServiceError::execution_dynamic(format!("Failed to add source: {}", e))
160 })?;
161
162 let current_node = source_node;
163 let mut parsed_node = None;
164
165 for step in self.steps {
166 match step {
167 Step::Parse => {
168 // 2. TRANSFORM: Parse with Thread
169 let content_field = current_node
170 .field("content")
171 .map_err(|e| {
172 ServiceError::config_dynamic(format!("Missing content field: {}", e))
173 })?
174 .ok_or_else(|| ServiceError::config_static("Content field not found"))?;
175
176 // Attempt to get language field, fallback to path if needed or error
177 let language_field = current_node
178 .field("language")
179 .or_else(|_| current_node.field("path"))
180 .map_err(|e| {
181 ServiceError::config_dynamic(format!(
182 "Missing language/path field: {}",
183 e
184 ))
185 })?
186 .ok_or_else(|| {
187 ServiceError::config_static("Language/Path field not found")
188 })?;
189
190 let parsed = builder
191 .transform(
192 "thread_parse".to_string(),
193 serde_json::Map::new(),
194 vec![
195 (content_field, Some("content".to_string())),
196 (language_field, Some("language".to_string())),
197 ],
198 None,
199 "parsed".to_string(),
200 )
201 .await
202 .map_err(|e: RecocoError| {
203 ServiceError::execution_dynamic(format!(
204 "Failed to add parse step: {}",
205 e
206 ))
207 })?;
208
209 parsed_node = Some(parsed);
210 }
211 Step::ExtractSymbols => {
212 // 3. COLLECT: Symbols
213 let parsed = parsed_node.as_ref().ok_or_else(|| {
214 ServiceError::config_static("Extract symbols requires parse step first")
215 })?;
216
217 let mut root_scope = builder.root_scope();
218 let symbols_collector = root_scope
219 .add_collector("symbols".to_string())
220 .map_err(|e: RecocoError| {
221 ServiceError::execution_dynamic(format!(
222 "Failed to add collector: {}",
223 e
224 ))
225 })?;
226
227 // We need source node for file_path
228 let path_field = current_node
229 .field("path")
230 .map_err(|e| {
231 ServiceError::config_dynamic(format!("Missing path field: {}", e))
232 })?
233 .ok_or_else(|| ServiceError::config_static("Path field not found"))?;
234
235 let symbols = parsed
236 .field("symbols")
237 .map_err(|e| {
238 ServiceError::config_dynamic(format!(
239 "Missing symbols field in parsed output: {}",
240 e
241 ))
242 })?
243 .ok_or_else(|| ServiceError::config_static("Symbols field not found"))?;
244
245 // Get content_fingerprint field for content-addressed deduplication
246 let content_fingerprint = parsed
247 .field("content_fingerprint")
248 .map_err(|e| {
249 ServiceError::config_dynamic(format!(
250 "Missing content_fingerprint field in parsed output: {}",
251 e
252 ))
253 })?
254 .ok_or_else(|| {
255 ServiceError::config_static("Content fingerprint field not found")
256 })?;
257
258 builder
259 .collect(
260 &symbols_collector,
261 vec![
262 ("file_path".to_string(), path_field),
263 ("content_fingerprint".to_string(), content_fingerprint),
264 (
265 "name".to_string(),
266 symbols
267 .field("name")
268 .map_err(|e: RecocoError| {
269 ServiceError::config_dynamic(e.to_string())
270 })?
271 .ok_or_else(|| {
272 ServiceError::config_static(
273 "Symbol Name field not found",
274 )
275 })?,
276 ),
277 (
278 "kind".to_string(),
279 symbols
280 .field("kind")
281 .map_err(|e: RecocoError| {
282 ServiceError::config_dynamic(e.to_string())
283 })?
284 .ok_or_else(|| {
285 ServiceError::config_static(
286 "Symbol Kind field not found",
287 )
288 })?,
289 ),
290 (
291 "signature".to_string(),
292 symbols
293 .field("scope")
294 .map_err(|e: RecocoError| {
295 ServiceError::config_dynamic(e.to_string())
296 })?
297 .ok_or_else(|| {
298 ServiceError::config_static(
299 "Symbol Scope field not found",
300 )
301 })?,
302 ),
303 ],
304 None,
305 )
306 .await
307 .map_err(|e: RecocoError| {
308 ServiceError::execution_dynamic(format!(
309 "Failed to configure collector: {}",
310 e
311 ))
312 })?;
313
314 // 4. EXPORT
315 if let Some(target_cfg) = &self.target {
316 match target_cfg {
317 Target::Postgres { table, primary_key } => {
318 builder
319 .export(
320 "symbols_table".to_string(),
321 "postgres".to_string(), // target type name
322 json!({
323 "table": table,
324 "primary_key": primary_key
325 })
326 .as_object()
327 .ok_or_else(|| {
328 ServiceError::config_static("Invalid target spec")
329 })?
330 .clone(),
331 vec![],
332 IndexOptions {
333 primary_key_fields: Some(
334 primary_key.iter().map(|s| s.to_string()).collect(),
335 ),
336 vector_indexes: vec![],
337 fts_indexes: vec![],
338 },
339 &symbols_collector,
340 false, // setup_by_user
341 )
342 .map_err(|e: RecocoError| {
343 ServiceError::execution_dynamic(format!(
344 "Failed to add export: {}",
345 e
346 ))
347 })?;
348 }
349 Target::D1 {
350 account_id,
351 database_id,
352 api_token,
353 table,
354 primary_key,
355 } => {
356 builder
357 .export(
358 "symbols_table".to_string(),
359 "d1".to_string(), // target type name matching D1TargetFactory::name()
360 json!({
361 "account_id": account_id,
362 "database_id": database_id,
363 "api_token": api_token,
364 "table_name": table
365 })
366 .as_object()
367 .ok_or_else(|| {
368 ServiceError::config_static("Invalid target spec")
369 })?
370 .clone(),
371 vec![],
372 IndexOptions {
373 primary_key_fields: Some(
374 primary_key.iter().map(|s| s.to_string()).collect(),
375 ),
376 vector_indexes: vec![],
377 fts_indexes: vec![],
378 },
379 &symbols_collector,
380 false, // setup_by_user
381 )
382 .map_err(|e: RecocoError| {
383 ServiceError::execution_dynamic(format!(
384 "Failed to add export: {}",
385 e
386 ))
387 })?;
388 }
389 }
390 }
391 }
392 Step::ExtractImports => {
393 // Similar to ExtractSymbols but for imports
394 let parsed = parsed_node.as_ref().ok_or_else(|| {
395 ServiceError::config_static("Extract imports requires parse step first")
396 })?;
397
398 let mut root_scope = builder.root_scope();
399 let imports_collector = root_scope
400 .add_collector("imports".to_string())
401 .map_err(|e: RecocoError| {
402 ServiceError::execution_dynamic(format!(
403 "Failed to add collector: {}",
404 e
405 ))
406 })?;
407
408 let path_field = current_node
409 .field("path")
410 .map_err(|e| {
411 ServiceError::config_dynamic(format!("Missing path field: {}", e))
412 })?
413 .ok_or_else(|| ServiceError::config_static("Path field not found"))?;
414
415 let imports = parsed
416 .field("imports")
417 .map_err(|e| {
418 ServiceError::config_dynamic(format!(
419 "Missing imports field in parsed output: {}",
420 e
421 ))
422 })?
423 .ok_or_else(|| ServiceError::config_static("Imports field not found"))?;
424
425 // Get content_fingerprint field for content-addressed deduplication
426 let content_fingerprint = parsed
427 .field("content_fingerprint")
428 .map_err(|e| {
429 ServiceError::config_dynamic(format!(
430 "Missing content_fingerprint field in parsed output: {}",
431 e
432 ))
433 })?
434 .ok_or_else(|| {
435 ServiceError::config_static("Content fingerprint field not found")
436 })?;
437
438 builder
439 .collect(
440 &imports_collector,
441 vec![
442 ("file_path".to_string(), path_field),
443 ("content_fingerprint".to_string(), content_fingerprint),
444 (
445 "symbol_name".to_string(),
446 imports
447 .field("symbol_name")
448 .map_err(|e: RecocoError| {
449 ServiceError::config_dynamic(e.to_string())
450 })?
451 .ok_or_else(|| {
452 ServiceError::config_static(
453 "Import symbol_name field not found",
454 )
455 })?,
456 ),
457 (
458 "source_path".to_string(),
459 imports
460 .field("source_path")
461 .map_err(|e: RecocoError| {
462 ServiceError::config_dynamic(e.to_string())
463 })?
464 .ok_or_else(|| {
465 ServiceError::config_static(
466 "Import source_path field not found",
467 )
468 })?,
469 ),
470 (
471 "kind".to_string(),
472 imports
473 .field("kind")
474 .map_err(|e: RecocoError| {
475 ServiceError::config_dynamic(e.to_string())
476 })?
477 .ok_or_else(|| {
478 ServiceError::config_static(
479 "Import kind field not found",
480 )
481 })?,
482 ),
483 ],
484 None,
485 )
486 .await
487 .map_err(|e: RecocoError| {
488 ServiceError::execution_dynamic(format!(
489 "Failed to configure collector: {}",
490 e
491 ))
492 })?;
493
494 // Export if target configured
495 if let Some(target_cfg) = &self.target {
496 match target_cfg {
497 Target::Postgres { table, primary_key } => {
498 builder
499 .export(
500 "imports_table".to_string(),
501 "postgres".to_string(),
502 json!({
503 "table": format!("{}_imports", table),
504 "primary_key": primary_key
505 })
506 .as_object()
507 .ok_or_else(|| {
508 ServiceError::config_static("Invalid target spec")
509 })?
510 .clone(),
511 vec![],
512 IndexOptions {
513 primary_key_fields: Some(
514 primary_key.iter().map(|s| s.to_string()).collect(),
515 ),
516 vector_indexes: vec![],
517 fts_indexes: vec![],
518 },
519 &imports_collector,
520 false,
521 )
522 .map_err(|e: RecocoError| {
523 ServiceError::execution_dynamic(format!(
524 "Failed to add export: {}",
525 e
526 ))
527 })?;
528 }
529 Target::D1 {
530 account_id,
531 database_id,
532 api_token,
533 table,
534 primary_key,
535 } => {
536 builder
537 .export(
538 "imports_table".to_string(),
539 "d1".to_string(),
540 json!({
541 "account_id": account_id,
542 "database_id": database_id,
543 "api_token": api_token,
544 "table_name": format!("{}_imports", table)
545 })
546 .as_object()
547 .ok_or_else(|| {
548 ServiceError::config_static("Invalid target spec")
549 })?
550 .clone(),
551 vec![],
552 IndexOptions {
553 primary_key_fields: Some(
554 primary_key.iter().map(|s| s.to_string()).collect(),
555 ),
556 vector_indexes: vec![],
557 fts_indexes: vec![],
558 },
559 &imports_collector,
560 false,
561 )
562 .map_err(|e: RecocoError| {
563 ServiceError::execution_dynamic(format!(
564 "Failed to add export: {}",
565 e
566 ))
567 })?;
568 }
569 }
570 }
571 }
572 Step::ExtractCalls => {
573 // Similar to ExtractSymbols but for function calls
574 let parsed = parsed_node.as_ref().ok_or_else(|| {
575 ServiceError::config_static("Extract calls requires parse step first")
576 })?;
577
578 let mut root_scope = builder.root_scope();
579 let calls_collector = root_scope.add_collector("calls".to_string()).map_err(
580 |e: RecocoError| {
581 ServiceError::execution_dynamic(format!(
582 "Failed to add collector: {}",
583 e
584 ))
585 },
586 )?;
587
588 let path_field = current_node
589 .field("path")
590 .map_err(|e| {
591 ServiceError::config_dynamic(format!("Missing path field: {}", e))
592 })?
593 .ok_or_else(|| ServiceError::config_static("Path field not found"))?;
594
595 let calls = parsed
596 .field("calls")
597 .map_err(|e| {
598 ServiceError::config_dynamic(format!(
599 "Missing calls field in parsed output: {}",
600 e
601 ))
602 })?
603 .ok_or_else(|| ServiceError::config_static("Calls field not found"))?;
604
605 // Get content_fingerprint field for content-addressed deduplication
606 let content_fingerprint = parsed
607 .field("content_fingerprint")
608 .map_err(|e| {
609 ServiceError::config_dynamic(format!(
610 "Missing content_fingerprint field in parsed output: {}",
611 e
612 ))
613 })?
614 .ok_or_else(|| {
615 ServiceError::config_static("Content fingerprint field not found")
616 })?;
617
618 builder
619 .collect(
620 &calls_collector,
621 vec![
622 ("file_path".to_string(), path_field),
623 ("content_fingerprint".to_string(), content_fingerprint),
624 (
625 "function_name".to_string(),
626 calls
627 .field("function_name")
628 .map_err(|e: RecocoError| {
629 ServiceError::config_dynamic(e.to_string())
630 })?
631 .ok_or_else(|| {
632 ServiceError::config_static(
633 "Call function_name field not found",
634 )
635 })?,
636 ),
637 (
638 "arguments_count".to_string(),
639 calls
640 .field("arguments_count")
641 .map_err(|e: RecocoError| {
642 ServiceError::config_dynamic(e.to_string())
643 })?
644 .ok_or_else(|| {
645 ServiceError::config_static(
646 "Call arguments_count field not found",
647 )
648 })?,
649 ),
650 ],
651 None,
652 )
653 .await
654 .map_err(|e: RecocoError| {
655 ServiceError::execution_dynamic(format!(
656 "Failed to configure collector: {}",
657 e
658 ))
659 })?;
660
661 // Export if target configured
662 if let Some(target_cfg) = &self.target {
663 match target_cfg {
664 Target::Postgres { table, primary_key } => {
665 builder
666 .export(
667 "calls_table".to_string(),
668 "postgres".to_string(),
669 json!({
670 "table": format!("{}_calls", table),
671 "primary_key": primary_key
672 })
673 .as_object()
674 .ok_or_else(|| {
675 ServiceError::config_static("Invalid target spec")
676 })?
677 .clone(),
678 vec![],
679 IndexOptions {
680 primary_key_fields: Some(
681 primary_key.iter().map(|s| s.to_string()).collect(),
682 ),
683 vector_indexes: vec![],
684 fts_indexes: vec![],
685 },
686 &calls_collector,
687 false,
688 )
689 .map_err(|e: RecocoError| {
690 ServiceError::execution_dynamic(format!(
691 "Failed to add export: {}",
692 e
693 ))
694 })?;
695 }
696 Target::D1 {
697 account_id,
698 database_id,
699 api_token,
700 table,
701 primary_key,
702 } => {
703 builder
704 .export(
705 "calls_table".to_string(),
706 "d1".to_string(),
707 json!({
708 "account_id": account_id,
709 "database_id": database_id,
710 "api_token": api_token,
711 "table_name": format!("{}_calls", table)
712 })
713 .as_object()
714 .ok_or_else(|| {
715 ServiceError::config_static("Invalid target spec")
716 })?
717 .clone(),
718 vec![],
719 IndexOptions {
720 primary_key_fields: Some(
721 primary_key.iter().map(|s| s.to_string()).collect(),
722 ),
723 vector_indexes: vec![],
724 fts_indexes: vec![],
725 },
726 &calls_collector,
727 false,
728 )
729 .map_err(|e: RecocoError| {
730 ServiceError::execution_dynamic(format!(
731 "Failed to add export: {}",
732 e
733 ))
734 })?;
735 }
736 }
737 }
738 }
739 }
740 }
741
742 let ctx = builder.build_flow().await.map_err(|e: RecocoError| {
743 ServiceError::execution_dynamic(format!("Failed to build flow: {}", e))
744 })?;
745
746 Ok(ctx.0.flow.flow_instance.clone())
747 }
748}