Skip to main content

razor_stream/server/
mod.rs

1//! This module contains traits defined for the server-side
2//!
3
4use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use crossfire::{MAsyncRx, mpmc};
8use io_buffer::Buffer;
9use orb::prelude::*;
10use std::time::Duration;
11use std::{fmt, future::Future, io, sync::Arc};
12
13pub mod task;
14use task::*;
15
16mod server;
17pub use server::RpcServer;
18
19pub mod graceful;
20
21pub mod dispatch;
22use dispatch::Dispatch;
23
24/// General config for server-side
25#[derive(Clone)]
26pub struct ServerConfig {
27    /// socket read timeout
28    pub read_timeout: Duration,
29    /// Socket write timeout
30    pub write_timeout: Duration,
31    /// Socket idle time to be close.
32    pub idle_timeout: Duration,
33    /// wait for all connections to be close with a timeout
34    pub server_close_wait: Duration,
35    /// In bytes. when non-zero, overwrite the default DEFAULT_BUF_SIZE of transport
36    pub stream_buf_size: usize,
37}
38
39impl Default for ServerConfig {
40    fn default() -> Self {
41        Self {
42            read_timeout: Duration::from_secs(5),
43            write_timeout: Duration::from_secs(5),
44            idle_timeout: Duration::from_secs(120),
45            server_close_wait: Duration::from_secs(90),
46            stream_buf_size: 0,
47        }
48    }
49}
50
51/// A central hub defined by the user for the server-side, to define the customizable plugin.
52///
53/// # NOTE
54///
55/// If you choose implement this trait rather than use [ServerDefault]
56/// We recommend your implementation to Deref<Target=orb::AsyncRuntime>
57/// then the blanket trait in `orb` crate will automatically impl AsyncRuntime on your ClientFacts type.
58/// Refer to the code of [ServerDefault] for example.
59pub trait ServerFacts: orb::AsyncRuntime + Sync + Send + 'static + Sized {
60    /// You should keep ServerConfig inside, get_config() will return the reference.
61    fn get_config(&self) -> &ServerConfig;
62
63    /// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
64    fn new_logger(&self) -> Arc<LogFilter>;
65}
66
67/// This trait is for server-side transport layer protocol.
68///
69/// The implementation can be found on:
70///
71/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
72pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
73    type Listener: AsyncListener;
74
75    fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
76
77    /// The implementation is expected to store the conn_count until dropped
78    fn new_conn(
79        stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
80    ) -> Self;
81
82    /// Read a request from the socket
83    fn read_req<'a>(
84        &'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
85    ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
86
87    /// Write our user task response
88    fn write_resp<T: ServerTaskEncode>(
89        &self, logger: &LogFilter, codec: &impl Codec, task: T,
90    ) -> impl Future<Output = io::Result<()>> + Send;
91
92    /// Write out ping resp or error
93    fn write_resp_internal(
94        &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
95    ) -> impl Future<Output = io::Result<()>> + Send;
96
97    /// Flush the response for the socket writer, if the transport has buffering logic
98    fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
99
100    /// Shutdown the write direction of the connection
101    fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
102}
103
104/// A temporary struct to hold data buffer return by ServerTransport
105///
106/// NOTE: `RpcAction` and `msg` contains slice that reference to ServerTransport's internal buffer,
107/// you should parse and clone them.
108pub struct RpcSvrReq<'a> {
109    pub seq: u64,
110    pub action: RpcAction<'a>,
111    pub msg: &'a [u8],
112    pub blob: Option<Buffer>, // for write, this contains data
113}
114
115impl<'a> fmt::Debug for RpcSvrReq<'a> {
116    #[inline]
117    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118        write!(f, "req(seq={}, action={:?})", self.seq, self.action)
119    }
120}
121
122/// A Struct to hold pre encoded buffer for server response
123#[allow(dead_code)]
124#[derive(Debug)]
125pub struct RpcSvrResp {
126    pub seq: u64,
127
128    pub msg: Option<Vec<u8>>,
129
130    pub blob: Option<Buffer>,
131
132    pub res: Option<Result<(), EncodedErr>>,
133}
134
135impl task::ServerTaskEncode for RpcSvrResp {
136    #[inline]
137    fn encode_resp<'a, 'b, C: Codec>(
138        &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
139    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
140        match self.res.take().unwrap() {
141            Ok(_) => {
142                if let Some(msg) = self.msg.as_ref() {
143                    use std::io::Write;
144                    buf.write_all(&msg).expect("fill msg");
145                    return (self.seq, Ok((msg.len(), self.blob.as_deref())));
146                } else {
147                    return (self.seq, Ok((0, self.blob.as_deref())));
148                }
149            }
150            Err(e) => return (self.seq, Err(e)),
151        }
152    }
153}
154
155/// An ServerFacts for general use
156pub struct ServerDefault<RT: AsyncRuntime> {
157    pub logger: Arc<LogFilter>,
158    config: ServerConfig,
159    rt: RT,
160}
161
162impl<RT: AsyncRuntime> ServerDefault<RT> {
163    pub fn new(config: ServerConfig, rt: RT) -> Arc<Self> {
164        Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt })
165    }
166
167    #[inline]
168    pub fn set_log_level(&self, level: log::Level) {
169        self.logger.set_level(level);
170    }
171}
172
173impl<RT: AsyncRuntime> std::ops::Deref for ServerDefault<RT> {
174    type Target = RT;
175    fn deref(&self) -> &Self::Target {
176        &self.rt
177    }
178}
179
180impl<RT: AsyncRuntime> ServerFacts for ServerDefault<RT> {
181    #[inline]
182    fn new_logger(&self) -> Arc<LogFilter> {
183        self.logger.clone()
184    }
185
186    #[inline]
187    fn get_config(&self) -> &ServerConfig {
188        &self.config
189    }
190}