std_modrpc/role_impls/
request_server.rs

1use crate::proto::{RequestInitState, RequestLazy, RequestServerConfig, Response, ResponseGen};
2use modrpc::RoleSetup;
3
4use crate::request_tracker::{RequestTracker, get_request_tracker};
5
6pub struct RequestServer<Req, Resp> {
7    name: &'static str,
8    worker_id: u16,
9    hooks: crate::RequestServerHooks<Req, Resp>,
10    tracker: RequestTracker,
11    tracing_enabled: bool,
12}
13
14pub struct RequestServerBuilder<Req, Resp> {
15    pub name: &'static str,
16    pub hooks: crate::RequestServerHooks<Req, Resp>,
17    pub stubs: crate::RequestServerStubs<Req, Resp>,
18    pub init: RequestInitState,
19}
20
21impl<Req: mproto::Owned, Resp: mproto::Owned> RequestServerBuilder<Req, Resp> {
22    pub fn new(
23        name: &'static str,
24        hooks: crate::RequestServerHooks<Req, Resp>,
25        stubs: crate::RequestServerStubs<Req, Resp>,
26        _config: &RequestServerConfig,
27        init: RequestInitState,
28    ) -> Self {
29        Self {
30            name,
31            hooks,
32            stubs,
33            init,
34        }
35    }
36
37    pub fn create_handle(&self, setup: &RoleSetup) -> RequestServer<Req, Resp> {
38        let worker_id = setup.worker_id();
39        let tracker = get_request_tracker(setup);
40
41        RequestServer {
42            name: self.name,
43            worker_id,
44            hooks: self.hooks.clone(),
45            tracker,
46            tracing_enabled: true,
47        }
48    }
49
50    pub fn build_shared(self /* todo */) {
51        // TODO insert self.hooks.response into the shared state's map of plane_id -> response_tx
52        // we'll need to change modrpc::PacketProcessor to lookup handlers based on
53        // (infra_id, topic) instead of (plane_id, topic) (and add infra_id to TransmitPacket).
54        // In PacketProcessor, if infra_id is non-zero, we'll look up the handler based on
55        // infra_id. Otherwise we'll look up based on plane_id.
56    }
57
58    pub fn build_replier(
59        self,
60        setup: &RoleSetup,
61        mut handler: impl AsyncFnMut(RequestContext<Resp>, Req::Lazy<'_>) + 'static,
62    ) {
63        let mut response_tx: modrpc::EventTx<Response<Resp>> = self.hooks.response;
64        self.stubs
65            .request
66            .queued(
67                setup,
68                async move |source: modrpc::EndpointAddr, request: RequestLazy<Req>| {
69                    let Ok(request_id) = request.request_id() else {
70                        return;
71                    };
72                    let Ok(requester_worker) = request.worker() else {
73                        return;
74                    };
75                    let Ok(payload) = request.payload() else {
76                        return;
77                    };
78
79                    handler(
80                        RequestContext {
81                            source,
82                            reply: ResponseSender {
83                                response_event_sender: &mut response_tx,
84                                request_id,
85                                source: source,
86                                requester_worker,
87                            },
88                        },
89                        payload,
90                    )
91                    .await;
92                },
93            )
94            .load_balance();
95    }
96
97    pub fn build(
98        self,
99        setup: &RoleSetup,
100        mut handler: impl AsyncFnMut(modrpc::EndpointAddr, Req::Lazy<'_>) -> Resp + 'static,
101    ) {
102        let response_tx: modrpc::EventTx<Response<Resp>> = self.hooks.response;
103        self.stubs
104            .request
105            .queued(
106                setup,
107                async move |source: modrpc::EndpointAddr, request: RequestLazy<Req>| {
108                    let Ok(request_id) = request.request_id() else {
109                        return;
110                    };
111                    let Ok(requester_worker) = request.worker() else {
112                        return;
113                    };
114                    let Ok(request_payload) = request.payload() else {
115                        return;
116                    };
117
118                    let response = handler(source, request_payload).await;
119                    response_tx
120                        .send(Response {
121                            request_id,
122                            requester: source.endpoint,
123                            requester_worker,
124                            payload: response,
125                        })
126                        .await;
127                },
128            )
129            .load_balance();
130    }
131
132    pub fn build_proxied(self, setup: &RoleSetup) {
133        self.stubs.request.proxy_load_balance(setup);
134    }
135}
136
137impl<Req, Resp> Clone for RequestServer<Req, Resp> {
138    fn clone(&self) -> Self {
139        Self {
140            name: self.name,
141            worker_id: self.worker_id,
142            hooks: self.hooks.clone(),
143            tracker: self.tracker.clone(),
144            tracing_enabled: self.tracing_enabled,
145        }
146    }
147}
148
149pub struct RequestContext<'a, R> {
150    pub source: modrpc::EndpointAddr,
151    pub reply: ResponseSender<'a, R>,
152}
153
154pub struct ResponseSender<'a, T> {
155    pub response_event_sender: &'a mut modrpc::EventTx<Response<T>>,
156    pub request_id: u32,
157    pub source: modrpc::EndpointAddr,
158    pub requester_worker: u16,
159}
160
161impl<T: mproto::Owned> ResponseSender<'_, T> {
162    #[inline]
163    pub async fn send(&mut self, response: impl mproto::Encode + mproto::Compatible<T>) {
164        self.response_event_sender
165            .send(ResponseGen {
166                request_id: self.request_id,
167                requester: self.source.endpoint,
168                requester_worker: self.requester_worker,
169                payload: response,
170            })
171            .await;
172    }
173}
174
175// Helpers to play nice with type inference for the very common situation where the response type
176// is a `Result`.
177impl<O: mproto::Owned, E: mproto::Owned> ResponseSender<'_, Result<O, E>> {
178    #[inline]
179    pub async fn send_ok(&mut self, response: impl mproto::Encode + mproto::Compatible<O>) {
180        self.response_event_sender
181            .send(ResponseGen {
182                request_id: self.request_id,
183                requester: self.source.endpoint,
184                requester_worker: self.requester_worker,
185                payload: Ok::<_, E>(response),
186            })
187            .await;
188    }
189
190    #[inline]
191    pub async fn send_err(&mut self, response: impl mproto::Encode + mproto::Compatible<E>) {
192        self.response_event_sender
193            .send(ResponseGen {
194                request_id: self.request_id,
195                requester: self.source.endpoint,
196                requester_worker: self.requester_worker,
197                payload: Err::<O, _>(response),
198            })
199            .await;
200    }
201}