Skip to main content

razor_stream/server/
mod.rs

1//! This module contains traits defined for the server-side
2//!
3
4use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use crossfire::{MAsyncRx, mpmc};
8use io_buffer::Buffer;
9use orb::prelude::*;
10use std::time::Duration;
11use std::{fmt, future::Future, io, sync::Arc};
12
13pub mod task;
14use task::*;
15
16mod server;
17pub use server::RpcServer;
18
19pub mod graceful;
20
21pub mod dispatch;
22use dispatch::Dispatch;
23
24/// General config for server-side
25#[derive(Clone)]
26pub struct ServerConfig {
27    /// socket read timeout
28    pub read_timeout: Duration,
29    /// Socket write timeout
30    pub write_timeout: Duration,
31    /// Socket idle time to be close.
32    pub idle_timeout: Duration,
33    /// wait for all connections to be close with a timeout
34    pub server_close_wait: Duration,
35    /// In bytes. when non-zero, overwrite the default DEFAULT_BUF_SIZE of transport
36    pub stream_buf_size: usize,
37}
38
39impl Default for ServerConfig {
40    fn default() -> Self {
41        Self {
42            read_timeout: Duration::from_secs(5),
43            write_timeout: Duration::from_secs(5),
44            idle_timeout: Duration::from_secs(120),
45            server_close_wait: Duration::from_secs(90),
46            stream_buf_size: 0,
47        }
48    }
49}
50
51/// A central hub defined by the user for the server-side, to define the customizable plugin.
52///
53/// # NOTE
54pub trait ServerFacts: Sync + Send + 'static + Sized {
55    /// You should keep ServerConfig inside, get_config() will return the reference.
56    fn get_config(&self) -> &ServerConfig;
57
58    /// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
59    fn new_logger(&self) -> Arc<LogFilter>;
60}
61
62/// This trait is for server-side transport layer protocol.
63///
64/// The implementation can be found on:
65///
66/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
67pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
68    type Listener: AsyncListener;
69
70    fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
71
72    /// The implementation is expected to store the conn_count until dropped
73    fn new_conn(
74        stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
75    ) -> Self;
76
77    /// Read a request from the socket
78    fn read_req<'a>(
79        &'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
80    ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
81
82    /// Write our user task response
83    fn write_resp<T: ServerTaskEncode>(
84        &self, logger: &LogFilter, codec: &impl Codec, task: T,
85    ) -> impl Future<Output = io::Result<()>> + Send;
86
87    /// Write out ping resp or error
88    fn write_resp_internal(
89        &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
90    ) -> impl Future<Output = io::Result<()>> + Send;
91
92    /// Flush the response for the socket writer, if the transport has buffering logic
93    fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
94
95    /// Shutdown the write direction of the connection
96    fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
97}
98
99/// A temporary struct to hold data buffer return by ServerTransport
100///
101/// NOTE: `RpcAction` and `msg` contains slice that reference to ServerTransport's internal buffer,
102/// you should parse and clone them.
103pub struct RpcSvrReq<'a> {
104    pub seq: u64,
105    pub action: RpcAction<'a>,
106    pub msg: &'a [u8],
107    pub blob: Option<Buffer>, // for write, this contains data
108}
109
110impl<'a> fmt::Debug for RpcSvrReq<'a> {
111    #[inline]
112    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
113        write!(f, "req(seq={}, action={:?})", self.seq, self.action)
114    }
115}
116
117/// A Struct to hold pre encoded buffer for server response
118#[allow(dead_code)]
119#[derive(Debug)]
120pub struct RpcSvrResp {
121    pub seq: u64,
122
123    pub msg: Option<Vec<u8>>,
124
125    pub blob: Option<Buffer>,
126
127    pub res: Option<Result<(), EncodedErr>>,
128}
129
130impl task::ServerTaskEncode for RpcSvrResp {
131    #[inline]
132    fn encode_resp<'a, 'b, C: Codec>(
133        &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
134    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
135        match self.res.take().unwrap() {
136            Ok(_) => {
137                if let Some(msg) = self.msg.as_ref() {
138                    use std::io::Write;
139                    buf.write_all(msg).expect("fill msg");
140                    return (self.seq, Ok((msg.len(), self.blob.as_deref())));
141                } else {
142                    return (self.seq, Ok((0, self.blob.as_deref())));
143                }
144            }
145            Err(e) => return (self.seq, Err(e)),
146        }
147    }
148}
149
150/// An ServerFacts for general use
151pub struct ServerDefault {
152    pub logger: Arc<LogFilter>,
153    config: ServerConfig,
154}
155
156impl ServerDefault {
157    pub fn new(config: ServerConfig) -> Arc<Self> {
158        Arc::new(Self { logger: Arc::new(LogFilter::new()), config })
159    }
160
161    #[inline]
162    pub fn set_log_level(&self, level: log::Level) {
163        self.logger.set_level(level);
164    }
165}
166
167impl ServerFacts for ServerDefault {
168    #[inline]
169    fn new_logger(&self) -> Arc<LogFilter> {
170        self.logger.clone()
171    }
172
173    #[inline]
174    fn get_config(&self) -> &ServerConfig {
175        &self.config
176    }
177}