Skip to main content

recoco_core/base/
spec.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::prelude::*;
14
15use super::schema::{EnrichedValueType, FieldSchema};
16use serde::{Deserialize, Serialize};
17use std::fmt;
18use std::ops::Deref;
19
20/// OutputMode enum for displaying spec info in different granularity
21#[derive(Debug, Clone, Copy, Eq, PartialEq, Serialize, Deserialize)]
22#[serde(rename_all = "lowercase")]
23pub enum OutputMode {
24    Concise,
25    Verbose,
26}
27
28/// Formatting spec per output mode
29pub trait SpecFormatter {
30    fn format(&self, mode: OutputMode) -> String;
31}
32
33pub type ScopeName = String;
34
35/// Used to identify a data field within a flow.
36/// Within a flow, in each specific scope, each field name must be unique.
37/// - A field is defined by `outputs` of an operation. There must be exactly one definition for each field.
38/// - A field can be used as an input for multiple operations.
39pub type FieldName = String;
40
41pub const ROOT_SCOPE_NAME: &str = "_root";
42
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash, Default)]
44pub struct FieldPath(pub Vec<FieldName>);
45
46impl Deref for FieldPath {
47    type Target = Vec<FieldName>;
48
49    fn deref(&self) -> &Self::Target {
50        &self.0
51    }
52}
53
54impl fmt::Display for FieldPath {
55    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56        if self.is_empty() {
57            write!(f, "*")
58        } else {
59            write!(f, "{}", self.join("."))
60        }
61    }
62}
63
64/// Used to identify an input or output argument for an operator.
65/// Useful to identify different inputs/outputs of the same operation. Usually omitted for operations with the same purpose of input/output.
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Default)]
67pub struct OpArgName(pub Option<String>);
68
69impl fmt::Display for OpArgName {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        if let Some(arg_name) = &self.0 {
72            write!(f, "${arg_name}")
73        } else {
74            write!(f, "?")
75        }
76    }
77}
78
79impl OpArgName {
80    pub fn is_unnamed(&self) -> bool {
81        self.0.is_none()
82    }
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
86pub struct NamedSpec<T> {
87    pub name: String,
88
89    #[serde(flatten)]
90    pub spec: T,
91}
92
93impl<T: fmt::Display> fmt::Display for NamedSpec<T> {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
95        write!(f, "{}: {}", self.name, self.spec)
96    }
97}
98
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub struct FieldMapping {
101    /// If unspecified, means the current scope.
102    /// "_root" refers to the top-level scope.
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub scope: Option<ScopeName>,
105
106    pub field_path: FieldPath,
107}
108
109impl fmt::Display for FieldMapping {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        let scope = self.scope.as_deref().unwrap_or("");
112        write!(
113            f,
114            "{}{}",
115            if scope.is_empty() {
116                "".to_string()
117            } else {
118                format!("{scope}.")
119            },
120            self.field_path
121        )
122    }
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
126pub struct ConstantMapping {
127    pub schema: EnrichedValueType,
128    pub value: serde_json::Value,
129}
130
131impl fmt::Display for ConstantMapping {
132    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
133        let value = serde_json::to_string(&self.value).unwrap_or("#serde_error".to_string());
134        write!(f, "{value}")
135    }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize)]
139pub struct StructMapping {
140    pub fields: Vec<NamedSpec<ValueMapping>>,
141}
142
143impl fmt::Display for StructMapping {
144    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
145        let fields = self
146            .fields
147            .iter()
148            .map(|field| field.name.clone())
149            .collect::<Vec<_>>()
150            .join(",");
151        write!(f, "{fields}")
152    }
153}
154
155#[derive(Debug, Clone, Serialize, Deserialize)]
156#[serde(tag = "kind")]
157pub enum ValueMapping {
158    Constant(ConstantMapping),
159    Field(FieldMapping),
160    // TODO: Add support for collections
161}
162
163impl ValueMapping {
164    pub fn is_entire_scope(&self) -> bool {
165        match self {
166            ValueMapping::Field(FieldMapping {
167                scope: None,
168                field_path,
169            }) => field_path.is_empty(),
170            _ => false,
171        }
172    }
173}
174
175impl std::fmt::Display for ValueMapping {
176    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> fmt::Result {
177        match self {
178            ValueMapping::Constant(v) => write!(
179                f,
180                "{}",
181                serde_json::to_string(&v.value)
182                    .unwrap_or_else(|_| "#(invalid json value)".to_string())
183            ),
184            ValueMapping::Field(v) => {
185                write!(f, "{}.{}", v.scope.as_deref().unwrap_or(""), v.field_path)
186            }
187        }
188    }
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct OpArgBinding {
193    #[serde(default, skip_serializing_if = "OpArgName::is_unnamed")]
194    pub arg_name: OpArgName,
195
196    #[serde(flatten)]
197    pub value: ValueMapping,
198}
199
200impl fmt::Display for OpArgBinding {
201    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
202        if self.arg_name.is_unnamed() {
203            write!(f, "{}", self.value)
204        } else {
205            write!(f, "{}={}", self.arg_name, self.value)
206        }
207    }
208}
209
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct OpSpec {
212    pub kind: String,
213    #[serde(flatten, default)]
214    pub spec: serde_json::Map<String, serde_json::Value>,
215}
216
217impl SpecFormatter for OpSpec {
218    fn format(&self, mode: OutputMode) -> String {
219        match mode {
220            OutputMode::Concise => self.kind.clone(),
221            OutputMode::Verbose => {
222                let spec_str = serde_json::to_string_pretty(&self.spec)
223                    .map(|s| {
224                        let lines: Vec<&str> = s.lines().collect();
225                        if lines.len() < s.lines().count() {
226                            lines
227                                .into_iter()
228                                .chain(["..."])
229                                .collect::<Vec<_>>()
230                                .join("\n  ")
231                        } else {
232                            lines.join("\n  ")
233                        }
234                    })
235                    .unwrap_or("#serde_error".to_string());
236                format!("{}({})", self.kind, spec_str)
237            }
238        }
239    }
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize, Default)]
243pub struct ExecutionOptions {
244    #[serde(default, skip_serializing_if = "Option::is_none")]
245    pub max_inflight_rows: Option<usize>,
246
247    #[serde(default, skip_serializing_if = "Option::is_none")]
248    pub max_inflight_bytes: Option<usize>,
249
250    #[serde(default, skip_serializing_if = "Option::is_none")]
251    pub timeout: Option<std::time::Duration>,
252}
253
254impl ExecutionOptions {
255    pub fn get_concur_control_options(&self) -> concur_control::Options {
256        concur_control::Options {
257            max_inflight_rows: self.max_inflight_rows,
258            max_inflight_bytes: self.max_inflight_bytes,
259        }
260    }
261}
262
263#[derive(Debug, Clone, Serialize, Deserialize, Default)]
264pub struct SourceRefreshOptions {
265    pub refresh_interval: Option<std::time::Duration>,
266}
267
268impl fmt::Display for SourceRefreshOptions {
269    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
270        let refresh = self
271            .refresh_interval
272            .map(|d| format!("{d:?}"))
273            .unwrap_or("none".to_string());
274        write!(f, "{refresh}")
275    }
276}
277
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct ImportOpSpec {
280    pub source: OpSpec,
281
282    #[serde(default)]
283    pub refresh_options: SourceRefreshOptions,
284
285    #[serde(default)]
286    pub execution_options: ExecutionOptions,
287}
288
289impl SpecFormatter for ImportOpSpec {
290    fn format(&self, mode: OutputMode) -> String {
291        let source = self.source.format(mode);
292        format!("source={}, refresh={}", source, self.refresh_options)
293    }
294}
295
296impl fmt::Display for ImportOpSpec {
297    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
298        write!(f, "{}", self.format(OutputMode::Concise))
299    }
300}
301
302/// Transform data using a given operator.
303#[derive(Debug, Clone, Serialize, Deserialize)]
304pub struct TransformOpSpec {
305    pub inputs: Vec<OpArgBinding>,
306    pub op: OpSpec,
307
308    #[serde(default)]
309    pub execution_options: ExecutionOptions,
310}
311
312impl SpecFormatter for TransformOpSpec {
313    fn format(&self, mode: OutputMode) -> String {
314        let inputs = self
315            .inputs
316            .iter()
317            .map(ToString::to_string)
318            .collect::<Vec<_>>()
319            .join(",");
320        let op_str = self.op.format(mode);
321        match mode {
322            OutputMode::Concise => format!("op={op_str}, inputs={inputs}"),
323            OutputMode::Verbose => format!("op={op_str}, inputs=[{inputs}]"),
324        }
325    }
326}
327
328/// Apply reactive operations to each row of the input field.
329#[derive(Debug, Clone, Serialize, Deserialize)]
330pub struct ForEachOpSpec {
331    /// Mapping that provides a table to apply reactive operations to.
332    pub field_path: FieldPath,
333    pub op_scope: ReactiveOpScope,
334
335    #[serde(default)]
336    pub execution_options: ExecutionOptions,
337}
338
339impl ForEachOpSpec {
340    pub fn get_label(&self) -> String {
341        format!("Loop over {}", self.field_path)
342    }
343}
344
345impl SpecFormatter for ForEachOpSpec {
346    fn format(&self, mode: OutputMode) -> String {
347        match mode {
348            OutputMode::Concise => self.get_label(),
349            OutputMode::Verbose => format!("field={}", self.field_path),
350        }
351    }
352}
353
354/// Emit data to a given collector at the given scope.
355#[derive(Debug, Clone, Serialize, Deserialize)]
356pub struct CollectOpSpec {
357    /// Field values to be collected.
358    pub input: StructMapping,
359    /// Scope for the collector.
360    pub scope_name: ScopeName,
361    /// Name of the collector.
362    pub collector_name: FieldName,
363    /// If specified, the collector will have an automatically generated UUID field with the given name.
364    /// The uuid will remain stable when collected input values remain unchanged.
365    pub auto_uuid_field: Option<FieldName>,
366}
367
368impl SpecFormatter for CollectOpSpec {
369    fn format(&self, mode: OutputMode) -> String {
370        let uuid = self.auto_uuid_field.as_deref().unwrap_or("none");
371        match mode {
372            OutputMode::Concise => {
373                format!(
374                    "collector={}, input={}, uuid={}",
375                    self.collector_name, self.input, uuid
376                )
377            }
378            OutputMode::Verbose => {
379                format!(
380                    "scope={}, collector={}, input=[{}], uuid={}",
381                    self.scope_name, self.collector_name, self.input, uuid
382                )
383            }
384        }
385    }
386}
387
388#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
389pub enum VectorSimilarityMetric {
390    CosineSimilarity,
391    L2Distance,
392    InnerProduct,
393}
394
395impl fmt::Display for VectorSimilarityMetric {
396    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
397        match self {
398            VectorSimilarityMetric::CosineSimilarity => write!(f, "Cosine"),
399            VectorSimilarityMetric::L2Distance => write!(f, "L2"),
400            VectorSimilarityMetric::InnerProduct => write!(f, "InnerProduct"),
401        }
402    }
403}
404
405#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
406#[serde(tag = "kind")]
407pub enum VectorIndexMethod {
408    Hnsw {
409        #[serde(default, skip_serializing_if = "Option::is_none")]
410        m: Option<u32>,
411        #[serde(default, skip_serializing_if = "Option::is_none")]
412        ef_construction: Option<u32>,
413    },
414    IvfFlat {
415        #[serde(default, skip_serializing_if = "Option::is_none")]
416        lists: Option<u32>,
417    },
418}
419
420impl VectorIndexMethod {
421    pub fn kind(&self) -> &'static str {
422        match self {
423            Self::Hnsw { .. } => "Hnsw",
424            Self::IvfFlat { .. } => "IvfFlat",
425        }
426    }
427}
428
429impl fmt::Display for VectorIndexMethod {
430    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
431        match self {
432            Self::Hnsw { m, ef_construction } => {
433                let mut parts = Vec::new();
434                if let Some(m) = m {
435                    parts.push(format!("m={}", m));
436                }
437                if let Some(ef) = ef_construction {
438                    parts.push(format!("ef_construction={}", ef));
439                }
440                if parts.is_empty() {
441                    write!(f, "Hnsw")
442                } else {
443                    write!(f, "Hnsw({})", parts.join(","))
444                }
445            }
446            Self::IvfFlat { lists } => {
447                if let Some(lists) = lists {
448                    write!(f, "IvfFlat(lists={lists})")
449                } else {
450                    write!(f, "IvfFlat")
451                }
452            }
453        }
454    }
455}
456
457#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
458pub struct VectorIndexDef {
459    pub field_name: FieldName,
460    pub metric: VectorSimilarityMetric,
461    #[serde(default, skip_serializing_if = "Option::is_none")]
462    pub method: Option<VectorIndexMethod>,
463}
464
465impl fmt::Display for VectorIndexDef {
466    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
467        match &self.method {
468            None => write!(f, "{}:{}", self.field_name, self.metric),
469            Some(method) => write!(f, "{}:{}:{}", self.field_name, self.metric, method),
470        }
471    }
472}
473
474#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
475pub struct FtsIndexDef {
476    pub field_name: FieldName,
477    #[serde(default, skip_serializing_if = "Option::is_none")]
478    pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
479}
480
481impl fmt::Display for FtsIndexDef {
482    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
483        match &self.parameters {
484            None => write!(f, "{}", self.field_name),
485            Some(params) => {
486                let params_str = serde_json::to_string(params).unwrap_or_else(|_| "{}".to_string());
487                write!(f, "{}:{}", self.field_name, params_str)
488            }
489        }
490    }
491}
492
493#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
494pub struct IndexOptions {
495    #[serde(default, skip_serializing_if = "Option::is_none")]
496    pub primary_key_fields: Option<Vec<FieldName>>,
497    #[serde(default, skip_serializing_if = "Vec::is_empty")]
498    pub vector_indexes: Vec<VectorIndexDef>,
499    #[serde(default, skip_serializing_if = "Vec::is_empty")]
500    pub fts_indexes: Vec<FtsIndexDef>,
501}
502
503impl IndexOptions {
504    pub fn primary_key_fields(&self) -> Result<&[FieldName]> {
505        Ok(self
506            .primary_key_fields
507            .as_ref()
508            .ok_or(api_error!("Primary key fields are not set"))?
509            .as_ref())
510    }
511}
512
513impl fmt::Display for IndexOptions {
514    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
515        let primary_keys = self
516            .primary_key_fields
517            .as_ref()
518            .map(|p| p.join(","))
519            .unwrap_or_default();
520        let vector_indexes = self
521            .vector_indexes
522            .iter()
523            .map(|v| v.to_string())
524            .collect::<Vec<_>>()
525            .join(",");
526        let fts_indexes = self
527            .fts_indexes
528            .iter()
529            .map(|f| f.to_string())
530            .collect::<Vec<_>>()
531            .join(",");
532        write!(
533            f,
534            "keys={primary_keys}, vector_indexes={vector_indexes}, fts_indexes={fts_indexes}"
535        )
536    }
537}
538
539/// Store data to a given sink.
540#[derive(Debug, Clone, Serialize, Deserialize)]
541pub struct ExportOpSpec {
542    pub collector_name: FieldName,
543    pub target: OpSpec,
544
545    #[serde(default, skip_serializing_if = "Vec::is_empty")]
546    pub attachments: Vec<OpSpec>,
547
548    pub index_options: IndexOptions,
549    pub setup_by_user: bool,
550}
551
552impl SpecFormatter for ExportOpSpec {
553    fn format(&self, mode: OutputMode) -> String {
554        let target_str = self.target.format(mode);
555        let base = format!(
556            "collector={}, target={}, {}",
557            self.collector_name, target_str, self.index_options
558        );
559        match mode {
560            OutputMode::Concise => base,
561            OutputMode::Verbose => format!("{}, setup_by_user={}", base, self.setup_by_user),
562        }
563    }
564}
565
566/// A reactive operation reacts on given input values.
567#[derive(Debug, Clone, Serialize, Deserialize)]
568#[serde(tag = "action")]
569pub enum ReactiveOpSpec {
570    Transform(TransformOpSpec),
571    ForEach(ForEachOpSpec),
572    Collect(CollectOpSpec),
573}
574
575impl SpecFormatter for ReactiveOpSpec {
576    fn format(&self, mode: OutputMode) -> String {
577        match self {
578            ReactiveOpSpec::Transform(t) => format!("Transform: {}", t.format(mode)),
579            ReactiveOpSpec::ForEach(fe) => match mode {
580                OutputMode::Concise => fe.get_label().to_string(),
581                OutputMode::Verbose => format!("ForEach: {}", fe.format(mode)),
582            },
583            ReactiveOpSpec::Collect(c) => format!("Collect: {}", c.format(mode)),
584        }
585    }
586}
587
588#[derive(Debug, Clone, Serialize, Deserialize)]
589pub struct ReactiveOpScope {
590    pub name: ScopeName,
591    pub ops: Vec<NamedSpec<ReactiveOpSpec>>,
592    // TODO: Suport collectors
593}
594
595impl fmt::Display for ReactiveOpScope {
596    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
597        write!(f, "Scope: name={}", self.name)
598    }
599}
600
601/// A flow defines the rule to sync data from given sources to given sinks with given transformations.
602#[derive(Debug, Clone, Serialize, Deserialize)]
603pub struct FlowInstanceSpec {
604    /// Name of the flow instance.
605    pub name: String,
606
607    #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
608    pub import_ops: Vec<NamedSpec<ImportOpSpec>>,
609
610    #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
611    pub reactive_ops: Vec<NamedSpec<ReactiveOpSpec>>,
612
613    #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
614    pub export_ops: Vec<NamedSpec<ExportOpSpec>>,
615
616    #[serde(default = "Vec::new", skip_serializing_if = "Vec::is_empty")]
617    pub declarations: Vec<OpSpec>,
618}
619
620#[derive(Debug, Clone, Serialize, Deserialize)]
621pub struct TransientFlowSpec {
622    pub name: String,
623    pub input_fields: Vec<FieldSchema>,
624    pub reactive_ops: Vec<NamedSpec<ReactiveOpSpec>>,
625    pub output_value: ValueMapping,
626}
627
628impl<T> AuthEntryReference<T> {
629    pub fn new(key: String) -> Self {
630        Self {
631            key,
632            _phantom: std::marker::PhantomData,
633        }
634    }
635}
636pub struct AuthEntryReference<T> {
637    pub key: String,
638    _phantom: std::marker::PhantomData<T>,
639}
640
641impl<T> fmt::Debug for AuthEntryReference<T> {
642    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
643        write!(f, "AuthEntryReference({})", self.key)
644    }
645}
646
647impl<T> fmt::Display for AuthEntryReference<T> {
648    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
649        write!(f, "AuthEntryReference({})", self.key)
650    }
651}
652
653impl<T> Clone for AuthEntryReference<T> {
654    fn clone(&self) -> Self {
655        Self::new(self.key.clone())
656    }
657}
658
659#[derive(Serialize, Deserialize)]
660struct UntypedAuthEntryReference<T> {
661    key: T,
662}
663
664impl<T> Serialize for AuthEntryReference<T> {
665    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
666    where
667        S: serde::Serializer,
668    {
669        UntypedAuthEntryReference { key: &self.key }.serialize(serializer)
670    }
671}
672
673impl<'de, T> Deserialize<'de> for AuthEntryReference<T> {
674    fn deserialize<D>(deserializer: D) -> std::result::Result<Self, D::Error>
675    where
676        D: serde::Deserializer<'de>,
677    {
678        let untyped_ref = UntypedAuthEntryReference::<String>::deserialize(deserializer)?;
679        Ok(AuthEntryReference::new(untyped_ref.key))
680    }
681}
682
683impl<T> PartialEq for AuthEntryReference<T> {
684    fn eq(&self, other: &Self) -> bool {
685        self.key == other.key
686    }
687}
688
689impl<T> Eq for AuthEntryReference<T> {}
690
691impl<T> std::hash::Hash for AuthEntryReference<T> {
692    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
693        self.key.hash(state);
694    }
695}