use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use boardwalk::runtime::{
Actor, ActorHandle, DynFuture, Resource, ResourceCtx, ResourceError, TransitionCtx,
TransitionError,
};
use boardwalk::{ResourceSnapshot, ResourceSpec, TransitionInput, TransitionOutcome};
struct Slow {
delay: Duration,
enters: Arc<AtomicU64>,
exits: Arc<AtomicU64>,
}
impl Resource for Slow {
fn spec(&self) -> ResourceSpec {
ResourceSpec {
kind: "slow".into(),
name: None,
labels: BTreeMap::new(),
property_schema: None,
streams: vec![],
}
}
fn snapshot<'a>(
&'a self,
_ctx: ResourceCtx,
) -> DynFuture<'a, Result<ResourceSnapshot, ResourceError>> {
Box::pin(
async move { Err::<ResourceSnapshot, _>(ResourceError::Unavailable("stub".into())) },
)
}
}
impl Actor for Slow {
fn transition<'a>(
&'a mut self,
_ctx: TransitionCtx,
_name: &'a str,
_input: TransitionInput,
) -> DynFuture<'a, Result<TransitionOutcome, TransitionError>> {
let enters = self.enters.clone();
let exits = self.exits.clone();
let delay = self.delay;
Box::pin(async move {
enters.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(delay).await;
exits.fetch_add(1, Ordering::SeqCst);
Err::<TransitionOutcome, _>(TransitionError::NotAllowed("stub".into()))
})
}
}
fn slow_actor(delay_ms: u64) -> (Slow, Arc<AtomicU64>, Arc<AtomicU64>) {
let enters = Arc::new(AtomicU64::new(0));
let exits = Arc::new(AtomicU64::new(0));
(
Slow {
delay: Duration::from_millis(delay_ms),
enters: enters.clone(),
exits: exits.clone(),
},
enters,
exits,
)
}
#[tokio::test]
async fn actor_transitions_are_serialized_per_actor() {
let (actor, enters, _exits) = slow_actor(80);
let handle = ActorHandle::spawn(actor, 4);
let h1 = handle.clone();
let h2 = handle.clone();
let a = tokio::spawn(async move {
let _ = h1.transition("a", TransitionInput::default()).await;
});
let b = tokio::spawn(async move {
let _ = h2.transition("b", TransitionInput::default()).await;
});
let start = Instant::now();
let _ = tokio::join!(a, b);
let elapsed = start.elapsed();
assert_eq!(enters.load(Ordering::SeqCst), 2);
assert!(
elapsed >= Duration::from_millis(150),
"two serialized 80ms transitions must take >= 150ms; took {elapsed:?}"
);
}
#[tokio::test]
async fn actor_command_queue_is_bounded() {
let (actor, _enters, _exits) = slow_actor(200);
let handle = ActorHandle::spawn(actor, 1);
let h1 = handle.clone();
let h2 = handle.clone();
tokio::spawn(async move {
let _ = h1.transition("a", TransitionInput::default()).await;
});
tokio::spawn(async move {
let _ = h2.transition("b", TransitionInput::default()).await;
});
tokio::time::sleep(Duration::from_millis(40)).await;
let result = handle.try_transition("c", TransitionInput::default());
match result {
Err(TransitionError::Busy | TransitionError::BackpressureRequired) => {}
Err(other) => panic!("expected Busy/BackpressureRequired, got {other:?}"),
Ok(_) => panic!("expected try_transition to reject when full"),
}
}
#[tokio::test]
async fn different_actors_run_independently() {
let (a_actor, _ae, _ax) = slow_actor(200);
let (b_actor, _be, _bx) = slow_actor(0);
let h_a = ActorHandle::spawn(a_actor, 4);
let h_b = ActorHandle::spawn(b_actor, 4);
let a_run = tokio::spawn(async move {
let _ = h_a.transition("slow", TransitionInput::default()).await;
});
let start = Instant::now();
let _ = h_b.transition("fast", TransitionInput::default()).await;
let elapsed = start.elapsed();
assert!(
elapsed < Duration::from_millis(100),
"fast actor should not wait on slow actor; took {elapsed:?}"
);
let _ = a_run.await;
}