integrationos_domain/domain/context/
root_context.rs

1use super::{pipeline_context::PipelineContext, Transaction};
2use crate::{
3    id::Id,
4    prelude::{PipelineExt, PipelineStatus},
5};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, sync::Arc};
10
11#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "camelCase")]
13pub struct RootContext {
14    pub event_key: Id,
15    pub status: PipelineStatus,
16    pub stage: RootStage,
17    #[serde(with = "chrono::serde::ts_milliseconds")]
18    pub timestamp: DateTime<Utc>,
19    r#type: Arc<str>,
20
21    #[serde(flatten)]
22    pub transaction: Option<Transaction>,
23}
24
25impl RootContext {
26    pub fn new(event_key: Id) -> Self {
27        Self {
28            event_key,
29            status: PipelineStatus::Succeeded,
30            stage: RootStage::New,
31            timestamp: Utc::now(),
32            r#type: "root".into(),
33            transaction: None,
34        }
35    }
36
37    pub fn is_dropped(&self) -> bool {
38        matches!(self.status, PipelineStatus::Dropped { .. })
39    }
40
41    pub fn is_finished(&self) -> bool {
42        matches!(self.stage, RootStage::Finished)
43    }
44}
45
46#[async_trait]
47impl PipelineExt for RootContext {
48    fn is_complete(&self) -> bool {
49        self.is_dropped() || self.is_finished()
50    }
51
52    fn context_key(&self) -> &Id {
53        &self.event_key
54    }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub enum RootStage {
59    New,
60    Verified,
61    ProcessedDuplicates,
62    ProcessingPipelines(HashMap<String, PipelineContext>),
63    Finished,
64}
65
66impl Display for RootStage {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            Self::New => write!(f, "New"),
70            Self::Verified => write!(f, "Verified"),
71            Self::ProcessedDuplicates => write!(f, "ProcessingDuplicates"),
72            Self::ProcessingPipelines(p) => {
73                write!(f, "ProcessingPipelines(")?;
74                for (s, p) in p.iter() {
75                    write!(f, "{{{s} => {}: {}}}", p.stage, p.status)?;
76                }
77                write!(f, ")")
78            }
79            Self::Finished => write!(f, "Finished"),
80        }
81    }
82}