1use crate::proto::{RequestInitState, RequestLazy, RequestServerConfig, Response, ResponseGen};
2use modrpc::RoleSetup;
3
4use crate::request_tracker::{RequestTracker, get_request_tracker};
5
6pub struct RequestServer<Req, Resp> {
7 name: &'static str,
8 worker_id: u16,
9 hooks: crate::RequestServerHooks<Req, Resp>,
10 tracker: RequestTracker,
11 tracing_enabled: bool,
12}
13
14pub struct RequestServerBuilder<Req, Resp> {
15 pub name: &'static str,
16 pub hooks: crate::RequestServerHooks<Req, Resp>,
17 pub stubs: crate::RequestServerStubs<Req, Resp>,
18 pub init: RequestInitState,
19}
20
21impl<Req: mproto::Owned, Resp: mproto::Owned> RequestServerBuilder<Req, Resp> {
22 pub fn new(
23 name: &'static str,
24 hooks: crate::RequestServerHooks<Req, Resp>,
25 stubs: crate::RequestServerStubs<Req, Resp>,
26 _config: &RequestServerConfig,
27 init: RequestInitState,
28 ) -> Self {
29 Self {
30 name,
31 hooks,
32 stubs,
33 init,
34 }
35 }
36
37 pub fn create_handle(&self, setup: &RoleSetup) -> RequestServer<Req, Resp> {
38 let worker_id = setup.worker_id();
39 let tracker = get_request_tracker(setup);
40
41 RequestServer {
42 name: self.name,
43 worker_id,
44 hooks: self.hooks.clone(),
45 tracker,
46 tracing_enabled: true,
47 }
48 }
49
50 pub fn build_shared(self ) {
51 }
57
58 pub fn build_replier(
59 self,
60 setup: &RoleSetup,
61 mut handler: impl AsyncFnMut(RequestContext<Resp>, Req::Lazy<'_>) + 'static,
62 ) {
63 let mut response_tx: modrpc::EventTx<Response<Resp>> = self.hooks.response;
64 self.stubs
65 .request
66 .queued(
67 setup,
68 async move |source: modrpc::EndpointAddr, request: RequestLazy<Req>| {
69 let Ok(request_id) = request.request_id() else {
70 return;
71 };
72 let Ok(requester_worker) = request.worker() else {
73 return;
74 };
75 let Ok(payload) = request.payload() else {
76 return;
77 };
78
79 handler(
80 RequestContext {
81 source,
82 reply: ResponseSender {
83 response_event_sender: &mut response_tx,
84 request_id,
85 source: source,
86 requester_worker,
87 },
88 },
89 payload,
90 )
91 .await;
92 },
93 )
94 .load_balance();
95 }
96
97 pub fn build(
98 self,
99 setup: &RoleSetup,
100 mut handler: impl AsyncFnMut(modrpc::EndpointAddr, Req::Lazy<'_>) -> Resp + 'static,
101 ) {
102 let response_tx: modrpc::EventTx<Response<Resp>> = self.hooks.response;
103 self.stubs
104 .request
105 .queued(
106 setup,
107 async move |source: modrpc::EndpointAddr, request: RequestLazy<Req>| {
108 let Ok(request_id) = request.request_id() else {
109 return;
110 };
111 let Ok(requester_worker) = request.worker() else {
112 return;
113 };
114 let Ok(request_payload) = request.payload() else {
115 return;
116 };
117
118 let response = handler(source, request_payload).await;
119 response_tx
120 .send(Response {
121 request_id,
122 requester: source.endpoint,
123 requester_worker,
124 payload: response,
125 })
126 .await;
127 },
128 )
129 .load_balance();
130 }
131
132 pub fn build_proxied(self, setup: &RoleSetup) {
133 self.stubs.request.proxy_load_balance(setup);
134 }
135}
136
137impl<Req, Resp> Clone for RequestServer<Req, Resp> {
138 fn clone(&self) -> Self {
139 Self {
140 name: self.name,
141 worker_id: self.worker_id,
142 hooks: self.hooks.clone(),
143 tracker: self.tracker.clone(),
144 tracing_enabled: self.tracing_enabled,
145 }
146 }
147}
148
149pub struct RequestContext<'a, R> {
150 pub source: modrpc::EndpointAddr,
151 pub reply: ResponseSender<'a, R>,
152}
153
154pub struct ResponseSender<'a, T> {
155 pub response_event_sender: &'a mut modrpc::EventTx<Response<T>>,
156 pub request_id: u32,
157 pub source: modrpc::EndpointAddr,
158 pub requester_worker: u16,
159}
160
161impl<T: mproto::Owned> ResponseSender<'_, T> {
162 #[inline]
163 pub async fn send(&mut self, response: impl mproto::Encode + mproto::Compatible<T>) {
164 self.response_event_sender
165 .send(ResponseGen {
166 request_id: self.request_id,
167 requester: self.source.endpoint,
168 requester_worker: self.requester_worker,
169 payload: response,
170 })
171 .await;
172 }
173}
174
175impl<O: mproto::Owned, E: mproto::Owned> ResponseSender<'_, Result<O, E>> {
178 #[inline]
179 pub async fn send_ok(&mut self, response: impl mproto::Encode + mproto::Compatible<O>) {
180 self.response_event_sender
181 .send(ResponseGen {
182 request_id: self.request_id,
183 requester: self.source.endpoint,
184 requester_worker: self.requester_worker,
185 payload: Ok::<_, E>(response),
186 })
187 .await;
188 }
189
190 #[inline]
191 pub async fn send_err(&mut self, response: impl mproto::Encode + mproto::Compatible<E>) {
192 self.response_event_sender
193 .send(ResponseGen {
194 request_id: self.request_id,
195 requester: self.source.endpoint,
196 requester_worker: self.requester_worker,
197 payload: Err::<O, _>(response),
198 })
199 .await;
200 }
201}