msg_channel 0.1.0-beat.3

message channel
Documentation
use std::future::Future;
use std::time::Duration;

use tokio::join;
use tokio::sync::{mpsc, oneshot};

use msg_channel::*;

pub struct MsgHandler;

impl MsgHandler {
    pub fn test(&mut self) {}
}

impl HandleSync<SyncMsgA> for MsgHandler {
    type Replay = ();

    fn handle(&mut self, _msg: SyncMsgA) -> Self::Replay {
        println!("SyncMsg");
    }
}
impl HandleSync<SyncMsgB> for MsgHandler {
    type Replay = ();

    fn handle(&mut self, _msg: SyncMsgB) -> Self::Replay {
        println!("SyncMsgB");
    }
}
impl HandleAsync<AsyncMsg> for MsgHandler {
    type Replay = ();

    async fn handle(&mut self, _msg: AsyncMsg) -> Self::Replay {
        tokio::time::sleep(Duration::from_secs(2)).await;
        println!("AsyncMsg")
    }
}
impl HandleAsyncConcurrent<AsyncConcurrentMsgA> for MsgHandler {
    type Replay = ();

    async fn handle(&self, _msg: AsyncConcurrentMsgA) -> Self::Replay {
        tokio::time::sleep(Duration::from_secs(2)).await;
        println!("AsyncConcurrentMsgA")
    }
}
impl HandleAsyncConcurrent<AsyncConcurrentMsgB> for MsgHandler {
    type Replay = ();

    async fn handle(&self, _msg: AsyncConcurrentMsgB) -> Self::Replay {
        tokio::time::sleep(Duration::from_secs(1)).await;
        println!("AsyncConcurrentMsgB")
    }
}

impl HandleSyncConcurrent<SyncConcurrentMsgA> for MsgHandler {
    type Replay = ();

    fn handle(&self, _msg: SyncConcurrentMsgA) -> Self::Replay {
        println!("SyncConcurrentMsgA")
    }
}
impl HandleSyncConcurrent<SyncConcurrentMsgB> for MsgHandler {
    type Replay = ();

    fn handle(&self, _msg: SyncConcurrentMsgB) -> Self::Replay {
        println!("SyncConcurrentMsgB")
    }
}

pub struct SyncMsgA;
pub struct SyncMsgB;
pub struct AsyncMsg;
pub struct SyncConcurrentMsgA;
pub struct SyncConcurrentMsgB;
pub struct AsyncConcurrentMsgA;
pub struct AsyncConcurrentMsgB;
pub struct TestMsgSet;
pub struct TestMsgSet2;

#[msg_set]
impl MessageSet for TestMsgSet {
    type Handler = MsgHandler;
    type Async = (AsyncMsg,);
    type Sync = (SyncMsgA, SyncMsgB);
    type AsyncConcurrent = (AsyncConcurrentMsgA, AsyncConcurrentMsgB);
    type SyncConcurrent = (SyncConcurrentMsgA, SyncConcurrentMsgB);
}

pub struct MsgHandler2 {
    x: MessageSetSender<TestMsgSet>,
}

pub struct TestMsgSetMsg(MessageSetItem<TestMsgSet>);

impl<T> From<T> for TestMsgSetMsg
where
    T: Into<MessageSetItem<TestMsgSet>>,
{
    fn from(value: T) -> Self {
        Self(value.into())
    }
}

#[msg_set]
impl MessageSet for TestMsgSet2 {
    type Handler = MsgHandler2;
    type Async = (TestMsgSetMsg,);
    type Sync = ();
    type AsyncConcurrent = ();
    type SyncConcurrent = ();
}

impl HandleAsync<TestMsgSetMsg> for MsgHandler2 {
    type Replay = Result<
        MessageSetReplayItem<TestMsgSet>,
        mpsc::error::SendError<(
            MessageSetItem<TestMsgSet>,
            oneshot::Sender<MessageSetReplayItem<TestMsgSet>>,
        )>,
    >;

    async fn handle(&mut self, _msg: TestMsgSetMsg) -> Self::Replay {
        Ok(self.x.send(_msg.0)?.await)
    }
}

#[tokio::main]
async fn main() -> color_eyre::Result<()> {
    let (sender, mut handler) = msg_channel::<TestMsgSet>();
    tokio::spawn(async move {
        let _r = sender.send(SyncMsgA)?.await;
        let _r = sender.send(SyncMsgB)?.await;
        let _r = sender.send(AsyncMsg)?.await;
        join!(
            sender.send(SyncConcurrentMsgA)?,
            sender.send(SyncConcurrentMsgB)?
        );
        let r = sender
            .send(TestMsgSetSyncVariant::SyncMsgA(SyncMsgA))?
            .await;
        match r {
            TestMsgSetSyncReplayVariant::SyncMsgA(_) => {}
            TestMsgSetSyncReplayVariant::SyncMsgB(_) => {}
        }
        let r = sender
            .send(TestMsgSetAsyncConcurrentVariant::AsyncConcurrentMsgB(
                AsyncConcurrentMsgB,
            ))?
            .await;
        match r {
            TestMsgSetAsyncConcurrentReplayVariant::AsyncConcurrentMsgA(_) => {}
            TestMsgSetAsyncConcurrentReplayVariant::AsyncConcurrentMsgB(_) => {}
        }
        Ok::<(), color_eyre::Report>(())
    });
    let mut msg_handler = MsgHandler;
    loop {
        tokio::select! {
            _ = std::future::pending() => {
                msg_handler.test();
            }
            result = async {
                handler.handle_next(&mut msg_handler).await
            } => {
                if let Some(_result) = result? {
                    msg_handler.test();
                }else{
                    break;
                }
            }
        }
    }
    Ok(())
}