Skip to main content

rust_supervisor/event/
correlation.rs

1//! Correlation handle for end-to-end lifecycle tracking.
2//!
3//! This module owns the `CorrelationHandle` type that links supervisor events
4//! sharing the same correlation identifier, and the query errors that callers
5//! must handle when exporting event chains.
6
7use crate::event::payload::{SupervisorEvent, What};
8use crate::event::time::{CorrelationId, EventSequence};
9use crate::id::types::ChildId;
10use serde::{Deserialize, Serialize};
11use std::collections::BTreeSet;
12
13/// Duplicate event sequence error.
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15pub struct SequenceAlreadyRegistered {
16    /// The sequence that was already present.
17    pub sequence: EventSequence,
18}
19
20impl std::fmt::Display for SequenceAlreadyRegistered {
21    /// Formats the error showing the duplicate sequence number.
22    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
23        write!(f, "sequence {} already registered", self.sequence.value)
24    }
25}
26
27impl std::error::Error for SequenceAlreadyRegistered {}
28
29/// Error returned by [`CorrelationHandle::export_chain`].
30#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
31pub enum CorrelationQueryError {
32    /// No events found for the given correlation ID.
33    CorrelationNotFound {
34        /// The queried correlation identifier.
35        correlation_id: CorrelationId,
36    },
37    /// Event chain is truncated due to log rotation or journal capacity.
38    CorrelationTruncated {
39        /// The queried correlation identifier.
40        correlation_id: CorrelationId,
41        /// Total events found.
42        total_events: u64,
43        /// Maximum events before truncation.
44        max_events: u64,
45    },
46    /// One or more lifecycle stages are missing from the chain.
47    CorrelationGapDetected {
48        /// The queried correlation identifier.
49        correlation_id: CorrelationId,
50        /// Set of lifecycle stages that are missing.
51        missing_stages: Vec<String>,
52        /// Stages that are present in the chain.
53        present_stages: Vec<String>,
54    },
55    /// Sequence collision detected (possible UUID collision).
56    CorrelationConflict {
57        /// The queried correlation identifier.
58        correlation_id: CorrelationId,
59        /// Child identifiers that conflict.
60        conflicting_child_ids: Vec<ChildId>,
61    },
62}
63
64impl std::fmt::Display for CorrelationQueryError {
65    /// Formats the query error with correlation id and context.
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        match self {
68            Self::CorrelationNotFound { correlation_id } => {
69                write!(f, "correlation {} not found", correlation_id.value)
70            }
71            Self::CorrelationTruncated {
72                correlation_id,
73                total_events,
74                max_events,
75            } => {
76                write!(
77                    f,
78                    "correlation {} truncated: {} events (max {})",
79                    correlation_id.value, total_events, max_events
80                )
81            }
82            Self::CorrelationGapDetected {
83                correlation_id,
84                missing_stages,
85                present_stages,
86            } => {
87                write!(
88                    f,
89                    "correlation {} gap detected: missing {:?}, present {:?}",
90                    correlation_id.value, missing_stages, present_stages
91                )
92            }
93            Self::CorrelationConflict {
94                correlation_id,
95                conflicting_child_ids,
96            } => {
97                write!(
98                    f,
99                    "correlation {} conflict: child_ids {:?}",
100                    correlation_id.value, conflicting_child_ids
101                )
102            }
103        }
104    }
105}
106
107impl std::error::Error for CorrelationQueryError {}
108
109/// Stage names for the five mandatory lifecycle stages.
110const STAGES: &[&str] = &[
111    "spawn",
112    "ready",
113    "failure_decision",
114    "restart_attempt",
115    "shutdown",
116];
117
118/// Maps a `What` variant name to its lifecycle stage.
119fn what_to_stage(what: &What) -> Option<&'static str> {
120    match what {
121        What::ChildStarting { .. } => Some("spawn"),
122        What::ChildReady { .. } | What::HealthCheckPassed { .. } => Some("ready"),
123        What::ChildFailed { .. } | What::ChildPanicked { .. } | What::BudgetDenied { .. } => {
124            Some("failure_decision")
125        }
126        What::ChildRestarting { .. } | What::BackoffScheduled { .. } => Some("restart_attempt"),
127        What::ChildStopped { .. }
128        | What::ShutdownRequested { .. }
129        | What::ShutdownCompleted { .. } => Some("shutdown"),
130        _ => None,
131    }
132}
133
134/// Handle that correlates supervisor events sharing a common correlation ID.
135///
136/// Events are stored in insertion order and can be exported as a chronologically
137/// sorted chain. The chain is validated against five mandatory lifecycle stages.
138#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
139pub struct CorrelationHandle {
140    /// Correlation identifier for this chain.
141    pub correlation_id: CorrelationId,
142    /// Optional child identifier for scoped queries.
143    pub child_id: Option<ChildId>,
144    /// Events in insertion order.
145    events: Vec<SupervisorEvent>,
146    /// Set of registered sequence numbers for duplicate detection.
147    sequences: BTreeSet<u64>,
148}
149
150impl CorrelationHandle {
151    /// Creates a new correlation handle.
152    ///
153    /// # Arguments
154    ///
155    /// - `correlation_id`: UUID v4 that identifies this tracking chain.
156    /// - `child_id`: Optional child identifier for scoped queries.
157    ///
158    /// # Returns
159    ///
160    /// Returns a new [`CorrelationHandle`].
161    pub fn new(correlation_id: CorrelationId, child_id: Option<ChildId>) -> Self {
162        Self {
163            correlation_id,
164            child_id,
165            events: Vec::new(),
166            sequences: BTreeSet::new(),
167        }
168    }
169
170    /// Links a supervisor event to this correlation handle.
171    ///
172    /// The event is stored in chronological order. Duplicate sequence numbers
173    /// are rejected.
174    ///
175    /// # Arguments
176    ///
177    /// - `event`: The supervisor event to associate.
178    ///
179    /// # Returns
180    ///
181    /// Returns `Ok(())` on success, `Err(SequenceAlreadyRegistered)` if the
182    /// event's sequence was already linked.
183    pub fn link_event(&mut self, event: SupervisorEvent) -> Result<(), SequenceAlreadyRegistered> {
184        if !self.sequences.insert(event.sequence.value) {
185            return Err(SequenceAlreadyRegistered {
186                sequence: event.sequence,
187            });
188        }
189        self.events.push(event);
190        Ok(())
191    }
192
193    /// Exports all linked events in chronological order.
194    ///
195    /// # Arguments
196    ///
197    /// - `from_stage`: Optional stage filter (e.g., "spawn", "ready").
198    ///
199    /// # Returns
200    ///
201    /// Returns a vector of [`SupervisorEvent`] sorted by `when.when.unix_nanos`,
202    /// or a [`CorrelationQueryError`] if gaps are detected.
203    pub fn export_chain(
204        &self,
205        from_stage: Option<&str>,
206    ) -> Result<Vec<SupervisorEvent>, CorrelationQueryError> {
207        if self.events.is_empty() {
208            return Err(CorrelationQueryError::CorrelationNotFound {
209                correlation_id: self.correlation_id,
210            });
211        }
212
213        let mut sorted: Vec<SupervisorEvent> = self.events.clone();
214        sorted.sort_by(|a, b| {
215            a.when
216                .time
217                .monotonic_nanos
218                .cmp(&b.when.time.monotonic_nanos)
219                .then_with(|| a.when.time.unix_nanos.cmp(&b.when.time.unix_nanos))
220        });
221
222        // Gap detection runs on ALL events regardless of filter.
223        let present_stages_all: Vec<String> = {
224            let mut stages: Vec<String> = sorted
225                .iter()
226                .filter_map(|e| what_to_stage(&e.what))
227                .map(|s| s.to_string())
228                .collect();
229            stages.sort();
230            stages.dedup();
231            stages
232        };
233
234        let present_set: std::collections::HashSet<&str> =
235            present_stages_all.iter().map(|s| s.as_str()).collect();
236
237        let missing: Vec<String> = STAGES
238            .iter()
239            .filter(|s| !present_set.contains(**s))
240            .map(|s| s.to_string())
241            .collect();
242
243        if !missing.is_empty() {
244            return Err(CorrelationQueryError::CorrelationGapDetected {
245                correlation_id: self.correlation_id,
246                missing_stages: missing,
247                present_stages: present_stages_all,
248            });
249        }
250
251        // Filter by stage only for the returned events.
252        let filtered: Vec<SupervisorEvent> = if let Some(stage) = from_stage {
253            sorted
254                .into_iter()
255                .filter(|e| what_to_stage(&e.what) == Some(stage))
256                .collect()
257        } else {
258            sorted
259        };
260
261        Ok(filtered)
262    }
263
264    /// Returns the number of linked events.
265    ///
266    /// # Arguments
267    ///
268    /// This function has no arguments.
269    ///
270    /// # Returns
271    ///
272    /// Returns the event count.
273    pub fn len(&self) -> usize {
274        self.events.len()
275    }
276
277    /// Reports whether this handle has no linked events.
278    ///
279    /// # Arguments
280    ///
281    /// This function has no arguments.
282    ///
283    /// # Returns
284    ///
285    /// Returns `true` when there are no linked events.
286    pub fn is_empty(&self) -> bool {
287        self.events.is_empty()
288    }
289}