use crate::bus::{BusError, BusEvent};
use crate::worker::identity::{IdentityCommon, IdentityOfRx, IdentityOfTx};
use crate::worker::WorkerId;
use crate::Event;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::{interval_at, Instant, Interval};
pub enum IdentitySignal {
Event(BusEvent),
Tick,
}
pub struct IdentityOfInterval {
pub(crate) id: IdentityOfRx,
pub(crate) interval: Interval,
}
impl IdentityOfInterval {
pub fn new(id: IdentityOfRx, duration: Duration) -> Self {
Self {
id,
interval: interval_at(Instant::now() + duration, duration),
}
}
pub fn tx(&self) -> IdentityOfTx {
self.id.tx()
}
pub fn worker_id(&self) -> WorkerId {
self.id.id.clone()
}
pub fn interval_mut(&mut self) -> &mut Interval {
&mut self.interval
}
pub async fn recv_signal(&mut self) -> Result<IdentitySignal, BusError> {
tokio::select! {
event = self.id.rx_event.recv() => {
event
.map(IdentitySignal::Event)
.ok_or_else(|| BusError::channel_closed("worker_recv_signal", Some(self.id.id.name())))
}
_ = self.interval.tick() => Ok(IdentitySignal::Tick),
}
}
pub async fn recv<T: Event>(&mut self) -> Result<Arc<T>, BusError> {
self.id.recv::<T>().await
}
pub fn try_recv<T: Event>(&mut self) -> Result<Option<Arc<T>>, BusError> {
self.id.try_recv::<T>()
}
pub async fn subscribe<T: Event + 'static>(&self) -> Result<(), BusError> {
self.id.subscribe::<T>().await
}
pub async fn subscribe_with_key<T: Event + 'static>(
&self,
key: impl Into<String>,
) -> Result<(), BusError> {
self.id.subscribe_with_key::<T>(key).await
}
pub async fn dispatch_event<T: Event>(&self, event: T) -> Result<(), BusError> {
self.id.dispatch_event(event).await
}
pub async fn dispatch_with_key<T: Event>(
&self,
key: impl Into<String>,
event: T,
) -> Result<(), BusError> {
self.id.dispatch_with_key(key, event).await
}
pub fn into_inner(self) -> IdentityOfRx {
self.id
}
}
impl From<(IdentityCommon, Duration)> for IdentityOfInterval {
fn from(value: (IdentityCommon, Duration)) -> Self {
let (common, duration) = value;
let id: IdentityOfRx = common.into();
Self::new(id, duration)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::BusData;
use tokio::sync::mpsc;
#[derive(Debug)]
struct Ping;
impl Event for Ping {
fn name() -> &'static str {
"Ping"
}
}
#[tokio::test]
async fn recv_signal_can_receive_event() {
let (tx_data, _rx_data) = mpsc::unbounded_channel::<BusData>();
let (tx_event, rx_event) = mpsc::channel::<BusEvent>(1);
let id = WorkerId::init("interval-worker-event".to_string());
let identity = IdentityOfRx {
id,
rx_event,
tx_data,
};
let mut interval_id = IdentityOfInterval::new(identity, Duration::from_secs(5));
tx_event.send(BusEvent::new(Ping)).await.unwrap();
let signal = interval_id.recv_signal().await.unwrap();
match signal {
IdentitySignal::Event(event) => assert_eq!(event.type_name(), "Ping"),
IdentitySignal::Tick => panic!("expected event"),
}
}
#[tokio::test]
async fn recv_signal_can_receive_tick() {
let (tx_data, _rx_data) = mpsc::unbounded_channel::<BusData>();
let (_tx_event, rx_event) = mpsc::channel::<BusEvent>(1);
let id = WorkerId::init("interval-worker-tick".to_string());
let identity = IdentityOfRx {
id,
rx_event,
tx_data,
};
let mut interval_id = IdentityOfInterval::new(identity, Duration::from_millis(10));
let signal = interval_id.recv_signal().await.unwrap();
match signal {
IdentitySignal::Tick => {}
IdentitySignal::Event(_) => panic!("expected tick"),
}
}
}