integrationos_domain/domain/context/
root_context.rs1use super::{pipeline_context::PipelineContext, Transaction};
2use crate::{
3 id::Id,
4 prelude::{PipelineExt, PipelineStatus},
5};
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use std::{collections::HashMap, fmt::Display, sync::Arc};
10
11#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
12#[serde(rename_all = "camelCase")]
13pub struct RootContext {
14 pub event_key: Id,
15 pub status: PipelineStatus,
16 pub stage: RootStage,
17 #[serde(with = "chrono::serde::ts_milliseconds")]
18 pub timestamp: DateTime<Utc>,
19 r#type: Arc<str>,
20
21 #[serde(flatten)]
22 pub transaction: Option<Transaction>,
23}
24
25impl RootContext {
26 pub fn new(event_key: Id) -> Self {
27 Self {
28 event_key,
29 status: PipelineStatus::Succeeded,
30 stage: RootStage::New,
31 timestamp: Utc::now(),
32 r#type: "root".into(),
33 transaction: None,
34 }
35 }
36
37 pub fn is_dropped(&self) -> bool {
38 matches!(self.status, PipelineStatus::Dropped { .. })
39 }
40
41 pub fn is_finished(&self) -> bool {
42 matches!(self.stage, RootStage::Finished)
43 }
44}
45
46#[async_trait]
47impl PipelineExt for RootContext {
48 fn is_complete(&self) -> bool {
49 self.is_dropped() || self.is_finished()
50 }
51
52 fn context_key(&self) -> &Id {
53 &self.event_key
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
58pub enum RootStage {
59 New,
60 Verified,
61 ProcessedDuplicates,
62 ProcessingPipelines(HashMap<String, PipelineContext>),
63 Finished,
64}
65
66impl Display for RootStage {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 Self::New => write!(f, "New"),
70 Self::Verified => write!(f, "Verified"),
71 Self::ProcessedDuplicates => write!(f, "ProcessingDuplicates"),
72 Self::ProcessingPipelines(p) => {
73 write!(f, "ProcessingPipelines(")?;
74 for (s, p) in p.iter() {
75 write!(f, "{{{s} => {}: {}}}", p.stage, p.status)?;
76 }
77 write!(f, ")")
78 }
79 Self::Finished => write!(f, "Finished"),
80 }
81 }
82}