use crate::event::payload::{FiniteF64, SupervisorEvent, What};
use crate::journal::ring::EventJournal;
use crate::observe::metrics::{MetricSample, MetricsFacade};
use crate::observe::tracing::{ChildStartCountSpan, TracingEvent};
use crate::spec::supervisor::{BackpressureConfig, BackpressureStrategy};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, VecDeque};
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StructuredLogRecord {
pub sequence: u64,
pub correlation_id: String,
pub event_name: String,
pub config_version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuditRecord {
pub sequence: u64,
pub command_id: String,
pub requested_by: String,
pub result: String,
pub reason: String,
pub phase: String,
pub child_id: Option<String>,
pub context: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct TestRecorder {
pub events: Vec<SupervisorEvent>,
pub pipeline_stage_diagnostics: Vec<PipelineStageDiagnostic>,
pub logs: Vec<StructuredLogRecord>,
pub spans: Vec<ChildStartCountSpan>,
pub tracing_events: Vec<TracingEvent>,
pub metrics: Vec<MetricSample>,
pub audits: Vec<AuditRecord>,
pub subscriber_lag: u64,
}
impl TestRecorder {
pub fn new() -> Self {
Self::default()
}
pub fn record_lag(&mut self, missed: u64) {
self.subscriber_lag = self.subscriber_lag.saturating_add(missed);
}
pub fn record_pipeline_stage_diagnostics(&mut self, diagnostics: &[PipelineStageDiagnostic]) {
self.pipeline_stage_diagnostics
.extend_from_slice(diagnostics);
}
}
#[derive(Debug, Clone)]
pub struct ObservabilityPipeline {
pub journal: EventJournal,
pub metrics: MetricsFacade,
metrics_enabled: bool,
audit_enabled: bool,
pub test_recorder: TestRecorder,
subscribers: Vec<VecDeque<SupervisorEvent>>,
subscriber_capacity: usize,
backpressure_config: BackpressureConfig,
discarded_count: u64,
}
impl ObservabilityPipeline {
pub fn new(journal_capacity: usize, subscriber_capacity: usize) -> Self {
Self::with_observability_switches(journal_capacity, subscriber_capacity, true, true)
}
pub fn with_observability_switches(
journal_capacity: usize,
subscriber_capacity: usize,
metrics_enabled: bool,
audit_enabled: bool,
) -> Self {
Self::with_backpressure_config(
journal_capacity,
subscriber_capacity,
metrics_enabled,
audit_enabled,
BackpressureConfig::default(),
)
}
pub fn with_backpressure_config(
journal_capacity: usize,
subscriber_capacity: usize,
metrics_enabled: bool,
audit_enabled: bool,
backpressure_config: BackpressureConfig,
) -> Self {
Self {
journal: EventJournal::new(journal_capacity),
metrics: MetricsFacade::new(),
metrics_enabled,
audit_enabled,
test_recorder: TestRecorder::new(),
subscribers: Vec::new(),
subscriber_capacity,
backpressure_config,
discarded_count: 0,
}
}
pub fn add_subscriber(&mut self) -> usize {
self.subscribers.push(VecDeque::new());
self.subscribers.len().saturating_sub(1)
}
pub fn emit(&mut self, event: SupervisorEvent) -> u64 {
let metrics = self.metrics.samples_for_event(&event);
let log = structured_log(&event);
let span = ChildStartCountSpan::from_event(&event);
let tracing_event = TracingEvent::from_event(&event);
let audit = audit_record(&event);
self.emit_policy_diagnostic(&event);
let lagged = self.fan_out(event.clone());
self.journal.push(event.clone());
self.test_recorder.events.push(event);
self.test_recorder.logs.push(log);
self.test_recorder.spans.push(span);
self.test_recorder.tracing_events.push(tracing_event);
if self.metrics_enabled {
self.test_recorder.metrics.extend(metrics);
}
if self.audit_enabled {
self.test_recorder.audits.extend(audit);
}
self.test_recorder.record_lag(lagged);
lagged
}
fn emit_policy_diagnostic(&mut self, event: &SupervisorEvent) {
let diagnostic = match &event.what {
What::BudgetExhausted {
child_id,
retry_after_ns,
budget_source_group,
} => {
let mut diag = PipelineStageDiagnostic::new(
event.sequence.value,
event.correlation_id.value.to_string(),
PipelineStage::EvaluateBudget,
event.when.time.unix_nanos,
)
.with_child_id(child_id.to_string());
if let Some(group) = budget_source_group {
diag = diag.with_group_id(group.clone());
}
diag = diag.with_supervisor_path(event.r#where.supervisor_path.to_string());
diag.budget_evaluation =
Some(format!("BudgetExhausted:retry_after_ns={}", retry_after_ns));
Some(diag)
}
What::GroupFuseTriggered {
group_name,
propagated_from_group,
} => {
let mut diag = PipelineStageDiagnostic::new(
event.sequence.value,
event.correlation_id.value.to_string(),
PipelineStage::EvaluateBudget,
event.when.time.unix_nanos,
)
.with_group_id(group_name.clone());
diag = diag.with_supervisor_path(event.r#where.supervisor_path.to_string());
diag.budget_evaluation = Some(format!(
"GroupFuseTriggered:propagated_from={}",
propagated_from_group.as_deref().unwrap_or("self")
));
Some(diag)
}
What::EscalationBifurcated {
severity,
budget_verdict,
fuse_outcome,
tie_break_reason: _,
} => {
let mut diag = PipelineStageDiagnostic::new(
event.sequence.value,
event.correlation_id.value.to_string(),
PipelineStage::EvaluateBudget,
event.when.time.unix_nanos,
);
if let Some(ref child_id) = event.r#where.child_id {
diag = diag.with_child_id(child_id.to_string());
}
diag = diag.with_supervisor_path(event.r#where.supervisor_path.to_string());
diag.budget_evaluation = Some(format!(
"EscalationBifurcated:severity={}:budget={}:fuse={}",
severity,
budget_verdict.as_deref().unwrap_or("none"),
fuse_outcome.as_deref().unwrap_or("none"),
));
Some(diag)
}
_ => None,
};
if let Some(diag) = diagnostic {
self.test_recorder
.record_pipeline_stage_diagnostics(&[diag]);
}
}
pub fn record_pipeline_stage_diagnostics(&mut self, diagnostics: &[PipelineStageDiagnostic]) {
self.test_recorder
.record_pipeline_stage_diagnostics(diagnostics);
}
pub fn drain_subscriber(&mut self, subscriber_index: usize) -> Vec<SupervisorEvent> {
self.subscribers
.get_mut(subscriber_index)
.map(|queue| queue.drain(..).collect())
.unwrap_or_default()
}
fn fan_out(&mut self, event: SupervisorEvent) -> u64 {
let mut lagged = 0_u64;
let cfg = &self.backpressure_config;
for (idx, subscriber) in &mut self.subscribers.iter_mut().enumerate() {
let occupancy_pct = if self.subscriber_capacity == 0 {
100_u8
} else {
(subscriber.len().saturating_mul(100) / self.subscriber_capacity) as u8
};
if occupancy_pct >= cfg.warn_threshold_pct && occupancy_pct < cfg.critical_threshold_pct
{
self.test_recorder.events.push(make_backpressure_alert(
&event,
idx,
occupancy_pct,
cfg.warn_threshold_pct,
));
}
if occupancy_pct >= cfg.critical_threshold_pct {
match cfg.strategy {
BackpressureStrategy::AlertAndBlock => {
if subscriber.len() == self.subscriber_capacity {
subscriber.pop_front();
lagged = lagged.saturating_add(1);
}
}
BackpressureStrategy::SampleAndAudit => {
if subscriber.len() == self.subscriber_capacity {
subscriber.pop_front();
self.discarded_count = self.discarded_count.saturating_add(1);
lagged = lagged.saturating_add(1);
if self.discarded_count % 10 == 1 {
let deg = make_backpressure_degradation(
&event,
idx,
occupancy_pct,
cfg,
self.discarded_count,
);
self.test_recorder.events.push(deg);
}
}
}
}
} else {
if subscriber.len() == self.subscriber_capacity {
subscriber.pop_front();
lagged = lagged.saturating_add(1);
}
}
subscriber.push_back(event.clone());
}
lagged
}
}
fn make_backpressure_alert(
event: &SupervisorEvent,
subscriber_index: usize,
occupancy_pct: u8,
threshold_pct: u8,
) -> SupervisorEvent {
let mut ev = event.clone();
ev.what = What::BackpressureAlert {
subscriber: format!("subscriber_{}", subscriber_index),
buffer_pct: occupancy_pct,
threshold_pct,
};
ev
}
fn make_backpressure_degradation(
event: &SupervisorEvent,
subscriber_index: usize,
occupancy_pct: u8,
cfg: &BackpressureConfig,
total_discarded: u64,
) -> SupervisorEvent {
let mut ev = event.clone();
let strategy_name = match cfg.strategy {
BackpressureStrategy::AlertAndBlock => "alert_and_block",
BackpressureStrategy::SampleAndAudit => "sample_and_audit",
};
ev.what = What::BackpressureDegradation {
subscriber: format!("subscriber_{}", subscriber_index),
strategy: strategy_name.to_owned(),
sample_ratio: FiniteF64::new(0.5),
buffer_peak_pct: occupancy_pct,
recovered: false,
};
let _audit_event = SupervisorEvent::new(
ev.when,
ev.r#where.clone(),
What::AuditRecorded {
command_id: String::new(),
event_type: "backpressure_degradation".to_owned(),
sample_ratio: FiniteF64::new(0.5),
correlation_id: ev.correlation_id,
trigger_reason: format!(
"subscriber {} buffer {}% >= critical {}%",
subscriber_index, occupancy_pct, cfg.critical_threshold_pct
),
events_discarded: total_discarded,
},
ev.sequence,
ev.correlation_id,
ev.config_version,
);
ev
}
fn structured_log(event: &SupervisorEvent) -> StructuredLogRecord {
StructuredLogRecord {
sequence: event.sequence.value,
correlation_id: event.correlation_id.value.to_string(),
event_name: event.what.name().to_owned(),
config_version: event.config_version,
}
}
fn audit_record(event: &SupervisorEvent) -> Option<AuditRecord> {
audit_record_control_commands_and_runtime_shutdown(event)
.or_else(|| audit_record_child_shutdown_pipeline(event))
.or_else(|| audit_record_child_control_early(event))
.or_else(|| audit_record_child_control_late(event))
.or_else(|| audit_record_child_heartbeat_stale(event))
.or_else(|| audit_record_generation_fence_entered(event))
.or_else(|| audit_record_generation_fence_abort_requested(event))
.or_else(|| audit_record_generation_fence_released(event))
.or_else(|| audit_record_generation_fence_conflict(event))
.or_else(|| audit_record_generation_fence_stale_attempt(event))
}
fn audit_record_control_commands_and_runtime_shutdown(
event: &SupervisorEvent,
) -> Option<AuditRecord> {
match &event.what {
What::CommandAccepted { audit } | What::CommandCompleted { audit } => Some(AuditRecord {
sequence: event.sequence.value,
command_id: audit.command_id.clone(),
requested_by: audit.requested_by.clone(),
result: audit.result.clone(),
reason: audit.reason.clone(),
phase: "control_command".to_owned(),
child_id: None,
context: BTreeMap::new(),
}),
What::RuntimeControlLoopShutdownRequested {
command_id,
requested_by,
reason,
} => Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: requested_by.clone(),
result: "accepted".to_owned(),
reason: reason.clone(),
phase: "shutdown".to_owned(),
child_id: None,
context: BTreeMap::new(),
}),
What::RuntimeControlLoopJoinCompleted {
command_id,
requested_by,
state,
phase,
reason,
} => Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: requested_by.clone(),
result: state.clone(),
reason: reason.clone(),
phase: phase.clone(),
child_id: None,
context: BTreeMap::new(),
}),
What::RuntimeControlLoopFailed { phase, reason, .. } => Some(AuditRecord {
sequence: event.sequence.value,
command_id: "runtime-control-loop".to_owned(),
requested_by: "runtime".to_owned(),
result: "failed".to_owned(),
reason: reason.clone(),
phase: phase.clone(),
child_id: None,
context: BTreeMap::new(),
}),
What::ShutdownCompleted {
phase,
result,
duration_ms,
} => {
let mut context = BTreeMap::new();
context.insert("duration_ms".to_owned(), duration_ms.to_string());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "shutdown-pipeline".to_owned(),
requested_by: "runtime".to_owned(),
result: result.clone(),
reason: "shutdown pipeline completed".to_owned(),
phase: phase.clone(),
child_id: None,
context,
})
}
_ => None,
}
}
fn audit_record_child_shutdown_pipeline(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildShutdownCancelDelivered {
child_id,
generation,
child_start_count,
phase,
} => Some(AuditRecord {
sequence: event.sequence.value,
command_id: "shutdown-pipeline".to_owned(),
requested_by: "runtime".to_owned(),
result: "cancel_delivered".to_owned(),
reason: "cancellation token delivered".to_owned(),
phase: phase.clone(),
child_id: Some(child_id.to_string()),
context: child_child_start_count_context(generation.value, child_start_count.value),
}),
What::ChildShutdownGraceful {
child_id,
generation,
child_start_count,
phase,
exit,
} => {
let mut context =
child_child_start_count_context(generation.value, child_start_count.value);
context.insert("exit".to_owned(), exit.clone());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "shutdown-pipeline".to_owned(),
requested_by: "runtime".to_owned(),
result: "graceful".to_owned(),
reason: "child completed during graceful drain".to_owned(),
phase: phase.clone(),
child_id: Some(child_id.to_string()),
context,
})
}
What::ChildShutdownAborted {
child_id,
generation,
child_start_count,
phase,
result,
reason,
} => Some(AuditRecord {
sequence: event.sequence.value,
command_id: "shutdown-pipeline".to_owned(),
requested_by: "runtime".to_owned(),
result: result.clone(),
reason: reason.clone(),
phase: phase.clone(),
child_id: Some(child_id.to_string()),
context: child_child_start_count_context(generation.value, child_start_count.value),
}),
What::ChildShutdownLateReport {
child_id,
generation,
child_start_count,
phase,
exit,
} => {
let mut context =
child_child_start_count_context(generation.value, child_start_count.value);
context.insert("exit".to_owned(), exit.clone());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "shutdown-pipeline".to_owned(),
requested_by: "runtime".to_owned(),
result: "late_report".to_owned(),
reason: "child reported after shutdown accounting window".to_owned(),
phase: phase.clone(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_child_control_early(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildControlCommandCompleted {
child_id,
command,
command_id,
requested_by,
reason,
result,
outcome,
} => Some(audit_child_control(ChildControlAuditInput {
sequence: event.sequence.value,
command_id,
requested_by,
reason,
result,
child_id,
command,
outcome,
})),
What::ChildControlCancelDelivered {
child_id,
generation,
attempt,
command,
command_id,
} => {
let mut context = child_child_start_count_context(generation.value, attempt.value);
context.insert("command".to_owned(), command.clone());
context.insert("cancel_delivered".to_owned(), true.to_string());
context.insert("stop_state".to_owned(), "CancelDelivered".to_owned());
context.insert("idempotent".to_owned(), false.to_string());
context.insert("failure".to_owned(), "none".to_owned());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: "runtime".to_owned(),
result: "cancel_delivered".to_owned(),
reason: "child control cancellation delivered".to_owned(),
phase: "child_control".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
What::ChildControlStopCompleted {
child_id,
generation,
attempt,
exit_kind,
} => {
let mut context = child_child_start_count_context(generation.value, attempt.value);
context.insert("exit_kind".to_owned(), format!("{exit_kind:?}"));
context.insert("stop_state".to_owned(), "Completed".to_owned());
context.insert("failure".to_owned(), "none".to_owned());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "child-control".to_owned(),
requested_by: "runtime".to_owned(),
result: "completed".to_owned(),
reason: "child control stop completed".to_owned(),
phase: "child_control".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_child_control_late(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildControlStopFailed {
child_id,
generation,
attempt,
status,
stop_state,
phase,
reason,
recoverable,
} => {
let mut context = child_child_start_count_context(generation.value, attempt.value);
context.insert("status".to_owned(), format!("{status:?}"));
context.insert("stop_state".to_owned(), format!("{stop_state:?}"));
context.insert("recoverable".to_owned(), recoverable.to_string());
context.insert("failure_phase".to_owned(), format!("{phase:?}"));
context.insert("failure".to_owned(), reason.clone());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "child-control".to_owned(),
requested_by: "runtime".to_owned(),
result: "failed".to_owned(),
reason: reason.clone(),
phase: format!("{phase:?}"),
child_id: Some(child_id.to_string()),
context,
})
}
What::ChildControlOperationChanged {
child_id,
from,
to,
command,
command_id,
} => {
let mut context = BTreeMap::new();
context.insert("operation_before".to_owned(), format!("{from:?}"));
context.insert("operation_after".to_owned(), format!("{to:?}"));
context.insert("command".to_owned(), command.clone());
context.insert("idempotent".to_owned(), false.to_string());
context.insert("failure".to_owned(), "none".to_owned());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: "runtime".to_owned(),
result: "operation_changed".to_owned(),
reason: "child control operation changed".to_owned(),
phase: "child_control".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
What::ChildRuntimeStateRemoved {
child_id,
path,
final_status,
} => {
let mut context = BTreeMap::new();
context.insert("path".to_owned(), path.to_string());
if let Some(status) = final_status {
context.insert("final_status".to_owned(), format!("{status:?}"));
context.insert("status".to_owned(), format!("{status:?}"));
} else {
context.insert("final_status".to_owned(), "none".to_owned());
}
context.insert("operation_after".to_owned(), "Removed".to_owned());
context.insert("failure".to_owned(), "none".to_owned());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "child-control".to_owned(),
requested_by: "runtime".to_owned(),
result: "removed".to_owned(),
reason: "child runtime state removed".to_owned(),
phase: "child_control".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_child_heartbeat_stale(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildHeartbeatStale {
child_id,
attempt,
since_unix_nanos,
} => {
let mut context = BTreeMap::new();
context.insert("attempt".to_owned(), attempt.value.to_string());
context.insert("since_unix_nanos".to_owned(), since_unix_nanos.to_string());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "child-liveness".to_owned(),
requested_by: "runtime".to_owned(),
result: "heartbeat_stale".to_owned(),
reason: "child heartbeat became stale".to_owned(),
phase: "liveness".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_generation_fence_entered(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildRestartFenceEntered {
child_id,
old_generation,
old_attempt,
target_generation,
command_id,
requested_by,
reason,
stop_deadline_at_unix_nanos,
} => {
let mut context =
child_child_start_count_context(old_generation.value, old_attempt.value);
context.insert(
"target_generation".to_owned(),
target_generation.value.to_string(),
);
context.insert(
"stop_deadline_at_unix_nanos".to_owned(),
stop_deadline_at_unix_nanos.to_string(),
);
Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: requested_by.clone(),
result: "fence_entered".to_owned(),
reason: reason.clone(),
phase: "generation_fence".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_generation_fence_abort_requested(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildRestartFenceAbortRequested {
child_id,
old_generation,
old_attempt,
target_generation,
command_id,
deadline_unix_nanos,
} => {
let mut context =
child_child_start_count_context(old_generation.value, old_attempt.value);
context.insert(
"target_generation".to_owned(),
target_generation.value.to_string(),
);
context.insert(
"deadline_unix_nanos".to_owned(),
deadline_unix_nanos.to_string(),
);
Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: "runtime".to_owned(),
result: "abort_requested".to_owned(),
reason: "generation fence escalation after cooperative deadline".to_owned(),
phase: "generation_fence".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_generation_fence_released(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildRestartFenceReleased {
child_id,
old_generation,
old_attempt,
target_generation,
exit_kind,
} => {
let mut context =
child_child_start_count_context(old_generation.value, old_attempt.value);
context.insert(
"target_generation".to_owned(),
target_generation.value.to_string(),
);
context.insert("exit_kind".to_owned(), format!("{exit_kind:?}"));
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "generation-fence".to_owned(),
requested_by: "runtime".to_owned(),
result: "released".to_owned(),
reason: "old attempt drained; queued generation may spawn".to_owned(),
phase: "generation_fence".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_generation_fence_conflict(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildRestartConflict {
child_id,
current_generation,
current_attempt,
target_generation,
command_id,
decision,
reason,
} => {
let mut context = BTreeMap::new();
context.insert(
"old_generation".to_owned(),
optional_u64(current_generation.map(|generation| generation.value)),
);
context.insert(
"old_attempt".to_owned(),
optional_u64(current_attempt.map(|attempt| attempt.value)),
);
context.insert(
"target_generation".to_owned(),
optional_u64(target_generation.map(|generation| generation.value)),
);
context.insert("generation_fence_decision".to_owned(), decision.clone());
context.insert("failure".to_owned(), reason.clone());
context.insert("stale_report".to_owned(), "none".to_owned());
Some(AuditRecord {
sequence: event.sequence.value,
command_id: command_id.clone(),
requested_by: "runtime".to_owned(),
result: decision.clone(),
reason: reason.clone(),
phase: "generation_fence".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn audit_record_generation_fence_stale_attempt(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::ChildAttemptStaleReport {
child_id,
reported_generation,
reported_attempt,
current_generation,
current_attempt,
exit_kind,
handled_as,
} => {
let mut context =
child_child_start_count_context(reported_generation.value, reported_attempt.value);
context.insert(
"current_generation".to_owned(),
optional_u64(current_generation.map(|generation| generation.value)),
);
context.insert(
"current_attempt".to_owned(),
optional_u64(current_attempt.map(|attempt| attempt.value)),
);
context.insert("exit_kind".to_owned(), format!("{exit_kind:?}"));
context.insert("handled_as".to_owned(), format!("{handled_as:?}"));
context.insert(
"stale_report".to_owned(),
format!(
"reported_generation={} reported_attempt={}",
reported_generation.value, reported_attempt.value
),
);
Some(AuditRecord {
sequence: event.sequence.value,
command_id: "generation-fence".to_owned(),
requested_by: "runtime".to_owned(),
result: "stale_report".to_owned(),
reason: "completion triple did not match active or pending restart identities"
.to_owned(),
phase: "generation_fence".to_owned(),
child_id: Some(child_id.to_string()),
context,
})
}
_ => None,
}
}
fn merge_generation_fence_child_control_audit_fields(
context: &mut BTreeMap<String, String>,
outcome: &crate::control::outcome::ChildControlResult,
) {
if let Some(fence) = &outcome.generation_fence {
context.insert(
"generation_fence_decision".to_owned(),
format!("{:?}", fence.decision),
);
context.insert(
"generation_fence_abort_requested".to_owned(),
fence.abort_requested.to_string(),
);
context.insert(
"generation_fence_cancel_delivered".to_owned(),
fence.cancel_delivered.to_string(),
);
context.insert(
"generation_fence_old_generation".to_owned(),
optional_u64(fence.old_generation.map(|generation| generation.value)),
);
context.insert(
"generation_fence_old_attempt".to_owned(),
optional_u64(fence.old_attempt.map(|attempt| attempt.value)),
);
context.insert(
"generation_fence_target_generation".to_owned(),
optional_u64(fence.target_generation.map(|generation| generation.value)),
);
context.insert(
"generation_fence_conflict".to_owned(),
failure_context(&fence.conflict),
);
} else {
context.insert("generation_fence_decision".to_owned(), "none".to_owned());
context.insert(
"generation_fence_abort_requested".to_owned(),
"false".to_owned(),
);
context.insert(
"generation_fence_cancel_delivered".to_owned(),
"false".to_owned(),
);
context.insert("generation_fence_conflict".to_owned(), "none".to_owned());
}
}
struct ChildControlAuditInput<'a> {
sequence: u64,
command_id: &'a str,
requested_by: &'a str,
reason: &'a str,
result: &'a str,
child_id: &'a crate::id::types::ChildId,
command: &'a str,
outcome: &'a crate::control::outcome::ChildControlResult,
}
fn audit_child_control(input: ChildControlAuditInput<'_>) -> AuditRecord {
let mut context = BTreeMap::new();
context.insert("command".to_owned(), input.command.to_owned());
context.insert(
"generation".to_owned(),
optional_u64(input.outcome.generation.map(|generation| generation.value)),
);
context.insert(
"attempt".to_owned(),
optional_u64(input.outcome.attempt.map(|attempt| attempt.value)),
);
context.insert("status".to_owned(), optional_debug(input.outcome.status));
context.insert(
"operation_before".to_owned(),
format!("{:?}", input.outcome.operation_before),
);
context.insert(
"operation_after".to_owned(),
format!("{:?}", input.outcome.operation_after),
);
context.insert(
"cancel_delivered".to_owned(),
input.outcome.cancel_delivered.to_string(),
);
context.insert(
"stop_state".to_owned(),
format!("{:?}", input.outcome.stop_state),
);
context.insert(
"restart_limit_remaining".to_owned(),
input.outcome.restart_limit.remaining.to_string(),
);
context.insert(
"idempotent".to_owned(),
input.outcome.idempotent.to_string(),
);
context.insert(
"failure".to_owned(),
failure_context(&input.outcome.failure),
);
merge_generation_fence_child_control_audit_fields(&mut context, input.outcome);
context.insert("stale_report".to_owned(), "none".to_owned());
AuditRecord {
sequence: input.sequence,
command_id: input.command_id.to_owned(),
requested_by: input.requested_by.to_owned(),
result: input.result.to_owned(),
reason: input.reason.to_owned(),
phase: "child_control".to_owned(),
child_id: Some(input.child_id.to_string()),
context,
}
}
fn optional_u64(value: Option<u64>) -> String {
value
.map(|value| value.to_string())
.unwrap_or_else(|| "none".to_owned())
}
fn optional_debug<T: std::fmt::Debug>(value: Option<T>) -> String {
value
.map(|value| format!("{value:?}"))
.unwrap_or_else(|| "none".to_owned())
}
fn failure_context(failure: &Option<crate::control::outcome::ChildControlFailure>) -> String {
failure
.as_ref()
.map(|failure| {
format!(
"{:?}:{}:recoverable={}",
failure.phase, failure.reason, failure.recoverable
)
})
.unwrap_or_else(|| "none".to_owned())
}
fn child_child_start_count_context(
generation: u64,
child_start_count: u64,
) -> BTreeMap<String, String> {
let mut context = BTreeMap::new();
context.insert("generation".to_owned(), generation.to_string());
context.insert("attempt".to_owned(), child_start_count.to_string());
context.insert(
"child_start_count".to_owned(),
child_start_count.to_string(),
);
context
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum PipelineStage {
ClassifyExit,
RecordFailureWindow,
EvaluateBudget,
DecideAction,
EmitTypedEvent,
ExecuteAction,
}
impl std::fmt::Display for PipelineStage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ClassifyExit => write!(f, "classify_exit"),
Self::RecordFailureWindow => write!(f, "record_failure_window"),
Self::EvaluateBudget => write!(f, "evaluate_budget"),
Self::DecideAction => write!(f, "decide_action"),
Self::EmitTypedEvent => write!(f, "emit_typed_event"),
Self::ExecuteAction => write!(f, "execute_action"),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PipelineStageDiagnostic {
pub sequence: u64,
pub correlation_id: String,
pub stage: PipelineStage,
pub child_id: Option<String>,
pub group_id: Option<String>,
pub supervisor_path: String,
pub evaluated: bool,
pub skip_reason: Option<String>,
pub exit_classification: Option<String>,
pub failure_window_state: Option<String>,
pub budget_evaluation: Option<String>,
pub decided_action: Option<String>,
pub event_emitted: bool,
pub execution_result: Option<String>,
pub completed_at_unix_nanos: u128,
}
impl PipelineStageDiagnostic {
pub fn new(
sequence: u64,
correlation_id: impl Into<String>,
stage: PipelineStage,
completed_at_unix_nanos: u128,
) -> Self {
Self {
sequence,
correlation_id: correlation_id.into(),
stage,
child_id: None,
group_id: None,
supervisor_path: String::new(),
evaluated: true,
skip_reason: None,
exit_classification: None,
failure_window_state: None,
budget_evaluation: None,
decided_action: None,
event_emitted: false,
execution_result: None,
completed_at_unix_nanos,
}
}
pub fn with_child_id(mut self, child_id: impl Into<String>) -> Self {
self.child_id = Some(child_id.into());
self
}
pub fn with_group_id(mut self, group_id: impl Into<String>) -> Self {
self.group_id = Some(group_id.into());
self
}
pub fn with_supervisor_path(mut self, supervisor_path: impl Into<String>) -> Self {
self.supervisor_path = supervisor_path.into();
self
}
}