razor_stream/server/
task.rs

1//! Trait for server-side task, and predefined task structures
2//!
3//! There are pre-defined structs that impl [ServerTaskDecode] and [ServerTaskResp]:
4//! - [ServerTaskVariant] is for the situation to map a request struct to a response struct
5//! - [ServerTaskVariantFull] is for the situation of holding the request and response msg in the same struct
6//!
7//! Provided macros:
8//!
9//! - [`#[server_task_enum]`](macro@server_task_enum): For defining a server-side RPC task enum.
10
11pub use razor_stream_macros::server_task_enum;
12
13use crate::proto::{RpcAction, RpcActionOwned};
14use crate::{Codec, error::*};
15use io_buffer::Buffer;
16use serde::{Deserialize, Serialize};
17use std::fmt;
18
19/// Sum up trait for server response task
20pub trait ServerTaskResp: ServerTaskEncode + Send + Sized + Unpin + 'static + fmt::Debug {}
21
22/// How to decode a server request
23pub trait ServerTaskDecode<R: Send + Unpin + 'static>: Send + Sized + Unpin + 'static {
24    fn decode_req<'a, C: Codec>(
25        codec: &'a C, action: RpcAction<'a>, seq: u64, req: &'a [u8], blob: Option<Buffer>,
26        noti: RespNoti<R>,
27    ) -> Result<Self, ()>;
28}
29
30/// How to encode a server response
31///
32/// For a server task with any type of buffer, user can always return u8 slice, so the framework
33/// don't need to known the type, but this requires reference and lifetime to the task.
34/// for the returning EncodedErr, it's possible generated during the encode,
35/// Otherwise when existing EncodedErr held in `res` field, the user need to take the res field out of the task.
36pub trait ServerTaskEncode: Send + 'static + Unpin {
37    fn encode_resp<'a, 'b, C: Codec>(
38        &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
39    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>);
40}
41
42/// How to notify Rpc framework when a task is done
43///
44/// This is not mandatory for the framework, this a guideline,
45/// You can skip this as long as you send the result back to RespNoti.
46pub trait ServerTaskDone<T: Send + 'static, E: RpcErrCodec>: Sized + 'static {
47    /// Should implement for enum delegation, not intended for user call
48    fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T>;
49
50    /// For users, set the result in the task and send it back
51    #[inline]
52    fn set_result(mut self, res: Result<(), E>)
53    where
54        T: std::convert::From<Self>,
55    {
56        // NOTE: To allow a trait to consume self, must require Sized
57        let noti = self._set_result(res);
58        let parent: T = self.into();
59        noti.done(parent);
60    }
61}
62
63/// Get RpcAction from a enum task, or a sub-type that fits multiple RpcActions
64pub trait ServerTaskAction {
65    fn get_action<'a>(&'a self) -> RpcAction<'a>;
66}
67
68/// A container that impl ServerTaskResp to show an example,
69/// presuming you have a different types to represent Request and Response.
70/// You can write your customize version.
71#[allow(dead_code)]
72pub struct ServerTaskVariant<T, M, E>
73where
74    T: Send + Unpin + 'static,
75    M: Send + Unpin + 'static,
76    E: RpcErrCodec,
77{
78    pub seq: u64,
79    pub action: RpcActionOwned,
80    pub msg: M,
81    pub blob: Option<Buffer>,
82    pub res: Option<Result<(), E>>,
83    pub noti: Option<RespNoti<T>>,
84}
85
86impl<T, M, E> fmt::Debug for ServerTaskVariant<T, M, E>
87where
88    T: Send + Unpin + 'static,
89    M: fmt::Debug + Send + Unpin + 'static,
90    E: RpcErrCodec + fmt::Display,
91{
92    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
93        write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.msg)?;
94        match self.res.as_ref() {
95            Some(Ok(())) => {
96                write!(f, " ok")
97            }
98            Some(Err(e)) => {
99                write!(f, " err: {}", e)
100            }
101            _ => Ok(()),
102        }
103    }
104}
105
106impl<T, M, E> ServerTaskDone<T, E> for ServerTaskVariant<T, M, E>
107where
108    T: Send + Unpin + 'static,
109    M: Send + Unpin + 'static,
110    E: RpcErrCodec,
111{
112    fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
113        self.res.replace(res);
114        return self.noti.take().unwrap();
115    }
116}
117
118impl<T, M, E> ServerTaskDecode<T> for ServerTaskVariant<T, M, E>
119where
120    T: Send + Unpin + 'static,
121    M: for<'b> Deserialize<'b> + Send + Unpin + 'static,
122    E: RpcErrCodec,
123{
124    fn decode_req<'a, C: Codec>(
125        codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
126        noti: RespNoti<T>,
127    ) -> Result<Self, ()> {
128        let req = codec.decode(msg)?;
129        Ok(Self { seq, action: action.into(), msg: req, blob, res: None, noti: Some(noti) })
130    }
131}
132
133impl<T, M, E> ServerTaskAction for ServerTaskVariant<T, M, E>
134where
135    T: Send + Unpin + 'static,
136    M: Send + Unpin + 'static,
137    E: RpcErrCodec,
138{
139    fn get_action<'a>(&'a self) -> RpcAction<'a> {
140        self.action.to_action()
141    }
142}
143
144impl<T, M, E> ServerTaskEncode for ServerTaskVariant<T, M, E>
145where
146    T: Send + Unpin + 'static,
147    M: Serialize + Send + Unpin + 'static,
148    E: RpcErrCodec,
149{
150    #[inline]
151    fn encode_resp<'a, 'b, C: Codec>(
152        &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
153    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
154        if let Some(res) = self.res.as_ref() {
155            match res {
156                Ok(_) => match codec.encode_into(&self.msg, buf) {
157                    Err(_) => {
158                        return (self.seq, Err(RpcIntErr::Encode.into()));
159                    }
160                    Ok(msg_len) => {
161                        return (self.seq, Ok((msg_len, self.blob.as_deref())));
162                    }
163                },
164                Err(e) => {
165                    return (self.seq, Err(e.encode::<C>(codec)));
166                }
167            }
168        } else {
169            panic!("no result when encode_resp");
170        }
171    }
172}
173
174/// A container that impl ServerTaskResp to show an example,
175/// presuming you have a type to carry both Request and Response.
176/// You can write your customize version.
177#[allow(dead_code)]
178pub struct ServerTaskVariantFull<T, R, P, E>
179where
180    T: Send + Unpin + 'static,
181    R: Send + Unpin + 'static,
182    P: Send + Unpin + 'static,
183    E: RpcErrCodec,
184{
185    pub seq: u64,
186    pub action: RpcActionOwned,
187    pub req: R,
188    pub req_blob: Option<Buffer>,
189    pub resp: Option<P>,
190    pub resp_blob: Option<Buffer>,
191    pub res: Option<Result<(), E>>,
192    noti: Option<RespNoti<T>>,
193}
194
195impl<T, R, P, E> fmt::Debug for ServerTaskVariantFull<T, R, P, E>
196where
197    T: Send + Unpin + 'static,
198    R: Send + Unpin + 'static + fmt::Debug,
199    P: Send + Unpin + 'static,
200    E: RpcErrCodec + fmt::Display,
201{
202    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
203        write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.req)?;
204        match self.res.as_ref() {
205            Some(Ok(())) => write!(f, " ok"),
206            Some(Err(e)) => write!(f, " err: {}", e),
207            _ => Ok(()),
208        }
209    }
210}
211
212impl<T, R, P, E> ServerTaskDone<T, E> for ServerTaskVariantFull<T, R, P, E>
213where
214    T: Send + Unpin + 'static,
215    R: Send + Unpin + 'static,
216    P: Send + Unpin + 'static,
217    E: RpcErrCodec,
218{
219    fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
220        self.res.replace(res);
221        return self.noti.take().unwrap();
222    }
223}
224
225impl<T, R, P, E> ServerTaskDecode<T> for ServerTaskVariantFull<T, R, P, E>
226where
227    T: Send + Unpin + 'static,
228    R: for<'b> Deserialize<'b> + Send + Unpin + 'static,
229    P: Send + Unpin + 'static,
230    E: RpcErrCodec,
231{
232    fn decode_req<'a, C: Codec>(
233        codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
234        noti: RespNoti<T>,
235    ) -> Result<Self, ()> {
236        let req = codec.decode(msg)?;
237        Ok(Self {
238            seq,
239            action: action.into(),
240            req,
241            req_blob: blob,
242            res: None,
243            resp: None,
244            resp_blob: None,
245            noti: Some(noti),
246        })
247    }
248}
249
250impl<T, R, P, E> ServerTaskAction for ServerTaskVariantFull<T, R, P, E>
251where
252    T: Send + Unpin + 'static,
253    R: Send + Unpin + 'static,
254    P: Send + Unpin + 'static,
255    E: RpcErrCodec,
256{
257    fn get_action<'a>(&'a self) -> RpcAction<'a> {
258        self.action.to_action()
259    }
260}
261
262impl<T, R, P, E> ServerTaskEncode for ServerTaskVariantFull<T, R, P, E>
263where
264    T: Send + Unpin + 'static,
265    R: Send + Unpin + 'static,
266    P: Send + Unpin + 'static + Serialize,
267    E: RpcErrCodec,
268{
269    #[inline]
270    fn encode_resp<'a, 'b, C: Codec>(
271        &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
272    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
273        if let Some(res) = self.res.as_ref() {
274            match res {
275                Ok(_) => {
276                    if let Some(resp) = self.resp.as_ref() {
277                        match codec.encode_into(resp, buf) {
278                            Err(_) => {
279                                return (self.seq, Err(RpcIntErr::Encode.into()));
280                            }
281                            Ok(msg_len) => {
282                                return (self.seq, Ok((msg_len, self.resp_blob.as_deref())));
283                            }
284                        }
285                    } else {
286                        // empty response
287                        return (self.seq, Ok((0, self.resp_blob.as_deref())));
288                    }
289                }
290                Err(e) => {
291                    return (self.seq, Err(e.encode::<C>(codec)));
292                }
293            }
294        } else {
295            panic!("no result when encode_resp");
296        }
297    }
298}
299
300/// A writer channel to send response to the server framework.
301///
302/// It can be cloned anywhere.
303/// The user doesn't need to call it directly.
304pub struct RespNoti<T: Send + 'static>(
305    pub(crate) crossfire::MTx<Result<T, (u64, Option<RpcIntErr>)>>,
306);
307
308impl<T: Send + 'static> Clone for RespNoti<T> {
309    #[inline]
310    fn clone(&self) -> Self {
311        Self(self.0.clone())
312    }
313}
314
315impl<T: Send + 'static> RespNoti<T> {
316    pub fn new(tx: crossfire::MTx<Result<T, (u64, Option<RpcIntErr>)>>) -> Self {
317        Self(tx)
318    }
319
320    #[inline]
321    pub fn done(self, task: T) {
322        let _ = self.0.send(Ok(task));
323    }
324
325    #[inline]
326    pub(crate) fn send_err(&self, seq: u64, err: Option<RpcIntErr>) -> Result<(), ()> {
327        if self.0.send(Err((seq, err))).is_err() { return Err(()) } else { Ok(()) }
328    }
329}