integrationos-domain 8.0.0

Shared library for IntegrationOS
Documentation
use super::{pipeline_context::PipelineContext, Transaction};
use crate::{
    id::Id,
    prelude::{PipelineExt, PipelineStatus},
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Display, sync::Arc};

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RootContext {
    pub event_key: Id,
    pub status: PipelineStatus,
    pub stage: RootStage,
    #[serde(with = "chrono::serde::ts_milliseconds")]
    pub timestamp: DateTime<Utc>,
    r#type: Arc<str>,

    #[serde(flatten)]
    pub transaction: Option<Transaction>,
}

impl RootContext {
    pub fn new(event_key: Id) -> Self {
        Self {
            event_key,
            status: PipelineStatus::Succeeded,
            stage: RootStage::New,
            timestamp: Utc::now(),
            r#type: "root".into(),
            transaction: None,
        }
    }

    pub fn is_dropped(&self) -> bool {
        matches!(self.status, PipelineStatus::Dropped { .. })
    }

    pub fn is_finished(&self) -> bool {
        matches!(self.stage, RootStage::Finished)
    }
}

#[async_trait]
impl PipelineExt for RootContext {
    fn is_complete(&self) -> bool {
        self.is_dropped() || self.is_finished()
    }

    fn context_key(&self) -> &Id {
        &self.event_key
    }
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RootStage {
    New,
    Verified,
    ProcessedDuplicates,
    ProcessingPipelines(HashMap<String, PipelineContext>),
    Finished,
}

impl Display for RootStage {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        match self {
            Self::New => write!(f, "New"),
            Self::Verified => write!(f, "Verified"),
            Self::ProcessedDuplicates => write!(f, "ProcessingDuplicates"),
            Self::ProcessingPipelines(p) => {
                write!(f, "ProcessingPipelines(")?;
                for (s, p) in p.iter() {
                    write!(f, "{{{s} => {}: {}}}", p.stage, p.status)?;
                }
                write!(f, ")")
            }
            Self::Finished => write!(f, "Finished"),
        }
    }
}