use std::{
fmt,
sync::Arc,
time::{Duration, Instant},
};
use tokio::sync::mpsc::{Sender, UnboundedReceiver, unbounded_channel};
use crate::{
ActorId, Envelope, Event, EventId, IntoEnvelope, Supervisor, Topic,
monitoring::MonitorHandle,
monitors::ActorMonitor,
testing::{
ActorSpy, EventChain, EventCollector, EventEntry, EventMatcher, EventQuery, EventRecords,
EventSpy, TopicSpy, expectation::Expectation,
},
};
pub struct Harness<E: Event, T: Topic<E>> {
pub(super) snapshot: Vec<EventEntry<E, T>>,
records: EventRecords<E, T>,
monitor_handle: MonitorHandle<E, T>,
pub(super) receiver: UnboundedReceiver<EventEntry<E, T>>,
actor_sender: Sender<Arc<Envelope<E>>>,
actor_monitor: ActorMonitor,
}
impl<E: Event, T: Topic<E>> fmt::Debug for Harness<E, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Harness")
.field("snapshot", &self.snapshot.len())
.field("records", &self.records.len())
.finish_non_exhaustive()
}
}
impl<E: Event, T: Topic<E>> Harness<E, T> {
pub async fn new(supervisor: &mut Supervisor<E, T>) -> Self {
let (tx, rx) = unbounded_channel();
let monitor = EventCollector::new(tx);
let monitor_handle = supervisor.monitors().add(monitor).await;
let actor_monitor = ActorMonitor::new();
supervisor.monitors().add(actor_monitor.clone()).await;
Self {
snapshot: Vec::new(),
records: Arc::new(Vec::new()),
monitor_handle,
receiver: rx,
actor_sender: supervisor.sender.clone(),
actor_monitor,
}
}
pub async fn record(&self) {
self.monitor_handle.resume().await;
}
pub async fn settle(&mut self) {
self.drain_until_quiet(Self::DEFAULT_SETTLE_WINDOW, Self::DEFAULT_MAX_SETTLE)
.await;
self.freeze().await;
}
pub fn settle_on<F>(&mut self, condition: F) -> Expectation<'_, E, T, F>
where
F: Fn(EventQuery<E, T>) -> bool,
{
Expectation::new(self, condition)
}
pub fn settle_on_event<M>(
&mut self,
matcher: M,
) -> Expectation<'_, E, T, impl Fn(EventQuery<E, T>) -> bool>
where
M: Into<EventMatcher<E, T>>,
{
let matcher = matcher.into();
self.settle_on(move |events| events.any(|entry| matcher.matches(entry)))
}
pub fn reset(&mut self) {
self.snapshot.clear();
self.records = Arc::new(Vec::new());
while let Ok(_entry) = self.receiver.try_recv() {}
}
pub const DEFAULT_SETTLE_WINDOW: Duration = Duration::from_millis(1);
pub const DEFAULT_MAX_SETTLE: Duration = Duration::from_millis(10);
pub(super) async fn drain_until_quiet(
&mut self,
settle_window: Duration,
max_settle: Duration,
) {
let deadline = Instant::now() + max_settle;
loop {
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
break;
}
let timeout = settle_window.min(remaining);
match tokio::time::timeout(timeout, self.receiver.recv()).await {
Ok(Some(entry)) => self.snapshot.push(entry),
Ok(None) => break, Err(_) => break, }
}
}
pub(super) async fn freeze(&mut self) {
self.monitor_handle.pause().await;
self.records = Arc::new(std::mem::take(&mut self.snapshot));
}
pub async fn send_as<IE: Into<IntoEnvelope<E>>>(
&self,
actor_id: &ActorId,
into_envelope: IE,
) -> crate::Result<EventId> {
let envelope = into_envelope.into().with_actor_id(actor_id.clone()).build();
let id = envelope.id();
self.actor_sender.send(envelope.into()).await?;
Ok(id)
}
pub fn events(&self) -> EventQuery<E, T> {
EventQuery::new(self.records.clone())
}
pub fn event(&self, id: EventId) -> EventSpy<E, T> {
EventSpy::new(self.records.clone(), id)
}
pub fn actor(&self, actor: &ActorId) -> ActorSpy<E, T> {
ActorSpy::new(
self.records.clone(),
actor.clone(),
self.actor_monitor.clone(),
)
}
pub fn topic(&self, topic: T) -> TopicSpy<E, T> {
TopicSpy::new(self.records.clone(), topic)
}
pub fn chain(&self, id: EventId) -> EventChain<E, T> {
EventChain::new(self.records.clone(), id)
}
pub fn dump(&self) {
if self.records.is_empty() {
println!("(no events recorded)");
return;
}
println!("Recorded events ({} deliveries):", self.records.len());
for (i, entry) in self.records.iter().enumerate() {
println!(
" {}: [{}] --> [{}] (id: {})",
i,
entry.sender(),
entry.receiver(),
entry.id(),
);
}
}
pub fn event_count(&self) -> usize {
self.records.len()
}
}