use tokio::net::ToSocketAddrs;
pub use core::*;
mod core {
pub use ::tarpc::server::*;
}
#[derive(Clone, Debug)]
#[non_exhaustive]
pub struct TcpConfig<A: ToSocketAddrs> {
pub(crate) listen_address: A,
pub(crate) max_frame_len: usize,
pub(crate) pending_response_buffer: usize,
pub(crate) max_channels_per_key: u32,
pub(crate) buffer_unordered: usize,
}
impl<A: ToSocketAddrs> TcpConfig<A> {
pub fn new(listen_address: A) -> Self {
let server_config = Config::default();
Self {
listen_address,
max_frame_len: usize::MAX,
pending_response_buffer: server_config.pending_response_buffer,
max_channels_per_key: Default::default(),
buffer_unordered: 10,
}
}
pub fn listen_address(&self) -> &A {
&self.listen_address
}
pub fn with_max_frame_len(mut self, max_frame_len: usize) -> Self {
if max_frame_len <= 0 {
self.max_frame_len = usize::MAX;
} else {
self.max_frame_len = max_frame_len;
}
self
}
pub fn max_frame_len(&self) -> usize {
self.max_frame_len
}
pub fn with_pending_response_buffer(mut self, pending_response_buffer: usize) -> Self {
if pending_response_buffer <= 0 {
self.pending_response_buffer = usize::MAX;
} else {
self.pending_response_buffer = pending_response_buffer;
}
self
}
pub fn pending_response_buffer(&self) -> usize {
self.pending_response_buffer
}
pub fn with_max_channels_per_key(mut self, max_channels_per_key: u32) -> Self {
self.max_channels_per_key = max_channels_per_key;
self
}
pub fn max_channels_per_key(&self) -> u32 {
self.max_channels_per_key
}
pub fn with_buffer_unordered(mut self, buffer_unordered: usize) -> Self {
if buffer_unordered <= 0 {
self.buffer_unordered = 10;
} else {
self.buffer_unordered = buffer_unordered;
}
self
}
pub fn buffer_unordered(&self) -> usize {
self.buffer_unordered
}
}
#[macro_export]
macro_rules! tokio_tcp_listen {
($component:expr, $tcp_config:expr) => {{
use ::logimesh::futures::prelude::*;
use ::logimesh::server::incoming::Incoming as _;
use ::logimesh::server::Channel as _;
let serve = $component.logimesh_serve();
let mut listener = ::logimesh::transport::tcp::listen($tcp_config.listen_address(), $component.__logimesh_codec().to_fn()).await.unwrap();
::logimesh::tracing::info!("[LOGIMESH] Listening on {}", listener.local_addr());
listener.config_mut().max_frame_length($tcp_config.max_frame_len());
listener
.filter_map(|r| future::ready(r.ok()))
.map(|transport| {
::logimesh::server::BaseChannel::new(
::logimesh::server::Config {
pending_response_buffer: $tcp_config.pending_response_buffer(),
},
transport,
)
})
.max_channels_per_key($tcp_config.max_channels_per_key(), |t| t.transport().peer_addr().unwrap().ip())
.map(|channel| {
channel.execute(serve.clone()).for_each(|fut| async {
::tokio::spawn(fut);
})
})
.buffer_unordered($tcp_config.buffer_unordered())
.for_each(|_| async {})
.await;
}};
}