use protosocket::Connection;
use protosocket::Encoder;
use protosocket::SocketListener;
use protosocket::SocketResult;
use std::future::Future;
use std::io::Error;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use crate::server::Spawn;
use super::rpc_submitter::RpcSubmitter;
use super::server_traits::SocketService;
pub struct SocketRpcServer<TSocketService, TSpawnConnection>
where
TSocketService: SocketService,
{
socket_server: TSocketService,
spawner: TSpawnConnection,
listener: TSocketService::SocketListener,
max_buffer_length: usize,
buffer_allocation_increment: usize,
max_queued_outbound_messages: usize,
}
impl<TSocketService>
SocketRpcServer<
TSocketService,
super::TokioSpawn<
Connection<
<TSocketService::SocketListener as SocketListener>::Stream,
TSocketService::Codec,
RpcSubmitter<TSocketService::ConnectionService>,
>,
>,
>
where
TSocketService: SocketService,
TSocketService::Codec: Send,
<TSocketService::Codec as Encoder>::Serialized: Send,
<TSocketService::SocketListener as SocketListener>::Stream: Send,
TSocketService::ConnectionService: Send,
{
pub fn new(
listener: TSocketService::SocketListener,
socket_server: TSocketService,
max_buffer_length: usize,
buffer_allocation_increment: usize,
max_queued_outbound_messages: usize,
) -> crate::Result<Self> {
Self::new_with_spawner(
listener,
socket_server,
max_buffer_length,
buffer_allocation_increment,
max_queued_outbound_messages,
super::TokioSpawn::default(),
)
}
}
impl<TSocketService, TSpawnConnection> SocketRpcServer<TSocketService, TSpawnConnection>
where
TSocketService: SocketService,
TSpawnConnection: Spawn<
Connection<
<TSocketService::SocketListener as SocketListener>::Stream,
TSocketService::Codec,
RpcSubmitter<TSocketService::ConnectionService>,
>,
>,
{
#[allow(clippy::too_many_arguments)]
pub fn new_with_spawner(
listener: TSocketService::SocketListener,
socket_server: TSocketService,
max_buffer_length: usize,
buffer_allocation_increment: usize,
max_queued_outbound_messages: usize,
spawner: TSpawnConnection,
) -> crate::Result<Self> {
Ok(Self {
socket_server,
spawner,
listener,
max_buffer_length,
buffer_allocation_increment,
max_queued_outbound_messages,
})
}
pub fn set_max_buffer_length(&mut self, max_buffer_length: usize) {
self.max_buffer_length = max_buffer_length;
}
pub fn set_max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
self.max_queued_outbound_messages = max_queued_outbound_messages;
}
}
impl<TSocketService, TSpawnConnection> Unpin for SocketRpcServer<TSocketService, TSpawnConnection> where
TSocketService: SocketService
{
}
impl<TSocketService, TSpawnConnection> Future for SocketRpcServer<TSocketService, TSpawnConnection>
where
TSocketService: SocketService,
TSpawnConnection: Spawn<
Connection<
<TSocketService::SocketListener as SocketListener>::Stream,
TSocketService::Codec,
RpcSubmitter<TSocketService::ConnectionService>,
>,
>,
{
type Output = Result<(), Error>;
fn poll(mut self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll<Self::Output> {
loop {
break match self.listener.poll_accept(context) {
Poll::Ready(result) => match result {
SocketResult::Stream(stream) => {
let connection_service = self.socket_server.new_stream_service(&stream);
let (outbound_messages, outbound_messages_receiver) = spillway::channel();
let submitter = RpcSubmitter::new(connection_service, outbound_messages);
#[allow(clippy::type_complexity)]
let connection: Connection<
<TSocketService::SocketListener as SocketListener>::Stream,
TSocketService::Codec,
RpcSubmitter<TSocketService::ConnectionService>,
> = Connection::new(
stream,
self.socket_server.codec(),
self.max_buffer_length,
self.buffer_allocation_increment,
self.max_queued_outbound_messages,
outbound_messages_receiver,
submitter,
);
self.spawner.spawn(connection);
continue;
}
SocketResult::Disconnect => Poll::Ready(Ok(())),
},
Poll::Pending => {
Poll::Pending
}
};
}
}
}