mm1_server/
lib.rs

1use std::any::TypeId;
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::ops::ControlFlow;
5
6mod behaviour;
7mod handler;
8mod outcome;
9
10pub use behaviour::{OnMessage, OnRequest};
11use eyre::Context;
12use mm1_common::log::warn;
13use mm1_common::types::AnyError;
14use mm1_core::context::Messaging;
15use mm1_core::tracing::WithTraceIdExt;
16pub use outcome::Outcome;
17
18pub fn new<Ctx>() -> Server<Ctx, (), ()> {
19    Server {
20        behaviour: (),
21        pd:        Default::default(),
22    }
23}
24
25#[derive(Debug, Clone, Copy)]
26pub struct Server<Ctx, B, H> {
27    behaviour: B,
28    pd:        PhantomData<(Ctx, H)>,
29}
30pub type AppendReq<H, Rq, Rs> = (H, handler::Req<Rq, Rs>);
31pub type AppendMsg<H, M> = (H, handler::Msg<M>);
32
33impl<Ctx> Server<Ctx, (), ()> {
34    pub fn behaviour<S>(self, behaviour: S) -> Server<Ctx, S, ()> {
35        let Self { behaviour: _, pd } = self;
36        Server { behaviour, pd }
37    }
38}
39
40impl<Ctx, B, H> Server<Ctx, B, H> {
41    pub fn msg<M>(self) -> Server<Ctx, B, AppendMsg<H, M>>
42    where
43        B: behaviour::OnMessage<Ctx, M>,
44    {
45        let Self {
46            behaviour: state,
47            pd: _,
48        } = self;
49        Server {
50            behaviour: state,
51            pd:        Default::default(),
52        }
53    }
54
55    pub fn req<Rq>(self) -> Server<Ctx, B, AppendReq<H, Rq, B::Rs>>
56    where
57        B: behaviour::OnRequest<Ctx, Rq>,
58    {
59        let Self {
60            behaviour: state,
61            pd: _,
62        } = self;
63        Server {
64            behaviour: state,
65            pd:        Default::default(),
66        }
67    }
68
69    pub async fn run(self, ctx: &mut Ctx) -> Result<B, AnyError>
70    where
71        Ctx: Messaging,
72        H: handler::Register<Ctx, B>,
73    {
74        let Self { mut behaviour, .. } = self;
75        let mut handlers = HashMap::<TypeId, &dyn handler::ErasedHandler<Ctx, B>>::new();
76
77        H::register(&mut handlers);
78
79        loop {
80            let envelope = ctx.recv().await.wrap_err("ctx.recv")?;
81            let trace_id = envelope.header().trace_id();
82            let msg_type_id = envelope.tid();
83            let Some(handler) = handlers.get(&msg_type_id) else {
84                trace_id.scope_sync(|| warn!(unexpected = ?envelope, "unexpected"));
85                continue
86            };
87            match handler
88                .handle(ctx, &mut behaviour, envelope)
89                .with_trace_id(trace_id)
90                .await
91                .wrap_err("handler.handle")?
92            {
93                ControlFlow::Break(()) => break,
94                ControlFlow::Continue(()) => (),
95            }
96        }
97
98        Ok(behaviour)
99    }
100}