ecksport_net/
builder.rs

1//! Builder types for setting up connections and listeners.
2
3use std::{net::SocketAddr, time};
4
5use tokio::time::timeout;
6
7use ecksport_core::{
8    peer::PeerData,
9    state_mach::ClientMeta,
10    topic,
11    traits::{AsyncRecvFrame, AsyncSendFrame, AuthConfig},
12};
13
14use crate::{
15    connection::{self, ConnectOptions},
16    constants::*,
17    errors::Error,
18    worker::{self, ConnectionHandle},
19};
20
21pub struct ClientBuilder {
22    agent: String,
23    connect_timeout: time::Duration,
24    protocol: topic::Topic,
25}
26
27impl ClientBuilder {
28    /// Creates a builder connect to a server using some protocol.
29    pub fn new(proto: topic::Topic) -> Self {
30        Self {
31            agent: ecksport_core::get_default_agent(),
32            connect_timeout: time::Duration::from_millis(DEFAULT_CONN_TIMEOUT_MS),
33            protocol: proto,
34        }
35    }
36
37    /// Sets the agent name.
38    pub fn set_agent_name(&mut self, name: &str) {
39        self.agent = ecksport_core::get_named_agent(name);
40    }
41
42    /// Sets the full agent string without supplementary info.
43    pub fn set_exact_agent_str(&mut self, agent_str: String) {
44        self.agent = agent_str;
45    }
46
47    /// Sets the connection timeout.
48    pub fn set_connect_timeout(&mut self, tout: time::Duration) {
49        self.connect_timeout = tout;
50    }
51
52    /// Wraps a freshly opened raw connection and performs the handshake.
53    async fn wrap_raw_conn<
54        A: AuthConfig,
55        T: AsyncRecvFrame + AsyncSendFrame + Sync + Send + Unpin + 'static,
56    >(
57        self,
58        auth: A,
59        transport: T,
60        peer_data: PeerData,
61    ) -> Result<ConnectionHandle, Error> {
62        let opts = ConnectOptions {
63            client_meta: ClientMeta::new(self.agent),
64            timeout: self.connect_timeout,
65        };
66
67        let conn =
68            connection::perform_handshake_async(transport, self.protocol, opts, auth, peer_data)
69                .await?;
70
71        Ok(worker::spawn_connection_worker(conn).await)
72    }
73
74    /// Connects to a server using a Tokio TCP stream and a provided auth config.
75    pub async fn connect_tcp_authed<A: AuthConfig>(
76        self,
77        socket_addr: SocketAddr,
78        auth: A,
79    ) -> Result<ConnectionHandle, Error> {
80        // Do the initial connection.
81        let socket_connect_fut = tokio::net::TcpStream::connect(socket_addr);
82        let sock = match timeout(self.connect_timeout, socket_connect_fut).await {
83            Ok(res) => res?,
84            Err(_) => return Err(Error::ConnectionTimeout),
85        };
86
87        let peer = PeerData::new_loc(ecksport_core::peer::Location::Ip(socket_addr));
88        let framer = ecksport_core::stream_framing::StreamFramer::new(sock);
89
90        self.wrap_raw_conn(auth, framer, peer).await
91    }
92
93    /// Connects to a server using a Tokio TCP stream without an auth config.
94    pub async fn connect_tcp(self, socket_addr: SocketAddr) -> Result<ConnectionHandle, Error> {
95        self.connect_tcp_authed(socket_addr, ()).await
96    }
97}