abcgen 0.4.0

A procedural macro to generate boilerplate code for objects implementing the 'Actor' pattern
Documentation
#![doc = " Illustrates the functionalities by printing messages to the console."]
#![allow(unused)]
use abcgen::actor_module;
use my_actor_module::{MyActor, MyActorEvent};
use std::sync::{atomic::AtomicBool, Arc};
mod my_actor_module {
    use abcgen::*;
    use std::sync::{atomic::AtomicBool, Arc};
    #[events]
    #[derive(Debug, Clone)]
    pub enum MyActorEvent {
        Event1,
        Event2,
    }
    #[actor]
    pub struct MyActor {
        pub(crate) termination_requested: Arc<AtomicBool>,
        pub(crate) internal_task: Option<tokio::task::JoinHandle<()>>,
    }
    impl MyActor {
        pub async fn start(&mut self, task_sender: TaskSender, event_sender: EventSender) {
            log::info!("Starting");
            let term_req = self.termination_requested.clone();
            let internal_task = tokio::spawn(async move {
                send_task ! (task_sender (this) => { this . dummy_task () . await ; });
                send_task ! (task_sender (this) => { log :: info ! ("Executing a closure task") ; this . dummy_task () . await ; });
                while !term_req.load(std::sync::atomic::Ordering::Relaxed) {
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    log::info!("Sending ThisHappend");
                    let _ = event_sender.send(MyActorEvent::Event1);
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                    log::info!("Sending ThatHappend");
                    let _ = event_sender.send(MyActorEvent::Event2);
                }
                log::info!("Event sender closed");
            });
            self.internal_task = Some(internal_task);
        }
        pub async fn shutdown(&mut self) {
            log::info!("Shutting down");
            self.termination_requested
                .store(true, std::sync::atomic::Ordering::Relaxed);
        }
        pub async fn dummy_task<'b>(&'b mut self) {
            log::info!("Dummy task executed");
        }
        pub fn dummy_task_2(&mut self) -> PinnedFuture<'_, ()> {
            Box::pin(async {
                log::info!("Dummy dummy task 2 executed");
                self.dummy_task().await;
            })
        }
        #[message_handler]
        async fn do_this(&mut self, par1: i32, par2: String) {
            log::info!("do_this called: par1={}, par2={}", par1, par2);
        }
        #[message_handler]
        async fn do_that(&mut self) -> Result<(), ()> {
            log::info!("do_that called");
            Ok(())
        }
        #[message_handler]
        async fn get_that(&mut self, name: String) -> i32 {
            log::info!("get_that called: name={}", name);
            42
        }
    }
    type EventSender = tokio::sync::broadcast::Sender<MyActorEvent>;
    pub type TaskSender = tokio::sync::mpsc::Sender<Task<MyActor>>;
    #[derive(Debug)]
    pub enum MyActorMessage {
        DoThis {
            par1: i32,
            par2: String,
        },
        DoThat {
            respond_to: tokio::sync::oneshot::Sender<Result<(), ()>>,
        },
        GetThat {
            name: String,
            respond_to: tokio::sync::oneshot::Sender<i32>,
        },
    }
    impl MyActor {
        pub fn run(self) -> MyActorProxy {
            let (msg_sender, mut msg_receiver) = tokio::sync::mpsc::channel(10usize);
            let (event_sender, _) = tokio::sync::broadcast::channel::<MyActorEvent>(10usize);
            let event_sender_clone = event_sender.clone();
            let (stop_sender, stop_receiver) = tokio::sync::oneshot::channel::<()>();
            let (task_sender, mut task_receiver) =
                tokio::sync::mpsc::channel::<Task<MyActor>>(10usize);
            tokio::spawn(async move {
                let mut actor = self;
                actor.start(task_sender, event_sender_clone).await;
                tokio::select! { _ = actor . select_receivers (& mut msg_receiver , & mut task_receiver) => { log :: debug ! ("(abcgen) all proxies dropped") ; } _ = stop_receiver => { log :: debug ! ("(abcgen) stop signal received") ; } }
                actor.shutdown().await;
            });
            let proxy = MyActorProxy {
                message_sender: msg_sender,
                stop_signal: Some(stop_sender),
                events: event_sender,
            };
            proxy
        }
        async fn select_receivers(
            &mut self,
            msg_receiver: &mut tokio::sync::mpsc::Receiver<MyActorMessage>,
            task_receiver: &mut tokio::sync::mpsc::Receiver<Task<MyActor>>,
        ) {
            loop {
                tokio::select! { msg = msg_receiver . recv () => { match msg { Some (msg) => { self . dispatch (msg) . await ; } None => { break ; } } } , task = task_receiver . recv () => { if let Some (task) = task { task (self) . await ; } } }
            }
        }
        async fn dispatch(&mut self, message: MyActorMessage) {
            match message {
                MyActorMessage::DoThis { par1, par2 } => {
                    self.do_this(par1, par2).await;
                }
                MyActorMessage::DoThat { respond_to } => {
                    let result = self.do_that().await;
                    respond_to.send(result).unwrap();
                }
                MyActorMessage::GetThat { name, respond_to } => {
                    let result = self.get_that(name).await;
                    respond_to.send(result).unwrap();
                }
            }
        }
    }
    pub struct MyActorProxy {
        message_sender: tokio::sync::mpsc::Sender<MyActorMessage>,
        events: tokio::sync::broadcast::Sender<MyActorEvent>,
        stop_signal: std::option::Option<tokio::sync::oneshot::Sender<()>>,
    }
    impl MyActorProxy {
        pub fn is_running(&self) -> bool {
            match self.stop_signal.as_ref() {
                Some(s) => !s.is_closed(),
                None => false,
            }
        }
        pub fn stop(&mut self) -> Result<(), AbcgenError> {
            match self.stop_signal.take() {
                Some(tx) => tx.send(()).map_err(|_e: ()| AbcgenError::ActorShutDown),
                None => Err(AbcgenError::ActorShutDown),
            }
        }
        pub async fn stop_and_wait(&mut self) -> Result<(), AbcgenError> {
            self.stop()?;
            while self.is_running() {
                tokio::time::sleep(std::time::Duration::from_millis(100)).await;
            }
            Ok(())
        }
        pub fn get_events(&self) -> tokio::sync::broadcast::Receiver<MyActorEvent> {
            self.events.subscribe()
        }
        pub async fn do_this(&self, par1: i32, par2: String) -> Result<(), AbcgenError> {
            let msg = MyActorMessage::DoThis { par1, par2 };
            let send_res = self
                .message_sender
                .send(msg)
                .await
                .map_err(|e| AbcgenError::ActorShutDown);
            send_res
        }
        pub async fn do_that(&self) -> Result<Result<(), ()>, AbcgenError> {
            let (tx, rx) = tokio::sync::oneshot::channel();
            let msg = MyActorMessage::DoThat { respond_to: tx };
            let send_res = self.message_sender.send(msg).await;
            match send_res {
                Ok(_) => rx.await.map_err(|e| AbcgenError::ActorShutDown),
                Err(e) => Err(AbcgenError::ActorShutDown),
            }
        }
        pub async fn get_that(&self, name: String) -> Result<i32, AbcgenError> {
            let (tx, rx) = tokio::sync::oneshot::channel();
            let msg = MyActorMessage::GetThat {
                name,
                respond_to: tx,
            };
            let send_res = self.message_sender.send(msg).await;
            match send_res {
                Ok(_) => rx.await.map_err(|e| AbcgenError::ActorShutDown),
                Err(e) => Err(AbcgenError::ActorShutDown),
            }
        }
    }
}
#[tokio::main]
async fn main() {
    env_logger::builder()
        .format_timestamp_millis()
        .format_module_path(false)
        .filter_level(log::LevelFilter::Debug)
        .init();
    let actor: MyActor = MyActor {
        termination_requested: Arc::new(AtomicBool::new(false)),
        internal_task: None,
    };
    let mut proxy = actor.run();
    let mut events = proxy.get_events();
    tokio::spawn(async move {
        loop {
            match events.recv().await {
                Ok(event) => {
                    log::info!("Event received: {:?}", event);
                }
                Err(e) => {
                    match e {
                        tokio::sync::broadcast::error::RecvError::Closed => {
                            log::info!("Event channel closed");
                            break;
                        }
                        _ => {}
                    }
                    log::error!("Error receiving event: {:?}", e);
                    break;
                }
            }
        }
    });
    let res = proxy.get_that("test".to_string()).await;
    log::info!("get_that result: {:#?}", res);
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    proxy.do_this(1, "test".to_string()).await.unwrap();
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    proxy.do_that().await.unwrap();
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    proxy.stop_and_wait().await.unwrap();
    log::info!("Wait before terminating the application.");
    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
    log::info!("Terminating the application.");
}