1use std::any::TypeId;
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::ops::ControlFlow;
5
6mod behaviour;
7mod handler;
8mod outcome;
9
10pub use behaviour::{OnMessage, OnRequest};
11use eyre::Context;
12use mm1_common::log::warn;
13use mm1_common::types::AnyError;
14use mm1_core::context::Messaging;
15use mm1_core::tracing::WithTraceIdExt;
16pub use outcome::Outcome;
17
18pub fn new<Ctx>() -> Server<Ctx, (), ()> {
19 Server {
20 behaviour: (),
21 pd: Default::default(),
22 }
23}
24
25#[derive(Debug, Clone, Copy)]
26pub struct Server<Ctx, B, H> {
27 behaviour: B,
28 pd: PhantomData<(Ctx, H)>,
29}
30pub type AppendReq<H, Rq, Rs> = (H, handler::Req<Rq, Rs>);
31pub type AppendMsg<H, M> = (H, handler::Msg<M>);
32
33impl<Ctx> Server<Ctx, (), ()> {
34 pub fn behaviour<S>(self, behaviour: S) -> Server<Ctx, S, ()> {
35 let Self { behaviour: _, pd } = self;
36 Server { behaviour, pd }
37 }
38}
39
40impl<Ctx, B, H> Server<Ctx, B, H> {
41 pub fn msg<M>(self) -> Server<Ctx, B, AppendMsg<H, M>>
42 where
43 B: behaviour::OnMessage<Ctx, M>,
44 {
45 let Self {
46 behaviour: state,
47 pd: _,
48 } = self;
49 Server {
50 behaviour: state,
51 pd: Default::default(),
52 }
53 }
54
55 pub fn req<Rq>(self) -> Server<Ctx, B, AppendReq<H, Rq, B::Rs>>
56 where
57 B: behaviour::OnRequest<Ctx, Rq>,
58 {
59 let Self {
60 behaviour: state,
61 pd: _,
62 } = self;
63 Server {
64 behaviour: state,
65 pd: Default::default(),
66 }
67 }
68
69 pub async fn run(self, ctx: &mut Ctx) -> Result<B, AnyError>
70 where
71 Ctx: Messaging,
72 H: handler::Register<Ctx, B>,
73 {
74 let Self { mut behaviour, .. } = self;
75 let mut handlers = HashMap::<TypeId, &dyn handler::ErasedHandler<Ctx, B>>::new();
76
77 H::register(&mut handlers);
78
79 loop {
80 let envelope = ctx.recv().await.wrap_err("ctx.recv")?;
81 let trace_id = envelope.header().trace_id();
82 let msg_type_id = envelope.tid();
83 let Some(handler) = handlers.get(&msg_type_id) else {
84 trace_id.scope_sync(|| warn!(unexpected = ?envelope, "unexpected"));
85 continue
86 };
87 match handler
88 .handle(ctx, &mut behaviour, envelope)
89 .with_trace_id(trace_id)
90 .await
91 .wrap_err("handler.handle")?
92 {
93 ControlFlow::Break(()) => break,
94 ControlFlow::Continue(()) => (),
95 }
96 }
97
98 Ok(behaviour)
99 }
100}