use crate::proto::RpcAction;
use crate::{Codec, error::*};
use captains_log::filter::LogFilter;
use crossfire::{MAsyncRx, mpmc};
use io_buffer::Buffer;
use orb::prelude::*;
use std::time::Duration;
use std::{fmt, future::Future, io, sync::Arc};
pub mod task;
use task::*;
mod server;
pub use server::RpcServer;
pub mod graceful;
pub mod dispatch;
use dispatch::Dispatch;
#[derive(Clone)]
pub struct ServerConfig {
pub read_timeout: Duration,
pub write_timeout: Duration,
pub idle_timeout: Duration,
pub server_close_wait: Duration,
pub stream_buf_size: usize,
}
impl Default for ServerConfig {
fn default() -> Self {
Self {
read_timeout: Duration::from_secs(5),
write_timeout: Duration::from_secs(5),
idle_timeout: Duration::from_secs(120),
server_close_wait: Duration::from_secs(90),
stream_buf_size: 0,
}
}
}
pub trait ServerFacts: Sync + Send + 'static + Sized {
fn get_config(&self) -> &ServerConfig;
fn new_logger(&self) -> Arc<LogFilter>;
}
pub trait ServerTransport: Send + Sync + Sized + 'static + fmt::Debug {
type RT: AsyncRuntime;
type Listener: AsyncListener;
fn bind(addr: &str) -> impl Future<Output = io::Result<Self::Listener>> + Send;
fn new_conn(
stream: <Self::Listener as AsyncListener>::Conn, config: &ServerConfig, conn_count: Arc<()>,
) -> Self;
fn read_req<'a>(
&'a self, logger: &LogFilter, close_ch: &MAsyncRx<mpmc::Null>,
) -> impl Future<Output = Result<RpcSvrReq<'a>, RpcIntErr>> + Send;
fn write_resp<T: ServerTaskEncode>(
&self, logger: &LogFilter, codec: &impl Codec, task: T,
) -> impl Future<Output = io::Result<()>> + Send;
fn write_resp_internal(
&self, logger: &LogFilter, seq: u64, err: Option<RpcIntErr>,
) -> impl Future<Output = io::Result<()>> + Send;
fn flush_resp(&self, logger: &LogFilter) -> impl Future<Output = io::Result<()>> + Send;
fn close_conn(&self, logger: &LogFilter) -> impl Future<Output = ()> + Send;
}
pub struct RpcSvrReq<'a> {
pub seq: u64,
pub action: RpcAction<'a>,
pub msg: &'a [u8],
pub blob: Option<Buffer>, }
impl<'a> fmt::Debug for RpcSvrReq<'a> {
#[inline]
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "req(seq={}, action={:?})", self.seq, self.action)
}
}
#[allow(dead_code)]
#[derive(Debug)]
pub struct RpcSvrResp {
pub seq: u64,
pub msg: Option<Vec<u8>>,
pub blob: Option<Buffer>,
pub res: Option<Result<(), EncodedErr>>,
}
impl task::ServerTaskEncode for RpcSvrResp {
#[inline]
fn encode_resp<'a, 'b, C: Codec>(
&'a mut self, _codec: &'b C, buf: &'b mut Vec<u8>,
) -> (u64, Result<(usize, Option<&'a [u8]>), EncodedErr>) {
match self.res.take().unwrap() {
Ok(_) => {
if let Some(msg) = self.msg.as_ref() {
use std::io::Write;
buf.write_all(msg).expect("fill msg");
return (self.seq, Ok((msg.len(), self.blob.as_deref())));
} else {
return (self.seq, Ok((0, self.blob.as_deref())));
}
}
Err(e) => return (self.seq, Err(e)),
}
}
}
pub struct ServerDefault {
pub logger: Arc<LogFilter>,
config: ServerConfig,
}
impl ServerDefault {
pub fn new(config: ServerConfig) -> Arc<Self> {
Arc::new(Self { logger: Arc::new(LogFilter::new()), config })
}
#[inline]
pub fn set_log_level(&self, level: log::Level) {
self.logger.set_level(level);
}
}
impl ServerFacts for ServerDefault {
#[inline]
fn new_logger(&self) -> Arc<LogFilter> {
self.logger.clone()
}
#[inline]
fn get_config(&self) -> &ServerConfig {
&self.config
}
}