use std::fmt::Display;
use tokio::sync::{mpsc, oneshot};
use crate::messaging::{Message, MsgError, MsgResult, Reply};
use super::Actor;
pub async fn spawn<I, O, E>(
mut actor: impl Actor<I, O, E> + Send + 'static,
) -> Result<LocalHandle<I, O, E>, Error>
where
I: Send + 'static,
O: Send + 'static,
E: Send + 'static,
{
let (sender, mut receiver) =
mpsc::channel::<(Message<I>, oneshot::Sender<MsgResult<O, E>>)>(1024);
let (conf_sender, conf_receiver) = oneshot::channel::<Result<(), Error>>();
tokio::spawn(async move {
let _ = conf_sender.send(Ok(()));
while let Some((msg, sender)) = receiver.recv().await {
match msg {
Message::Task(input) => {
let result = match actor.handler(input).await {
Ok(res) => Ok(Reply::Task(res)),
Err(err) => Err(MsgError::Task(err)),
};
try_send_reply(sender, result);
}
Message::TaskMut(input) => {
let result = match actor.handler_mut(input).await {
Ok(Some(res)) => Ok(Reply::Task(res)),
Ok(None) => Ok(Reply::Accepted),
Err(err) => Err(MsgError::Task(err)),
};
try_send_reply(sender, result);
}
Message::Ping => {
try_send_reply(sender, Ok(Reply::Accepted));
}
Message::Stop => {
try_send_reply(sender, Ok(Reply::Accepted));
break;
}
}
}
});
conf_receiver
.await
.map_err(|e| Error::Spawn(e.to_string()))??;
Ok(LocalHandle { sender })
}
fn try_send_reply<O, E>(sender: oneshot::Sender<MsgResult<O, E>>, reply: MsgResult<O, E>) {
if sender.send(reply).is_err() {
tracing::error!("local: failed to send reply");
}
}
#[derive(Debug, Clone)]
#[allow(clippy::type_complexity)]
pub struct LocalHandle<I, O, E> {
sender: mpsc::Sender<(Message<I>, oneshot::Sender<MsgResult<O, E>>)>,
}
impl<I, O, E> LocalHandle<I, O, E> {
pub async fn send(&self, msg: Message<I>) -> MsgResult<O, E> {
let (sender, receiver) = oneshot::channel();
self.sender
.send((msg, sender))
.await
.map_err(|e| MsgError::Send(e.to_string()))?;
receiver.await.map_err(|e| MsgError::Recv(e.to_string()))?
}
pub fn blocking_send(&self, msg: Message<I>) -> MsgResult<O, E> {
let (sender, receiver) = oneshot::channel();
self.sender
.blocking_send((msg, sender))
.map_err(|e| MsgError::Send(e.to_string()))?;
receiver
.blocking_recv()
.map_err(|e| MsgError::Recv(e.to_string()))?
}
}
#[allow(missing_docs)]
#[derive(Debug)]
pub enum Error {
Spawn(String),
}
impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Spawn(ctx) => write!(f, "failed to spawn actor: {ctx}"),
}
}
}
impl std::error::Error for Error {}
#[cfg(test)]
mod tests {
use std::time::Duration;
use tokio::{runtime::Runtime, sync::oneshot};
use crate::{
actors::tests::Mult,
messaging::{Message, Reply},
};
#[tokio::test]
async fn spawning_and_messaging() {
let mult = Mult { a: 2 };
let handle = super::spawn(mult).await.unwrap();
let reply = handle.send(Message::Task(15)).await.unwrap();
assert!(matches!(reply, Reply::Task(30)));
}
#[tokio::test]
async fn ping() {
let mult = Mult { a: 2 };
let handle = super::spawn(mult).await.unwrap();
let reply = handle.send(Message::Ping).await.unwrap();
assert!(matches!(reply, Reply::Accepted));
}
#[tokio::test]
async fn stop() {
let mult = Mult { a: 2 };
let handle = super::spawn(mult).await.unwrap();
let reply = handle.send(Message::Stop).await.unwrap();
assert!(matches!(reply, Reply::Accepted));
let _ = tokio::time::sleep(Duration::from_millis(10)).await;
handle.send(Message::Ping).await.unwrap_err();
}
#[test]
fn blocking_send() {
let rt = Runtime::new().unwrap();
let (handler_sender, handler_receiver) = oneshot::channel();
let (compl_sender, compl_receiver) = oneshot::channel();
std::thread::spawn(move || {
rt.block_on(async move {
let mult = Mult { a: 2 };
let handle = super::spawn(mult).await.unwrap();
handler_sender.send(handle).unwrap();
let _ = compl_receiver.await;
});
});
let handle = handler_receiver.blocking_recv().unwrap();
let reply = handle.blocking_send(Message::Task(15)).unwrap();
assert!(matches!(reply, Reply::Task(30)));
compl_sender.send(()).unwrap();
}
}