1use std::{net::SocketAddr, time};
4
5use tokio::time::timeout;
6
7use ecksport_core::{
8 peer::PeerData,
9 state_mach::ClientMeta,
10 topic,
11 traits::{AsyncRecvFrame, AsyncSendFrame, AuthConfig},
12};
13
14use crate::{
15 connection::{self, ConnectOptions},
16 constants::*,
17 errors::Error,
18 worker::{self, ConnectionHandle},
19};
20
21pub struct ClientBuilder {
22 agent: String,
23 connect_timeout: time::Duration,
24 protocol: topic::Topic,
25}
26
27impl ClientBuilder {
28 pub fn new(proto: topic::Topic) -> Self {
30 Self {
31 agent: ecksport_core::get_default_agent(),
32 connect_timeout: time::Duration::from_millis(DEFAULT_CONN_TIMEOUT_MS),
33 protocol: proto,
34 }
35 }
36
37 pub fn set_agent_name(&mut self, name: &str) {
39 self.agent = ecksport_core::get_named_agent(name);
40 }
41
42 pub fn set_exact_agent_str(&mut self, agent_str: String) {
44 self.agent = agent_str;
45 }
46
47 pub fn set_connect_timeout(&mut self, tout: time::Duration) {
49 self.connect_timeout = tout;
50 }
51
52 async fn wrap_raw_conn<
54 A: AuthConfig,
55 T: AsyncRecvFrame + AsyncSendFrame + Sync + Send + Unpin + 'static,
56 >(
57 self,
58 auth: A,
59 transport: T,
60 peer_data: PeerData,
61 ) -> Result<ConnectionHandle, Error> {
62 let opts = ConnectOptions {
63 client_meta: ClientMeta::new(self.agent),
64 timeout: self.connect_timeout,
65 };
66
67 let conn =
68 connection::perform_handshake_async(transport, self.protocol, opts, auth, peer_data)
69 .await?;
70
71 Ok(worker::spawn_connection_worker(conn).await)
72 }
73
74 pub async fn connect_tcp_authed<A: AuthConfig>(
76 self,
77 socket_addr: SocketAddr,
78 auth: A,
79 ) -> Result<ConnectionHandle, Error> {
80 let socket_connect_fut = tokio::net::TcpStream::connect(socket_addr);
82 let sock = match timeout(self.connect_timeout, socket_connect_fut).await {
83 Ok(res) => res?,
84 Err(_) => return Err(Error::ConnectionTimeout),
85 };
86
87 let peer = PeerData::new_loc(ecksport_core::peer::Location::Ip(socket_addr));
88 let framer = ecksport_core::stream_framing::StreamFramer::new(sock);
89
90 self.wrap_raw_conn(auth, framer, peer).await
91 }
92
93 pub async fn connect_tcp(self, socket_addr: SocketAddr) -> Result<ConnectionHandle, Error> {
95 self.connect_tcp_authed(socket_addr, ()).await
96 }
97}