use crate::prelude::*;
use std::time::SystemTime;
use crate::base::{schema::*, spec::IndexOptions, value::*};
use crate::setup;
use chrono::TimeZone;
use serde::Serialize;
pub struct FlowInstanceContext {
pub flow_instance_name: String,
pub auth_registry: Arc<AuthRegistry>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub struct Ordinal(pub Option<i64>);
impl Ordinal {
pub fn unavailable() -> Self {
Self(None)
}
pub fn is_available(&self) -> bool {
self.0.is_some()
}
}
impl From<Ordinal> for Option<i64> {
fn from(val: Ordinal) -> Self {
val.0
}
}
impl TryFrom<SystemTime> for Ordinal {
type Error = anyhow::Error;
fn try_from(time: SystemTime) -> std::result::Result<Self, Self::Error> {
let duration = time.duration_since(std::time::UNIX_EPOCH)?;
Ok(Ordinal(Some(duration.as_micros().try_into()?)))
}
}
impl<TZ: TimeZone> TryFrom<chrono::DateTime<TZ>> for Ordinal {
type Error = anyhow::Error;
fn try_from(time: chrono::DateTime<TZ>) -> std::result::Result<Self, Self::Error> {
Ok(Ordinal(Some(time.timestamp_micros())))
}
}
#[derive(Debug)]
pub enum SourceValue {
Existence(FieldValues),
NonExistence,
}
#[derive(Debug, Default)]
pub struct PartialSourceRowData {
pub ordinal: Option<Ordinal>,
pub content_version_fp: Option<Vec<u8>>,
pub value: Option<SourceValue>,
}
pub struct PartialSourceRow {
pub key: KeyValue,
pub key_aux_info: serde_json::Value,
pub data: PartialSourceRowData,
}
impl SourceValue {
pub fn is_existent(&self) -> bool {
matches!(self, Self::Existence(_))
}
pub fn as_optional(&self) -> Option<&FieldValues> {
match self {
Self::Existence(value) => Some(value),
Self::NonExistence => None,
}
}
pub fn into_optional(self) -> Option<FieldValues> {
match self {
Self::Existence(value) => Some(value),
Self::NonExistence => None,
}
}
}
pub struct SourceChange {
pub key: KeyValue,
pub key_aux_info: serde_json::Value,
pub data: PartialSourceRowData,
}
pub struct SourceChangeMessage {
pub changes: Vec<SourceChange>,
pub ack_fn: Option<Box<dyn FnOnce() -> BoxFuture<'static, Result<()>> + Send + Sync>>,
}
#[derive(Debug, Default, Serialize)]
pub struct SourceExecutorReadOptions {
pub include_ordinal: bool,
pub include_content_version_fp: bool,
pub include_value: bool,
}
#[async_trait]
pub trait SourceExecutor: Send + Sync {
async fn list(
&self,
options: &SourceExecutorReadOptions,
) -> Result<BoxStream<'async_trait, Result<Vec<PartialSourceRow>>>>;
async fn get_value(
&self,
key: &KeyValue,
key_aux_info: &serde_json::Value,
options: &SourceExecutorReadOptions,
) -> Result<PartialSourceRowData>;
async fn change_stream(
&self,
) -> Result<Option<BoxStream<'async_trait, Result<SourceChangeMessage>>>> {
Ok(None)
}
fn provides_ordinal(&self) -> bool;
}
#[async_trait]
pub trait SourceFactory {
async fn build(
self: Arc<Self>,
source_name: &str,
spec: serde_json::Value,
context: Arc<FlowInstanceContext>,
) -> Result<(
EnrichedValueType,
BoxFuture<'static, Result<Box<dyn SourceExecutor>>>,
)>;
}
#[async_trait]
pub trait SimpleFunctionExecutor: Send + Sync {
async fn evaluate(&self, args: Vec<Value>) -> Result<Value>;
fn enable_cache(&self) -> bool {
false
}
fn timeout(&self) -> Option<std::time::Duration> {
None
}
}
pub struct SimpleFunctionBuildOutput {
pub output_type: EnrichedValueType,
pub behavior_version: Option<u32>,
pub executor: BoxFuture<'static, Result<Box<dyn SimpleFunctionExecutor>>>,
}
#[async_trait]
pub trait SimpleFunctionFactory {
async fn build(
self: Arc<Self>,
spec: serde_json::Value,
input_schema: Vec<OpArgSchema>,
context: Arc<FlowInstanceContext>,
) -> Result<SimpleFunctionBuildOutput>;
}
#[derive(Debug)]
pub struct ExportTargetUpsertEntry {
pub key: KeyValue,
pub additional_key: serde_json::Value,
pub value: FieldValues,
}
#[derive(Debug)]
pub struct ExportTargetDeleteEntry {
pub key: KeyValue,
pub additional_key: serde_json::Value,
}
#[derive(Debug, Default)]
pub struct ExportTargetMutation {
pub upserts: Vec<ExportTargetUpsertEntry>,
pub deletes: Vec<ExportTargetDeleteEntry>,
}
impl ExportTargetMutation {
pub fn is_empty(&self) -> bool {
self.upserts.is_empty() && self.deletes.is_empty()
}
}
#[derive(Debug)]
pub struct ExportTargetMutationWithContext<'ctx, T: ?Sized + Send + Sync> {
pub mutation: ExportTargetMutation,
pub export_context: &'ctx T,
}
pub struct ResourceSetupChangeItem<'a> {
pub key: &'a serde_json::Value,
pub setup_change: &'a dyn setup::ResourceSetupChange,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Deserialize)]
pub enum SetupStateCompatibility {
Compatible,
PartialCompatible,
NotCompatible,
}
pub struct ExportDataCollectionBuildOutput {
pub export_context: BoxFuture<'static, Result<Arc<dyn Any + Send + Sync>>>,
pub setup_key: serde_json::Value,
pub desired_setup_state: serde_json::Value,
}
pub struct ExportDataCollectionSpec {
pub name: String,
pub spec: serde_json::Value,
pub key_fields_schema: Box<[FieldSchema]>,
pub value_fields_schema: Vec<FieldSchema>,
pub index_options: IndexOptions,
}
#[async_trait]
pub trait TargetFactory: Send + Sync {
async fn build(
self: Arc<Self>,
data_collections: Vec<ExportDataCollectionSpec>,
declarations: Vec<serde_json::Value>,
context: Arc<FlowInstanceContext>,
) -> Result<(
Vec<ExportDataCollectionBuildOutput>,
Vec<(serde_json::Value, serde_json::Value)>,
)>;
async fn diff_setup_states(
&self,
key: &serde_json::Value,
desired_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
context: Arc<interface::FlowInstanceContext>,
) -> Result<Box<dyn setup::ResourceSetupChange>>;
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
fn check_state_compatibility(
&self,
desired_state: &serde_json::Value,
existing_state: &serde_json::Value,
) -> Result<SetupStateCompatibility>;
fn describe_resource(&self, key: &serde_json::Value) -> Result<String>;
fn extract_additional_key(
&self,
key: &KeyValue,
value: &FieldValues,
export_context: &(dyn Any + Send + Sync),
) -> Result<serde_json::Value>;
async fn apply_mutation(
&self,
mutations: Vec<ExportTargetMutationWithContext<'async_trait, dyn Any + Send + Sync>>,
) -> Result<()>;
async fn apply_setup_changes(
&self,
setup_change: Vec<ResourceSetupChangeItem<'async_trait>>,
context: Arc<FlowInstanceContext>,
) -> Result<()>;
}
pub struct TargetAttachmentState {
pub setup_key: serde_json::Value,
pub setup_state: serde_json::Value,
}
#[async_trait]
pub trait AttachmentSetupChange {
fn describe_changes(&self) -> Vec<String>;
async fn apply_change(&self) -> Result<()>;
}
#[async_trait]
pub trait TargetAttachmentFactory: Send + Sync {
fn normalize_setup_key(&self, key: &serde_json::Value) -> Result<serde_json::Value>;
fn get_state(
&self,
target_name: &str,
target_spec: &serde_json::Map<String, serde_json::Value>,
attachment_spec: serde_json::Value,
) -> Result<TargetAttachmentState>;
async fn diff_setup_states(
&self,
target_key: &serde_json::Value,
attachment_key: &serde_json::Value,
new_state: Option<serde_json::Value>,
existing_states: setup::CombinedState<serde_json::Value>,
context: &interface::FlowInstanceContext,
) -> Result<Option<Box<dyn AttachmentSetupChange + Send + Sync>>>;
}
#[derive(Clone)]
pub enum ExecutorFactory {
Source(Arc<dyn SourceFactory + Send + Sync>),
SimpleFunction(Arc<dyn SimpleFunctionFactory + Send + Sync>),
ExportTarget(Arc<dyn TargetFactory + Send + Sync>),
TargetAttachment(Arc<dyn TargetAttachmentFactory + Send + Sync>),
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct AttachmentSetupKey(pub String, pub serde_json::Value);
impl std::fmt::Display for AttachmentSetupKey {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}:{}", self.0, self.1)
}
}