Skip to main content

protosocket_rpc/client/
configuration.rs

1use protosocket::Connection;
2use socket2::TcpKeepalive;
3use std::net::SocketAddr;
4use tokio::net::TcpStream;
5
6use crate::{
7    client::{
8        reactor::completion_reactor::{DoNothingMessageHandler, RpcCompletionReactor},
9        StreamConnector,
10    },
11    Message,
12};
13
14use super::RpcClient;
15
16/// Configuration for a `protosocket` rpc client.
17#[derive(Debug, Clone)]
18pub struct Configuration<TStreamConnector> {
19    max_buffer_length: usize,
20    buffer_allocation_increment: usize,
21    max_queued_outbound_messages: usize,
22    tcp_keepalive_duration: Option<std::time::Duration>,
23    stream_connector: TStreamConnector,
24}
25
26impl<TStreamConnector> Configuration<TStreamConnector>
27where
28    TStreamConnector: StreamConnector,
29{
30    /// Create a new protosocket rpc client that uses a stream connector
31    pub fn new(stream_connector: TStreamConnector) -> Self {
32        log::trace!("new client configuration");
33        Self {
34            max_buffer_length: 4 * (1 << 20), // 4 MiB
35            buffer_allocation_increment: 1 << 20,
36            max_queued_outbound_messages: 256,
37            tcp_keepalive_duration: None,
38            stream_connector,
39        }
40    }
41
42    /// Max buffer length limits the max message size. Try to use a buffer length that is at least 4 times the largest message you want to support.
43    ///
44    /// Default: 4MiB
45    pub fn max_buffer_length(&mut self, max_buffer_length: usize) {
46        self.max_buffer_length = max_buffer_length;
47    }
48
49    /// Max messages that will be queued up waiting for send on the client channel.
50    ///
51    /// Default: 256
52    pub fn max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
53        self.max_queued_outbound_messages = max_queued_outbound_messages;
54    }
55
56    /// Amount of buffer to allocate at one time when buffer needs extension.
57    ///
58    /// Default: 1MiB
59    pub fn buffer_allocation_increment(&mut self, buffer_allocation_increment: usize) {
60        self.buffer_allocation_increment = buffer_allocation_increment;
61    }
62
63    /// The duration to set for tcp_keepalive on the underlying socket.
64    ///
65    /// Default: None
66    pub fn tcp_keepalive_duration(&mut self, tcp_keepalive_duration: Option<std::time::Duration>) {
67        self.tcp_keepalive_duration = tcp_keepalive_duration;
68    }
69}
70
71/// Connect a new protosocket rpc client to a server
72pub async fn connect<Codec, TStreamConnector>(
73    address: SocketAddr,
74    configuration: &Configuration<TStreamConnector>,
75) -> Result<
76    (
77        RpcClient<
78            <Codec as protosocket::Encoder>::Message,
79            <Codec as protosocket::Decoder>::Message,
80        >,
81        protosocket::Connection<
82            TStreamConnector::Stream,
83            Codec,
84            RpcCompletionReactor<
85                <Codec as protosocket::Decoder>::Message,
86                <Codec as protosocket::Encoder>::Message,
87                DoNothingMessageHandler<<Codec as protosocket::Decoder>::Message>,
88            >,
89        >,
90    ),
91    crate::Error,
92>
93where
94    Codec: protosocket::Codec + Default + 'static,
95    <Codec as protosocket::Decoder>::Message: Message,
96    <Codec as protosocket::Encoder>::Message: Message,
97    TStreamConnector: StreamConnector,
98{
99    log::trace!("new client {address}, {configuration:?}");
100
101    let stream = TcpStream::connect(&address).await?;
102
103    // For setting socket configuration options available to socket2
104    let socket = socket2::SockRef::from(&stream);
105
106    let mut tcp_keepalive = TcpKeepalive::new();
107    if let Some(duration) = configuration.tcp_keepalive_duration {
108        tcp_keepalive = tcp_keepalive.with_time(duration);
109    }
110    socket.set_nonblocking(true)?;
111    socket.set_tcp_keepalive(&tcp_keepalive)?;
112    socket.set_tcp_nodelay(true)?;
113    socket.set_reuse_address(true)?;
114
115    let message_reactor: RpcCompletionReactor<
116        <Codec as protosocket::Decoder>::Message,
117        <Codec as protosocket::Encoder>::Message,
118        DoNothingMessageHandler<<Codec as protosocket::Decoder>::Message>,
119    > = RpcCompletionReactor::new(Default::default());
120    let (outbound, outbound_messages) = spillway::channel();
121    let rpc_client = RpcClient::new(outbound, &message_reactor);
122    let stream = configuration
123        .stream_connector
124        .connect_stream(stream)
125        .await?;
126
127    // Tie outbound_messages to message_reactor via a protosocket::Connection
128    let connection = Connection::<
129        TStreamConnector::Stream,
130        Codec,
131        RpcCompletionReactor<
132            <Codec as protosocket::Decoder>::Message,
133            <Codec as protosocket::Encoder>::Message,
134            DoNothingMessageHandler<<Codec as protosocket::Decoder>::Message>,
135        >,
136    >::new(
137        stream,
138        Codec::default(),
139        configuration.max_buffer_length,
140        configuration.buffer_allocation_increment,
141        configuration.max_queued_outbound_messages,
142        outbound_messages,
143        message_reactor,
144    );
145
146    Ok((rpc_client, connection))
147}