razor_stream/server/
mod.rs1use crate::proto::RpcAction;
5use crate::{Codec, error::*};
6use captains_log::filter::LogFilter;
7use crossfire::{MAsyncRx, mpmc};
8use io_buffer::Buffer;
9use orb::prelude::*;
10use std::time::Duration;
11use std::{fmt, future::Future, io, sync::Arc};
12
13pub mod task;
14use task::*;
15
16mod server;
17pub use server::RpcServer;
18
19pub mod graceful;
20
21pub mod dispatch;
22use dispatch::Dispatch;
23
24#[derive(Clone)]
26pub struct ServerConfig {
27 pub read_timeout: Duration,
29 pub write_timeout: Duration,
31 pub idle_timeout: Duration,
33 pub server_close_wait: Duration,
35 pub stream_buf_size: usize,
37}
38
39impl Default for ServerConfig {
40 fn default() -> Self {
41 Self {
42 read_timeout: Duration::from_secs(5),
43 write_timeout: Duration::from_secs(5),
44 idle_timeout: Duration::from_secs(120),
45 server_close_wait: Duration::from_secs(90),
46 stream_buf_size: 0,
47 }
48 }
49}
50
51pub trait ServerFacts: Sync + Send + 'static + Sized {
55 fn get_config(&self) -> &ServerConfig;
57
58 fn new_logger(&self) -> Arc<LogFilter>;
60}
61
62pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
68 type RT: AsyncRuntime + Clone;
69
70 type Listener: AsyncListener;
71
72 fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
73
74 fn new_conn(
76 stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
77 ) -> Self;
78
79 fn read_req<'a>(
81 &'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
82 ) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
83
84 fn write_resp<T: ServerTaskEncode>(
86 &self, logger: &LogFilter, codec: &impl Codec, task: T,
87 ) -> impl Future<Output = io::Result<()>> + Send;
88
89 fn write_resp_internal(
91 &self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
92 ) -> impl Future<Output = io::Result<()>> + Send;
93
94 fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
96
97 fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
99}
100
101pub struct RpcSvrReq<'a> {
106 pub seq: u64,
107 pub action: RpcAction<'a>,
108 pub msg: &'a [u8],
109 pub blob: Option<Buffer>, }
111
112impl<'a> fmt::Debug for RpcSvrReq<'a> {
113 #[inline]
114 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
115 write!(f, "req(seq={}, action={:?})", self.seq, self.action)
116 }
117}
118
119#[allow(dead_code)]
121#[derive(Debug)]
122pub struct RpcSvrResp {
123 pub seq: u64,
124
125 pub msg: Option<Vec<u8>>,
126
127 pub blob: Option<Buffer>,
128
129 pub res: Option<Result<(), EncodedErr>>,
130}
131
132impl task::ServerTaskEncode for RpcSvrResp {
133 #[inline]
134 fn encode_resp<'a, 'b, C: Codec>(
135 &'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
136 ) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
137 match self.res.take().unwrap() {
138 Ok(_) => {
139 if let Some(msg) = self.msg.as_ref() {
140 use std::io::Write;
141 buf.write_all(msg).expect("fill msg");
142 return (self.seq, Ok((msg.len(), self.blob.as_deref())));
143 } else {
144 return (self.seq, Ok((0, self.blob.as_deref())));
145 }
146 }
147 Err(e) => return (self.seq, Err(e)),
148 }
149 }
150}
151
152pub struct ServerDefault {
154 pub logger: Arc<LogFilter>,
155 config: ServerConfig,
156}
157
158impl ServerDefault {
159 pub fn new(config: ServerConfig) -> Arc<Self> {
160 Arc::new(Self { logger: Arc::new(LogFilter::new()), config })
161 }
162
163 #[inline]
164 pub fn set_log_level(&self, level: log::Level) {
165 self.logger.set_level(level);
166 }
167}
168
169impl ServerFacts for ServerDefault {
170 #[inline]
171 fn new_logger(&self) -> Arc<LogFilter> {
172 self.logger.clone()
173 }
174
175 #[inline]
176 fn get_config(&self) -> &ServerConfig {
177 &self.config
178 }
179}