razor_stream/server/
mod.rs

1//! This module contains traits defined for the server-side
2//!
3
4use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use io_buffer::Buffer;
8use orb::prelude::*;
9use std::time::Duration;
10use std::{fmt, future::Future, io, sync::Arc};
11
12pub mod task;
13use task::*;
14
15mod server;
16pub use server::RpcServer;
17
18pub mod graceful;
19
20pub mod dispatch;
21use dispatch::Dispatch;
22
23/// General config for server-side
24#[derive(Clone)]
25pub struct ServerConfig {
26    /// socket read timeout
27    pub read_timeout: Duration,
28    /// Socket write timeout
29    pub write_timeout: Duration,
30    /// Socket idle time to be close.
31    pub idle_timeout: Duration,
32    /// wait for all connections to be close with a timeout
33    pub server_close_wait: Duration,
34    /// In bytes. when non-zero, overwrite the default DEFAULT_BUF_SIZE of transport
35    pub stream_buf_size: usize,
36}
37
38impl Default for ServerConfig {
39    fn default() -> Self {
40        Self {
41            read_timeout: Duration::from_secs(5),
42            write_timeout: Duration::from_secs(5),
43            idle_timeout: Duration::from_secs(120),
44            server_close_wait: Duration::from_secs(90),
45            stream_buf_size: 0,
46        }
47    }
48}
49
50/// A central hub defined by the user for the server-side, to define the customizable plugin.
51///
52/// # NOTE
53///
54/// If you choose implement this trait rather than use [ServerDefault]
55/// We recommend your implementation to Deref<Target=orb::AsyncRuntime>
56/// then the blanket trait in `orb` crate will automatically impl AsyncRuntime on your ClientFacts type.
57/// Refer to the code of [ServerDefault] for example.
58pub trait ServerFacts: orb::AsyncRuntime + Sync + Send + 'static + Sized {
59    /// You should keep ServerConfig inside, get_config() will return the reference.
60    fn get_config(&self) -> &ServerConfig;
61
62    /// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
63    fn new_logger(&self) -> Arc<LogFilter>;
64}
65
66/// This trait is for server-side transport layer protocol.
67///
68/// The implementation can be found on:
69///
70/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
71pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
72    type Listener: AsyncListener;
73
74    fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
75
76    /// The implementation is expected to store the conn_count until dropped
77    fn new_conn(
78        stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
79    ) -> Self;
80
81    /// Read a request from the socket
82    fn read_req<'a>(
83        &'a self, logger: &LogFilter, close_ch: &crossfire::MAsyncRx<()>,
84    ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
85
86    /// Write our user task response
87    fn write_resp<T: ServerTaskEncode>(
88        &self, logger: &LogFilter, codec: &impl Codec, task: T,
89    ) -> impl Future<Output = io::Result<()>> + Send;
90
91    /// Write out ping resp or error
92    fn write_resp_internal(
93        &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
94    ) -> impl Future<Output = io::Result<()>> + Send;
95
96    /// Flush the response for the socket writer, if the transport has buffering logic
97    fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
98
99    /// Shutdown the write direction of the connection
100    fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
101}
102
103/// A temporary struct to hold data buffer return by ServerTransport
104///
105/// NOTE: `RpcAction` and `msg` contains slice that reference to ServerTransport's internal buffer,
106/// you should parse and clone them.
107pub struct RpcSvrReq<'a> {
108    pub seq: u64,
109    pub action: RpcAction<'a>,
110    pub msg: &'a [u8],
111    pub blob: Option<Buffer>, // for write, this contains data
112}
113
114impl<'a> fmt::Debug for RpcSvrReq<'a> {
115    #[inline]
116    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117        write!(f, "req(seq={}, action={:?})", self.seq, self.action)
118    }
119}
120
121/// A Struct to hold pre encoded buffer for server response
122#[allow(dead_code)]
123#[derive(Debug)]
124pub struct RpcSvrResp {
125    pub seq: u64,
126
127    pub msg: Option<Vec<u8>>,
128
129    pub blob: Option<Buffer>,
130
131    pub res: Option<Result<(), EncodedErr>>,
132}
133
134impl task::ServerTaskEncode for RpcSvrResp {
135    #[inline]
136    fn encode_resp<'a, 'b, C: Codec>(
137        &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
138    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
139        match self.res.take().unwrap() {
140            Ok(_) => {
141                if let Some(msg) = self.msg.as_ref() {
142                    use std::io::Write;
143                    buf.write_all(&msg).expect("fill msg");
144                    return (self.seq, Ok((msg.len(), self.blob.as_deref())));
145                } else {
146                    return (self.seq, Ok((0, self.blob.as_deref())));
147                }
148            }
149            Err(e) => return (self.seq, Err(e)),
150        }
151    }
152}
153
154/// An ServerFacts for general use
155pub struct ServerDefault<RT: AsyncRuntime> {
156    pub logger: Arc<LogFilter>,
157    config: ServerConfig,
158    rt: RT,
159}
160
161impl<RT: AsyncRuntime> ServerDefault<RT> {
162    pub fn new(config: ServerConfig, rt: RT) -> Arc<Self> {
163        Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt })
164    }
165
166    #[inline]
167    pub fn set_log_level(&self, level: log::Level) {
168        self.logger.set_level(level);
169    }
170}
171
172impl<RT: AsyncRuntime> std::ops::Deref for ServerDefault<RT> {
173    type Target = RT;
174    fn deref(&self) -> &Self::Target {
175        &self.rt
176    }
177}
178
179impl<RT: AsyncRuntime> ServerFacts for ServerDefault<RT> {
180    #[inline]
181    fn new_logger(&self) -> Arc<LogFilter> {
182        self.logger.clone()
183    }
184
185    #[inline]
186    fn get_config(&self) -> &ServerConfig {
187        &self.config
188    }
189}