razor_stream/server/
mod.rs1use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use io_buffer::Buffer;
8use orb::prelude::*;
9use std::time::Duration;
10use std::{fmt, future::Future, io, sync::Arc};
11
12pub mod task;
13use task::*;
14
15mod server;
16pub use server::RpcServer;
17
18pub mod graceful;
19
20pub mod dispatch;
21use dispatch::Dispatch;
22
23#[derive(Clone)]
25pub struct ServerConfig {
26 pub read_timeout: Duration,
28 pub write_timeout: Duration,
30 pub idle_timeout: Duration,
32 pub server_close_wait: Duration,
34 pub stream_buf_size: usize,
36}
37
38impl Default for ServerConfig {
39 fn default() -> Self {
40 Self {
41 read_timeout: Duration::from_secs(5),
42 write_timeout: Duration::from_secs(5),
43 idle_timeout: Duration::from_secs(120),
44 server_close_wait: Duration::from_secs(90),
45 stream_buf_size: 0,
46 }
47 }
48}
49
50pub trait ServerFacts: orb::AsyncRuntime + Sync + Send + 'static + Sized {
59 fn get_config(&self) -> &ServerConfig;
61
62 fn new_logger(&self) -> Arc<LogFilter>;
64}
65
66pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
72 type Listener: AsyncListener;
73
74 fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
75
76 fn new_conn(
78 stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
79 ) -> Self;
80
81 fn read_req<'a>(
83 &'a self, logger: &LogFilter, close_ch: &crossfire::MAsyncRx<()>,
84 ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
85
86 fn write_resp<T: ServerTaskEncode>(
88 &self, logger: &LogFilter, codec: &impl Codec, task: T,
89 ) -> impl Future<Output = io::Result<()>> + Send;
90
91 fn write_resp_internal(
93 &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
94 ) -> impl Future<Output = io::Result<()>> + Send;
95
96 fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
98
99 fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
101}
102
103pub struct RpcSvrReq<'a> {
108 pub seq: u64,
109 pub action: RpcAction<'a>,
110 pub msg: &'a [u8],
111 pub blob: Option<Buffer>, }
113
114impl<'a> fmt::Debug for RpcSvrReq<'a> {
115 #[inline]
116 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
117 write!(f, "req(seq={}, action={:?})", self.seq, self.action)
118 }
119}
120
121#[allow(dead_code)]
123#[derive(Debug)]
124pub struct RpcSvrResp {
125 pub seq: u64,
126
127 pub msg: Option<Vec<u8>>,
128
129 pub blob: Option<Buffer>,
130
131 pub res: Option<Result<(), EncodedErr>>,
132}
133
134impl task::ServerTaskEncode for RpcSvrResp {
135 #[inline]
136 fn encode_resp<'a, 'b, C: Codec>(
137 &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
138 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
139 match self.res.take().unwrap() {
140 Ok(_) => {
141 if let Some(msg) = self.msg.as_ref() {
142 use std::io::Write;
143 buf.write_all(&msg).expect("fill msg");
144 return (self.seq, Ok((msg.len(), self.blob.as_deref())));
145 } else {
146 return (self.seq, Ok((0, self.blob.as_deref())));
147 }
148 }
149 Err(e) => return (self.seq, Err(e)),
150 }
151 }
152}
153
154pub struct ServerDefault<RT: AsyncRuntime> {
156 pub logger: Arc<LogFilter>,
157 config: ServerConfig,
158 rt: RT,
159}
160
161impl<RT: AsyncRuntime> ServerDefault<RT> {
162 pub fn new(config: ServerConfig, rt: RT) -> Arc<Self> {
163 Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt })
164 }
165
166 #[inline]
167 pub fn set_log_level(&self, level: log::Level) {
168 self.logger.set_level(level);
169 }
170}
171
172impl<RT: AsyncRuntime> std::ops::Deref for ServerDefault<RT> {
173 type Target = RT;
174 fn deref(&self) -> &Self::Target {
175 &self.rt
176 }
177}
178
179impl<RT: AsyncRuntime> ServerFacts for ServerDefault<RT> {
180 #[inline]
181 fn new_logger(&self) -> Arc<LogFilter> {
182 self.logger.clone()
183 }
184
185 #[inline]
186 fn get_config(&self) -> &ServerConfig {
187 &self.config
188 }
189}