use std::future::Future;
use std::time::Duration;
use tokio::join;
use tokio::sync::{mpsc, oneshot};
use msg_channel::*;
pub struct MsgHandler;
impl MsgHandler {
pub fn test(&mut self) {}
}
impl HandleSync<SyncMsgA> for MsgHandler {
type Replay = ();
fn handle(&mut self, _msg: SyncMsgA) -> Self::Replay {
println!("SyncMsg");
}
}
impl HandleSync<SyncMsgB> for MsgHandler {
type Replay = ();
fn handle(&mut self, _msg: SyncMsgB) -> Self::Replay {
println!("SyncMsgB");
}
}
impl HandleAsync<AsyncMsg> for MsgHandler {
type Replay = ();
async fn handle(&mut self, _msg: AsyncMsg) -> Self::Replay {
tokio::time::sleep(Duration::from_secs(2)).await;
println!("AsyncMsg")
}
}
impl HandleAsyncConcurrent<AsyncConcurrentMsgA> for MsgHandler {
type Replay = ();
async fn handle(&self, _msg: AsyncConcurrentMsgA) -> Self::Replay {
tokio::time::sleep(Duration::from_secs(2)).await;
println!("AsyncConcurrentMsgA")
}
}
impl HandleAsyncConcurrent<AsyncConcurrentMsgB> for MsgHandler {
type Replay = ();
async fn handle(&self, _msg: AsyncConcurrentMsgB) -> Self::Replay {
tokio::time::sleep(Duration::from_secs(1)).await;
println!("AsyncConcurrentMsgB")
}
}
impl HandleSyncConcurrent<SyncConcurrentMsgA> for MsgHandler {
type Replay = ();
fn handle(&self, _msg: SyncConcurrentMsgA) -> Self::Replay {
println!("SyncConcurrentMsgA")
}
}
impl HandleSyncConcurrent<SyncConcurrentMsgB> for MsgHandler {
type Replay = ();
fn handle(&self, _msg: SyncConcurrentMsgB) -> Self::Replay {
println!("SyncConcurrentMsgB")
}
}
pub struct SyncMsgA;
pub struct SyncMsgB;
pub struct AsyncMsg;
pub struct SyncConcurrentMsgA;
pub struct SyncConcurrentMsgB;
pub struct AsyncConcurrentMsgA;
pub struct AsyncConcurrentMsgB;
pub struct TestMsgSet;
pub struct TestMsgSet2;
#[msg_set]
impl MessageSet for TestMsgSet {
type Handler = MsgHandler;
type Async = (AsyncMsg,);
type Sync = (SyncMsgA, SyncMsgB);
type AsyncConcurrent = (AsyncConcurrentMsgA, AsyncConcurrentMsgB);
type SyncConcurrent = (SyncConcurrentMsgA, SyncConcurrentMsgB);
}
pub struct MsgHandler2 {
x: MessageSetSender<TestMsgSet>,
}
pub struct TestMsgSetMsg(MessageSetItem<TestMsgSet>);
impl<T> From<T> for TestMsgSetMsg
where
T: Into<MessageSetItem<TestMsgSet>>,
{
fn from(value: T) -> Self {
Self(value.into())
}
}
#[msg_set]
impl MessageSet for TestMsgSet2 {
type Handler = MsgHandler2;
type Async = (TestMsgSetMsg,);
type Sync = ();
type AsyncConcurrent = ();
type SyncConcurrent = ();
}
impl HandleAsync<TestMsgSetMsg> for MsgHandler2 {
type Replay = Result<
MessageSetReplayItem<TestMsgSet>,
mpsc::error::SendError<(
MessageSetItem<TestMsgSet>,
oneshot::Sender<MessageSetReplayItem<TestMsgSet>>,
)>,
>;
async fn handle(&mut self, _msg: TestMsgSetMsg) -> Self::Replay {
Ok(self.x.send(_msg.0)?.await)
}
}
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
let (sender, mut handler) = msg_channel::<TestMsgSet>();
tokio::spawn(async move {
let _r = sender.send(SyncMsgA)?.await;
let _r = sender.send(SyncMsgB)?.await;
let _r = sender.send(AsyncMsg)?.await;
join!(
sender.send(SyncConcurrentMsgA)?,
sender.send(SyncConcurrentMsgB)?
);
let r = sender
.send(TestMsgSetSyncVariant::SyncMsgA(SyncMsgA))?
.await;
match r {
TestMsgSetSyncReplayVariant::SyncMsgA(_) => {}
TestMsgSetSyncReplayVariant::SyncMsgB(_) => {}
}
let r = sender
.send(TestMsgSetAsyncConcurrentVariant::AsyncConcurrentMsgB(
AsyncConcurrentMsgB,
))?
.await;
match r {
TestMsgSetAsyncConcurrentReplayVariant::AsyncConcurrentMsgA(_) => {}
TestMsgSetAsyncConcurrentReplayVariant::AsyncConcurrentMsgB(_) => {}
}
Ok::<(), color_eyre::Report>(())
});
let mut msg_handler = MsgHandler;
loop {
tokio::select! {
_ = std::future::pending() => {
msg_handler.test();
}
result = async {
handler.handle_next(&mut msg_handler).await
} => {
if let Some(_result) = result? {
msg_handler.test();
}else{
break;
}
}
}
}
Ok(())
}