razor_stream/server/
mod.rs1use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use crossfire::{MAsyncRx, mpmc};
8use io_buffer::Buffer;
9use orb::prelude::*;
10use std::time::Duration;
11use std::{fmt, future::Future, io, sync::Arc};
12
13pub mod task;
14use task::*;
15
16mod server;
17pub use server::RpcServer;
18
19pub mod graceful;
20
21pub mod dispatch;
22use dispatch::Dispatch;
23
24#[derive(Clone)]
26pub struct ServerConfig {
27 pub read_timeout: Duration,
29 pub write_timeout: Duration,
31 pub idle_timeout: Duration,
33 pub server_close_wait: Duration,
35 pub stream_buf_size: usize,
37}
38
39impl Default for ServerConfig {
40 fn default() -> Self {
41 Self {
42 read_timeout: Duration::from_secs(5),
43 write_timeout: Duration::from_secs(5),
44 idle_timeout: Duration::from_secs(120),
45 server_close_wait: Duration::from_secs(90),
46 stream_buf_size: 0,
47 }
48 }
49}
50
51pub trait ServerFacts: orb::AsyncRuntime + Sync + Send + 'static + Sized {
60 fn get_config(&self) -> &ServerConfig;
62
63 fn new_logger(&self) -> Arc<LogFilter>;
65}
66
67pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
73 type Listener: AsyncListener;
74
75 fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
76
77 fn new_conn(
79 stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
80 ) -> Self;
81
82 fn read_req<'a>(
84 &'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
85 ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
86
87 fn write_resp<T: ServerTaskEncode>(
89 &self, logger: &LogFilter, codec: &impl Codec, task: T,
90 ) -> impl Future<Output = io::Result<()>> + Send;
91
92 fn write_resp_internal(
94 &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
95 ) -> impl Future<Output = io::Result<()>> + Send;
96
97 fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
99
100 fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
102}
103
104pub struct RpcSvrReq<'a> {
109 pub seq: u64,
110 pub action: RpcAction<'a>,
111 pub msg: &'a [u8],
112 pub blob: Option<Buffer>, }
114
115impl<'a> fmt::Debug for RpcSvrReq<'a> {
116 #[inline]
117 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
118 write!(f, "req(seq={}, action={:?})", self.seq, self.action)
119 }
120}
121
122#[allow(dead_code)]
124#[derive(Debug)]
125pub struct RpcSvrResp {
126 pub seq: u64,
127
128 pub msg: Option<Vec<u8>>,
129
130 pub blob: Option<Buffer>,
131
132 pub res: Option<Result<(), EncodedErr>>,
133}
134
135impl task::ServerTaskEncode for RpcSvrResp {
136 #[inline]
137 fn encode_resp<'a, 'b, C: Codec>(
138 &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
139 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
140 match self.res.take().unwrap() {
141 Ok(_) => {
142 if let Some(msg) = self.msg.as_ref() {
143 use std::io::Write;
144 buf.write_all(&msg).expect("fill msg");
145 return (self.seq, Ok((msg.len(), self.blob.as_deref())));
146 } else {
147 return (self.seq, Ok((0, self.blob.as_deref())));
148 }
149 }
150 Err(e) => return (self.seq, Err(e)),
151 }
152 }
153}
154
155pub struct ServerDefault<RT: AsyncRuntime> {
157 pub logger: Arc<LogFilter>,
158 config: ServerConfig,
159 rt: RT,
160}
161
162impl<RT: AsyncRuntime> ServerDefault<RT> {
163 pub fn new(config: ServerConfig, rt: RT) -> Arc<Self> {
164 Arc::new(Self { logger: Arc::new(LogFilter::new()), config, rt })
165 }
166
167 #[inline]
168 pub fn set_log_level(&self, level: log::Level) {
169 self.logger.set_level(level);
170 }
171}
172
173impl<RT: AsyncRuntime> std::ops::Deref for ServerDefault<RT> {
174 type Target = RT;
175 fn deref(&self) -> &Self::Target {
176 &self.rt
177 }
178}
179
180impl<RT: AsyncRuntime> ServerFacts for ServerDefault<RT> {
181 #[inline]
182 fn new_logger(&self) -> Arc<LogFilter> {
183 self.logger.clone()
184 }
185
186 #[inline]
187 fn get_config(&self) -> &ServerConfig {
188 &self.config
189 }
190}