use std::sync::Arc;
use serde::{Deserialize, Serialize};
use crate::adapter::net::redex::{RedexError, RedexEvent, RedexFold};
use super::action::{MeshOsAction, PendingAction};
use super::snapshot::{action_kind_str, FailureRecord, MeshOsSnapshot, RECENT_FAILURES_CAPACITY};
pub const WIRE_FORMAT_VERSION: u8 = 1;
pub fn encode_record(record: &ActionChainRecord) -> Result<Vec<u8>, AppendError> {
let body = postcard::to_allocvec(record).map_err(|e| AppendError {
reason: format!("postcard encode: {e}"),
})?;
let mut bytes = Vec::with_capacity(1 + body.len());
bytes.push(WIRE_FORMAT_VERSION);
bytes.extend_from_slice(&body);
Ok(bytes)
}
pub fn decode_record(bytes: &[u8]) -> Result<ActionChainRecord, DecodeError> {
let (&version, rest) = bytes.split_first().ok_or(DecodeError::Empty)?;
if version != WIRE_FORMAT_VERSION {
return Err(DecodeError::UnsupportedVersion {
seen: version,
supported: WIRE_FORMAT_VERSION,
});
}
postcard::from_bytes(rest).map_err(|e| DecodeError::Postcard(e.to_string()))
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum DecodeError {
Empty,
UnsupportedVersion {
seen: u8,
supported: u8,
},
Postcard(String),
}
impl std::fmt::Display for DecodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DecodeError::Empty => f.write_str("empty action-chain record payload"),
DecodeError::UnsupportedVersion { seen, supported } => write!(
f,
"unsupported action-chain wire version {seen}; this build expects {supported}",
),
DecodeError::Postcard(s) => write!(f, "postcard decode: {s}"),
}
}
}
impl std::error::Error for DecodeError {}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ActionChainRecord {
pub id: u64,
pub kind: String,
pub emitted_at_ms: u64,
pub disposition: ActionDisposition,
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[non_exhaustive]
pub enum ActionDisposition {
Dispatched,
Failed {
reason: String,
retry_after_ms: Option<u64>,
},
Gated {
reason: String,
cooldown_ms: Option<u64>,
},
}
pub fn record_from(pending: &PendingAction, disposition: ActionDisposition) -> ActionChainRecord {
let emitted_at_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
ActionChainRecord {
id: pending.id.0,
kind: action_kind_str(&pending.action).to_string(),
emitted_at_ms,
disposition,
}
}
pub trait ActionChainAppender: Send + Sync + 'static {
fn append(&self, record: ActionChainRecord) -> Result<(), AppendError>;
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct AppendError {
pub reason: String,
}
impl std::fmt::Display for AppendError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "action-chain append failed: {}", self.reason)
}
}
impl std::error::Error for AppendError {}
#[derive(Debug, Default)]
pub struct NoOpActionChainAppender;
impl ActionChainAppender for NoOpActionChainAppender {
fn append(&self, _record: ActionChainRecord) -> Result<(), AppendError> {
Ok(())
}
}
pub const DEFAULT_BUFFERING_APPENDER_CAPACITY: usize = 4096;
#[derive(Debug)]
pub struct BufferingActionChainAppender {
records: parking_lot::Mutex<std::collections::VecDeque<ActionChainRecord>>,
capacity: usize,
dropped: std::sync::atomic::AtomicU64,
}
impl Default for BufferingActionChainAppender {
fn default() -> Self {
Self::with_capacity(DEFAULT_BUFFERING_APPENDER_CAPACITY)
}
}
impl BufferingActionChainAppender {
pub fn new() -> Self {
Self::default()
}
pub fn with_capacity(capacity: usize) -> Self {
let capacity = capacity.max(1);
Self {
records: parking_lot::Mutex::new(std::collections::VecDeque::with_capacity(
capacity.min(64),
)),
capacity,
dropped: std::sync::atomic::AtomicU64::new(0),
}
}
pub fn records(&self) -> Vec<ActionChainRecord> {
self.records.lock().iter().cloned().collect()
}
pub fn len(&self) -> usize {
self.records.lock().len()
}
pub fn is_empty(&self) -> bool {
self.records.lock().is_empty()
}
pub fn dropped_count(&self) -> u64 {
self.dropped.load(std::sync::atomic::Ordering::Relaxed)
}
}
impl ActionChainAppender for BufferingActionChainAppender {
fn append(&self, record: ActionChainRecord) -> Result<(), AppendError> {
let mut guard = self.records.lock();
if guard.len() >= self.capacity {
guard.pop_front();
self.dropped
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
guard.push_back(record);
Ok(())
}
}
#[derive(Debug, Default)]
pub struct MeshOsSnapshotFold;
impl RedexFold<MeshOsSnapshot> for MeshOsSnapshotFold {
fn apply(&mut self, ev: &RedexEvent, state: &mut MeshOsSnapshot) -> Result<(), RedexError> {
let record = decode_record(&ev.payload)
.map_err(|e| RedexError::Decode(format!("ActionChainRecord wire decode: {e}")))?;
let recorded_at_ms = record.emitted_at_ms;
match record.disposition {
ActionDisposition::Dispatched => {
}
ActionDisposition::Failed { reason, .. } => {
push_failure(
state,
format!("action-id:{}:{}", record.id, record.kind),
reason,
recorded_at_ms,
);
}
ActionDisposition::Gated {
reason,
cooldown_ms,
} => {
let detail = match cooldown_ms {
Some(ms) => format!("gated ({reason}); cooldown {ms} ms"),
None => format!("gated ({reason})"),
};
push_failure(
state,
format!("action-id:{}:{}", record.id, record.kind),
detail,
recorded_at_ms,
);
}
}
Ok(())
}
}
fn push_failure(state: &mut MeshOsSnapshot, source: String, reason: String, recorded_at_ms: u64) {
if state.recent_failures.len() >= RECENT_FAILURES_CAPACITY {
state.recent_failures.pop_front();
}
state.recent_failures.push_back(FailureRecord {
seq: 0,
source,
reason,
recorded_at_ms,
});
}
pub fn append_dispatched(
appender: &Arc<dyn ActionChainAppender>,
pending: &PendingAction,
) -> Result<(), AppendError> {
appender.append(record_from(pending, ActionDisposition::Dispatched))
}
pub fn append_failed(
appender: &Arc<dyn ActionChainAppender>,
pending: &PendingAction,
reason: String,
retry_after_ms: Option<u64>,
) -> Result<(), AppendError> {
appender.append(record_from(
pending,
ActionDisposition::Failed {
reason,
retry_after_ms,
},
))
}
pub fn append_gated(
appender: &Arc<dyn ActionChainAppender>,
pending: &PendingAction,
reason: String,
cooldown_ms: Option<u64>,
) -> Result<(), AppendError> {
appender.append(record_from(
pending,
ActionDisposition::Gated {
reason,
cooldown_ms,
},
))
}
#[allow(dead_code)]
const _: Option<MeshOsAction> = None;
#[cfg(test)]
mod tests {
use std::time::Instant;
use super::super::action::{ActionId, MaintenanceTransition};
use super::*;
fn record(id: u64, kind: &str, disposition: ActionDisposition) -> ActionChainRecord {
ActionChainRecord {
id,
kind: kind.into(),
emitted_at_ms: 1_000_000,
disposition,
}
}
fn redex_event(payload: Vec<u8>) -> RedexEvent {
let payload = {
let mut versioned = Vec::with_capacity(1 + payload.len());
versioned.push(WIRE_FORMAT_VERSION);
versioned.extend_from_slice(&payload);
versioned
};
use crate::adapter::net::redex::RedexEntry;
RedexEvent {
entry: RedexEntry {
seq: 1,
payload_offset: 0,
payload_len: payload.len() as u32,
flags_and_checksum: 0,
},
payload: bytes::Bytes::from(payload),
}
}
#[test]
fn decode_rejects_payload_with_unknown_wire_version() {
let r = record(1, "start_daemon", ActionDisposition::Dispatched);
let mut bytes = encode_record(&r).unwrap();
bytes[0] = 99;
let err = decode_record(&bytes).unwrap_err();
match err {
DecodeError::UnsupportedVersion { seen, supported } => {
assert_eq!(seen, 99);
assert_eq!(supported, WIRE_FORMAT_VERSION);
}
other => panic!("expected UnsupportedVersion, got {other:?}"),
}
}
#[test]
fn decode_rejects_empty_payload() {
assert_eq!(decode_record(&[]).unwrap_err(), DecodeError::Empty);
}
#[test]
fn encode_decode_round_trip_preserves_record() {
let r = record(
42,
"pull_replica",
ActionDisposition::Failed {
reason: "boom".into(),
retry_after_ms: Some(500),
},
);
let bytes = encode_record(&r).unwrap();
assert_eq!(bytes[0], WIRE_FORMAT_VERSION);
let back = decode_record(&bytes).unwrap();
assert_eq!(back, r);
}
#[test]
fn buffering_appender_drops_oldest_when_at_capacity() {
let appender = BufferingActionChainAppender::with_capacity(3);
for i in 0..5 {
appender
.append(record(i, "test", ActionDisposition::Dispatched))
.unwrap();
}
assert_eq!(appender.len(), 3, "buffer capped at capacity");
assert_eq!(appender.dropped_count(), 2, "two oldest evicted");
let ids: Vec<u64> = appender.records().iter().map(|r| r.id).collect();
assert_eq!(ids, vec![2, 3, 4]);
}
#[test]
fn record_round_trips_through_postcard() {
let r = record(
42,
"start_daemon",
ActionDisposition::Failed {
reason: "boom".into(),
retry_after_ms: Some(500),
},
);
let bytes = postcard::to_allocvec(&r).unwrap();
let back: ActionChainRecord = postcard::from_bytes(&bytes).unwrap();
assert_eq!(back, r);
}
#[test]
fn record_from_pending_action_uses_action_kind_str() {
let pending = PendingAction {
id: ActionId(7),
action: MeshOsAction::CommitMaintenanceTransition {
node: 0,
target: MaintenanceTransition::Maintenance,
},
emitted_at: Instant::now(),
};
let rec = record_from(&pending, ActionDisposition::Dispatched);
assert_eq!(rec.id, 7);
assert_eq!(rec.kind, "commit_maintenance_transition");
assert!(matches!(rec.disposition, ActionDisposition::Dispatched));
}
#[test]
fn buffering_appender_collects_records() {
let appender = BufferingActionChainAppender::new();
appender
.append(record(1, "start_daemon", ActionDisposition::Dispatched))
.unwrap();
appender
.append(record(
2,
"stop_daemon",
ActionDisposition::Failed {
reason: "boom".into(),
retry_after_ms: None,
},
))
.unwrap();
assert_eq!(appender.len(), 2);
assert_eq!(appender.records()[0].id, 1);
assert_eq!(appender.records()[1].id, 2);
}
#[test]
fn noop_appender_swallows_all_records() {
let appender = NoOpActionChainAppender;
let r = record(1, "start_daemon", ActionDisposition::Dispatched);
appender.append(r).unwrap();
}
#[test]
fn fold_dispatched_record_leaves_recent_failures_empty() {
let mut fold = MeshOsSnapshotFold;
let mut state = MeshOsSnapshot::default();
let r = record(1, "start_daemon", ActionDisposition::Dispatched);
let bytes = postcard::to_allocvec(&r).unwrap();
fold.apply(&redex_event(bytes), &mut state).unwrap();
assert!(state.recent_failures.is_empty());
}
#[test]
fn fold_failed_record_pushes_failure_with_action_id_source() {
let mut fold = MeshOsSnapshotFold;
let mut state = MeshOsSnapshot::default();
let r = record(
42,
"start_daemon",
ActionDisposition::Failed {
reason: "process died".into(),
retry_after_ms: None,
},
);
let bytes = postcard::to_allocvec(&r).unwrap();
fold.apply(&redex_event(bytes), &mut state).unwrap();
assert_eq!(state.recent_failures.len(), 1);
assert_eq!(state.recent_failures[0].source, "action-id:42:start_daemon",);
assert_eq!(state.recent_failures[0].reason, "process died");
}
#[test]
fn fold_gated_record_pushes_failure_with_cooldown_detail() {
let mut fold = MeshOsSnapshotFold;
let mut state = MeshOsSnapshot::default();
let r = record(
7,
"stop_daemon",
ActionDisposition::Gated {
reason: "daemon-backoff".into(),
cooldown_ms: Some(5000),
},
);
let bytes = postcard::to_allocvec(&r).unwrap();
fold.apply(&redex_event(bytes), &mut state).unwrap();
assert_eq!(state.recent_failures.len(), 1);
assert!(
state.recent_failures[0].reason.contains("daemon-backoff"),
"got reason {}",
state.recent_failures[0].reason
);
assert!(
state.recent_failures[0].reason.contains("5000"),
"cooldown ms not in reason: {}",
state.recent_failures[0].reason
);
}
#[test]
fn fold_drops_oldest_failure_at_ring_capacity() {
let mut fold = MeshOsSnapshotFold;
let mut state = MeshOsSnapshot::default();
for i in 0..(RECENT_FAILURES_CAPACITY + 5) {
let r = record(
i as u64,
"start_daemon",
ActionDisposition::Failed {
reason: format!("err {i}"),
retry_after_ms: None,
},
);
let bytes = postcard::to_allocvec(&r).unwrap();
fold.apply(&redex_event(bytes), &mut state).unwrap();
}
assert_eq!(state.recent_failures.len(), RECENT_FAILURES_CAPACITY);
assert!(
state.recent_failures[0].source.contains(":5"),
"expected oldest survivor id=5, got source {}",
state.recent_failures[0].source
);
}
#[tokio::test]
async fn end_to_end_executor_buffer_fold_reproduces_failures_on_snapshot() {
use std::sync::Arc;
use tokio::sync::mpsc;
use super::super::action::ActionId;
use super::super::config::MeshOsConfig;
use super::super::executor::{ActionExecutor, DispatchError, LoggingDispatcher};
let (tx, rx) = mpsc::channel(8);
let dispatcher = Arc::new(LoggingDispatcher::new());
dispatcher.fail_next(DispatchError::drop("test failure"));
let appender = Arc::new(BufferingActionChainAppender::new());
let exec = ActionExecutor::new(
rx,
Arc::new(MeshOsConfig::default()),
Arc::clone(&dispatcher),
)
.with_chain_appender(Arc::clone(&appender) as Arc<dyn ActionChainAppender>);
let pending = PendingAction {
id: ActionId(99),
action: MeshOsAction::CommitMaintenanceTransition {
node: 0,
target: MaintenanceTransition::Active,
},
emitted_at: Instant::now(),
};
tx.send(pending).await.unwrap();
let task = tokio::spawn(exec.run());
drop(tx);
let _ = task.await.expect("executor join");
let records = appender.records();
assert_eq!(
records.len(),
1,
"expected one chain record, got {records:?}"
);
assert_eq!(records[0].id, 99);
assert!(matches!(
records[0].disposition,
ActionDisposition::Failed { .. }
));
let mut fold = MeshOsSnapshotFold;
let mut state = MeshOsSnapshot::default();
for record in records {
let bytes = postcard::to_allocvec(&record).unwrap();
fold.apply(&redex_event(bytes), &mut state).unwrap();
}
assert_eq!(state.recent_failures.len(), 1);
assert_eq!(state.recent_failures[0].reason, "test failure");
}
#[test]
fn fold_decode_error_surfaces_as_redex_error() {
let mut fold = MeshOsSnapshotFold;
let mut state = MeshOsSnapshot::default();
let ev = redex_event(vec![0xFF, 0xFF, 0xFF]);
let err = fold.apply(&ev, &mut state).unwrap_err();
match err {
RedexError::Decode(msg) => {
assert!(msg.contains("ActionChainRecord"));
}
other => panic!("expected Decode, got {other:?}"),
}
}
}