use crate::{
config::{Channel, Config},
nats::Conn,
};
use super::{ChannelSupervisor, Error};
use act_zero::*;
use async_nats::Message;
use async_trait::async_trait;
use futures::StreamExt as _;
use log::{debug, error, info};
use std::sync::Arc;
#[async_trait]
impl Actor for Pong {
async fn started(&mut self, pid: Addr<Self>) -> ActorResult<()> {
let pid_clone = pid.clone();
send!(pid_clone.listen(pid));
Produces::ok(())
}
async fn error(&mut self, error: ActorError) -> bool {
error!("Pong Actor Error: {:?}", error);
false
}
}
#[allow(dead_code)]
pub(crate) struct Pong {
config: Arc<Config>,
channel: async_nats::Subscriber,
parent: WeakAddr<ChannelSupervisor>,
}
impl Pong {
pub(crate) async fn new(
parent: WeakAddr<ChannelSupervisor>,
config: &Arc<Config>,
conn: &Conn,
) -> Self {
Self {
parent,
channel: conn
.subscribe(&Channel::Pong.channel_to_string(config))
.await
.unwrap(),
config: Arc::clone(config),
}
}
pub(crate) async fn listen(&mut self, _pid: Addr<Self>) {
info!("Listening for PONG messages");
while let Some(msg) = self.channel.next().await {
match self.handle_message(msg).await {
Ok(_) => debug!("Successfully handled PONG message"),
Err(e) => error!("Unable to handle PONG message: {}", e),
}
}
}
async fn handle_message(&self, _msg: Message) -> Result<(), Error> {
Ok(())
}
}