protosocket_rpc/client/
configuration.rs

1use std::net::SocketAddr;
2
3use protosocket::Connection;
4use tokio::sync::mpsc;
5
6use crate::{
7    client::reactor::completion_reactor::{DoNothingMessageHandler, RpcCompletionReactor},
8    Message,
9};
10
11use super::{reactor::completion_reactor::RpcCompletionConnectionBindings, RpcClient};
12
13/// Configuration for a `protosocket` rpc client.
14#[derive(Debug, Clone)]
15pub struct Configuration {
16    max_buffer_length: usize,
17    max_queued_outbound_messages: usize,
18}
19
20impl Default for Configuration {
21    fn default() -> Self {
22        Self {
23            max_buffer_length: 4 * (2 << 20),
24            max_queued_outbound_messages: 256,
25        }
26    }
27}
28
29impl Configuration {
30    /// Max buffer length limits the max message size. Try to use a buffer length that is at least 4 times the largest message you want to support.
31    ///
32    /// Default: 4MiB
33    pub fn max_buffer_length(&mut self, max_buffer_length: usize) {
34        self.max_buffer_length = max_buffer_length;
35    }
36
37    /// Max messages that will be queued up waiting for send on the client channel.
38    ///
39    /// Default: 256
40    pub fn max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
41        self.max_queued_outbound_messages = max_queued_outbound_messages;
42    }
43}
44
45/// Connect a new protosocket rpc client to a server
46pub async fn connect<Serializer, Deserializer>(
47    address: SocketAddr,
48    configuration: &Configuration,
49) -> Result<
50    (
51        RpcClient<Serializer::Message, Deserializer::Message>,
52        protosocket::Connection<RpcCompletionConnectionBindings<Serializer, Deserializer>>,
53    ),
54    crate::Error,
55>
56where
57    Deserializer: protosocket::Deserializer + Default + 'static,
58    Serializer: protosocket::Serializer + Default + 'static,
59    Deserializer::Message: Message,
60    Serializer::Message: Message,
61{
62    log::trace!("new client {address}, {configuration:?}");
63
64    let stream = tokio::net::TcpStream::connect(address).await?;
65    stream.set_nodelay(true)?;
66
67    let message_reactor: RpcCompletionReactor<
68        Deserializer::Message,
69        DoNothingMessageHandler<Deserializer::Message>,
70    > = RpcCompletionReactor::new(Default::default());
71    let (outbound, outbound_messages) = mpsc::channel(configuration.max_queued_outbound_messages);
72    let rpc_client = RpcClient::new(outbound, &message_reactor);
73
74    // Tie outbound_messages to message_reactor via a protosocket::Connection
75    let connection = Connection::<RpcCompletionConnectionBindings<Serializer, Deserializer>>::new(
76        stream,
77        address,
78        Deserializer::default(),
79        Serializer::default(),
80        configuration.max_buffer_length,
81        configuration.max_queued_outbound_messages,
82        outbound_messages,
83        message_reactor,
84    );
85
86    Ok((rpc_client, connection))
87}