razor_stream/server/
mod.rs1use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use crossfire::{MAsyncRx, mpmc};
8use io_buffer::Buffer;
9use orb::prelude::*;
10use std::time::Duration;
11use std::{fmt, future::Future, io, sync::Arc};
12
13pub mod task;
14use task::*;
15
16mod server;
17pub use server::RpcServer;
18
19pub mod graceful;
20
21pub mod dispatch;
22use dispatch::Dispatch;
23
24#[derive(Clone)]
26pub struct ServerConfig {
27 pub read_timeout: Duration,
29 pub write_timeout: Duration,
31 pub idle_timeout: Duration,
33 pub server_close_wait: Duration,
35 pub stream_buf_size: usize,
37}
38
39impl Default for ServerConfig {
40 fn default() -> Self {
41 Self {
42 read_timeout: Duration::from_secs(5),
43 write_timeout: Duration::from_secs(5),
44 idle_timeout: Duration::from_secs(120),
45 server_close_wait: Duration::from_secs(90),
46 stream_buf_size: 0,
47 }
48 }
49}
50
51pub trait ServerFacts: Sync + Send + 'static + Sized {
55 fn get_config(&self) -> &ServerConfig;
57
58 fn new_logger(&self) -> Arc<LogFilter>;
60}
61
62pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
68 type Listener: AsyncListener;
69
70 fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
71
72 fn new_conn(
74 stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
75 ) -> Self;
76
77 fn read_req<'a>(
79 &'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
80 ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
81
82 fn write_resp<T: ServerTaskEncode>(
84 &self, logger: &LogFilter, codec: &impl Codec, task: T,
85 ) -> impl Future<Output = io::Result<()>> + Send;
86
87 fn write_resp_internal(
89 &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
90 ) -> impl Future<Output = io::Result<()>> + Send;
91
92 fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
94
95 fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
97}
98
99pub struct RpcSvrReq<'a> {
104 pub seq: u64,
105 pub action: RpcAction<'a>,
106 pub msg: &'a [u8],
107 pub blob: Option<Buffer>, }
109
110impl<'a> fmt::Debug for RpcSvrReq<'a> {
111 #[inline]
112 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
113 write!(f, "req(seq={}, action={:?})", self.seq, self.action)
114 }
115}
116
117#[allow(dead_code)]
119#[derive(Debug)]
120pub struct RpcSvrResp {
121 pub seq: u64,
122
123 pub msg: Option<Vec<u8>>,
124
125 pub blob: Option<Buffer>,
126
127 pub res: Option<Result<(), EncodedErr>>,
128}
129
130impl task::ServerTaskEncode for RpcSvrResp {
131 #[inline]
132 fn encode_resp<'a, 'b, C: Codec>(
133 &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
134 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
135 match self.res.take().unwrap() {
136 Ok(_) => {
137 if let Some(msg) = self.msg.as_ref() {
138 use std::io::Write;
139 buf.write_all(msg).expect("fill msg");
140 return (self.seq, Ok((msg.len(), self.blob.as_deref())));
141 } else {
142 return (self.seq, Ok((0, self.blob.as_deref())));
143 }
144 }
145 Err(e) => return (self.seq, Err(e)),
146 }
147 }
148}
149
150pub struct ServerDefault {
152 pub logger: Arc<LogFilter>,
153 config: ServerConfig,
154}
155
156impl ServerDefault {
157 pub fn new(config: ServerConfig) -> Arc<Self> {
158 Arc::new(Self { logger: Arc::new(LogFilter::new()), config })
159 }
160
161 #[inline]
162 pub fn set_log_level(&self, level: log::Level) {
163 self.logger.set_level(level);
164 }
165}
166
167impl ServerFacts for ServerDefault {
168 #[inline]
169 fn new_logger(&self) -> Arc<LogFilter> {
170 self.logger.clone()
171 }
172
173 #[inline]
174 fn get_config(&self) -> &ServerConfig {
175 &self.config
176 }
177}