1use crate::event::payload::{SupervisorEvent, What};
8use crate::event::time::{CorrelationId, EventSequence};
9use crate::id::types::ChildId;
10use serde::{Deserialize, Serialize};
11use std::collections::BTreeSet;
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct SequenceAlreadyRegistered {
16 pub sequence: EventSequence,
18}
19
20impl std::fmt::Display for SequenceAlreadyRegistered {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23 write!(f, "sequence {} already registered", self.sequence.value)
24 }
25}
26
27impl std::error::Error for SequenceAlreadyRegistered {}
28
29#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub enum CorrelationQueryError {
32 CorrelationNotFound {
34 correlation_id: CorrelationId,
36 },
37 CorrelationTruncated {
39 correlation_id: CorrelationId,
41 total_events: u64,
43 max_events: u64,
45 },
46 CorrelationGapDetected {
48 correlation_id: CorrelationId,
50 missing_stages: Vec<String>,
52 present_stages: Vec<String>,
54 },
55 CorrelationConflict {
57 correlation_id: CorrelationId,
59 conflicting_child_ids: Vec<ChildId>,
61 },
62}
63
64impl std::fmt::Display for CorrelationQueryError {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67 match self {
68 Self::CorrelationNotFound { correlation_id } => {
69 write!(f, "correlation {} not found", correlation_id.value)
70 }
71 Self::CorrelationTruncated {
72 correlation_id,
73 total_events,
74 max_events,
75 } => {
76 write!(
77 f,
78 "correlation {} truncated: {} events (max {})",
79 correlation_id.value, total_events, max_events
80 )
81 }
82 Self::CorrelationGapDetected {
83 correlation_id,
84 missing_stages,
85 present_stages,
86 } => {
87 write!(
88 f,
89 "correlation {} gap detected: missing {:?}, present {:?}",
90 correlation_id.value, missing_stages, present_stages
91 )
92 }
93 Self::CorrelationConflict {
94 correlation_id,
95 conflicting_child_ids,
96 } => {
97 write!(
98 f,
99 "correlation {} conflict: child_ids {:?}",
100 correlation_id.value, conflicting_child_ids
101 )
102 }
103 }
104 }
105}
106
107impl std::error::Error for CorrelationQueryError {}
108
109const STAGES: &[&str] = &[
111 "spawn",
112 "ready",
113 "failure_decision",
114 "restart_attempt",
115 "shutdown",
116];
117
118fn what_to_stage(what: &What) -> Option<&'static str> {
120 match what {
121 What::ChildStarting { .. } => Some("spawn"),
122 What::ChildReady { .. } | What::HealthCheckPassed { .. } => Some("ready"),
123 What::ChildFailed { .. } | What::ChildPanicked { .. } | What::BudgetDenied { .. } => {
124 Some("failure_decision")
125 }
126 What::ChildRestarting { .. } | What::BackoffScheduled { .. } => Some("restart_attempt"),
127 What::ChildStopped { .. }
128 | What::ShutdownRequested { .. }
129 | What::ShutdownCompleted { .. } => Some("shutdown"),
130 _ => None,
131 }
132}
133
134#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub struct CorrelationHandle {
140 pub correlation_id: CorrelationId,
142 pub child_id: Option<ChildId>,
144 events: Vec<SupervisorEvent>,
146 sequences: BTreeSet<u64>,
148}
149
150impl CorrelationHandle {
151 pub fn new(correlation_id: CorrelationId, child_id: Option<ChildId>) -> Self {
162 Self {
163 correlation_id,
164 child_id,
165 events: Vec::new(),
166 sequences: BTreeSet::new(),
167 }
168 }
169
170 pub fn link_event(&mut self, event: SupervisorEvent) -> Result<(), SequenceAlreadyRegistered> {
184 if !self.sequences.insert(event.sequence.value) {
185 return Err(SequenceAlreadyRegistered {
186 sequence: event.sequence,
187 });
188 }
189 self.events.push(event);
190 Ok(())
191 }
192
193 pub fn export_chain(
204 &self,
205 from_stage: Option<&str>,
206 ) -> Result<Vec<SupervisorEvent>, CorrelationQueryError> {
207 if self.events.is_empty() {
208 return Err(CorrelationQueryError::CorrelationNotFound {
209 correlation_id: self.correlation_id,
210 });
211 }
212
213 let mut sorted: Vec<SupervisorEvent> = self.events.clone();
214 sorted.sort_by(|a, b| {
215 a.when
216 .time
217 .monotonic_nanos
218 .cmp(&b.when.time.monotonic_nanos)
219 .then_with(|| a.when.time.unix_nanos.cmp(&b.when.time.unix_nanos))
220 });
221
222 let present_stages_all: Vec<String> = {
224 let mut stages: Vec<String> = sorted
225 .iter()
226 .filter_map(|e| what_to_stage(&e.what))
227 .map(|s| s.to_string())
228 .collect();
229 stages.sort();
230 stages.dedup();
231 stages
232 };
233
234 let present_set: std::collections::HashSet<&str> =
235 present_stages_all.iter().map(|s| s.as_str()).collect();
236
237 let missing: Vec<String> = STAGES
238 .iter()
239 .filter(|s| !present_set.contains(**s))
240 .map(|s| s.to_string())
241 .collect();
242
243 if !missing.is_empty() {
244 return Err(CorrelationQueryError::CorrelationGapDetected {
245 correlation_id: self.correlation_id,
246 missing_stages: missing,
247 present_stages: present_stages_all,
248 });
249 }
250
251 let filtered: Vec<SupervisorEvent> = if let Some(stage) = from_stage {
253 sorted
254 .into_iter()
255 .filter(|e| what_to_stage(&e.what) == Some(stage))
256 .collect()
257 } else {
258 sorted
259 };
260
261 Ok(filtered)
262 }
263
264 pub fn len(&self) -> usize {
274 self.events.len()
275 }
276
277 pub fn is_empty(&self) -> bool {
287 self.events.is_empty()
288 }
289}