protosocket_rpc/client/
configuration.rs1use protosocket::Connection;
2use socket2::TcpKeepalive;
3use std::net::SocketAddr;
4use tokio::net::TcpStream;
5
6use crate::{
7 client::{
8 reactor::completion_reactor::{DoNothingMessageHandler, RpcCompletionReactor},
9 StreamConnector,
10 },
11 Message,
12};
13
14use super::RpcClient;
15
16#[derive(Debug, Clone)]
18pub struct Configuration<TStreamConnector> {
19 max_buffer_length: usize,
20 buffer_allocation_increment: usize,
21 max_queued_outbound_messages: usize,
22 tcp_keepalive_duration: Option<std::time::Duration>,
23 stream_connector: TStreamConnector,
24}
25
26impl<TStreamConnector> Configuration<TStreamConnector>
27where
28 TStreamConnector: StreamConnector,
29{
30 pub fn new(stream_connector: TStreamConnector) -> Self {
32 log::trace!("new client configuration");
33 Self {
34 max_buffer_length: 4 * (1 << 20), buffer_allocation_increment: 1 << 20,
36 max_queued_outbound_messages: 256,
37 tcp_keepalive_duration: None,
38 stream_connector,
39 }
40 }
41
42 pub fn max_buffer_length(&mut self, max_buffer_length: usize) {
46 self.max_buffer_length = max_buffer_length;
47 }
48
49 pub fn max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
53 self.max_queued_outbound_messages = max_queued_outbound_messages;
54 }
55
56 pub fn buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
60 self.buffer_allocation_increment = buffer_allocation_increment;
61 }
62
63 pub fn tcp_keepalive_duration(&mut self, tcp_keepalive_duration: Option<std::time::Duration>) {
67 self.tcp_keepalive_duration = tcp_keepalive_duration;
68 }
69}
70
71pub async fn connect<Codec, TStreamConnector>(
73 address: SocketAddr,
74 configuration: &Configuration<TStreamConnector>,
75) -> Result<
76 (
77 RpcClient<
78 <Codec as protosocket::Encoder>::Message,
79 <Codec as protosocket::Decoder>::Message,
80 >,
81 protosocket::Connection<
82 TStreamConnector::Stream,
83 Codec,
84 RpcCompletionReactor<
85 <Codec as protosocket::Decoder>::Message,
86 <Codec as protosocket::Encoder>::Message,
87 DoNothingMessageHandler<<Codec as protosocket::Decoder>::Message>,
88 >,
89 >,
90 ),
91 crate::Error,
92>
93where
94 Codec: protosocket::Codec + Default + 'static,
95 <Codec as protosocket::Decoder>::Message: Message,
96 <Codec as protosocket::Encoder>::Message: Message,
97 TStreamConnector: StreamConnector,
98{
99 log::trace!("new client {address}, {configuration:?}");
100
101 let stream = TcpStream::connect(&address).await?;
102
103 let socket = socket2::SockRef::from(&stream);
105
106 let mut tcp_keepalive = TcpKeepalive::new();
107 if let Some(duration) = configuration.tcp_keepalive_duration {
108 tcp_keepalive = tcp_keepalive.with_time(duration);
109 }
110 socket.set_nonblocking(true)?;
111 socket.set_tcp_keepalive(&tcp_keepalive)?;
112 socket.set_tcp_nodelay(true)?;
113 socket.set_reuse_address(true)?;
114
115 let message_reactor: RpcCompletionReactor<
116 <Codec as protosocket::Decoder>::Message,
117 <Codec as protosocket::Encoder>::Message,
118 DoNothingMessageHandler<<Codec as protosocket::Decoder>::Message>,
119 > = RpcCompletionReactor::new(Default::default());
120 let (outbound, outbound_messages) = spillway::channel();
121 let rpc_client = RpcClient::new(outbound, &message_reactor);
122 let stream = configuration
123 .stream_connector
124 .connect_stream(stream)
125 .await?;
126
127 let connection = Connection::<
129 TStreamConnector::Stream,
130 Codec,
131 RpcCompletionReactor<
132 <Codec as protosocket::Decoder>::Message,
133 <Codec as protosocket::Encoder>::Message,
134 DoNothingMessageHandler<<Codec as protosocket::Decoder>::Message>,
135 >,
136 >::new(
137 stream,
138 Codec::default(),
139 configuration.max_buffer_length,
140 configuration.buffer_allocation_increment,
141 configuration.max_queued_outbound_messages,
142 outbound_messages,
143 message_reactor,
144 );
145
146 Ok((rpc_client, connection))
147}