protosocket_rpc/client/
configuration.rs1use std::net::SocketAddr;
2
3use protosocket::Connection;
4use tokio::sync::mpsc;
5
6use crate::{
7 client::reactor::completion_reactor::{DoNothingMessageHandler, RpcCompletionReactor},
8 Message,
9};
10
11use super::{reactor::completion_reactor::RpcCompletionConnectionBindings, RpcClient};
12
13#[derive(Debug, Clone)]
15pub struct Configuration {
16 max_buffer_length: usize,
17 max_queued_outbound_messages: usize,
18}
19
20impl Default for Configuration {
21 fn default() -> Self {
22 Self {
23 max_buffer_length: 4 * (2 << 20),
24 max_queued_outbound_messages: 256,
25 }
26 }
27}
28
29impl Configuration {
30 pub fn max_buffer_length(&mut self, max_buffer_length: usize) {
34 self.max_buffer_length = max_buffer_length;
35 }
36
37 pub fn max_queued_outbound_messages(&mut self, max_queued_outbound_messages: usize) {
41 self.max_queued_outbound_messages = max_queued_outbound_messages;
42 }
43}
44
45pub async fn connect<Serializer, Deserializer>(
47 address: SocketAddr,
48 configuration: &Configuration,
49) -> Result<
50 (
51 RpcClient<Serializer::Message, Deserializer::Message>,
52 protosocket::Connection<RpcCompletionConnectionBindings<Serializer, Deserializer>>,
53 ),
54 crate::Error,
55>
56where
57 Deserializer: protosocket::Deserializer + Default + 'static,
58 Serializer: protosocket::Serializer + Default + 'static,
59 Deserializer::Message: Message,
60 Serializer::Message: Message,
61{
62 log::trace!("new client {address}, {configuration:?}");
63
64 let stream = tokio::net::TcpStream::connect(address).await?;
65 stream.set_nodelay(true)?;
66
67 let message_reactor: RpcCompletionReactor<
68 Deserializer::Message,
69 DoNothingMessageHandler<Deserializer::Message>,
70 > = RpcCompletionReactor::new(Default::default());
71 let (outbound, outbound_messages) = mpsc::channel(configuration.max_queued_outbound_messages);
72 let rpc_client = RpcClient::new(outbound, &message_reactor);
73
74 let connection = Connection::<RpcCompletionConnectionBindings<Serializer, Deserializer>>::new(
76 stream,
77 address,
78 Deserializer::default(),
79 Serializer::default(),
80 configuration.max_buffer_length,
81 configuration.max_queued_outbound_messages,
82 outbound_messages,
83 message_reactor,
84 );
85
86 Ok((rpc_client, connection))
87}