abcgen 0.4.0

A procedural macro to generate boilerplate code for objects implementing the 'Actor' pattern
Documentation
#[allow(unused)]
mod hello_world_actor {
    use abcgen::*;
    #[events]
    #[derive(Debug, Clone)]
    pub enum HelloWorldActorEvent {
        SomeoneAskedMyName(String),
        Message(String),
    }
    #[doc = " Some errors that we want to return in our actor's handler methods"]
    #[derive(thiserror :: Error, Debug)]
    pub enum HelloWorldError {
        #[error("Actor already stopped")]
        AlreadyStopped,
        #[error("HelloWorldErrors 1")]
        Error1,
        #[error("HelloWorldErrors 2")]
        Error2,
    }
    #[doc = " It is useful to implement the From<AbcgenError> for the errors"]
    #[doc = " that can be returned by the actor's handler methods so that"]
    #[doc = " the proxy can return a flat Result<V, E> instead"]
    #[doc = " of a nested Result<Result<V, E>, AbcgenError>"]
    impl From<AbcgenError> for HelloWorldError {
        fn from(_: abcgen::AbcgenError) -> Self {
            HelloWorldError::AlreadyStopped
        }
    }
    #[derive(thiserror :: Error, Debug)]
    pub enum SomeOtherError {
        #[error("SomeOtherErrors 1")]
        Error1,
        #[error("SomeOtherErrors 2")]
        Error2,
    }
    #[actor]
    pub struct HelloWorldActor {
        pub event_sender: Option<EventSender>,
    }
    impl HelloWorldActor {
        #[doc = " The following function *must* be implemented by the user and is called by the run function"]
        async fn start(&mut self, task_sender: TaskSender, event_sender: EventSender) {
            self.event_sender = Some(event_sender);
            println!("Hello, World!");
            tokio::spawn(async move {
                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                send_task ! (task_sender (this) => { this . still_here () . await ; });
            });
        }
        #[doc = " The following function must be implemented by the user and is called befor termination"]
        async fn shutdown(&mut self) {
            println!("Goodbye, World!");
        }
        #[doc = " following function is meant to handle a message because it is marked with `#[message_handler]`"]
        #[doc = " abcgen generates a message for it"]
        #[doc = " that should have the following signature:"]
        #[doc = " ```"]
        #[doc = " HelloWorldActorMessage::TellMeYourName({caller: String, respond_to: tokio::sync::oneshot::Sender<Result<String, HelloWorldError>>})"]
        #[doc = " ```"]
        #[doc = " A specular function is generated on the proxy that can be called to send the message and receive the response."]
        #[doc = " In this case the fuction of the proxy will return the same `Result<String, HelloWorldError>` because there is"]
        #[doc = " a conversion `From<AbcgenError>` for HelloWorldError otherwise it would return a nested `Result<Result<String, HelloWorldError>, AbcgenError>`"]
        #[doc = ""]
        #[message_handler]
        async fn tell_me_your_name(&mut self, caller: String) -> Result<String, HelloWorldError> {
            self.event_sender
                .as_ref()
                .unwrap()
                .send(HelloWorldActorEvent::SomeoneAskedMyName(caller.clone()))
                .unwrap();
            println!("Hello {}, I am HelloWorldActor", caller);
            Ok("HelloWorldActor".to_string())
        }
        #[doc = " The following function is meant to handle a message because it is marked with `#[message_handler]`"]
        #[doc = " In this case the fuction generated on the proxy will return a nested `Result<Result<(), SomeOtherError>, AbcgenError>`"]
        #[message_handler]
        async fn do_that(&mut self) -> Result<(), SomeOtherError> {
            println!("do_that called");
            Ok(())
        }
        #[doc = " The following function can be enqueued as a task to executed in the actor's task"]
        fn still_here(&mut self) -> PinnedFuture<()> {
            Box::pin(async {
                self.event_sender
                    .as_ref()
                    .unwrap()
                    .send(HelloWorldActorEvent::Message(
                        "Hello world again, I'm still here.".to_string(),
                    ))
                    .unwrap();
            })
        }
    }
    type EventSender = tokio::sync::broadcast::Sender<HelloWorldActorEvent>;
    pub type TaskSender = tokio::sync::mpsc::Sender<Task<HelloWorldActor>>;
    #[derive(Debug)]
    pub enum HelloWorldActorMessage {
        TellMeYourName {
            caller: String,
            respond_to: tokio::sync::oneshot::Sender<Result<String, HelloWorldError>>,
        },
        DoThat {
            respond_to: tokio::sync::oneshot::Sender<Result<(), SomeOtherError>>,
        },
    }
    impl HelloWorldActor {
        pub fn run(self) -> HelloWorldActorProxy {
            let (msg_sender, mut msg_receiver) = tokio::sync::mpsc::channel(10usize);
            let (event_sender, _) =
                tokio::sync::broadcast::channel::<HelloWorldActorEvent>(10usize);
            let event_sender_clone = event_sender.clone();
            let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>();
            let (task_sender, mut task_receiver) =
                tokio::sync::mpsc::channel::<Task<HelloWorldActor>>(10usize);
            tokio::spawn(async move {
                let mut actor = self;
                actor.start(task_sender, event_sender_clone).await;
                tokio::select! { _ = actor . select_receivers (& mut msg_receiver , & mut task_receiver) => { log :: debug ! ("(abcgen) all proxies dropped") ; } _ = stop_receiver => { log :: debug ! ("(abcgen) stop signal received") ; } }
                actor.shutdown().await;
            });
            let proxy = HelloWorldActorProxy {
                message_sender: msg_sender,
                stop_signal: Some(stop_sender),
                events: event_sender,
            };
            proxy
        }
        async fn select_receivers(
            &mut self,
            msg_receiver: &mut tokio::sync::mpsc::Receiver<HelloWorldActorMessage>,
            task_receiver: &mut tokio::sync::mpsc::Receiver<Task<HelloWorldActor>>,
        ) {
            loop {
                tokio::select! { msg = msg_receiver . recv () => { match msg { Some (msg) => { self . dispatch (msg) . await ; } None => { break ; } } } , task = task_receiver . recv () => { if let Some (task) = task { task (self) . await ; } } }
            }
        }
        async fn dispatch(&mut self, message: HelloWorldActorMessage) {
            match message {
                HelloWorldActorMessage::TellMeYourName { caller, respond_to } => {
                    let result = self.tell_me_your_name(caller).await;
                    respond_to.send(result).unwrap();
                }
                HelloWorldActorMessage::DoThat { respond_to } => {
                    let result = self.do_that().await;
                    respond_to.send(result).unwrap();
                }
            }
        }
    }
    pub struct HelloWorldActorProxy {
        message_sender: tokio::sync::mpsc::Sender<HelloWorldActorMessage>,
        events: tokio::sync::broadcast::Sender<HelloWorldActorEvent>,
        stop_signal: std::option::Option<tokio::sync::oneshot::Sender<()>>,
    }
    impl HelloWorldActorProxy {
        pub fn is_running(&self) -> bool {
            match self.stop_signal.as_ref() {
                Some(s) => !s.is_closed(),
                None => false,
            }
        }
        pub fn stop(&mut self) -> Result<(), AbcgenError> {
            match self.stop_signal.take() {
                Some(tx) => tx.send(()).map_err(|_e: ()| AbcgenError::ActorShutDown),
                None => Err(AbcgenError::ActorShutDown),
            }
        }
        pub async fn stop_and_wait(&mut self) -> Result<(), AbcgenError> {
            self.stop()?;
            while self.is_running() {
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            }
            Ok(())
        }
        pub fn get_events(&self) -> tokio::sync::broadcast::Receiver<HelloWorldActorEvent> {
            self.events.subscribe()
        }
        pub async fn tell_me_your_name(&self, caller: String) -> Result<String, HelloWorldError> {
            let (tx, rx) = tokio::sync::oneshot::channel();
            let msg = HelloWorldActorMessage::TellMeYourName {
                caller,
                respond_to: tx,
            };
            let send_res = self.message_sender.send(msg).await;
            match send_res {
                Ok(_) => rx
                    .await
                    .unwrap_or_else(|e| Err(AbcgenError::ActorShutDown.into())),
                Err(e) => Err(AbcgenError::ActorShutDown.into()),
            }
        }
        pub async fn do_that(&self) -> Result<Result<(), SomeOtherError>, AbcgenError> {
            let (tx, rx) = tokio::sync::oneshot::channel();
            let msg = HelloWorldActorMessage::DoThat { respond_to: tx };
            let send_res = self.message_sender.send(msg).await;
            match send_res {
                Ok(_) => rx.await.map_err(|e| AbcgenError::ActorShutDown),
                Err(e) => Err(AbcgenError::ActorShutDown),
            }
        }
    }
}
use abcgen::AbcgenError;
use hello_world_actor::{HelloWorldActor, HelloWorldActorEvent, SomeOtherError};
#[tokio::main]
async fn main() {
    let actor = HelloWorldActor { event_sender: None };
    let proxy = actor.run();
    let mut events_rx = proxy.get_events().resubscribe();
    tokio::spawn(async move {
        while let Ok(event) = events_rx.recv().await {
            match event {
                HelloWorldActorEvent::SomeoneAskedMyName(name) => {
                    println!("{} asked my name", name);
                }
                HelloWorldActorEvent::Message(msg) => {
                    println!("Actor said: \"{}\"", msg);
                }
            }
        }
    });
    let do_that_res: Result<Result<(), SomeOtherError>, AbcgenError> = proxy.do_that().await;
    let thename = proxy.tell_me_your_name("Alice".to_string()).await.unwrap();
    println!("The actor replied with name: \"{}\"", thename);
    match do_that_res {
        Ok(Ok(_)) => println!("do_that succeeded"),
        Ok(Err(e)) => println!("do_that failed: {:?}", e),
        Err(e) => println!("do_that failed: {:?}", e),
    }
    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
}