1use crate::prelude::*;
14
15use std::time::SystemTime;
16
17use crate::base::{schema::*, spec::IndexOptions, value::*};
18use crate::setup;
19use chrono::TimeZone;
20use serde::Serialize;
21
22pub struct FlowInstanceContext {
23 pub flow_instance_name: String,
24 pub auth_registry: Arc<AuthRegistry>,
25}
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
28pub struct Ordinal(pub Option<i64>);
29
30impl Ordinal {
31 pub fn unavailable() -> Self {
32 Self(None)
33 }
34
35 pub fn is_available(&self) -> bool {
36 self.0.is_some()
37 }
38}
39
40impl From<Ordinal> for Option<i64> {
41 fn from(val: Ordinal) -> Self {
42 val.0
43 }
44}
45
46impl TryFrom<SystemTime> for Ordinal {
47 type Error = anyhow::Error;
48
49 fn try_from(time: SystemTime) -> std::result::Result<Self, Self::Error> {
50 let duration = time.duration_since(std::time::UNIX_EPOCH)?;
51 Ok(Ordinal(Some(duration.as_micros().try_into()?)))
52 }
53}
54
55impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
56 type Error = anyhow::Error;
57
58 fn try_from(time: chrono::DateTime<TZ>) -> std::result::Result<Self, Self::Error> {
59 Ok(Ordinal(Some(time.timestamp_micros())))
60 }
61}
62
63#[derive(Debug)]
64pub enum SourceValue {
65 Existence(FieldValues),
66 NonExistence,
67}
68
69#[derive(Debug, Default)]
70pub struct PartialSourceRowData {
71 pub ordinal: Option<Ordinal>,
72
73 pub content_version_fp: Option<Vec<u8>>,
80
81 pub value: Option<SourceValue>,
82}
83
84pub struct PartialSourceRow {
85 pub key: KeyValue,
86 pub key_aux_info: serde_json::Value,
90
91 pub data: PartialSourceRowData,
92}
93
94impl SourceValue {
95 pub fn is_existent(&self) -> bool {
96 matches!(self, Self::Existence(_))
97 }
98
99 pub fn as_optional(&self) -> Option<&FieldValues> {
100 match self {
101 Self::Existence(value) => Some(value),
102 Self::NonExistence => None,
103 }
104 }
105
106 pub fn into_optional(self) -> Option<FieldValues> {
107 match self {
108 Self::Existence(value) => Some(value),
109 Self::NonExistence => None,
110 }
111 }
112}
113
114pub struct SourceChange {
115 pub key: KeyValue,
116 pub key_aux_info: serde_json::Value,
119
120 pub data: PartialSourceRowData,
122}
123
124pub struct SourceChangeMessage {
125 pub changes: Vec<SourceChange>,
126 pub ack_fn: Option<Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send + Sync>>,
127}
128
129#[derive(Debug, Default, Serialize)]
130pub struct SourceExecutorReadOptions {
131 pub include_ordinal: bool,
133
134 pub include_content_version_fp: bool,
138
139 pub include_value: bool,
146}
147
148#[async_trait]
149pub trait SourceExecutor: Send + Sync {
150 async fn list(
152 &self,
153 options: &SourceExecutorReadOptions,
154 ) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>>;
155
156 async fn get_value(
158 &self,
159 key: &KeyValue,
160 key_aux_info: &serde_json::Value,
161 options: &SourceExecutorReadOptions,
162 ) -> Result<PartialSourceRowData>;
163
164 async fn change_stream(
165 &self,
166 ) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
167 Ok(None)
168 }
169
170 fn provides_ordinal(&self) -> bool;
171}
172
173#[async_trait]
174pub trait SourceFactory {
175 async fn build(
176 self: Arc<Self>,
177 source_name: &str,
178 spec: serde_json::Value,
179 context: Arc<FlowInstanceContext>,
180 ) -> Result<(
181 EnrichedValueType,
182 BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
183 )>;
184}
185
186#[async_trait]
187pub trait SimpleFunctionExecutor: Send + Sync {
188 async fn evaluate(&self, args: Vec<Value>) -> Result<Value>;
190
191 fn enable_cache(&self) -> bool {
192 false
193 }
194
195 fn timeout(&self) -> Option<std::time::Duration> {
197 None
198 }
199}
200
201pub struct SimpleFunctionBuildOutput {
202 pub output_type: EnrichedValueType,
203
204 pub behavior_version: Option<u32>,
207
208 pub executor: BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
209}
210
211#[async_trait]
212pub trait SimpleFunctionFactory {
213 async fn build(
214 self: Arc<Self>,
215 spec: serde_json::Value,
216 input_schema: Vec<OpArgSchema>,
217 context: Arc<FlowInstanceContext>,
218 ) -> Result<SimpleFunctionBuildOutput>;
219}
220
221#[derive(Debug)]
222pub struct ExportTargetUpsertEntry {
223 pub key: KeyValue,
224 pub additional_key: serde_json::Value,
225 pub value: FieldValues,
226}
227
228#[derive(Debug)]
229pub struct ExportTargetDeleteEntry {
230 pub key: KeyValue,
231 pub additional_key: serde_json::Value,
232}
233
234#[derive(Debug, Default)]
235pub struct ExportTargetMutation {
236 pub upserts: Vec<ExportTargetUpsertEntry>,
237 pub deletes: Vec<ExportTargetDeleteEntry>,
238}
239
240impl ExportTargetMutation {
241 pub fn is_empty(&self) -> bool {
242 self.upserts.is_empty() && self.deletes.is_empty()
243 }
244}
245
246#[derive(Debug)]
247pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> {
248 pub mutation: ExportTargetMutation,
249 pub export_context: &'ctx T,
250}
251
252pub struct ResourceSetupChangeItem<'a> {
253 pub key: &'a serde_json::Value,
254 pub setup_change: &'a dyn setup::ResourceSetupChange,
255}
256
257#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
258pub enum SetupStateCompatibility {
259 Compatible,
262 PartialCompatible,
266 NotCompatible,
268}
269
270pub struct ExportDataCollectionBuildOutput {
271 pub export_context: BoxFuture<'static, Result<Arc<dyn Any + Send + Sync>>>,
272 pub setup_key: serde_json::Value,
273 pub desired_setup_state: serde_json::Value,
274}
275
276pub struct ExportDataCollectionSpec {
277 pub name: String,
278 pub spec: serde_json::Value,
279 pub key_fields_schema: Box<[FieldSchema]>,
280 pub value_fields_schema: Vec<FieldSchema>,
281 pub index_options: IndexOptions,
282}
283
284#[async_trait]
285pub trait TargetFactory: Send + Sync {
286 async fn build(
287 self: Arc<Self>,
288 data_collections: Vec<ExportDataCollectionSpec>,
289 declarations: Vec<serde_json::Value>,
290 context: Arc<FlowInstanceContext>,
291 ) -> Result<(
292 Vec<ExportDataCollectionBuildOutput>,
293 Vec<(serde_json::Value, serde_json::Value)>,
294 )>;
295
296 async fn diff_setup_states(
299 &self,
300 key: &serde_json::Value,
301 desired_state: Option<serde_json::Value>,
302 existing_states: setup::CombinedState<serde_json::Value>,
303 context: Arc<interface::FlowInstanceContext>,
304 ) -> Result<Box<dyn setup::ResourceSetupChange>>;
305
306 fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
309
310 fn check_state_compatibility(
311 &self,
312 desired_state: &serde_json::Value,
313 existing_state: &serde_json::Value,
314 ) -> Result<SetupStateCompatibility>;
315
316 fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;
317
318 fn extract_additional_key(
319 &self,
320 key: &KeyValue,
321 value: &FieldValues,
322 export_context: &(dyn Any + Send + Sync),
323 ) -> Result<serde_json::Value>;
324
325 async fn apply_mutation(
326 &self,
327 mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
328 ) -> Result<()>;
329
330 async fn apply_setup_changes(
331 &self,
332 setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
333 context: Arc<FlowInstanceContext>,
334 ) -> Result<()>;
335}
336
337pub struct TargetAttachmentState {
338 pub setup_key: serde_json::Value,
339 pub setup_state: serde_json::Value,
340}
341
342#[async_trait]
343pub trait AttachmentSetupChange {
344 fn describe_changes(&self) -> Vec<String>;
345
346 async fn apply_change(&self) -> Result<()>;
347}
348
349#[async_trait]
350pub trait TargetAttachmentFactory: Send + Sync {
351 fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
354
355 fn get_state(
356 &self,
357 target_name: &str,
358 target_spec: &serde_json::Map<String, serde_json::Value>,
359 attachment_spec: serde_json::Value,
360 ) -> Result<TargetAttachmentState>;
361
362 async fn diff_setup_states(
364 &self,
365 target_key: &serde_json::Value,
366 attachment_key: &serde_json::Value,
367 new_state: Option<serde_json::Value>,
368 existing_states: setup::CombinedState<serde_json::Value>,
369 context: &interface::FlowInstanceContext,
370 ) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>>;
371}
372
373#[derive(Clone)]
374pub enum ExecutorFactory {
375 Source(Arc<dyn SourceFactory + Send + Sync>),
376 SimpleFunction(Arc<dyn SimpleFunctionFactory + Send + Sync>),
377 ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
378 TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
379}
380
381#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
382pub struct AttachmentSetupKey(pub String, pub serde_json::Value);
383
384impl std::fmt::Display for AttachmentSetupKey {
385 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
386 write!(f, "{}:{}", self.0, self.1)
387 }
388}