use super::{pipeline_context::PipelineContext, Transaction};
use crate::{
id::Id,
prelude::{PipelineExt, PipelineStatus},
};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Display, sync::Arc};
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RootContext {
pub event_key: Id,
pub status: PipelineStatus,
pub stage: RootStage,
#[serde(with = "chrono::serde::ts_milliseconds")]
pub timestamp: DateTime<Utc>,
r#type: Arc<str>,
#[serde(flatten)]
pub transaction: Option<Transaction>,
}
impl RootContext {
pub fn new(event_key: Id) -> Self {
Self {
event_key,
status: PipelineStatus::Succeeded,
stage: RootStage::New,
timestamp: Utc::now(),
r#type: "root".into(),
transaction: None,
}
}
pub fn is_dropped(&self) -> bool {
matches!(self.status, PipelineStatus::Dropped { .. })
}
pub fn is_finished(&self) -> bool {
matches!(self.stage, RootStage::Finished)
}
}
#[async_trait]
impl PipelineExt for RootContext {
fn is_complete(&self) -> bool {
self.is_dropped() || self.is_finished()
}
fn context_key(&self) -> &Id {
&self.event_key
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum RootStage {
New,
Verified,
ProcessedDuplicates,
ProcessingPipelines(HashMap<String, PipelineContext>),
Finished,
}
impl Display for RootStage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::New => write!(f, "New"),
Self::Verified => write!(f, "Verified"),
Self::ProcessedDuplicates => write!(f, "ProcessingDuplicates"),
Self::ProcessingPipelines(p) => {
write!(f, "ProcessingPipelines(")?;
for (s, p) in p.iter() {
write!(f, "{{{s} => {}: {}}}", p.stage, p.status)?;
}
write!(f, ")")
}
Self::Finished => write!(f, "Finished"),
}
}
}