Skip to main content

razor_stream/server/
mod.rs

1//! This module contains traits defined for the server-side
2//!
3
4use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use crossfire::{MAsyncRx, mpmc};
8use io_buffer::Buffer;
9use orb::prelude::*;
10use std::time::Duration;
11use std::{fmt, future::Future, io, sync::Arc};
12
13pub mod task;
14use task::*;
15
16mod server;
17pub use server::RpcServer;
18
19pub mod graceful;
20
21pub mod dispatch;
22use dispatch::Dispatch;
23
24/// General config for server-side
25#[derive(Clone)]
26pub struct ServerConfig {
27    /// socket read timeout
28    pub read_timeout: Duration,
29    /// Socket write timeout
30    pub write_timeout: Duration,
31    /// Socket idle time to be close.
32    pub idle_timeout: Duration,
33    /// wait for all connections to be close with a timeout
34    pub server_close_wait: Duration,
35    /// In bytes. when non-zero, overwrite the default DEFAULT_BUF_SIZE of transport
36    pub stream_buf_size: usize,
37}
38
39impl Default for ServerConfig {
40    fn default() -> Self {
41        Self {
42            read_timeout: Duration::from_secs(5),
43            write_timeout: Duration::from_secs(5),
44            idle_timeout: Duration::from_secs(120),
45            server_close_wait: Duration::from_secs(90),
46            stream_buf_size: 0,
47        }
48    }
49}
50
51/// A central hub defined by the user for the server-side, to define the customizable plugin.
52///
53/// # NOTE
54pub trait ServerFacts: Sync + Send + 'static + Sized {
55    /// You should keep ServerConfig inside, get_config() will return the reference.
56    fn get_config(&self) -> &ServerConfig;
57
58    /// Construct a [captains_log::filter::Filter](https://docs.rs/captains-log/latest/captains_log/filter/trait.Filter.html) to oganize log of a client
59    fn new_logger(&self) -> Arc<LogFilter>;
60}
61
62/// This trait is for server-side transport layer protocol.
63///
64/// The implementation can be found on:
65///
66/// - [razor-rpc-tcp](https://docs.rs/razor-rpc-tcp): For TCP and Unix socket
67pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
68    type RT: AsyncRuntime + Clone;
69
70    type Listener: AsyncListener;
71
72    fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
73
74    /// The implementation is expected to store the conn_count until dropped
75    fn new_conn(
76        stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
77    ) -> Self;
78
79    /// Read a request from the socket
80    fn read_req<'a>(
81        &'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
82    ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
83
84    /// Write our user task response
85    fn write_resp<T: ServerTaskEncode>(
86        &self, logger: &LogFilter, codec: &impl Codec, task: T,
87    ) -> impl Future<Output = io::Result<()>> + Send;
88
89    /// Write out ping resp or error
90    fn write_resp_internal(
91        &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
92    ) -> impl Future<Output = io::Result<()>> + Send;
93
94    /// Flush the response for the socket writer, if the transport has buffering logic
95    fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
96
97    /// Shutdown the write direction of the connection
98    fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
99}
100
101/// A temporary struct to hold data buffer return by ServerTransport
102///
103/// NOTE: `RpcAction` and `msg` contains slice that reference to ServerTransport's internal buffer,
104/// you should parse and clone them.
105pub struct RpcSvrReq<'a> {
106    pub seq: u64,
107    pub action: RpcAction<'a>,
108    pub msg: &'a [u8],
109    pub blob: Option<Buffer>, // for write, this contains data
110}
111
112impl<'a> fmt::Debug for RpcSvrReq<'a> {
113    #[inline]
114    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115        write!(f, "req(seq={}, action={:?})", self.seq, self.action)
116    }
117}
118
119/// A Struct to hold pre encoded buffer for server response
120#[allow(dead_code)]
121#[derive(Debug)]
122pub struct RpcSvrResp {
123    pub seq: u64,
124
125    pub msg: Option<Vec<u8>>,
126
127    pub blob: Option<Buffer>,
128
129    pub res: Option<Result<(), EncodedErr>>,
130}
131
132impl task::ServerTaskEncode for RpcSvrResp {
133    #[inline]
134    fn encode_resp<'a, 'b, C: Codec>(
135        &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
136    ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
137        match self.res.take().unwrap() {
138            Ok(_) => {
139                if let Some(msg) = self.msg.as_ref() {
140                    use std::io::Write;
141                    buf.write_all(msg).expect("fill msg");
142                    return (self.seq, Ok((msg.len(), self.blob.as_deref())));
143                } else {
144                    return (self.seq, Ok((0, self.blob.as_deref())));
145                }
146            }
147            Err(e) => return (self.seq, Err(e)),
148        }
149    }
150}
151
152/// An ServerFacts for general use
153pub struct ServerDefault {
154    pub logger: Arc<LogFilter>,
155    config: ServerConfig,
156}
157
158impl ServerDefault {
159    pub fn new(config: ServerConfig) -> Arc<Self> {
160        Arc::new(Self { logger: Arc::new(LogFilter::new()), config })
161    }
162
163    #[inline]
164    pub fn set_log_level(&self, level: log::Level) {
165        self.logger.set_level(level);
166    }
167}
168
169impl ServerFacts for ServerDefault {
170    #[inline]
171    fn new_logger(&self) -> Arc<LogFilter> {
172        self.logger.clone()
173    }
174
175    #[inline]
176    fn get_config(&self) -> &ServerConfig {
177        &self.config
178    }
179}