Skip to main content

recoco_core/ops/
interface.rs

1// ReCoco is a Rust-only fork of CocoIndex, by [CocoIndex](https://CocoIndex)
2// Original code from CocoIndex is copyrighted by CocoIndex
3// SPDX-FileCopyrightText: 2025-2026 CocoIndex (upstream)
4// SPDX-FileContributor: CocoIndex Contributors
5//
6// All modifications from the upstream for ReCoco are copyrighted by Knitli Inc.
7// SPDX-FileCopyrightText: 2026 Knitli Inc. (ReCoco)
8// SPDX-FileContributor: Adam Poulemanos <adam@knit.li>
9//
10// Both the upstream CocoIndex code and the ReCoco modifications are licensed under the Apache-2.0 License.
11// SPDX-License-Identifier: Apache-2.0
12
13use crate::prelude::*;
14
15use std::time::SystemTime;
16
17use crate::base::{schema::*, spec::IndexOptions, value::*};
18use crate::setup;
19use chrono::TimeZone;
20use serde::Serialize;
21
22pub struct FlowInstanceContext {
23    pub flow_instance_name: String,
24    pub auth_registry: Arc<AuthRegistry>,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
28pub struct Ordinal(pub Option<i64>);
29
30impl Ordinal {
31    pub fn unavailable() -> Self {
32        Self(None)
33    }
34
35    pub fn is_available(&self) -> bool {
36        self.0.is_some()
37    }
38}
39
40impl From<Ordinal> for Option<i64> {
41    fn from(val: Ordinal) -> Self {
42        val.0
43    }
44}
45
46impl TryFrom<SystemTime> for Ordinal {
47    type Error = anyhow::Error;
48
49    fn try_from(time: SystemTime) -> std::result::Result<Self, Self::Error> {
50        let duration = time.duration_since(std::time::UNIX_EPOCH)?;
51        Ok(Ordinal(Some(duration.as_micros().try_into()?)))
52    }
53}
54
55impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
56    type Error = anyhow::Error;
57
58    fn try_from(time: chrono::DateTime<TZ>) -> std::result::Result<Self, Self::Error> {
59        Ok(Ordinal(Some(time.timestamp_micros())))
60    }
61}
62
63#[derive(Debug)]
64pub enum SourceValue {
65    Existence(FieldValues),
66    NonExistence,
67}
68
69#[derive(Debug, Default)]
70pub struct PartialSourceRowData {
71    pub ordinal: Option<Ordinal>,
72
73    /// A content version fingerprint can be anything that changes when the content of the row changes.
74    /// Note that it's acceptable if sometimes the fingerprint differs even though the content is the same,
75    /// which will lead to less optimization opportunities but won't break correctness.
76    ///
77    /// It's optional. The source shouldn't use generic way to compute it, e.g. computing a hash of the content.
78    /// The framework will do so. If there's no fast way to get it from the source, leave it as `None`.
79    pub content_version_fp: Option<Vec<u8>>,
80
81    pub value: Option<SourceValue>,
82}
83
84pub struct PartialSourceRow {
85    pub key: KeyValue,
86    /// Auxiliary information for the source row, to be used when reading the content.
87    /// e.g. it can be used to uniquely identify version of the row.
88    /// Use serde_json::Value::Null to represent no auxiliary information.
89    pub key_aux_info: serde_json::Value,
90
91    pub data: PartialSourceRowData,
92}
93
94impl SourceValue {
95    pub fn is_existent(&self) -> bool {
96        matches!(self, Self::Existence(_))
97    }
98
99    pub fn as_optional(&self) -> Option<&FieldValues> {
100        match self {
101            Self::Existence(value) => Some(value),
102            Self::NonExistence => None,
103        }
104    }
105
106    pub fn into_optional(self) -> Option<FieldValues> {
107        match self {
108            Self::Existence(value) => Some(value),
109            Self::NonExistence => None,
110        }
111    }
112}
113
114pub struct SourceChange {
115    pub key: KeyValue,
116    /// Auxiliary information for the source row, to be used when reading the content.
117    /// e.g. it can be used to uniquely identify version of the row.
118    pub key_aux_info: serde_json::Value,
119
120    /// If None, the engine will poll to get the latest existence state and value.
121    pub data: PartialSourceRowData,
122}
123
124pub struct SourceChangeMessage {
125    pub changes: Vec<SourceChange>,
126    pub ack_fn: Option<Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send + Sync>>,
127}
128
129#[derive(Debug, Default, Serialize)]
130pub struct SourceExecutorReadOptions {
131    /// When set to true, the implementation must return a non-None `ordinal`.
132    pub include_ordinal: bool,
133
134    /// When set to true, the implementation has the discretion to decide whether or not to return a non-None `content_version_fp`.
135    /// The guideline is to return it only if it's very efficient to get it.
136    /// If it's returned in `list()`, it must be returned in `get_value()`.
137    pub include_content_version_fp: bool,
138
139    /// For get calls, when set to true, the implementation must return a non-None `value`.
140    ///
141    /// For list calls, when set to true, the implementation has the discretion to decide whether or not to include it.
142    /// The guideline is to only include it if a single "list() with content" call is significantly more efficient than "list() without content + series of get_value()" calls.
143    ///
144    /// Even if `list()` already returns `value` when it's true, `get_value()` must still return `value` when it's true.
145    pub include_value: bool,
146}
147
148#[async_trait]
149pub trait SourceExecutor: Send + Sync {
150    /// Get the list of keys for the source.
151    async fn list(
152        &self,
153        options: &SourceExecutorReadOptions,
154    ) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>>;
155
156    // Get the value for the given key.
157    async fn get_value(
158        &self,
159        key: &KeyValue,
160        key_aux_info: &serde_json::Value,
161        options: &SourceExecutorReadOptions,
162    ) -> Result<PartialSourceRowData>;
163
164    async fn change_stream(
165        &self,
166    ) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
167        Ok(None)
168    }
169
170    fn provides_ordinal(&self) -> bool;
171}
172
173#[async_trait]
174pub trait SourceFactory {
175    async fn build(
176        self: Arc<Self>,
177        source_name: &str,
178        spec: serde_json::Value,
179        context: Arc<FlowInstanceContext>,
180    ) -> Result<(
181        EnrichedValueType,
182        BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
183    )>;
184}
185
186#[async_trait]
187pub trait SimpleFunctionExecutor: Send + Sync {
188    /// Evaluate the operation.
189    async fn evaluate(&self, args: Vec<Value>) -> Result<Value>;
190
191    fn enable_cache(&self) -> bool {
192        false
193    }
194
195    /// Returns None to use the default timeout (1800s)
196    fn timeout(&self) -> Option<std::time::Duration> {
197        None
198    }
199}
200
201pub struct SimpleFunctionBuildOutput {
202    pub output_type: EnrichedValueType,
203
204    /// Must be Some if `enable_cache` is true.
205    /// If it changes, the cache will be invalidated.
206    pub behavior_version: Option<u32>,
207
208    pub executor: BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
209}
210
211#[async_trait]
212pub trait SimpleFunctionFactory {
213    async fn build(
214        self: Arc<Self>,
215        spec: serde_json::Value,
216        input_schema: Vec<OpArgSchema>,
217        context: Arc<FlowInstanceContext>,
218    ) -> Result<SimpleFunctionBuildOutput>;
219}
220
221#[derive(Debug)]
222pub struct ExportTargetUpsertEntry {
223    pub key: KeyValue,
224    pub additional_key: serde_json::Value,
225    pub value: FieldValues,
226}
227
228#[derive(Debug)]
229pub struct ExportTargetDeleteEntry {
230    pub key: KeyValue,
231    pub additional_key: serde_json::Value,
232}
233
234#[derive(Debug, Default)]
235pub struct ExportTargetMutation {
236    pub upserts: Vec<ExportTargetUpsertEntry>,
237    pub deletes: Vec<ExportTargetDeleteEntry>,
238}
239
240impl ExportTargetMutation {
241    pub fn is_empty(&self) -> bool {
242        self.upserts.is_empty() && self.deletes.is_empty()
243    }
244}
245
246#[derive(Debug)]
247pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> {
248    pub mutation: ExportTargetMutation,
249    pub export_context: &'ctx T,
250}
251
252pub struct ResourceSetupChangeItem<'a> {
253    pub key: &'a serde_json::Value,
254    pub setup_change: &'a dyn setup::ResourceSetupChange,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
258pub enum SetupStateCompatibility {
259    /// The resource is fully compatible with the desired state.
260    /// This means the resource can be updated to the desired state without any loss of data.
261    Compatible,
262    /// The resource is partially compatible with the desired state.
263    /// This means data from some existing fields will be lost after applying the setup change.
264    /// But at least their key fields of all rows are still preserved.
265    PartialCompatible,
266    /// The resource needs to be rebuilt. After applying the setup change, all data will be gone.
267    NotCompatible,
268}
269
270pub struct ExportDataCollectionBuildOutput {
271    pub export_context: BoxFuture<'static, Result<Arc<dyn Any + Send + Sync>>>,
272    pub setup_key: serde_json::Value,
273    pub desired_setup_state: serde_json::Value,
274}
275
276pub struct ExportDataCollectionSpec {
277    pub name: String,
278    pub spec: serde_json::Value,
279    pub key_fields_schema: Box<[FieldSchema]>,
280    pub value_fields_schema: Vec<FieldSchema>,
281    pub index_options: IndexOptions,
282}
283
284#[async_trait]
285pub trait TargetFactory: Send + Sync {
286    async fn build(
287        self: Arc<Self>,
288        data_collections: Vec<ExportDataCollectionSpec>,
289        declarations: Vec<serde_json::Value>,
290        context: Arc<FlowInstanceContext>,
291    ) -> Result<(
292        Vec<ExportDataCollectionBuildOutput>,
293        Vec<(serde_json::Value, serde_json::Value)>,
294    )>;
295
296    /// Will not be called if it's setup by user.
297    /// It returns an error if the target only supports setup by user.
298    async fn diff_setup_states(
299        &self,
300        key: &serde_json::Value,
301        desired_state: Option<serde_json::Value>,
302        existing_states: setup::CombinedState<serde_json::Value>,
303        context: Arc<interface::FlowInstanceContext>,
304    ) -> Result<Box<dyn setup::ResourceSetupChange>>;
305
306    /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
307    /// This should always return the canonical serialized form.
308    fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
309
310    fn check_state_compatibility(
311        &self,
312        desired_state: &serde_json::Value,
313        existing_state: &serde_json::Value,
314    ) -> Result<SetupStateCompatibility>;
315
316    fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;
317
318    fn extract_additional_key(
319        &self,
320        key: &KeyValue,
321        value: &FieldValues,
322        export_context: &(dyn Any + Send + Sync),
323    ) -> Result<serde_json::Value>;
324
325    async fn apply_mutation(
326        &self,
327        mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
328    ) -> Result<()>;
329
330    async fn apply_setup_changes(
331        &self,
332        setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
333        context: Arc<FlowInstanceContext>,
334    ) -> Result<()>;
335}
336
337pub struct TargetAttachmentState {
338    pub setup_key: serde_json::Value,
339    pub setup_state: serde_json::Value,
340}
341
342#[async_trait]
343pub trait AttachmentSetupChange {
344    fn describe_changes(&self) -> Vec<String>;
345
346    async fn apply_change(&self) -> Result<()>;
347}
348
349#[async_trait]
350pub trait TargetAttachmentFactory: Send + Sync {
351    /// Normalize the key. e.g. the JSON format may change (after code change, e.g. new optional field or field ordering), even if the underlying value is not changed.
352    /// This should always return the canonical serialized form.
353    fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
354
355    fn get_state(
356        &self,
357        target_name: &str,
358        target_spec: &serde_json::Map<String, serde_json::Value>,
359        attachment_spec: serde_json::Value,
360    ) -> Result<TargetAttachmentState>;
361
362    /// Should return Some if and only if any changes are needed.
363    async fn diff_setup_states(
364        &self,
365        target_key: &serde_json::Value,
366        attachment_key: &serde_json::Value,
367        new_state: Option<serde_json::Value>,
368        existing_states: setup::CombinedState<serde_json::Value>,
369        context: &interface::FlowInstanceContext,
370    ) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>>;
371}
372
373#[derive(Clone)]
374pub enum ExecutorFactory {
375    Source(Arc<dyn SourceFactory + Send + Sync>),
376    SimpleFunction(Arc<dyn SimpleFunctionFactory + Send + Sync>),
377    ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
378    TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
382pub struct AttachmentSetupKey(pub String, pub serde_json::Value);
383
384impl std::fmt::Display for AttachmentSetupKey {
385    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386        write!(f, "{}:{}", self.0, self.1)
387    }
388}