use crate::close::AsyncClose;
use std::ops::Deref;
use std::sync::Arc;
use tokio::task::JoinHandle;
#[derive(Debug, Clone)]
pub struct ActorHandle {
join_handle: Arc<JoinHandle<()>>,
}
impl ActorHandle {
fn new(join_handle: JoinHandle<()>) -> Self {
Self {
join_handle: Arc::new(join_handle),
}
}
async fn wait_for_completion(mut self) {
if let Some(join_handle) = Arc::get_mut(&mut self.join_handle) {
join_handle.await.unwrap();
}
}
}
#[derive(Debug, Clone)]
pub struct Outbox<S> {
sender: S,
handle: ActorHandle,
}
impl<S> Outbox<S> {
pub fn new(sender: S, join_handle: JoinHandle<()>) -> Self {
Self {
sender,
handle: ActorHandle::new(join_handle),
}
}
}
impl<S: AsyncClose> AsyncClose for Outbox<S> {
async fn close(self) {
self.sender.close().await;
self.handle.wait_for_completion().await;
}
}
impl<S> Deref for Outbox<S> {
type Target = S;
fn deref(&self) -> &Self::Target {
&self.sender
}
}
pub type BoundedOutbox<T> = Outbox<tokio::sync::mpsc::Sender<T>>;
pub type UnboundedOutbox<T> = Outbox<tokio::sync::mpsc::UnboundedSender<T>>;
#[cfg(test)]
mod tests {
use super::*;
use crate::Actor;
use futures::{Stream, StreamExt};
use tokio::sync::{mpsc, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
struct LifecycleActor {
start_tx: oneshot::Sender<()>,
exit_tx: oneshot::Sender<()>,
}
impl Actor for LifecycleActor {
type Message = u32;
async fn run(self, inbox: impl Stream<Item = Self::Message> + Send) {
self.start_tx.send(()).unwrap();
let _ = inbox.for_each(|_| async {}).await;
self.exit_tx.send(()).unwrap_or(());
}
}
fn launch_controlled_actor() -> (
Outbox<mpsc::UnboundedSender<u32>>,
oneshot::Receiver<()>,
oneshot::Receiver<()>,
) {
let (sender, receiver) = mpsc::unbounded_channel();
let (start_tx, start_rx) = oneshot::channel();
let (exit_tx, exit_rx) = oneshot::channel();
let actor = LifecycleActor { start_tx, exit_tx };
let join_handle = tokio::spawn(actor.run(UnboundedReceiverStream::new(receiver)));
let outbox = Outbox::new(sender, join_handle);
(outbox, start_rx, exit_rx)
}
#[tokio::test]
async fn test_outbox_deref_to_sender() {
let (outbox, start_rx, _) = launch_controlled_actor();
start_rx.await.unwrap();
outbox.send(42).unwrap();
outbox.close().await;
}
#[tokio::test]
async fn test_single_outbox_waits_for_completion() {
let (outbox, start_rx, exit_rx) = launch_controlled_actor();
start_rx.await.unwrap();
outbox.close().await;
assert!(exit_rx.await.is_ok());
}
#[tokio::test]
async fn test_cloned_outbox_only_last_one_waits() {
let (outbox, start_rx, mut exit_rx) = launch_controlled_actor();
start_rx.await.unwrap();
let clone1 = outbox.clone();
let clone2 = outbox;
clone1.close().await;
assert!(exit_rx.try_recv().is_err());
clone2.close().await;
assert!(
exit_rx.await.is_ok(),
"Second close should not wait indefinitely if Actor is already finished."
);
}
#[tokio::test]
async fn test_outbox_release_closes_channel() {
let (sender, mut receiver) = mpsc::unbounded_channel::<i32>();
let handle = tokio::spawn(async {});
let outbox = Outbox::new(sender, handle);
outbox.send(1).unwrap();
outbox.close().await;
assert_eq!(receiver.recv().await, Some(1));
assert_eq!(receiver.recv().await, None);
}
}