use crate::event::payload::{SupervisorEvent, What};
use crate::event::time::{CorrelationId, EventSequence};
use crate::id::types::ChildId;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct SequenceAlreadyRegistered {
pub sequence: EventSequence,
}
impl std::fmt::Display for SequenceAlreadyRegistered {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "sequence {} already registered", self.sequence.value)
}
}
impl std::error::Error for SequenceAlreadyRegistered {}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum CorrelationQueryError {
CorrelationNotFound {
correlation_id: CorrelationId,
},
CorrelationTruncated {
correlation_id: CorrelationId,
total_events: u64,
max_events: u64,
},
CorrelationGapDetected {
correlation_id: CorrelationId,
missing_stages: Vec<String>,
present_stages: Vec<String>,
},
CorrelationConflict {
correlation_id: CorrelationId,
conflicting_child_ids: Vec<ChildId>,
},
}
impl std::fmt::Display for CorrelationQueryError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::CorrelationNotFound { correlation_id } => {
write!(f, "correlation {} not found", correlation_id.value)
}
Self::CorrelationTruncated {
correlation_id,
total_events,
max_events,
} => {
write!(
f,
"correlation {} truncated: {} events (max {})",
correlation_id.value, total_events, max_events
)
}
Self::CorrelationGapDetected {
correlation_id,
missing_stages,
present_stages,
} => {
write!(
f,
"correlation {} gap detected: missing {:?}, present {:?}",
correlation_id.value, missing_stages, present_stages
)
}
Self::CorrelationConflict {
correlation_id,
conflicting_child_ids,
} => {
write!(
f,
"correlation {} conflict: child_ids {:?}",
correlation_id.value, conflicting_child_ids
)
}
}
}
}
impl std::error::Error for CorrelationQueryError {}
const STAGES: &[&str] = &[
"spawn",
"ready",
"failure_decision",
"restart_attempt",
"shutdown",
];
fn what_to_stage(what: &What) -> Option<&'static str> {
match what {
What::ChildStarting { .. } => Some("spawn"),
What::ChildReady { .. } | What::HealthCheckPassed { .. } => Some("ready"),
What::ChildFailed { .. } | What::ChildPanicked { .. } | What::BudgetDenied { .. } => {
Some("failure_decision")
}
What::ChildRestarting { .. } | What::BackoffScheduled { .. } => Some("restart_attempt"),
What::ChildStopped { .. }
| What::ShutdownRequested { .. }
| What::ShutdownCompleted { .. } => Some("shutdown"),
_ => None,
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CorrelationHandle {
pub correlation_id: CorrelationId,
pub child_id: Option<ChildId>,
events: Vec<SupervisorEvent>,
sequences: BTreeSet<u64>,
}
impl CorrelationHandle {
pub fn new(correlation_id: CorrelationId, child_id: Option<ChildId>) -> Self {
Self {
correlation_id,
child_id,
events: Vec::new(),
sequences: BTreeSet::new(),
}
}
pub fn link_event(&mut self, event: SupervisorEvent) -> Result<(), SequenceAlreadyRegistered> {
if !self.sequences.insert(event.sequence.value) {
return Err(SequenceAlreadyRegistered {
sequence: event.sequence,
});
}
self.events.push(event);
Ok(())
}
pub fn export_chain(
&self,
from_stage: Option<&str>,
) -> Result<Vec<SupervisorEvent>, CorrelationQueryError> {
if self.events.is_empty() {
return Err(CorrelationQueryError::CorrelationNotFound {
correlation_id: self.correlation_id,
});
}
let mut sorted: Vec<SupervisorEvent> = self.events.clone();
sorted.sort_by(|a, b| {
a.when
.time
.monotonic_nanos
.cmp(&b.when.time.monotonic_nanos)
.then_with(|| a.when.time.unix_nanos.cmp(&b.when.time.unix_nanos))
});
let present_stages_all: Vec<String> = {
let mut stages: Vec<String> = sorted
.iter()
.filter_map(|e| what_to_stage(&e.what))
.map(|s| s.to_string())
.collect();
stages.sort();
stages.dedup();
stages
};
let present_set: std::collections::HashSet<&str> =
present_stages_all.iter().map(|s| s.as_str()).collect();
let missing: Vec<String> = STAGES
.iter()
.filter(|s| !present_set.contains(**s))
.map(|s| s.to_string())
.collect();
if !missing.is_empty() {
return Err(CorrelationQueryError::CorrelationGapDetected {
correlation_id: self.correlation_id,
missing_stages: missing,
present_stages: present_stages_all,
});
}
let filtered: Vec<SupervisorEvent> = if let Some(stage) = from_stage {
sorted
.into_iter()
.filter(|e| what_to_stage(&e.what) == Some(stage))
.collect()
} else {
sorted
};
Ok(filtered)
}
pub fn len(&self) -> usize {
self.events.len()
}
pub fn is_empty(&self) -> bool {
self.events.is_empty()
}
}