use anyhow::Result;
use flarch::{
broker::{Broker, SubsystemHandler},
platform_async_trait,
tasks::spawn_local,
};
use tokio::sync::watch;
#[derive(Debug, Clone, PartialEq)]
pub enum SimpleIn {
Count,
}
#[derive(Debug, Clone, PartialEq)]
pub enum SimpleOut {
Sum(usize),
}
pub type SimpleBroker = Broker<SimpleIn, SimpleOut>;
pub struct Simple {
counter: usize,
}
pub struct SimpleState {
pub state: watch::Receiver<usize>,
pub broker: SimpleBroker,
}
impl Simple {
pub async fn start() -> Result<SimpleState> {
let (tx, state) = watch::channel(0);
let mut state = SimpleState {
state,
broker: Self::broker().await?,
};
let mut tap = state.broker.get_tap_out().await?.0;
spawn_local(async move {
while let Some(msg) = tap.recv().await {
match msg {
SimpleOut::Sum(sum) => tx.send(sum).expect("Updating sum"),
}
}
});
Ok(state)
}
pub async fn broker() -> Result<SimpleBroker> {
let broker = Broker::new_with_handler(Box::new(Simple { counter: 0 })).await?;
Ok(broker.0)
}
fn process_msg(&mut self, msg: SimpleIn) -> Option<SimpleOut> {
match msg {
SimpleIn::Count => {
self.counter += 1;
(self.counter % 2 == 0).then(|| SimpleOut::Sum(self.counter))
}
}
}
}
#[platform_async_trait()]
impl SubsystemHandler<SimpleIn, SimpleOut> for Simple {
async fn messages(&mut self, msgs: Vec<SimpleIn>) -> Vec<SimpleOut> {
msgs.into_iter()
.flat_map(|msg| self.process_msg(msg))
.collect()
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_update() -> Result<()> {
let mut broker = Simple::broker().await?;
let mut tap = broker.get_tap_out().await?;
broker.emit_msg_in(SimpleIn::Count)?;
broker.emit_msg_in(SimpleIn::Count)?;
assert_eq!(Some(SimpleOut::Sum(2)), tap.0.recv().await);
Ok(())
}
#[tokio::test]
async fn test_state() -> Result<()> {
let mut state = Simple::start().await?;
let mut tap = state.broker.get_tap_out().await?;
state.broker.emit_msg_in(SimpleIn::Count)?;
assert_eq!(0, *state.state.borrow());
state.broker.emit_msg_in(SimpleIn::Count)?;
tap.0.recv().await;
assert_eq!(2, *state.state.borrow());
Ok(())
}
}