use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use crate::channel::mpsc::{SendError, Sender};
use crate::cx::Cx;
use crate::evidence_sink::EvidenceSink;
use franken_evidence::EvidenceLedger;
use parking_lot::Mutex;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct ActorId(u64);
impl ActorId {
#[must_use]
pub const fn new(id: u64) -> Self {
Self(id)
}
#[must_use]
pub const fn raw(self) -> u64 {
self.0
}
}
impl std::fmt::Display for ActorId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "actor-{}", self.0)
}
}
#[derive(Debug, Clone, Default)]
pub struct PartitionStats {
pub partitions_created: u64,
pub partitions_healed: u64,
pub messages_dropped: u64,
pub messages_buffered: u64,
}
impl std::fmt::Display for PartitionStats {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"PartitionStats {{ created: {}, healed: {}, dropped: {}, buffered: {} }}",
self.partitions_created,
self.partitions_healed,
self.messages_dropped,
self.messages_buffered,
)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PartitionBehavior {
Drop,
Error,
}
#[derive(Debug)]
pub struct PartitionController {
partitions: Mutex<HashSet<(u64, u64)>>,
behavior: PartitionBehavior,
partition_count: AtomicUsize,
evidence_seq: AtomicU64,
partitions_created: AtomicU64,
partitions_healed: AtomicU64,
messages_dropped: AtomicU64,
messages_buffered: AtomicU64,
evidence_sink: Arc<dyn EvidenceSink>,
}
impl PartitionController {
#[must_use]
pub fn new(behavior: PartitionBehavior, evidence_sink: Arc<dyn EvidenceSink>) -> Self {
Self {
partitions: Mutex::new(HashSet::new()),
behavior,
partition_count: AtomicUsize::new(0),
evidence_seq: AtomicU64::new(0),
partitions_created: AtomicU64::new(0),
partitions_healed: AtomicU64::new(0),
messages_dropped: AtomicU64::new(0),
messages_buffered: AtomicU64::new(0),
evidence_sink,
}
}
pub fn partition(&self, src: ActorId, dst: ActorId) {
let created = {
let mut partitions = self.partitions.lock();
let created = partitions.insert((src.0, dst.0));
if created {
self.partition_count.fetch_add(1, Ordering::Relaxed);
}
drop(partitions);
created
};
if created {
self.partitions_created.fetch_add(1, Ordering::Relaxed);
emit_partition_evidence(
&self.evidence_sink,
self.next_evidence_ts(),
"create",
src,
dst,
);
}
}
pub fn partition_symmetric(&self, a: ActorId, b: ActorId) {
self.partition(a, b);
self.partition(b, a);
}
pub fn heal(&self, src: ActorId, dst: ActorId) {
let healed = {
let mut partitions = self.partitions.lock();
let healed = partitions.remove(&(src.0, dst.0));
if healed {
self.partition_count.fetch_sub(1, Ordering::Relaxed);
}
drop(partitions);
healed
};
if healed {
self.partitions_healed.fetch_add(1, Ordering::Relaxed);
emit_partition_evidence(
&self.evidence_sink,
self.next_evidence_ts(),
"heal",
src,
dst,
);
}
}
pub fn heal_symmetric(&self, a: ActorId, b: ActorId) {
self.heal(a, b);
self.heal(b, a);
}
#[allow(clippy::cast_possible_truncation)]
pub fn heal_all(&self) {
let healed_edges: Vec<(u64, u64)> = {
let mut partitions = self.partitions.lock();
let edges = std::mem::take(&mut *partitions).into_iter().collect();
self.partition_count.store(0, Ordering::Relaxed);
drop(partitions);
edges
};
let count = healed_edges.len() as u64;
self.partitions_healed.fetch_add(count, Ordering::Relaxed);
for (src, dst) in healed_edges {
emit_partition_evidence(
&self.evidence_sink,
self.next_evidence_ts(),
"heal",
ActorId::new(src),
ActorId::new(dst),
);
}
}
#[inline]
#[must_use]
pub fn is_partitioned(&self, src: ActorId, dst: ActorId) -> bool {
if self.partition_count.load(Ordering::Relaxed) == 0 {
return false;
}
self.partitions.lock().contains(&(src.0, dst.0))
}
#[inline]
#[must_use]
pub fn active_partition_count(&self) -> usize {
self.partition_count.load(Ordering::Relaxed)
}
pub fn stats(&self) -> PartitionStats {
PartitionStats {
partitions_created: self.partitions_created.load(Ordering::Relaxed),
partitions_healed: self.partitions_healed.load(Ordering::Relaxed),
messages_dropped: self.messages_dropped.load(Ordering::Relaxed),
messages_buffered: self.messages_buffered.load(Ordering::Relaxed),
}
}
#[inline]
#[must_use]
pub fn behavior(&self) -> PartitionBehavior {
self.behavior
}
#[inline]
fn record_drop(&self) {
self.messages_dropped.fetch_add(1, Ordering::Relaxed);
}
#[inline]
fn next_evidence_ts(&self) -> u64 {
self.evidence_seq
.fetch_add(1, Ordering::Relaxed)
.saturating_add(1)
}
}
pub struct PartitionSender<T> {
inner: Sender<T>,
controller: Arc<PartitionController>,
src: ActorId,
dst: ActorId,
}
impl<T: std::fmt::Debug> std::fmt::Debug for PartitionSender<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PartitionSender")
.field("src", &self.src)
.field("dst", &self.dst)
.finish_non_exhaustive()
}
}
impl<T> PartitionSender<T> {
#[must_use]
pub fn new(
sender: Sender<T>,
controller: Arc<PartitionController>,
src: ActorId,
dst: ActorId,
) -> Self {
Self {
inner: sender,
controller,
src,
dst,
}
}
pub async fn send(&self, cx: &Cx, value: T) -> Result<(), SendError<T>> {
if cx.checkpoint().is_err() {
return Err(SendError::Cancelled(value));
}
if self.controller.is_partitioned(self.src, self.dst) {
return match self.controller.behavior() {
PartitionBehavior::Drop => {
self.controller.record_drop();
emit_partition_evidence(
&self.controller.evidence_sink,
self.controller.next_evidence_ts(),
"message_dropped",
self.src,
self.dst,
);
Ok(())
}
PartitionBehavior::Error => {
emit_partition_evidence(
&self.controller.evidence_sink,
self.controller.next_evidence_ts(),
"message_rejected",
self.src,
self.dst,
);
Err(SendError::Disconnected(value))
}
};
}
self.inner.send(cx, value).await
}
#[inline]
#[must_use]
pub fn src(&self) -> ActorId {
self.src
}
#[inline]
#[must_use]
pub fn dst(&self) -> ActorId {
self.dst
}
#[inline]
#[must_use]
pub fn inner(&self) -> &Sender<T> {
&self.inner
}
#[inline]
#[must_use]
pub fn controller(&self) -> &Arc<PartitionController> {
&self.controller
}
}
pub fn partition_channel<T>(
capacity: usize,
controller: Arc<PartitionController>,
src: ActorId,
dst: ActorId,
) -> (PartitionSender<T>, super::Receiver<T>) {
let (tx, rx) = super::mpsc::channel(capacity);
let ptx = PartitionSender::new(tx, controller, src, dst);
(ptx, rx)
}
fn emit_partition_evidence(
sink: &Arc<dyn EvidenceSink>,
ts_unix_ms: u64,
action: &str,
src: ActorId,
dst: ActorId,
) {
let action_str = format!("partition_{action}");
#[allow(clippy::cast_precision_loss)]
let entry = EvidenceLedger {
ts_unix_ms,
component: "channel_partition".to_string(),
action: action_str.clone(),
posterior: vec![1.0],
expected_loss_by_action: std::collections::BTreeMap::from([(action_str, 0.0)]),
chosen_expected_loss: 0.0,
calibration_score: 1.0,
fallback_active: false,
top_features: vec![
("src_actor".to_string(), src.0 as f64),
("dst_actor".to_string(), dst.0 as f64),
],
};
sink.emit(&entry);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::evidence_sink::CollectorSink;
use crate::types::Budget;
use crate::util::ArenaIndex;
use crate::{RegionId, TaskId};
use proptest::prelude::*;
use std::future::Future;
use std::sync::Arc;
use std::task::{Context, Poll};
fn test_cx() -> crate::cx::Cx {
crate::cx::Cx::new(
RegionId::from_arena(ArenaIndex::new(0, 0)),
TaskId::from_arena(ArenaIndex::new(0, 0)),
Budget::INFINITE,
)
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = std::task::Waker::noop().clone();
let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
fn make_controller(
behavior: PartitionBehavior,
) -> (Arc<PartitionController>, Arc<CollectorSink>) {
let collector = Arc::new(CollectorSink::new());
let sink: Arc<dyn EvidenceSink> = collector.clone();
let ctrl = Arc::new(PartitionController::new(behavior, sink));
(ctrl, collector)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RouteOutcome {
Delivered(u32),
Dropped,
Disconnected(u32),
}
fn route_outcome(
sender: &PartitionSender<u32>,
receiver: &mut crate::channel::mpsc::Receiver<u32>,
cx: &crate::cx::Cx,
value: u32,
) -> RouteOutcome {
match block_on(sender.send(cx, value)) {
Ok(()) => match receiver.try_recv() {
Ok(received) => RouteOutcome::Delivered(received),
Err(_) => RouteOutcome::Dropped,
},
Err(SendError::Disconnected(returned)) => RouteOutcome::Disconnected(returned),
Err(SendError::Full(returned)) => {
panic!("unexpected full channel while routing value {returned}")
}
Err(SendError::Cancelled(returned)) => {
panic!("unexpected cancellation while routing value {returned}")
}
}
}
#[test]
fn no_partition_passthrough() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, mut rx) = partition_channel::<u32>(16, ctrl, a, b);
let cx = test_cx();
for i in 0..5 {
block_on(ptx.send(&cx, i)).expect("send");
}
for i in 0..5 {
assert_eq!(rx.try_recv().unwrap(), i);
}
}
#[test]
fn partition_drops_messages() {
let (ctrl, collector) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, mut rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
ctrl.partition(a, b);
assert!(ctrl.is_partitioned(a, b));
for i in 0..5 {
block_on(ptx.send(&cx, i)).expect("send should succeed (drop mode)");
}
assert!(rx.try_recv().is_err(), "no messages should be delivered");
let stats = ctrl.stats();
assert_eq!(stats.partitions_created, 1);
assert_eq!(stats.messages_dropped, 5);
let entries = collector.entries();
let drop_entries = entries
.iter()
.filter(|e| e.action == "partition_message_dropped")
.count();
assert_eq!(drop_entries, 5);
assert!(
!entries
.iter()
.any(|e| e.action.starts_with("partition_partition_"))
);
}
#[test]
fn partition_error_mode() {
let (ctrl, collector) = make_controller(PartitionBehavior::Error);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, _rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
ctrl.partition(a, b);
let result = block_on(ptx.send(&cx, 42));
assert!(
matches!(result, Err(SendError::Disconnected(42))),
"expected Disconnected, got: {result:?}"
);
let stats = ctrl.stats();
assert_eq!(stats.messages_dropped, 0);
let entries = collector.entries();
assert!(
entries
.iter()
.any(|e| e.action == "partition_message_rejected")
);
assert!(
!entries
.iter()
.any(|e| e.action == "partition_message_dropped")
);
}
#[test]
fn partition_drop_mode_respects_cancellation() {
let (ctrl, _collector) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, mut rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
cx.set_cancel_requested(true);
ctrl.partition(a, b);
let result = block_on(ptx.send(&cx, 7));
assert!(matches!(result, Err(SendError::Cancelled(7))));
assert!(rx.try_recv().is_err());
assert_eq!(ctrl.stats().messages_dropped, 0);
}
#[test]
fn partition_error_mode_respects_cancellation_precedence() {
let (ctrl, _collector) = make_controller(PartitionBehavior::Error);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, _rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
cx.set_cancel_requested(true);
ctrl.partition(a, b);
let result = block_on(ptx.send(&cx, 11));
assert!(matches!(result, Err(SendError::Cancelled(11))));
}
#[test]
fn heal_restores_delivery() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, mut rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
ctrl.partition(a, b);
block_on(ptx.send(&cx, 1)).unwrap(); assert!(rx.try_recv().is_err());
ctrl.heal(a, b);
assert!(!ctrl.is_partitioned(a, b));
block_on(ptx.send(&cx, 2)).unwrap(); assert_eq!(rx.try_recv().unwrap(), 2);
let stats = ctrl.stats();
assert_eq!(stats.partitions_healed, 1);
}
#[test]
fn asymmetric_partition() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx_ab, mut rx_b) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let (ptx_ba, mut rx_a) = partition_channel::<u32>(16, ctrl.clone(), b, a);
let cx = test_cx();
ctrl.partition(a, b);
block_on(ptx_ab.send(&cx, 1)).unwrap();
assert!(rx_b.try_recv().is_err());
block_on(ptx_ba.send(&cx, 2)).unwrap();
assert_eq!(rx_a.try_recv().unwrap(), 2);
}
#[test]
fn symmetric_partition() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx_ab, mut rx_b) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let (ptx_ba, mut rx_a) = partition_channel::<u32>(16, ctrl.clone(), b, a);
let cx = test_cx();
ctrl.partition_symmetric(a, b);
block_on(ptx_ab.send(&cx, 1)).unwrap();
block_on(ptx_ba.send(&cx, 2)).unwrap();
assert!(rx_b.try_recv().is_err());
assert!(rx_a.try_recv().is_err());
ctrl.heal_symmetric(a, b);
block_on(ptx_ab.send(&cx, 3)).unwrap();
block_on(ptx_ba.send(&cx, 4)).unwrap();
assert_eq!(rx_b.try_recv().unwrap(), 3);
assert_eq!(rx_a.try_recv().unwrap(), 4);
}
#[test]
fn cascading_partitions() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let c = ActorId::new(3);
let (tx_a2b, mut rx_b) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let (tx_b2c, mut rx_c) = partition_channel::<u32>(16, ctrl.clone(), b, c);
let (tx_a2c, mut rx_c2) = partition_channel::<u32>(16, ctrl.clone(), a, c);
let cx = test_cx();
ctrl.partition(a, b);
ctrl.partition(b, c);
block_on(tx_a2b.send(&cx, 1)).unwrap();
block_on(tx_a2c.send(&cx, 2)).unwrap();
block_on(tx_b2c.send(&cx, 3)).unwrap();
assert!(rx_b.try_recv().is_err());
assert!(rx_c.try_recv().is_err());
assert_eq!(rx_c2.try_recv().unwrap(), 2);
}
#[test]
fn heal_all_clears_all_partitions() {
let (ctrl, collector) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let c = ActorId::new(3);
ctrl.partition_symmetric(a, b);
ctrl.partition_symmetric(b, c);
assert_eq!(ctrl.active_partition_count(), 4);
ctrl.heal_all();
assert_eq!(ctrl.active_partition_count(), 0);
assert_eq!(ctrl.stats().partitions_healed, 4);
let entries = collector.entries();
let heal_entries = entries
.iter()
.filter(|e| e.action == "partition_heal")
.count();
assert_eq!(heal_entries, 4);
}
#[test]
fn evidence_for_partition_lifecycle() {
let (ctrl, collector) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
ctrl.partition(a, b);
ctrl.heal(a, b);
let entries = collector.entries();
assert!(
entries.iter().any(|e| e.action == "partition_create"),
"should log partition creation"
);
assert!(
entries.iter().any(|e| e.action == "partition_heal"),
"should log partition heal"
);
assert!(
!entries
.iter()
.any(|e| e.action.starts_with("partition_partition_"))
);
for entry in &entries {
assert_eq!(entry.component, "channel_partition");
assert!(entry.is_valid());
}
}
#[test]
fn evidence_timestamps_follow_deterministic_event_sequence() {
let (ctrl, collector) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
let (ptx, _rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
ctrl.partition(a, b);
block_on(ptx.send(&cx, 1)).expect("send");
ctrl.heal(a, b);
let timestamps: Vec<u64> = collector
.entries()
.iter()
.map(|entry| entry.ts_unix_ms)
.collect();
assert_eq!(timestamps, vec![1, 2, 3]);
}
#[test]
fn idempotent_partition_and_heal() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let a = ActorId::new(1);
let b = ActorId::new(2);
ctrl.partition(a, b);
ctrl.partition(a, b);
assert_eq!(ctrl.stats().partitions_created, 1);
ctrl.heal(a, b);
ctrl.heal(a, b);
assert_eq!(ctrl.stats().partitions_healed, 1);
}
#[test]
fn actor_id_debug_clone_copy_ord_hash() {
use std::collections::HashSet;
let a = ActorId::new(1);
let b = ActorId::new(2);
let dbg = format!("{a:?}");
assert!(dbg.contains("ActorId"));
let cloned = a;
let copied = a; assert_eq!(cloned, a);
assert_eq!(copied, a);
assert_eq!(a, a);
assert_ne!(a, b);
assert!(a < b);
let mut set = HashSet::new();
set.insert(a);
set.insert(b);
set.insert(a);
assert_eq!(set.len(), 2);
}
#[test]
fn actor_id_raw_accessor() {
let a = ActorId::new(42);
assert_eq!(a.raw(), 42);
}
#[test]
fn partition_stats_debug_clone_default_display() {
let stats = PartitionStats::default();
assert_eq!(stats.partitions_created, 0);
assert_eq!(stats.partitions_healed, 0);
let dbg = format!("{stats:?}");
assert!(dbg.contains("PartitionStats"));
let display = format!("{stats}");
assert!(display.contains("created: 0"));
let cloned = stats;
assert_eq!(cloned.partitions_created, 0);
}
#[test]
fn partition_behavior_debug_clone_copy_eq() {
let drop_b = PartitionBehavior::Drop;
let err_b = PartitionBehavior::Error;
let dbg = format!("{drop_b:?}");
assert!(dbg.contains("Drop"));
let cloned = drop_b;
let copied = drop_b; assert_eq!(cloned, PartitionBehavior::Drop);
assert_eq!(copied, PartitionBehavior::Drop);
assert_eq!(drop_b, PartitionBehavior::Drop);
assert_ne!(drop_b, err_b);
}
#[test]
fn partition_controller_debug() {
let (ctrl, _) = make_controller(PartitionBehavior::Drop);
let dbg = format!("{ctrl:?}");
assert!(dbg.contains("PartitionController"));
}
#[test]
fn partition_controller_behavior_accessor() {
let (ctrl, _) = make_controller(PartitionBehavior::Error);
assert_eq!(ctrl.behavior(), PartitionBehavior::Error);
}
#[test]
fn actor_id_display() {
let id = ActorId::new(42);
assert_eq!(format!("{id}"), "actor-42");
}
proptest! {
#[test]
fn metamorphic_unrelated_partition_churn_preserves_route_outcome(
a_id in 1u64..1000,
b_id in 1001u64..2000,
c_id in 2001u64..3000,
d_id in 3001u64..4000,
baseline_value in any::<u32>(),
churn_value in any::<u32>(),
blocked_value in any::<u32>(),
blocked_churn_value in any::<u32>(),
drop_mode in any::<bool>(),
) {
let behavior = if drop_mode {
PartitionBehavior::Drop
} else {
PartitionBehavior::Error
};
let (ctrl, _) = make_controller(behavior);
let a = ActorId::new(a_id);
let b = ActorId::new(b_id);
let c = ActorId::new(c_id);
let d = ActorId::new(d_id);
let (ptx, mut rx) = partition_channel::<u32>(16, ctrl.clone(), a, b);
let cx = test_cx();
let baseline = route_outcome(&ptx, &mut rx, &cx, baseline_value);
prop_assert_eq!(
baseline,
RouteOutcome::Delivered(baseline_value),
"unpartitioned route should deliver"
);
ctrl.partition(c, d);
ctrl.partition(d, c);
ctrl.heal(c, d);
prop_assert!(
!ctrl.is_partitioned(a, b),
"unrelated edge churn must not partition the target route"
);
let after_unrelated_churn = route_outcome(&ptx, &mut rx, &cx, churn_value);
prop_assert_eq!(
after_unrelated_churn,
RouteOutcome::Delivered(churn_value),
"disjoint partition churn must not change delivery on the target route"
);
ctrl.partition(a, b);
let blocked = route_outcome(&ptx, &mut rx, &cx, blocked_value);
match behavior {
PartitionBehavior::Drop => prop_assert_eq!(blocked, RouteOutcome::Dropped),
PartitionBehavior::Error => {
prop_assert_eq!(blocked, RouteOutcome::Disconnected(blocked_value));
}
}
ctrl.partition(c, d);
ctrl.heal(d, c);
ctrl.partition(d, c);
prop_assert!(
ctrl.is_partitioned(a, b),
"unrelated churn must not heal an existing target partition"
);
let blocked_after_unrelated_churn =
route_outcome(&ptx, &mut rx, &cx, blocked_churn_value);
match behavior {
PartitionBehavior::Drop => {
prop_assert_eq!(
blocked_after_unrelated_churn,
RouteOutcome::Dropped,
"disjoint churn must preserve dropped routing behavior"
);
}
PartitionBehavior::Error => {
prop_assert_eq!(
blocked_after_unrelated_churn,
RouteOutcome::Disconnected(blocked_churn_value),
"disjoint churn must preserve rejected routing behavior"
);
}
}
}
}
}