use crate::{
error::{JobError, WorkerError},
job::{Job, JobRequestWrapper, JobStreamResult},
request::JobRequest,
response::JobResult,
worker::envelope::{Envelope, EnvelopeProxy, SyncEnvelopeProxy, ToEnvelope},
};
use futures::{Future, Stream, StreamExt};
use std::{
fmt::Debug,
ops::{Deref, DerefMut},
time::Duration,
};
use tokio::{
sync::{
mpsc::{self, Receiver},
oneshot::{self, error::RecvError},
},
task::{self, JoinHandle},
time,
};
use tower::{Service, ServiceExt};
use tracing::Instrument;
use self::monitor::WorkerManagement;
#[cfg(feature = "broker")]
use {deadlock::WorkerId, std::any::type_name};
#[cfg(feature = "broker")]
pub mod broker;
#[cfg(feature = "broker")]
mod deadlock;
mod envelope;
mod monitor;
pub mod prelude {
pub use super::{
monitor::{Monitor, WorkerEvent, WorkerListener, WorkerMessage},
Actor, Addr, AlwaysAddr, Context, ContextHandler, Handler, Message, Recipient, Worker,
WorkerStatus,
};
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub enum WorkerStatus {
Ok,
}
#[async_trait::async_trait]
pub trait Worker: Send + Sized + 'static {
type Job: Job;
type Service: Service<
JobRequest<Self::Job>,
Response = JobResult,
Error = JobError,
Future = Self::Future,
> + Send;
type Future: Future<Output = Result<JobResult, JobError>> + Send;
async fn on_start(&mut self, _ctx: &mut Context<Self>) {}
async fn on_stop(&mut self, _ctx: &mut Context<Self>) {}
fn consume(&mut self) -> JobStreamResult<Self::Job>;
fn service(&mut self) -> &mut Self::Service;
async fn handle_job(&mut self, job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let handle = self.service().ready().await?;
handle.call(job).await
}
async fn manage(&mut self, _msg: WorkerManagement) -> Result<WorkerStatus, WorkerError> {
Ok(WorkerStatus::Ok)
}
}
#[derive(Debug)]
pub struct Addr<A: Actor> {
sender: mpsc::Sender<Envelope<A>>,
#[cfg(feature = "broker")]
actor_id: WorkerId,
}
impl<A: Actor> Clone for Addr<A> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
#[cfg(feature = "broker")]
actor_id: self.actor_id,
}
}
}
impl<W: Actor> Addr<W> {
fn new(sender: mpsc::Sender<Envelope<W>>) -> Self {
Self {
sender,
#[cfg(feature = "broker")]
actor_id: WorkerId::new(Some(type_name::<W>())),
}
}
#[allow(unused_variables, clippy::expect_used, clippy::let_and_return)]
pub async fn send<M>(&self, message: M) -> Result<M::Result, RecvError>
where
M: Message + Send + 'static,
M::Result: Send,
W: ContextHandler<M>,
{
let (sender, reciever) = oneshot::channel();
let envelope = SyncEnvelopeProxy::pack(message, Some(sender));
#[cfg(feature = "broker")]
let from_actor_id_option = deadlock::task_local_actor_id();
#[cfg(feature = "broker")]
if let Some(from_actor_id) = from_actor_id_option {
deadlock::r#in(self.actor_id, from_actor_id).await;
}
#[allow(clippy::todo)]
if self.sender.send(envelope).await.is_err() {
panic!("Queue is full. Handle this case");
}
let result = reciever.await;
#[cfg(feature = "broker")]
if let Some(from_actor_id) = from_actor_id_option {
deadlock::out(self.actor_id, from_actor_id).await;
}
result
}
#[allow(clippy::result_unit_err)]
pub async fn do_send<M>(&self, message: M) -> Option<M::Result>
where
M: Message + Send + 'static,
M::Result: Send,
W: ContextHandler<M>,
{
let (tx, rx) = oneshot::channel();
let envelope = SyncEnvelopeProxy::pack(message, Some(tx));
let _error = self.sender.send(envelope).await;
rx.await.ok()
}
pub fn recipient<M>(&self) -> Recipient<M>
where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
{
Recipient(Box::new(self.clone()))
}
pub fn expect_running(self) -> AlwaysAddr<W> {
AlwaysAddr(self)
}
}
#[derive(Debug)]
pub struct AlwaysAddr<A: Actor>(Addr<A>);
impl<A: Actor> Clone for AlwaysAddr<A> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
impl<A: Actor> Deref for AlwaysAddr<A> {
type Target = Addr<A>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<A: Actor> DerefMut for AlwaysAddr<A> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
impl<A: Actor> AlwaysAddr<A> {
pub async fn send<M>(&self, message: M) -> M::Result
where
M: Message + Send + 'static,
M::Result: Send,
A: ContextHandler<M>,
{
#[allow(clippy::expect_used)]
self.deref()
.send(message)
.await
.expect("Failed to get response from actor. It should have never failed!")
}
}
#[allow(missing_debug_implementations)]
pub struct Recipient<M: Message>(Box<dyn Sender<M> + Sync + Send + 'static>);
impl<M> std::fmt::Debug for Recipient<M>
where
M: Message + Send,
{
fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(fmt, "Recipient {{ /* omitted */ }}")
}
}
impl<M> Recipient<M>
where
M: Message + Send,
M::Result: Send,
{
pub async fn send(&self, m: M) -> Option<M::Result> {
self.0.send(m).await
}
}
#[async_trait::async_trait]
trait Sender<M: Message> {
async fn send(&self, m: M) -> Option<M::Result>;
}
#[async_trait::async_trait]
impl<A, M> Sender<M> for Addr<A>
where
M: Message + Send + 'static,
A: ContextHandler<M>,
M::Result: Send,
{
async fn send(&self, m: M) -> Option<M::Result> {
self.do_send(m).await
}
}
#[async_trait::async_trait]
impl<T: 'static, W: 'static> Actor for W
where
W: Worker<Job = T> + Handler<JobRequestWrapper<T>>,
T: Send,
{
async fn on_start(&mut self, ctx: &mut Context<Self>) {
<W as Worker>::on_start(self, ctx).await;
let jobs = self
.consume()
.then(|r| async move {
match r {
Ok(Some(job)) => Ok(Some(job)),
_ => Ok(None),
}
});
let stream = jobs.map(|c| JobRequestWrapper(c));
ctx.notify_with(Box::pin(stream));
}
async fn on_stop(&mut self, ctx: &mut Context<Self>) {
<W as Worker>::on_stop(self, ctx).await;
}
}
impl Message for WorkerManagement {
type Result = Result<WorkerStatus, WorkerError>;
}
#[async_trait::async_trait]
impl<W> Handler<WorkerManagement> for W
where
W: Worker + Actor,
{
type Result = Result<WorkerStatus, WorkerError>;
async fn handle(&mut self, msg: WorkerManagement) -> Result<WorkerStatus, WorkerError> {
self.manage(msg).await
}
}
impl<T> Message for JobRequestWrapper<T> {
type Result = anyhow::Result<()>;
}
#[async_trait::async_trait]
impl<T, W> Handler<JobRequestWrapper<T>> for W
where
W: Worker<Job = T> + 'static,
T: Job + Send + 'static,
{
type Result = anyhow::Result<()>;
async fn handle(&mut self, job: JobRequestWrapper<T>) -> Self::Result {
match job.0 {
Ok(Some(job)) => {
self.handle_job(job).await?;
Ok(())
}
Ok(None) => Ok(()),
Err(e) => Err(e.into()),
}
}
}
#[async_trait::async_trait]
pub trait Actor: Send + Sized + 'static {
fn mailbox_capacity(&self) -> usize {
100
}
async fn on_start(&mut self, _ctx: &mut Context<Self>) {}
async fn on_stop(&mut self, _ctx: &mut Context<Self>) {}
fn preinit(self) -> InitializedActor<Self> {
let mailbox_capacity = self.mailbox_capacity();
InitializedActor::new(self, mailbox_capacity)
}
fn preinit_default() -> InitializedActor<Self>
where
Self: Default,
{
Self::default().preinit()
}
async fn start(self) -> Addr<Self> {
self.preinit().start().await
}
async fn start_default() -> Addr<Self>
where
Self: Default,
{
Self::default().start().await
}
}
#[derive(Debug)]
pub struct InitializedActor<A: Actor> {
pub address: Addr<A>,
pub actor: A,
receiver: Receiver<Envelope<A>>,
}
impl<A: Actor> InitializedActor<A> {
pub fn new(actor: A, mailbox_capacity: usize) -> Self {
let (sender, receiver) = mpsc::channel(mailbox_capacity);
InitializedActor {
actor,
address: Addr::new(sender),
receiver,
}
}
pub async fn start(self) -> Addr<A> {
let address = self.address;
let mut receiver = self.receiver;
let mut actor = self.actor;
let (handle_sender, handle_receiver) = oneshot::channel();
let move_addr = address.clone();
let actor_future = async move {
#[allow(clippy::expect_used)]
let join_handle = handle_receiver
.await
.expect("Unreachable as the message is always sent.");
let mut ctx = Context::new(move_addr.clone(), join_handle);
actor.on_start(&mut ctx).await;
while let Some(Envelope(mut message)) = receiver.recv().await {
EnvelopeProxy::handle(&mut *message, &mut actor, &mut ctx).await;
}
tracing::error!(actor = std::any::type_name::<A>(), "actor stopped");
actor.on_stop(&mut ctx).await;
}
.in_current_span();
#[cfg(not(feature = "broker"))]
let join_handle = task::spawn(actor_future);
#[cfg(feature = "broker")]
let join_handle = deadlock::spawn_task_with_actor_id(address.actor_id, actor_future);
let _error = handle_sender.send(join_handle);
address
}
}
pub trait Message {
type Result: 'static;
}
#[async_trait::async_trait]
pub trait ContextHandler<M: Message>: Actor {
type Result: MessageResponse<M>;
async fn handle(&mut self, ctx: &mut Context<Self>, msg: M) -> Self::Result;
}
#[async_trait::async_trait]
pub trait Handler<M: Message>: Actor {
type Result: MessageResponse<M>;
async fn handle(&mut self, msg: M) -> Self::Result;
}
#[async_trait::async_trait]
impl<M: Message + Send + 'static, S: Handler<M>> ContextHandler<M> for S {
type Result = S::Result;
async fn handle(&mut self, _: &mut Context<Self>, msg: M) -> Self::Result {
Handler::handle(self, msg).await
}
}
#[async_trait::async_trait]
pub trait MessageResponse<M: Message>: Send {
async fn handle(self, sender: oneshot::Sender<M::Result>);
}
#[async_trait::async_trait]
impl<M> MessageResponse<M> for M::Result
where
M: Message,
M::Result: Send,
{
async fn handle(self, sender: oneshot::Sender<M::Result>) {
drop(sender.send(self));
}
}
#[derive(Debug)]
pub struct Context<W: Actor> {
addr: Addr<W>,
#[allow(dead_code)]
handle: JoinHandle<()>,
}
impl<W: Actor> Context<W> {
pub fn new(addr: Addr<W>, handle: JoinHandle<()>) -> Self {
Self { addr, handle }
}
pub fn addr(&self) -> Addr<W> {
self.addr.clone()
}
pub fn recipient<M>(&self) -> Recipient<M>
where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
{
self.addr().recipient()
}
pub fn notify<M>(&self, message: M)
where
M: Message + Send + 'static,
W: ContextHandler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move { addr.do_send(message).await }.in_current_span(),
));
}
pub fn notify_later<M>(&self, message: M, later: Duration)
where
M: Message + Send + 'static,
W: Handler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move {
time::sleep(later).await;
addr.do_send(message).await
}
.in_current_span(),
));
}
pub fn notify_every<M>(&self, every: Duration)
where
M: Message + Default + Send + 'static,
W: Handler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move {
loop {
time::sleep(every).await;
let _res = addr.do_send(M::default()).await;
}
}
.in_current_span(),
));
}
pub fn notify_with<M, S>(&self, mut stream: S)
where
M: Message + Send + 'static,
S: Stream<Item = M> + Unpin + Send + 'static,
W: Handler<M>,
M::Result: Send,
{
let addr = self.addr();
drop(task::spawn(
async move {
while let Some(item) = stream.next().await {
addr.do_send(item).await;
}
}
.in_current_span(),
));
}
}
#[cfg(test)]
mod tests {
use std::fmt::Debug;
use futures::Future;
use tower::{service_fn, Service, ServiceExt};
use crate::worker::*;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_simple_actor() {
struct Job(String);
impl Message for Job {
type Result = anyhow::Result<()>;
}
#[derive(Debug, thiserror::Error)]
enum HandlerError {
#[error("any error")]
#[allow(dead_code)]
Any,
}
async fn handler(_msg: Job) -> Result<(), HandlerError> {
Ok(())
}
struct TowerActor<S> {
service: S,
}
#[async_trait::async_trait]
impl<S, F> Actor for TowerActor<S>
where
S: Service<Job, Response = (), Error = HandlerError, Future = F> + Send + 'static,
F: Future<Output = Result<(), HandlerError>> + Send,
{
async fn on_start(&mut self, ctx: &mut Context<Self>) {
use futures::stream;
let stream = stream::iter(vec![
Job("17".to_string()),
Job("18".to_string()),
Job("19".to_string()),
]);
ctx.notify_with(stream);
}
}
#[async_trait::async_trait]
impl<S, F> Handler<Job> for TowerActor<S>
where
S: Service<Job, Response = (), Error = HandlerError, Future = F> + Send + 'static,
F: Future<Output = Result<(), HandlerError>> + Send,
{
type Result = anyhow::Result<()>;
async fn handle(&mut self, msg: Job) -> Self::Result {
let handle = self.service.ready().await?;
handle.call(msg).await?;
Ok(())
}
}
let addr = TowerActor {
service: service_fn(handler),
}
.start()
.await;
let res = addr.send(Job("0".to_string())).await.unwrap();
assert!(res.is_ok())
}
}