Skip to main content

razor_stream/server/
task.rs

1//! Trait for server-side task, and predefined task structures
2//!
3//! There are pre-defined structs that impl [ServerTaskDecode] and [ServerTaskResp]:
4//! - [ServerTaskVariant] is for the situation to map a request struct to a response struct
5//! - [ServerTaskVariantFull] is for the situation of holding the request and response msg in the same struct
6//!
7//! Provided macros:
8//!
9//! - [`#[server_task_enum]`](macro@server_task_enum): For defining a server-side RPC task enum.
10
11pub use razor_stream_macros::server_task_enum;
12
13use crate::proto::{RpcAction, RpcActionOwned};
14use crate::{Codec, error::*};
15use crossfire::*;
16use io_buffer::Buffer;
17use serde::{Deserialize, Serialize};
18use std::fmt;
19
20/// Sum up trait for server response task
21pub trait ServerTaskResp: ServerTaskEncode + Send + Sized + 'static + fmt::Debug {}
22
23/// How to decode a server request
24pub trait ServerTaskDecode<R: Send + 'static>: Send + Sized + 'static {
25    fn decode_req<'a, C: Codec>(
26        codec: &'a C, action: RpcAction<'a>, seq: u64, req: &'a [u8], blob: Option<Buffer>,
27        noti: RespNoti<R>,
28    ) -> Result<Self, ()>;
29}
30
31/// How to encode a server response
32///
33/// For a server task with any type of buffer, user can always return u8 slice, so the framework
34/// don't need to known the type, but this requires reference and lifetime to the task.
35/// for the returning EncodedErr, it's possible generated during the encode,
36/// Otherwise when existing EncodedErr held in `res` field, the user need to take the res field out of the task.
37pub trait ServerTaskEncode: Send + 'static {
38    fn encode_resp<'a, 'b, C: Codec>(
39        &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
40    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>);
41}
42
43/// How to notify Rpc framework when a task is done
44///
45/// This is not mandatory for the framework, this a guideline,
46/// You can skip this as long as you send the result back to RespNoti.
47pub trait ServerTaskDone<T, E>: Sized + 'static
48where
49    T: Send + 'static,
50    E: RpcErrCodec,
51{
52    /// Should implement for enum delegation, not intended for user call
53    fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T>;
54
55    /// For users, set the result in the task and send it back
56    #[inline]
57    fn set_result(mut self, res: Result<(), E>)
58    where
59        T: std::convert::From<Self>,
60    {
61        // NOTE: To allow a trait to consume self, must require Sized
62        let noti = self._set_result(res);
63        let parent: T = self.into();
64        noti.done(parent);
65    }
66}
67
68/// Get RpcAction from a enum task, or a sub-type that fits multiple RpcActions
69pub trait ServerTaskAction {
70    fn get_action<'a>(&'a self) -> RpcAction<'a>;
71}
72
73/// A container that impl ServerTaskResp to show an example,
74/// presuming you have a different types to represent Request and Response.
75/// You can write your customize version.
76#[allow(dead_code)]
77pub struct ServerTaskVariant<T, M, E>
78where
79    T: Send + 'static,
80    M: Send + 'static,
81    E: RpcErrCodec,
82{
83    pub seq: u64,
84    pub action: RpcActionOwned,
85    pub msg: M,
86    pub blob: Option<Buffer>,
87    pub res: Option<Result<(), E>>,
88    pub noti: Option<RespNoti<T>>,
89}
90
91impl<T, M, E> fmt::Debug for ServerTaskVariant<T, M, E>
92where
93    T: Send + 'static,
94    M: fmt::Debug + Send + 'static,
95    E: RpcErrCodec + fmt::Display,
96{
97    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
98        write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.msg)?;
99        match self.res.as_ref() {
100            Some(Ok(())) => {
101                write!(f, " ok")
102            }
103            Some(Err(e)) => {
104                write!(f, " err: {}", e)
105            }
106            _ => Ok(()),
107        }
108    }
109}
110
111impl<T, M, E> ServerTaskDone<T, E> for ServerTaskVariant<T, M, E>
112where
113    T: Send + 'static,
114    M: Send + 'static,
115    E: RpcErrCodec,
116{
117    fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
118        self.res.replace(res);
119        return self.noti.take().unwrap();
120    }
121}
122
123impl<T, M, E> ServerTaskDecode<T> for ServerTaskVariant<T, M, E>
124where
125    T: Send + 'static,
126    M: for<'b> Deserialize<'b> + Send + 'static,
127    E: RpcErrCodec,
128{
129    fn decode_req<'a, C: Codec>(
130        codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
131        noti: RespNoti<T>,
132    ) -> Result<Self, ()> {
133        let req = codec.decode(msg)?;
134        Ok(Self { seq, action: action.into(), msg: req, blob, res: None, noti: Some(noti) })
135    }
136}
137
138impl<T, M, E> ServerTaskAction for ServerTaskVariant<T, M, E>
139where
140    T: Send + 'static,
141    M: Send + 'static,
142    E: RpcErrCodec,
143{
144    fn get_action<'a>(&'a self) -> RpcAction<'a> {
145        self.action.to_action()
146    }
147}
148
149impl<T, M, E> ServerTaskEncode for ServerTaskVariant<T, M, E>
150where
151    T: Send + 'static,
152    M: Serialize + Send + 'static,
153    E: RpcErrCodec,
154{
155    #[inline]
156    fn encode_resp<'a, 'b, C: Codec>(
157        &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
158    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
159        if let Some(res) = self.res.as_ref() {
160            match res {
161                Ok(_) => match codec.encode_into(&self.msg, buf) {
162                    Err(_) => {
163                        return (self.seq, Err(RpcIntErr::Encode.into()));
164                    }
165                    Ok(msg_len) => {
166                        return (self.seq, Ok((msg_len, self.blob.as_deref())));
167                    }
168                },
169                Err(e) => {
170                    return (self.seq, Err(e.encode::<C>(codec)));
171                }
172            }
173        } else {
174            panic!("no result when encode_resp");
175        }
176    }
177}
178
179/// A container that impl ServerTaskResp to show an example,
180/// presuming you have a type to carry both Request and Response.
181/// You can write your customize version.
182#[allow(dead_code)]
183pub struct ServerTaskVariantFull<T, R, P, E>
184where
185    T: Send + 'static,
186    R: Send + 'static,
187    P: Send + 'static,
188    E: RpcErrCodec,
189{
190    pub seq: u64,
191    pub action: RpcActionOwned,
192    pub req: R,
193    pub req_blob: Option<Buffer>,
194    pub resp: Option<P>,
195    pub resp_blob: Option<Buffer>,
196    pub res: Option<Result<(), E>>,
197    noti: Option<RespNoti<T>>,
198}
199
200impl<T, R, P, E> fmt::Debug for ServerTaskVariantFull<T, R, P, E>
201where
202    T: Send + 'static,
203    R: Send + 'static + fmt::Debug,
204    P: Send + 'static,
205    E: RpcErrCodec + fmt::Display,
206{
207    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
208        write!(f, "task seq={} action={:?} {:?}", self.seq, self.action, self.req)?;
209        match self.res.as_ref() {
210            Some(Ok(())) => write!(f, " ok"),
211            Some(Err(e)) => write!(f, " err: {}", e),
212            _ => Ok(()),
213        }
214    }
215}
216
217impl<T, R, P, E> ServerTaskDone<T, E> for ServerTaskVariantFull<T, R, P, E>
218where
219    T: Send + 'static,
220    R: Send + 'static,
221    P: Send + 'static,
222    E: RpcErrCodec,
223{
224    fn _set_result(&mut self, res: Result<(), E>) -> RespNoti<T> {
225        self.res.replace(res);
226        return self.noti.take().unwrap();
227    }
228}
229
230impl<T, R, P, E> ServerTaskDecode<T> for ServerTaskVariantFull<T, R, P, E>
231where
232    T: Send + 'static,
233    R: for<'b> Deserialize<'b> + Send + 'static,
234    P: Send + 'static,
235    E: RpcErrCodec,
236{
237    fn decode_req<'a, C: Codec>(
238        codec: &'a C, action: RpcAction<'a>, seq: u64, msg: &'a [u8], blob: Option<Buffer>,
239        noti: RespNoti<T>,
240    ) -> Result<Self, ()> {
241        let req = codec.decode(msg)?;
242        Ok(Self {
243            seq,
244            action: action.into(),
245            req,
246            req_blob: blob,
247            res: None,
248            resp: None,
249            resp_blob: None,
250            noti: Some(noti),
251        })
252    }
253}
254
255impl<T, R, P, E> ServerTaskAction for ServerTaskVariantFull<T, R, P, E>
256where
257    T: Send + 'static,
258    R: Send + 'static,
259    P: Send + 'static,
260    E: RpcErrCodec,
261{
262    fn get_action<'a>(&'a self) -> RpcAction<'a> {
263        self.action.to_action()
264    }
265}
266
267impl<T, R, P, E> ServerTaskEncode for ServerTaskVariantFull<T, R, P, E>
268where
269    T: Send + 'static,
270    R: Send + 'static,
271    P: Send + 'static + Serialize,
272    E: RpcErrCodec,
273{
274    #[inline]
275    fn encode_resp<'a, 'b, C: Codec>(
276        &'a mut self, codec: &'b C, buf: &'b mut Vec<u8>,
277    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
278        if let Some(res) = self.res.as_ref() {
279            match res {
280                Ok(_) => {
281                    if let Some(resp) = self.resp.as_ref() {
282                        match codec.encode_into(resp, buf) {
283                            Err(_) => {
284                                return (self.seq, Err(RpcIntErr::Encode.into()));
285                            }
286                            Ok(msg_len) => {
287                                return (self.seq, Ok((msg_len, self.resp_blob.as_deref())));
288                            }
289                        }
290                    } else {
291                        // empty response
292                        return (self.seq, Ok((0, self.resp_blob.as_deref())));
293                    }
294                }
295                Err(e) => {
296                    return (self.seq, Err(e.encode::<C>(codec)));
297                }
298            }
299        } else {
300            panic!("no result when encode_resp");
301        }
302    }
303}
304
305/// A writer channel to send response to the server framework.
306///
307/// It can be cloned anywhere.
308/// The user doesn't need to call it directly.
309pub struct RespNoti<T: Send + 'static>(
310    pub(crate) MTx<mpsc::List<Result<T, (u64, Option<RpcIntErr>)>>>,
311);
312
313impl<T: Send + 'static> Clone for RespNoti<T> {
314    #[inline]
315    fn clone(&self) -> Self {
316        Self(self.0.clone())
317    }
318}
319
320impl<T: Send + 'static> RespNoti<T> {
321    pub fn new(tx: MTx<mpsc::List<Result<T, (u64, Option<RpcIntErr>)>>>) -> Self {
322        Self(tx)
323    }
324
325    #[inline]
326    pub fn done(self, task: T) {
327        let _ = self.0.send(Ok(task));
328    }
329
330    #[inline]
331    pub(crate) fn send_err(&self, seq: u64, err: Option<RpcIntErr>) -> Result<(), ()> {
332        if self.0.send(Err((seq, err))).is_err() { return Err(()) } else { Ok(()) }
333    }
334}