mm1_server/
lib.rs

1use std::any::TypeId;
2use std::collections::HashMap;
3use std::marker::PhantomData;
4use std::ops::ControlFlow;
5
6mod behaviour;
7mod handler;
8
9pub use behaviour::{OnMessage, OnRequest, Outcome};
10use eyre::Context;
11use mm1_common::log::warn;
12use mm1_common::types::AnyError;
13use mm1_core::context::Messaging;
14use mm1_core::tracing::WithTraceIdExt;
15
16pub fn new<Ctx>() -> Server<Ctx, (), ()> {
17    Server {
18        behaviour: (),
19        pd:        Default::default(),
20    }
21}
22
23#[derive(Debug, Clone, Copy)]
24pub struct Server<Ctx, B, H> {
25    behaviour: B,
26    pd:        PhantomData<(Ctx, H)>,
27}
28pub type AppendReq<H, Rq, Rs> = (H, handler::Req<Rq, Rs>);
29pub type AppendMsg<H, M> = (H, handler::Msg<M>);
30
31impl<Ctx> Server<Ctx, (), ()> {
32    pub fn behaviour<S>(self, behaviour: S) -> Server<Ctx, S, ()> {
33        let Self { behaviour: _, pd } = self;
34        Server { behaviour, pd }
35    }
36}
37
38impl<Ctx, B, H> Server<Ctx, B, H> {
39    pub fn msg<M>(self) -> Server<Ctx, B, AppendMsg<H, M>>
40    where
41        B: behaviour::OnMessage<Ctx, M>,
42    {
43        let Self {
44            behaviour: state,
45            pd: _,
46        } = self;
47        Server {
48            behaviour: state,
49            pd:        Default::default(),
50        }
51    }
52
53    pub fn req<Rq>(self) -> Server<Ctx, B, AppendReq<H, Rq, B::Rs>>
54    where
55        B: behaviour::OnRequest<Ctx, Rq>,
56    {
57        let Self {
58            behaviour: state,
59            pd: _,
60        } = self;
61        Server {
62            behaviour: state,
63            pd:        Default::default(),
64        }
65    }
66
67    pub async fn run(self, ctx: &mut Ctx) -> Result<B, AnyError>
68    where
69        Ctx: Messaging,
70        H: handler::Register<Ctx, B>,
71    {
72        let Self { mut behaviour, .. } = self;
73        let mut handlers = HashMap::<TypeId, &dyn handler::ErasedHandler<Ctx, B>>::new();
74
75        H::register(&mut handlers);
76
77        loop {
78            let envelope = ctx.recv().await.wrap_err("ctx.recv")?;
79            let trace_id = envelope.header().trace_id();
80            let msg_type_id = envelope.tid();
81            let Some(handler) = handlers.get(&msg_type_id) else {
82                trace_id.scope_sync(|| warn!("unexpected: {:?}", envelope));
83                continue
84            };
85            match handler
86                .handle(ctx, &mut behaviour, envelope)
87                .with_trace_id(trace_id)
88                .await
89                .wrap_err("handler.handle")?
90            {
91                ControlFlow::Break(()) => break,
92                ControlFlow::Continue(()) => (),
93            }
94        }
95
96        Ok(behaviour)
97    }
98}