use crate::actor::*;
use crate::util::HashableHashMap;
use std::borrow::Cow;
use std::fmt::Debug;
use std::time::Duration;
use std::ops::Range;
use std::hash::Hash;
#[derive(Clone)]
pub struct ActorWrapper<A: Actor> {
pub resend_interval: Range<Duration>,
pub wrapped_actor: A,
}
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
#[derive(serde::Serialize, serde::Deserialize)]
pub enum MsgWrapper<Msg> {
Deliver(Sequencer, Msg),
Ack(Sequencer),
}
pub type Sequencer = u64;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct StateWrapper<Msg, State> {
next_send_seq: Sequencer,
msgs_pending_ack: HashableHashMap<Sequencer, (Id, Msg)>,
last_delivered_seqs: HashableHashMap<Id, Sequencer>,
wrapped_state: State,
}
impl<A: Actor> ActorWrapper<A> {
pub fn with_default_timeout(wrapped_actor: A) -> Self {
Self {
resend_interval: Duration::from_secs(1)..Duration::from_secs(2),
wrapped_actor,
}
}
}
impl<A: Actor> Actor for ActorWrapper<A>
where A::Msg: Hash
{
type Msg = MsgWrapper<A::Msg>;
type State = StateWrapper<A::Msg, A::State>;
fn on_start(&self, id: Id, o: &mut Out<Self>) -> Self::State {
o.set_timer(self.resend_interval.clone());
let mut wrapped_out = Out::new();
let mut state = StateWrapper {
next_send_seq: 1,
msgs_pending_ack: Default::default(),
last_delivered_seqs: Default::default(),
wrapped_state: self.wrapped_actor.on_start(id, &mut wrapped_out),
};
process_output(&mut state, wrapped_out, o);
state
}
fn on_msg(&self, id: Id, state: &mut Cow<Self::State>, src: Id, msg: Self::Msg, o: &mut Out<Self>) {
match msg {
MsgWrapper::Deliver(seq, wrapped_msg) => {
o.send(src, MsgWrapper::Ack(seq));
if seq <= *state.last_delivered_seqs.get(&src).unwrap_or(&0) { return }
let mut wrapped_state = Cow::Borrowed(&state.wrapped_state);
let mut wrapped_out = Out::new();
self.wrapped_actor.on_msg(
id, &mut wrapped_state, src, wrapped_msg, &mut wrapped_out);
if is_no_op(&wrapped_state, &wrapped_out) { return }
if let Cow::Owned(wrapped_state) = wrapped_state {
*state = Cow::Owned(StateWrapper {
next_send_seq: state.next_send_seq,
msgs_pending_ack: state.msgs_pending_ack.clone(),
last_delivered_seqs: state.last_delivered_seqs.clone(),
wrapped_state,
});
}
state.to_mut().last_delivered_seqs.insert(src, seq);
process_output(state.to_mut(), wrapped_out, o);
},
MsgWrapper::Ack(seq) => {
state.to_mut().msgs_pending_ack.remove(&seq);
},
}
}
fn on_timeout(&self, _id: Id, state: &mut std::borrow::Cow<Self::State>, o: &mut Out<Self>) {
o.set_timer(self.resend_interval.clone());
for (seq, (dst, msg)) in &state.msgs_pending_ack {
o.send(*dst, MsgWrapper::Deliver(*seq, msg.clone()));
}
}
}
fn process_output<A: Actor>(state: &mut StateWrapper<A::Msg, A::State>, wrapped_out: Out<A>, o: &mut Out<ActorWrapper<A>>)
where A::Msg: Hash
{
for command in wrapped_out {
match command {
Command::CancelTimer => {
todo!("CancelTimer is not supported at this time");
},
Command::SetTimer(_) => {
todo!("SetTimer is not supported at this time");
},
Command::Send(dst, inner_msg) => {
o.send(dst, MsgWrapper::Deliver(state.next_send_seq, inner_msg.clone()));
state.msgs_pending_ack.insert(state.next_send_seq, (dst, inner_msg));
state.next_send_seq += 1;
},
}
}
}
#[cfg(test)]
mod test {
use std::borrow::Cow;
use crate::{Checker, Expectation, Model};
use crate::actor::{Actor, Id, Out};
use crate::actor::ordered_reliable_link::{ActorWrapper, MsgWrapper};
use crate::actor::{ActorModel, ActorModelAction, LossyNetwork, Network};
pub enum TestActor {
Sender { receiver_id: Id },
Receiver,
}
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub struct Received(Vec<(Id, TestMsg)>);
#[derive(Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct TestMsg(u64);
impl Actor for TestActor {
type Msg = TestMsg;
type State = Received;
fn on_start(&self, _id: Id, o: &mut Out<Self>) -> Self::State {
if let TestActor::Sender { receiver_id } = self {
o.send(*receiver_id, TestMsg(42));
o.send(*receiver_id, TestMsg(43));
}
Received(Vec::new())
}
fn on_msg(&self, _id: Id, received: &mut Cow<Self::State>, src: Id, msg: Self::Msg, _o: &mut Out<Self>) {
received.to_mut().0.push((src, msg));
}
}
fn model() -> ActorModel<ActorWrapper<TestActor>> {
ActorModel::new((), ())
.actor(
ActorWrapper::with_default_timeout(
TestActor::Sender { receiver_id: Id::from(1) }))
.actor(
ActorWrapper::with_default_timeout(
TestActor::Receiver))
.init_network(Network::new_unordered_duplicating([]))
.lossy_network(LossyNetwork::Yes)
.property(Expectation::Always, "no redelivery", |_, state| {
let received = &state.actor_states[1].wrapped_state.0;
received.iter().filter(|(_, TestMsg(v))| *v == 42).count() < 2
&& received.iter().filter(|(_, TestMsg(v))| *v == 43).count() < 2
})
.property(Expectation::Always, "ordered", |_, state| {
state.actor_states[1].wrapped_state.0.iter()
.map(|(_, TestMsg(v))| *v)
.fold((true, 0), |(acc, last), next| (acc && last <= next, next))
.0
})
.property(Expectation::Sometimes, "delivered", |_, state| {
state.actor_states[1].wrapped_state.0 == vec![
(Id::from(0), TestMsg(42)),
(Id::from(0), TestMsg(43)),
]
})
.within_boundary(|_, state| {
state.network.len() < 4
})
}
#[test]
fn messages_are_not_delivered_twice() {
model().checker().spawn_bfs().join()
.assert_no_discovery("no redelivery");
}
#[test]
fn messages_are_delivered_in_order() {
model().checker().spawn_bfs().join()
.assert_no_discovery("ordered");
}
#[test]
fn messages_are_eventually_delivered() {
let checker = model().checker().spawn_bfs().join();
checker.assert_discovery("delivered", vec![
ActorModelAction::Deliver {
src: Id(0),
dst: Id(1),
msg: MsgWrapper::Deliver(1, TestMsg(42)),
},
ActorModelAction::Deliver {
src: Id(0),
dst: Id(1),
msg: MsgWrapper::Deliver(2, TestMsg(43)),
},
]);
}
}