1pub use razor_stream_macros::server_task_enum;
12
13use crate::proto::{RpcAction, RpcActionOwned};
14use crate::{Codec, error::*};
15use crossfire::*;
16use io_buffer::Buffer;
17use serde::{Deserialize, Serialize};
18use std::fmt;
19
20pub trait ServerTaskResp: ServerTaskEncode + Send + Sized + 'static + fmt::Debug {}
22
23pub trait ServerTaskDecode<R: Send + 'static>: Send + Sized + 'static {
25 fn decode_req<'a, C: Codec>(
26 codec: &'a C, action: RpcAction<'a>, seq: u64, req: &'a [u8], blob: Option<Buffer>,
27 noti: RespNoti<R>,
28 ) -> Result<Self, ()>;
29}
30
31pub trait ServerTaskEncode: Send + 'static {
38 fn encode_resp<'a, 'b, C: Codec>(
39 &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
40 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>);
41}
42
43pub trait ServerTaskDone<T, E>: Sized + 'static
48where
49 T: Send + 'static,
50 E: RpcErrCodec,
51{
52 fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T>;
54
55 #[inline]
57 fn set_result(mut self, res: Result<(), E>)
58 where
59 T: std::convert::From<Self>,
60 {
61 let noti = self._set_result(res);
63 let parent: T = self.into();
64 noti.done(parent);
65 }
66}
67
68pub trait ServerTaskAction {
70 fn get_action<'a>(&'a self) -> RpcAction<'a>;
71}
72
73#[allow(dead_code)]
77pub struct ServerTaskVariant<T, M, E>
78where
79 T: Send + 'static,
80 M: Send + 'static,
81 E: RpcErrCodec,
82{
83 pub seq: u64,
84 pub action: RpcActionOwned,
85 pub msg: M,
86 pub blob: Option<Buffer>,
87 pub res: Option<Result<(), E>>,
88 pub noti: Option<RespNoti<T>>,
89}
90
91impl<T, M, E> fmt::Debug for ServerTaskVariant<T, M, E>
92where
93 T: Send + 'static,
94 M: fmt::Debug + Send + 'static,
95 E: RpcErrCodec + fmt::Display,
96{
97 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98 write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.msg)?;
99 match self.res.as_ref() {
100 Some(Ok(())) => {
101 write!(f, " ok")
102 }
103 Some(Err(e)) => {
104 write!(f, " err: {}", e)
105 }
106 _ => Ok(()),
107 }
108 }
109}
110
111impl<T, M, E> ServerTaskDone<T, E> for ServerTaskVariant<T, M, E>
112where
113 T: Send + 'static,
114 M: Send + 'static,
115 E: RpcErrCodec,
116{
117 fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
118 self.res.replace(res);
119 return self.noti.take().unwrap();
120 }
121}
122
123impl<T, M, E> ServerTaskDecode<T> for ServerTaskVariant<T, M, E>
124where
125 T: Send + 'static,
126 M: for<'b> Deserialize<'b> + Send + 'static,
127 E: RpcErrCodec,
128{
129 fn decode_req<'a, C: Codec>(
130 codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
131 noti: RespNoti<T>,
132 ) -> Result<Self, ()> {
133 let req = codec.decode(msg)?;
134 Ok(Self { seq, action: action.into(), msg: req, blob, res: None, noti: Some(noti) })
135 }
136}
137
138impl<T, M, E> ServerTaskAction for ServerTaskVariant<T, M, E>
139where
140 T: Send + 'static,
141 M: Send + 'static,
142 E: RpcErrCodec,
143{
144 fn get_action<'a>(&'a self) -> RpcAction<'a> {
145 self.action.to_action()
146 }
147}
148
149impl<T, M, E> ServerTaskEncode for ServerTaskVariant<T, M, E>
150where
151 T: Send + 'static,
152 M: Serialize + Send + 'static,
153 E: RpcErrCodec,
154{
155 #[inline]
156 fn encode_resp<'a, 'b, C: Codec>(
157 &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
158 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
159 if let Some(res) = self.res.as_ref() {
160 match res {
161 Ok(_) => match codec.encode_into(&self.msg, buf) {
162 Err(_) => {
163 return (self.seq, Err(RpcIntErr::Encode.into()));
164 }
165 Ok(msg_len) => {
166 return (self.seq, Ok((msg_len, self.blob.as_deref())));
167 }
168 },
169 Err(e) => {
170 return (self.seq, Err(e.encode::<C>(codec)));
171 }
172 }
173 } else {
174 panic!("no result when encode_resp");
175 }
176 }
177}
178
179#[allow(dead_code)]
183pub struct ServerTaskVariantFull<T, R, P, E>
184where
185 T: Send + 'static,
186 R: Send + 'static,
187 P: Send + 'static,
188 E: RpcErrCodec,
189{
190 pub seq: u64,
191 pub action: RpcActionOwned,
192 pub req: R,
193 pub req_blob: Option<Buffer>,
194 pub resp: Option<P>,
195 pub resp_blob: Option<Buffer>,
196 pub res: Option<Result<(), E>>,
197 noti: Option<RespNoti<T>>,
198}
199
200impl<T, R, P, E> fmt::Debug for ServerTaskVariantFull<T, R, P, E>
201where
202 T: Send + 'static,
203 R: Send + 'static + fmt::Debug,
204 P: Send + 'static,
205 E: RpcErrCodec + fmt::Display,
206{
207 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
208 write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.req)?;
209 match self.res.as_ref() {
210 Some(Ok(())) => write!(f, " ok"),
211 Some(Err(e)) => write!(f, " err: {}", e),
212 _ => Ok(()),
213 }
214 }
215}
216
217impl<T, R, P, E> ServerTaskDone<T, E> for ServerTaskVariantFull<T, R, P, E>
218where
219 T: Send + 'static,
220 R: Send + 'static,
221 P: Send + 'static,
222 E: RpcErrCodec,
223{
224 fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
225 self.res.replace(res);
226 return self.noti.take().unwrap();
227 }
228}
229
230impl<T, R, P, E> ServerTaskDecode<T> for ServerTaskVariantFull<T, R, P, E>
231where
232 T: Send + 'static,
233 R: for<'b> Deserialize<'b> + Send + 'static,
234 P: Send + 'static,
235 E: RpcErrCodec,
236{
237 fn decode_req<'a, C: Codec>(
238 codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
239 noti: RespNoti<T>,
240 ) -> Result<Self, ()> {
241 let req = codec.decode(msg)?;
242 Ok(Self {
243 seq,
244 action: action.into(),
245 req,
246 req_blob: blob,
247 res: None,
248 resp: None,
249 resp_blob: None,
250 noti: Some(noti),
251 })
252 }
253}
254
255impl<T, R, P, E> ServerTaskAction for ServerTaskVariantFull<T, R, P, E>
256where
257 T: Send + 'static,
258 R: Send + 'static,
259 P: Send + 'static,
260 E: RpcErrCodec,
261{
262 fn get_action<'a>(&'a self) -> RpcAction<'a> {
263 self.action.to_action()
264 }
265}
266
267impl<T, R, P, E> ServerTaskEncode for ServerTaskVariantFull<T, R, P, E>
268where
269 T: Send + 'static,
270 R: Send + 'static,
271 P: Send + 'static + Serialize,
272 E: RpcErrCodec,
273{
274 #[inline]
275 fn encode_resp<'a, 'b, C: Codec>(
276 &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
277 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
278 if let Some(res) = self.res.as_ref() {
279 match res {
280 Ok(_) => {
281 if let Some(resp) = self.resp.as_ref() {
282 match codec.encode_into(resp, buf) {
283 Err(_) => {
284 return (self.seq, Err(RpcIntErr::Encode.into()));
285 }
286 Ok(msg_len) => {
287 return (self.seq, Ok((msg_len, self.resp_blob.as_deref())));
288 }
289 }
290 } else {
291 return (self.seq, Ok((0, self.resp_blob.as_deref())));
293 }
294 }
295 Err(e) => {
296 return (self.seq, Err(e.encode::<C>(codec)));
297 }
298 }
299 } else {
300 panic!("no result when encode_resp");
301 }
302 }
303}
304
305pub struct RespNoti<T: Send + 'static>(
310 pub(crate) MTx<mpsc::List<Result<T, (u64, Option<RpcIntErr>)>>>,
311);
312
313impl<T: Send + 'static> Clone for RespNoti<T> {
314 #[inline]
315 fn clone(&self) -> Self {
316 Self(self.0.clone())
317 }
318}
319
320impl<T: Send + 'static> RespNoti<T> {
321 pub fn new(tx: MTx<mpsc::List<Result<T, (u64, Option<RpcIntErr>)>>>) -> Self {
322 Self(tx)
323 }
324
325 #[inline]
326 pub fn done(self, task: T) {
327 let _ = self.0.send(Ok(task));
328 }
329
330 #[inline]
331 pub(crate) fn send_err(&self, seq: u64, err: Option<RpcIntErr>) -> Result<(), ()> {
332 if self.0.send(Err((seq, err))).is_err() { return Err(()) } else { Ok(()) }
333 }
334}