Skip to main content

thread_flow/flows/
builder.rs

1// SPDX-FileCopyrightText: 2025 Knitli Inc. <knitli@knit.li>
2// SPDX-License-Identifier: AGPL-3.0-or-later
3
4use recoco::base::spec::{ExecutionOptions, FlowInstanceSpec, IndexOptions, SourceRefreshOptions};
5use recoco::builder::flow_builder::FlowBuilder;
6use recoco::prelude::Error as RecocoError;
7use serde_json::json;
8use thread_services::error::{ServiceError, ServiceResult};
9
10#[derive(Clone)]
11struct SourceConfig {
12    path: String,
13    included: Vec<String>,
14    excluded: Vec<String>,
15}
16
17#[derive(Clone)]
18enum Step {
19    Parse,
20    ExtractSymbols,
21    ExtractImports,
22    ExtractCalls,
23}
24
25#[derive(Clone)]
26enum Target {
27    Postgres {
28        table: String,
29        primary_key: Vec<String>,
30    },
31    D1 {
32        account_id: String,
33        database_id: String,
34        api_token: String,
35        table: String,
36        primary_key: Vec<String>,
37    },
38}
39
40/// Builder for constructing standard Thread analysis pipelines.
41///
42/// This implements the Builder pattern to simplify the complexity of
43/// constructing CocoIndex flows with multiple operators.
44pub struct ThreadFlowBuilder {
45    name: String,
46    source: Option<SourceConfig>,
47    steps: Vec<Step>,
48    target: Option<Target>,
49}
50
51impl ThreadFlowBuilder {
52    pub fn new(name: impl Into<String>) -> Self {
53        Self {
54            name: name.into(),
55            source: None,
56            steps: Vec::new(),
57            target: None,
58        }
59    }
60
61    pub fn source_local(
62        mut self,
63        path: impl Into<String>,
64        included: &[&str],
65        excluded: &[&str],
66    ) -> Self {
67        self.source = Some(SourceConfig {
68            path: path.into(),
69            included: included.iter().map(|s| s.to_string()).collect(),
70            excluded: excluded.iter().map(|s| s.to_string()).collect(),
71        });
72        self
73    }
74
75    pub fn parse(mut self) -> Self {
76        self.steps.push(Step::Parse);
77        self
78    }
79
80    pub fn extract_symbols(mut self) -> Self {
81        self.steps.push(Step::ExtractSymbols);
82        self
83    }
84
85    pub fn extract_imports(mut self) -> Self {
86        self.steps.push(Step::ExtractImports);
87        self
88    }
89
90    pub fn extract_calls(mut self) -> Self {
91        self.steps.push(Step::ExtractCalls);
92        self
93    }
94
95    pub fn target_postgres(mut self, table: impl Into<String>, primary_key: &[&str]) -> Self {
96        self.target = Some(Target::Postgres {
97            table: table.into(),
98            primary_key: primary_key.iter().map(|s| s.to_string()).collect(),
99        });
100        self
101    }
102
103    /// Configure D1 as the export target
104    ///
105    /// # Arguments
106    /// * `account_id` - Cloudflare account ID
107    /// * `database_id` - D1 database ID
108    /// * `api_token` - Cloudflare API token
109    /// * `table` - Table name to export to
110    /// * `primary_key` - Primary key field names for content-addressed deduplication
111    pub fn target_d1(
112        mut self,
113        account_id: impl Into<String>,
114        database_id: impl Into<String>,
115        api_token: impl Into<String>,
116        table: impl Into<String>,
117        primary_key: &[&str],
118    ) -> Self {
119        self.target = Some(Target::D1 {
120            account_id: account_id.into(),
121            database_id: database_id.into(),
122            api_token: api_token.into(),
123            table: table.into(),
124            primary_key: primary_key.iter().map(|s| s.to_string()).collect(),
125        });
126        self
127    }
128
129    pub async fn build(self) -> ServiceResult<FlowInstanceSpec> {
130        let mut builder = FlowBuilder::new(&self.name)
131            .await
132            .map_err(|e: RecocoError| {
133                ServiceError::execution_dynamic(format!("Failed to create builder: {}", e))
134            })?;
135
136        let source_cfg = self
137            .source
138            .ok_or_else(|| ServiceError::config_static("Missing source configuration"))?;
139
140        // 1. SOURCE
141        let source_node = builder
142            .add_source(
143                "local_file".to_string(),
144                json!({
145                    "path": source_cfg.path,
146                    "included_patterns": source_cfg.included,
147                    "excluded_patterns": source_cfg.excluded
148                })
149                .as_object()
150                .ok_or_else(|| ServiceError::config_static("Invalid source spec"))?
151                .clone(),
152                None,
153                "source".to_string(),
154                Some(SourceRefreshOptions::default()),
155                Some(ExecutionOptions::default()),
156            )
157            .await
158            .map_err(|e: RecocoError| {
159                ServiceError::execution_dynamic(format!("Failed to add source: {}", e))
160            })?;
161
162        let current_node = source_node;
163        let mut parsed_node = None;
164
165        for step in self.steps {
166            match step {
167                Step::Parse => {
168                    // 2. TRANSFORM: Parse with Thread
169                    let content_field = current_node
170                        .field("content")
171                        .map_err(|e| {
172                            ServiceError::config_dynamic(format!("Missing content field: {}", e))
173                        })?
174                        .ok_or_else(|| ServiceError::config_static("Content field not found"))?;
175
176                    // Attempt to get language field, fallback to path if needed or error
177                    let language_field = current_node
178                        .field("language")
179                        .or_else(|_| current_node.field("path"))
180                        .map_err(|e| {
181                            ServiceError::config_dynamic(format!(
182                                "Missing language/path field: {}",
183                                e
184                            ))
185                        })?
186                        .ok_or_else(|| {
187                            ServiceError::config_static("Language/Path field not found")
188                        })?;
189
190                    let parsed = builder
191                        .transform(
192                            "thread_parse".to_string(),
193                            serde_json::Map::new(),
194                            vec![
195                                (content_field, Some("content".to_string())),
196                                (language_field, Some("language".to_string())),
197                            ],
198                            None,
199                            "parsed".to_string(),
200                        )
201                        .await
202                        .map_err(|e: RecocoError| {
203                            ServiceError::execution_dynamic(format!(
204                                "Failed to add parse step: {}",
205                                e
206                            ))
207                        })?;
208
209                    parsed_node = Some(parsed);
210                }
211                Step::ExtractSymbols => {
212                    // 3. COLLECT: Symbols
213                    let parsed = parsed_node.as_ref().ok_or_else(|| {
214                        ServiceError::config_static("Extract symbols requires parse step first")
215                    })?;
216
217                    let mut root_scope = builder.root_scope();
218                    let symbols_collector = root_scope
219                        .add_collector("symbols".to_string())
220                        .map_err(|e: RecocoError| {
221                            ServiceError::execution_dynamic(format!(
222                                "Failed to add collector: {}",
223                                e
224                            ))
225                        })?;
226
227                    // We need source node for file_path
228                    let path_field = current_node
229                        .field("path")
230                        .map_err(|e| {
231                            ServiceError::config_dynamic(format!("Missing path field: {}", e))
232                        })?
233                        .ok_or_else(|| ServiceError::config_static("Path field not found"))?;
234
235                    let symbols = parsed
236                        .field("symbols")
237                        .map_err(|e| {
238                            ServiceError::config_dynamic(format!(
239                                "Missing symbols field in parsed output: {}",
240                                e
241                            ))
242                        })?
243                        .ok_or_else(|| ServiceError::config_static("Symbols field not found"))?;
244
245                    // Get content_fingerprint field for content-addressed deduplication
246                    let content_fingerprint = parsed
247                        .field("content_fingerprint")
248                        .map_err(|e| {
249                            ServiceError::config_dynamic(format!(
250                                "Missing content_fingerprint field in parsed output: {}",
251                                e
252                            ))
253                        })?
254                        .ok_or_else(|| {
255                            ServiceError::config_static("Content fingerprint field not found")
256                        })?;
257
258                    builder
259                        .collect(
260                            &symbols_collector,
261                            vec![
262                                ("file_path".to_string(), path_field),
263                                ("content_fingerprint".to_string(), content_fingerprint),
264                                (
265                                    "name".to_string(),
266                                    symbols
267                                        .field("name")
268                                        .map_err(|e: RecocoError| {
269                                            ServiceError::config_dynamic(e.to_string())
270                                        })?
271                                        .ok_or_else(|| {
272                                            ServiceError::config_static(
273                                                "Symbol Name field not found",
274                                            )
275                                        })?,
276                                ),
277                                (
278                                    "kind".to_string(),
279                                    symbols
280                                        .field("kind")
281                                        .map_err(|e: RecocoError| {
282                                            ServiceError::config_dynamic(e.to_string())
283                                        })?
284                                        .ok_or_else(|| {
285                                            ServiceError::config_static(
286                                                "Symbol Kind field not found",
287                                            )
288                                        })?,
289                                ),
290                                (
291                                    "signature".to_string(),
292                                    symbols
293                                        .field("scope")
294                                        .map_err(|e: RecocoError| {
295                                            ServiceError::config_dynamic(e.to_string())
296                                        })?
297                                        .ok_or_else(|| {
298                                            ServiceError::config_static(
299                                                "Symbol Scope field not found",
300                                            )
301                                        })?,
302                                ),
303                            ],
304                            None,
305                        )
306                        .await
307                        .map_err(|e: RecocoError| {
308                            ServiceError::execution_dynamic(format!(
309                                "Failed to configure collector: {}",
310                                e
311                            ))
312                        })?;
313
314                    // 4. EXPORT
315                    if let Some(target_cfg) = &self.target {
316                        match target_cfg {
317                            Target::Postgres { table, primary_key } => {
318                                builder
319                                    .export(
320                                        "symbols_table".to_string(),
321                                        "postgres".to_string(), // target type name
322                                        json!({
323                                            "table": table,
324                                            "primary_key": primary_key
325                                        })
326                                        .as_object()
327                                        .ok_or_else(|| {
328                                            ServiceError::config_static("Invalid target spec")
329                                        })?
330                                        .clone(),
331                                        vec![],
332                                        IndexOptions {
333                                            primary_key_fields: Some(
334                                                primary_key.iter().map(|s| s.to_string()).collect(),
335                                            ),
336                                            vector_indexes: vec![],
337                                            fts_indexes: vec![],
338                                        },
339                                        &symbols_collector,
340                                        false, // setup_by_user
341                                    )
342                                    .map_err(|e: RecocoError| {
343                                        ServiceError::execution_dynamic(format!(
344                                            "Failed to add export: {}",
345                                            e
346                                        ))
347                                    })?;
348                            }
349                            Target::D1 {
350                                account_id,
351                                database_id,
352                                api_token,
353                                table,
354                                primary_key,
355                            } => {
356                                builder
357                                    .export(
358                                        "symbols_table".to_string(),
359                                        "d1".to_string(), // target type name matching D1TargetFactory::name()
360                                        json!({
361                                            "account_id": account_id,
362                                            "database_id": database_id,
363                                            "api_token": api_token,
364                                            "table_name": table
365                                        })
366                                        .as_object()
367                                        .ok_or_else(|| {
368                                            ServiceError::config_static("Invalid target spec")
369                                        })?
370                                        .clone(),
371                                        vec![],
372                                        IndexOptions {
373                                            primary_key_fields: Some(
374                                                primary_key.iter().map(|s| s.to_string()).collect(),
375                                            ),
376                                            vector_indexes: vec![],
377                                            fts_indexes: vec![],
378                                        },
379                                        &symbols_collector,
380                                        false, // setup_by_user
381                                    )
382                                    .map_err(|e: RecocoError| {
383                                        ServiceError::execution_dynamic(format!(
384                                            "Failed to add export: {}",
385                                            e
386                                        ))
387                                    })?;
388                            }
389                        }
390                    }
391                }
392                Step::ExtractImports => {
393                    // Similar to ExtractSymbols but for imports
394                    let parsed = parsed_node.as_ref().ok_or_else(|| {
395                        ServiceError::config_static("Extract imports requires parse step first")
396                    })?;
397
398                    let mut root_scope = builder.root_scope();
399                    let imports_collector = root_scope
400                        .add_collector("imports".to_string())
401                        .map_err(|e: RecocoError| {
402                            ServiceError::execution_dynamic(format!(
403                                "Failed to add collector: {}",
404                                e
405                            ))
406                        })?;
407
408                    let path_field = current_node
409                        .field("path")
410                        .map_err(|e| {
411                            ServiceError::config_dynamic(format!("Missing path field: {}", e))
412                        })?
413                        .ok_or_else(|| ServiceError::config_static("Path field not found"))?;
414
415                    let imports = parsed
416                        .field("imports")
417                        .map_err(|e| {
418                            ServiceError::config_dynamic(format!(
419                                "Missing imports field in parsed output: {}",
420                                e
421                            ))
422                        })?
423                        .ok_or_else(|| ServiceError::config_static("Imports field not found"))?;
424
425                    // Get content_fingerprint field for content-addressed deduplication
426                    let content_fingerprint = parsed
427                        .field("content_fingerprint")
428                        .map_err(|e| {
429                            ServiceError::config_dynamic(format!(
430                                "Missing content_fingerprint field in parsed output: {}",
431                                e
432                            ))
433                        })?
434                        .ok_or_else(|| {
435                            ServiceError::config_static("Content fingerprint field not found")
436                        })?;
437
438                    builder
439                        .collect(
440                            &imports_collector,
441                            vec![
442                                ("file_path".to_string(), path_field),
443                                ("content_fingerprint".to_string(), content_fingerprint),
444                                (
445                                    "symbol_name".to_string(),
446                                    imports
447                                        .field("symbol_name")
448                                        .map_err(|e: RecocoError| {
449                                            ServiceError::config_dynamic(e.to_string())
450                                        })?
451                                        .ok_or_else(|| {
452                                            ServiceError::config_static(
453                                                "Import symbol_name field not found",
454                                            )
455                                        })?,
456                                ),
457                                (
458                                    "source_path".to_string(),
459                                    imports
460                                        .field("source_path")
461                                        .map_err(|e: RecocoError| {
462                                            ServiceError::config_dynamic(e.to_string())
463                                        })?
464                                        .ok_or_else(|| {
465                                            ServiceError::config_static(
466                                                "Import source_path field not found",
467                                            )
468                                        })?,
469                                ),
470                                (
471                                    "kind".to_string(),
472                                    imports
473                                        .field("kind")
474                                        .map_err(|e: RecocoError| {
475                                            ServiceError::config_dynamic(e.to_string())
476                                        })?
477                                        .ok_or_else(|| {
478                                            ServiceError::config_static(
479                                                "Import kind field not found",
480                                            )
481                                        })?,
482                                ),
483                            ],
484                            None,
485                        )
486                        .await
487                        .map_err(|e: RecocoError| {
488                            ServiceError::execution_dynamic(format!(
489                                "Failed to configure collector: {}",
490                                e
491                            ))
492                        })?;
493
494                    // Export if target configured
495                    if let Some(target_cfg) = &self.target {
496                        match target_cfg {
497                            Target::Postgres { table, primary_key } => {
498                                builder
499                                    .export(
500                                        "imports_table".to_string(),
501                                        "postgres".to_string(),
502                                        json!({
503                                            "table": format!("{}_imports", table),
504                                            "primary_key": primary_key
505                                        })
506                                        .as_object()
507                                        .ok_or_else(|| {
508                                            ServiceError::config_static("Invalid target spec")
509                                        })?
510                                        .clone(),
511                                        vec![],
512                                        IndexOptions {
513                                            primary_key_fields: Some(
514                                                primary_key.iter().map(|s| s.to_string()).collect(),
515                                            ),
516                                            vector_indexes: vec![],
517                                            fts_indexes: vec![],
518                                        },
519                                        &imports_collector,
520                                        false,
521                                    )
522                                    .map_err(|e: RecocoError| {
523                                        ServiceError::execution_dynamic(format!(
524                                            "Failed to add export: {}",
525                                            e
526                                        ))
527                                    })?;
528                            }
529                            Target::D1 {
530                                account_id,
531                                database_id,
532                                api_token,
533                                table,
534                                primary_key,
535                            } => {
536                                builder
537                                    .export(
538                                        "imports_table".to_string(),
539                                        "d1".to_string(),
540                                        json!({
541                                            "account_id": account_id,
542                                            "database_id": database_id,
543                                            "api_token": api_token,
544                                            "table_name": format!("{}_imports", table)
545                                        })
546                                        .as_object()
547                                        .ok_or_else(|| {
548                                            ServiceError::config_static("Invalid target spec")
549                                        })?
550                                        .clone(),
551                                        vec![],
552                                        IndexOptions {
553                                            primary_key_fields: Some(
554                                                primary_key.iter().map(|s| s.to_string()).collect(),
555                                            ),
556                                            vector_indexes: vec![],
557                                            fts_indexes: vec![],
558                                        },
559                                        &imports_collector,
560                                        false,
561                                    )
562                                    .map_err(|e: RecocoError| {
563                                        ServiceError::execution_dynamic(format!(
564                                            "Failed to add export: {}",
565                                            e
566                                        ))
567                                    })?;
568                            }
569                        }
570                    }
571                }
572                Step::ExtractCalls => {
573                    // Similar to ExtractSymbols but for function calls
574                    let parsed = parsed_node.as_ref().ok_or_else(|| {
575                        ServiceError::config_static("Extract calls requires parse step first")
576                    })?;
577
578                    let mut root_scope = builder.root_scope();
579                    let calls_collector = root_scope.add_collector("calls".to_string()).map_err(
580                        |e: RecocoError| {
581                            ServiceError::execution_dynamic(format!(
582                                "Failed to add collector: {}",
583                                e
584                            ))
585                        },
586                    )?;
587
588                    let path_field = current_node
589                        .field("path")
590                        .map_err(|e| {
591                            ServiceError::config_dynamic(format!("Missing path field: {}", e))
592                        })?
593                        .ok_or_else(|| ServiceError::config_static("Path field not found"))?;
594
595                    let calls = parsed
596                        .field("calls")
597                        .map_err(|e| {
598                            ServiceError::config_dynamic(format!(
599                                "Missing calls field in parsed output: {}",
600                                e
601                            ))
602                        })?
603                        .ok_or_else(|| ServiceError::config_static("Calls field not found"))?;
604
605                    // Get content_fingerprint field for content-addressed deduplication
606                    let content_fingerprint = parsed
607                        .field("content_fingerprint")
608                        .map_err(|e| {
609                            ServiceError::config_dynamic(format!(
610                                "Missing content_fingerprint field in parsed output: {}",
611                                e
612                            ))
613                        })?
614                        .ok_or_else(|| {
615                            ServiceError::config_static("Content fingerprint field not found")
616                        })?;
617
618                    builder
619                        .collect(
620                            &calls_collector,
621                            vec![
622                                ("file_path".to_string(), path_field),
623                                ("content_fingerprint".to_string(), content_fingerprint),
624                                (
625                                    "function_name".to_string(),
626                                    calls
627                                        .field("function_name")
628                                        .map_err(|e: RecocoError| {
629                                            ServiceError::config_dynamic(e.to_string())
630                                        })?
631                                        .ok_or_else(|| {
632                                            ServiceError::config_static(
633                                                "Call function_name field not found",
634                                            )
635                                        })?,
636                                ),
637                                (
638                                    "arguments_count".to_string(),
639                                    calls
640                                        .field("arguments_count")
641                                        .map_err(|e: RecocoError| {
642                                            ServiceError::config_dynamic(e.to_string())
643                                        })?
644                                        .ok_or_else(|| {
645                                            ServiceError::config_static(
646                                                "Call arguments_count field not found",
647                                            )
648                                        })?,
649                                ),
650                            ],
651                            None,
652                        )
653                        .await
654                        .map_err(|e: RecocoError| {
655                            ServiceError::execution_dynamic(format!(
656                                "Failed to configure collector: {}",
657                                e
658                            ))
659                        })?;
660
661                    // Export if target configured
662                    if let Some(target_cfg) = &self.target {
663                        match target_cfg {
664                            Target::Postgres { table, primary_key } => {
665                                builder
666                                    .export(
667                                        "calls_table".to_string(),
668                                        "postgres".to_string(),
669                                        json!({
670                                            "table": format!("{}_calls", table),
671                                            "primary_key": primary_key
672                                        })
673                                        .as_object()
674                                        .ok_or_else(|| {
675                                            ServiceError::config_static("Invalid target spec")
676                                        })?
677                                        .clone(),
678                                        vec![],
679                                        IndexOptions {
680                                            primary_key_fields: Some(
681                                                primary_key.iter().map(|s| s.to_string()).collect(),
682                                            ),
683                                            vector_indexes: vec![],
684                                            fts_indexes: vec![],
685                                        },
686                                        &calls_collector,
687                                        false,
688                                    )
689                                    .map_err(|e: RecocoError| {
690                                        ServiceError::execution_dynamic(format!(
691                                            "Failed to add export: {}",
692                                            e
693                                        ))
694                                    })?;
695                            }
696                            Target::D1 {
697                                account_id,
698                                database_id,
699                                api_token,
700                                table,
701                                primary_key,
702                            } => {
703                                builder
704                                    .export(
705                                        "calls_table".to_string(),
706                                        "d1".to_string(),
707                                        json!({
708                                            "account_id": account_id,
709                                            "database_id": database_id,
710                                            "api_token": api_token,
711                                            "table_name": format!("{}_calls", table)
712                                        })
713                                        .as_object()
714                                        .ok_or_else(|| {
715                                            ServiceError::config_static("Invalid target spec")
716                                        })?
717                                        .clone(),
718                                        vec![],
719                                        IndexOptions {
720                                            primary_key_fields: Some(
721                                                primary_key.iter().map(|s| s.to_string()).collect(),
722                                            ),
723                                            vector_indexes: vec![],
724                                            fts_indexes: vec![],
725                                        },
726                                        &calls_collector,
727                                        false,
728                                    )
729                                    .map_err(|e: RecocoError| {
730                                        ServiceError::execution_dynamic(format!(
731                                            "Failed to add export: {}",
732                                            e
733                                        ))
734                                    })?;
735                            }
736                        }
737                    }
738                }
739            }
740        }
741
742        let ctx = builder.build_flow().await.map_err(|e: RecocoError| {
743            ServiceError::execution_dynamic(format!("Failed to build flow: {}", e))
744        })?;
745
746        Ok(ctx.0.flow.flow_instance.clone())
747    }
748}