1use std::any::TypeId;
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::ops::ControlFlow;
5
6mod behaviour;
7mod handler;
8
9pub use behaviour::{OnMessage, OnRequest, Outcome};
10use eyre::Context;
11use mm1_common::log::warn;
12use mm1_common::types::AnyError;
13use mm1_core::context::Messaging;
14use mm1_core::tracing::WithTraceIdExt;
15
16pub fn new<Ctx>() -> Server<Ctx, (), ()> {
17 Server {
18 behaviour: (),
19 pd: Default::default(),
20 }
21}
22
23#[derive(Debug, Clone, Copy)]
24pub struct Server<Ctx, B, H> {
25 behaviour: B,
26 pd: PhantomData<(Ctx, H)>,
27}
28pub type AppendReq<H, Rq, Rs> = (H, handler::Req<Rq, Rs>);
29pub type AppendMsg<H, M> = (H, handler::Msg<M>);
30
31impl<Ctx> Server<Ctx, (), ()> {
32 pub fn behaviour<S>(self, behaviour: S) -> Server<Ctx, S, ()> {
33 let Self { behaviour: _, pd } = self;
34 Server { behaviour, pd }
35 }
36}
37
38impl<Ctx, B, H> Server<Ctx, B, H> {
39 pub fn msg<M>(self) -> Server<Ctx, B, AppendMsg<H, M>>
40 where
41 B: behaviour::OnMessage<Ctx, M>,
42 {
43 let Self {
44 behaviour: state,
45 pd: _,
46 } = self;
47 Server {
48 behaviour: state,
49 pd: Default::default(),
50 }
51 }
52
53 pub fn req<Rq>(self) -> Server<Ctx, B, AppendReq<H, Rq, B::Rs>>
54 where
55 B: behaviour::OnRequest<Ctx, Rq>,
56 {
57 let Self {
58 behaviour: state,
59 pd: _,
60 } = self;
61 Server {
62 behaviour: state,
63 pd: Default::default(),
64 }
65 }
66
67 pub async fn run(self, ctx: &mut Ctx) -> Result<B, AnyError>
68 where
69 Ctx: Messaging,
70 H: handler::Register<Ctx, B>,
71 {
72 let Self { mut behaviour, .. } = self;
73 let mut handlers = HashMap::<TypeId, &dyn handler::ErasedHandler<Ctx, B>>::new();
74
75 H::register(&mut handlers);
76
77 loop {
78 let envelope = ctx.recv().await.wrap_err("ctx.recv")?;
79 let trace_id = envelope.header().trace_id();
80 let msg_type_id = envelope.tid();
81 let Some(handler) = handlers.get(&msg_type_id) else {
82 trace_id.scope_sync(|| warn!(unexpected = ?envelope, "unexpected"));
83 continue
84 };
85 match handler
86 .handle(ctx, &mut behaviour, envelope)
87 .with_trace_id(trace_id)
88 .await
89 .wrap_err("handler.handle")?
90 {
91 ControlFlow::Break(()) => break,
92 ControlFlow::Continue(()) => (),
93 }
94 }
95
96 Ok(behaviour)
97 }
98}