protosocket_rpc/client/
configuration.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
use std::net::SocketAddr;

use protosocket::Connection;
use tokio::sync::mpsc;

use crate::{
    client::reactor::completion_reactor::{DoNothingMessageHandler, RpcCompletionReactor},
    Message,
};

use super::{reactor::completion_reactor::RpcCompletionConnectionBindings, RpcClient};

/// Configuration for a `protosocket` rpc client.
#[derive(Debug, Clone)]
pub struct Configuration {
    max_buffer_length: usize,
    max_queued_outbound_messages: usize,
}

impl Default for Configuration {
    fn default() -> Self {
        Self {
            max_buffer_length: 4 * (2 << 20),
            max_queued_outbound_messages: 256,
        }
    }
}

impl Configuration {
    /// Max buffer length limits the max message size. Try to use a buffer length that is at least 4 times the largest message you want to support.
    ///
    /// Default: 4MiB
    pub fn max_buffer_length(&mut self, max_buffer_length: usize) {
        self.max_buffer_length = max_buffer_length;
    }

    /// Max messages that will be queued up waiting for send on the client channel.
    ///
    /// Default: 256
    pub fn max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
        self.max_queued_outbound_messages = max_queued_outbound_messages;
    }
}

/// Connect a new protosocket rpc client to a server
pub async fn connect<Serializer, Deserializer>(
    address: SocketAddr,
    configuration: &Configuration,
) -> Result<
    (
        RpcClient<Serializer::Message, Deserializer::Message>,
        protosocket::Connection<RpcCompletionConnectionBindings<Serializer, Deserializer>>,
    ),
    crate::Error,
>
where
    Deserializer: protosocket::Deserializer + Default + 'static,
    Serializer: protosocket::Serializer + Default + 'static,
    Deserializer::Message: Message,
    Serializer::Message: Message,
{
    log::trace!("new client {address}, {configuration:?}");

    let stream = tokio::net::TcpStream::connect(address).await?;
    stream.set_nodelay(true)?;

    let message_reactor: RpcCompletionReactor<
        Deserializer::Message,
        DoNothingMessageHandler<Deserializer::Message>,
    > = RpcCompletionReactor::new(Default::default());
    let (outbound, outbound_messages) = mpsc::channel(configuration.max_queued_outbound_messages);
    let rpc_client = RpcClient::new(outbound, &message_reactor);

    // Tie outbound_messages to message_reactor via a protosocket::Connection
    let connection = Connection::<RpcCompletionConnectionBindings<Serializer, Deserializer>>::new(
        stream,
        address,
        Deserializer::default(),
        Serializer::default(),
        configuration.max_buffer_length,
        configuration.max_queued_outbound_messages,
        outbound_messages,
        message_reactor,
    );

    Ok((rpc_client, connection))
}