use crate::event::payload::{SupervisorEvent, What};
use crate::journal::ring::EventJournal;
use crate::observe::metrics::{MetricSample, MetricsFacade};
use crate::observe::tracing::{AttemptSpan, TracingEvent};
use serde::{Deserialize, Serialize};
use std::collections::VecDeque;
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StructuredLogRecord {
pub sequence: u64,
pub correlation_id: String,
pub event_name: String,
pub config_version: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AuditRecord {
pub sequence: u64,
pub command_id: String,
pub requested_by: String,
pub result: String,
}
#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
pub struct TestRecorder {
pub events: Vec<SupervisorEvent>,
pub logs: Vec<StructuredLogRecord>,
pub spans: Vec<AttemptSpan>,
pub tracing_events: Vec<TracingEvent>,
pub metrics: Vec<MetricSample>,
pub audits: Vec<AuditRecord>,
pub subscriber_lag: u64,
}
impl TestRecorder {
pub fn new() -> Self {
Self::default()
}
pub fn record_lag(&mut self, missed: u64) {
self.subscriber_lag = self.subscriber_lag.saturating_add(missed);
}
}
#[derive(Debug, Clone)]
pub struct ObservabilityPipeline {
pub journal: EventJournal,
pub metrics: MetricsFacade,
pub test_recorder: TestRecorder,
subscribers: Vec<VecDeque<SupervisorEvent>>,
subscriber_capacity: usize,
}
impl ObservabilityPipeline {
pub fn new(journal_capacity: usize, subscriber_capacity: usize) -> Self {
Self {
journal: EventJournal::new(journal_capacity),
metrics: MetricsFacade::new(),
test_recorder: TestRecorder::new(),
subscribers: Vec::new(),
subscriber_capacity,
}
}
pub fn add_subscriber(&mut self) -> usize {
self.subscribers.push(VecDeque::new());
self.subscribers.len().saturating_sub(1)
}
pub fn emit(&mut self, event: SupervisorEvent) -> u64 {
let metrics = self.metrics.samples_for_event(&event);
let log = structured_log(&event);
let span = AttemptSpan::from_event(&event);
let tracing_event = TracingEvent::from_event(&event);
let audit = audit_record(&event);
let lagged = self.fan_out(event.clone());
self.journal.push(event.clone());
self.test_recorder.events.push(event);
self.test_recorder.logs.push(log);
self.test_recorder.spans.push(span);
self.test_recorder.tracing_events.push(tracing_event);
self.test_recorder.metrics.extend(metrics);
self.test_recorder.audits.extend(audit);
self.test_recorder.record_lag(lagged);
lagged
}
pub fn drain_subscriber(&mut self, subscriber_index: usize) -> Vec<SupervisorEvent> {
self.subscribers
.get_mut(subscriber_index)
.map(|queue| queue.drain(..).collect())
.unwrap_or_default()
}
fn fan_out(&mut self, event: SupervisorEvent) -> u64 {
let mut lagged = 0_u64;
for subscriber in &mut self.subscribers {
if subscriber.len() == self.subscriber_capacity {
subscriber.pop_front();
lagged = lagged.saturating_add(1);
}
subscriber.push_back(event.clone());
}
lagged
}
}
fn structured_log(event: &SupervisorEvent) -> StructuredLogRecord {
StructuredLogRecord {
sequence: event.sequence.value,
correlation_id: event.correlation_id.value.to_string(),
event_name: event.what.name().to_owned(),
config_version: event.config_version,
}
}
fn audit_record(event: &SupervisorEvent) -> Option<AuditRecord> {
match &event.what {
What::CommandAccepted { audit } | What::CommandCompleted { audit } => Some(AuditRecord {
sequence: event.sequence.value,
command_id: audit.command_id.clone(),
requested_by: audit.requested_by.clone(),
result: audit.result.clone(),
}),
_ => None,
}
}