1pub use razor_stream_macros::server_task_enum;
12
13use crate::proto::{RpcAction, RpcActionOwned};
14use crate::{Codec, error::*};
15use io_buffer::Buffer;
16use serde::{Deserialize, Serialize};
17use std::fmt;
18
19pub trait ServerTaskResp: ServerTaskEncode + Send + Sized + Unpin + 'static + fmt::Debug {}
21
22pub trait ServerTaskDecode<R: Send + Unpin + 'static>: Send + Sized + Unpin + 'static {
24 fn decode_req<'a, C: Codec>(
25 codec: &'a C, action: RpcAction<'a>, seq: u64, req: &'a [u8], blob: Option<Buffer>,
26 noti: RespNoti<R>,
27 ) -> Result<Self, ()>;
28}
29
30pub trait ServerTaskEncode: Send + 'static + Unpin {
37 fn encode_resp<'a, 'b, C: Codec>(
38 &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
39 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>);
40}
41
42pub trait ServerTaskDone<T: Send + 'static, E: RpcErrCodec>: Sized + 'static {
47 fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T>;
49
50 #[inline]
52 fn set_result(mut self, res: Result<(), E>)
53 where
54 T: std::convert::From<Self>,
55 {
56 let noti = self._set_result(res);
58 let parent: T = self.into();
59 noti.done(parent);
60 }
61}
62
63pub trait ServerTaskAction {
65 fn get_action<'a>(&'a self) -> RpcAction<'a>;
66}
67
68#[allow(dead_code)]
72pub struct ServerTaskVariant<T, M, E>
73where
74 T: Send + Unpin + 'static,
75 M: Send + Unpin + 'static,
76 E: RpcErrCodec,
77{
78 pub seq: u64,
79 pub action: RpcActionOwned,
80 pub msg: M,
81 pub blob: Option<Buffer>,
82 pub res: Option<Result<(), E>>,
83 pub noti: Option<RespNoti<T>>,
84}
85
86impl<T, M, E> fmt::Debug for ServerTaskVariant<T, M, E>
87where
88 T: Send + Unpin + 'static,
89 M: fmt::Debug + Send + Unpin + 'static,
90 E: RpcErrCodec + fmt::Display,
91{
92 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93 write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.msg)?;
94 match self.res.as_ref() {
95 Some(Ok(())) => {
96 write!(f, " ok")
97 }
98 Some(Err(e)) => {
99 write!(f, " err: {}", e)
100 }
101 _ => Ok(()),
102 }
103 }
104}
105
106impl<T, M, E> ServerTaskDone<T, E> for ServerTaskVariant<T, M, E>
107where
108 T: Send + Unpin + 'static,
109 M: Send + Unpin + 'static,
110 E: RpcErrCodec,
111{
112 fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
113 self.res.replace(res);
114 return self.noti.take().unwrap();
115 }
116}
117
118impl<T, M, E> ServerTaskDecode<T> for ServerTaskVariant<T, M, E>
119where
120 T: Send + Unpin + 'static,
121 M: for<'b> Deserialize<'b> + Send + Unpin + 'static,
122 E: RpcErrCodec,
123{
124 fn decode_req<'a, C: Codec>(
125 codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
126 noti: RespNoti<T>,
127 ) -> Result<Self, ()> {
128 let req = codec.decode(msg)?;
129 Ok(Self { seq, action: action.into(), msg: req, blob, res: None, noti: Some(noti) })
130 }
131}
132
133impl<T, M, E> ServerTaskAction for ServerTaskVariant<T, M, E>
134where
135 T: Send + Unpin + 'static,
136 M: Send + Unpin + 'static,
137 E: RpcErrCodec,
138{
139 fn get_action<'a>(&'a self) -> RpcAction<'a> {
140 self.action.to_action()
141 }
142}
143
144impl<T, M, E> ServerTaskEncode for ServerTaskVariant<T, M, E>
145where
146 T: Send + Unpin + 'static,
147 M: Serialize + Send + Unpin + 'static,
148 E: RpcErrCodec,
149{
150 #[inline]
151 fn encode_resp<'a, 'b, C: Codec>(
152 &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
153 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
154 if let Some(res) = self.res.as_ref() {
155 match res {
156 Ok(_) => match codec.encode_into(&self.msg, buf) {
157 Err(_) => {
158 return (self.seq, Err(RpcIntErr::Encode.into()));
159 }
160 Ok(msg_len) => {
161 return (self.seq, Ok((msg_len, self.blob.as_deref())));
162 }
163 },
164 Err(e) => {
165 return (self.seq, Err(e.encode::<C>(codec)));
166 }
167 }
168 } else {
169 panic!("no result when encode_resp");
170 }
171 }
172}
173
174#[allow(dead_code)]
178pub struct ServerTaskVariantFull<T, R, P, E>
179where
180 T: Send + Unpin + 'static,
181 R: Send + Unpin + 'static,
182 P: Send + Unpin + 'static,
183 E: RpcErrCodec,
184{
185 pub seq: u64,
186 pub action: RpcActionOwned,
187 pub req: R,
188 pub req_blob: Option<Buffer>,
189 pub resp: Option<P>,
190 pub resp_blob: Option<Buffer>,
191 pub res: Option<Result<(), E>>,
192 noti: Option<RespNoti<T>>,
193}
194
195impl<T, R, P, E> fmt::Debug for ServerTaskVariantFull<T, R, P, E>
196where
197 T: Send + Unpin + 'static,
198 R: Send + Unpin + 'static + fmt::Debug,
199 P: Send + Unpin + 'static,
200 E: RpcErrCodec + fmt::Display,
201{
202 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
203 write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.req)?;
204 match self.res.as_ref() {
205 Some(Ok(())) => write!(f, " ok"),
206 Some(Err(e)) => write!(f, " err: {}", e),
207 _ => Ok(()),
208 }
209 }
210}
211
212impl<T, R, P, E> ServerTaskDone<T, E> for ServerTaskVariantFull<T, R, P, E>
213where
214 T: Send + Unpin + 'static,
215 R: Send + Unpin + 'static,
216 P: Send + Unpin + 'static,
217 E: RpcErrCodec,
218{
219 fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
220 self.res.replace(res);
221 return self.noti.take().unwrap();
222 }
223}
224
225impl<T, R, P, E> ServerTaskDecode<T> for ServerTaskVariantFull<T, R, P, E>
226where
227 T: Send + Unpin + 'static,
228 R: for<'b> Deserialize<'b> + Send + Unpin + 'static,
229 P: Send + Unpin + 'static,
230 E: RpcErrCodec,
231{
232 fn decode_req<'a, C: Codec>(
233 codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
234 noti: RespNoti<T>,
235 ) -> Result<Self, ()> {
236 let req = codec.decode(msg)?;
237 Ok(Self {
238 seq,
239 action: action.into(),
240 req,
241 req_blob: blob,
242 res: None,
243 resp: None,
244 resp_blob: None,
245 noti: Some(noti),
246 })
247 }
248}
249
250impl<T, R, P, E> ServerTaskAction for ServerTaskVariantFull<T, R, P, E>
251where
252 T: Send + Unpin + 'static,
253 R: Send + Unpin + 'static,
254 P: Send + Unpin + 'static,
255 E: RpcErrCodec,
256{
257 fn get_action<'a>(&'a self) -> RpcAction<'a> {
258 self.action.to_action()
259 }
260}
261
262impl<T, R, P, E> ServerTaskEncode for ServerTaskVariantFull<T, R, P, E>
263where
264 T: Send + Unpin + 'static,
265 R: Send + Unpin + 'static,
266 P: Send + Unpin + 'static + Serialize,
267 E: RpcErrCodec,
268{
269 #[inline]
270 fn encode_resp<'a, 'b, C: Codec>(
271 &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
272 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
273 if let Some(res) = self.res.as_ref() {
274 match res {
275 Ok(_) => {
276 if let Some(resp) = self.resp.as_ref() {
277 match codec.encode_into(resp, buf) {
278 Err(_) => {
279 return (self.seq, Err(RpcIntErr::Encode.into()));
280 }
281 Ok(msg_len) => {
282 return (self.seq, Ok((msg_len, self.resp_blob.as_deref())));
283 }
284 }
285 } else {
286 return (self.seq, Ok((0, self.resp_blob.as_deref())));
288 }
289 }
290 Err(e) => {
291 return (self.seq, Err(e.encode::<C>(codec)));
292 }
293 }
294 } else {
295 panic!("no result when encode_resp");
296 }
297 }
298}
299
300pub struct RespNoti<T: Send + 'static>(
305 pub(crate) crossfire::MTx<Result<T, (u64, Option<RpcIntErr>)>>,
306);
307
308impl<T: Send + 'static> Clone for RespNoti<T> {
309 #[inline]
310 fn clone(&self) -> Self {
311 Self(self.0.clone())
312 }
313}
314
315impl<T: Send + 'static> RespNoti<T> {
316 pub fn new(tx: crossfire::MTx<Result<T, (u64, Option<RpcIntErr>)>>) -> Self {
317 Self(tx)
318 }
319
320 #[inline]
321 pub fn done(self, task: T) {
322 let _ = self.0.send(Ok(task));
323 }
324
325 #[inline]
326 pub(crate) fn send_err(&self, seq: u64, err: Option<RpcIntErr>) -> Result<(), ()> {
327 if self.0.send(Err((seq, err))).is_err() { return Err(()) } else { Ok(()) }
328 }
329}