1use std::io;
2use std::sync::Arc;
3#[cfg(feature = "datagram")]
4use std::sync::atomic::AtomicU64;
5
6use bytes::Bytes;
7#[cfg(feature = "datagram")]
8use tokio_util::sync::CancellationToken;
9
10use ombrac::protocol::{Address, Secret};
11use ombrac_transport::{Connection, Initiator};
12
13use crate::connection::BufferedStream;
14use crate::connection::ClientConnection;
15#[cfg(feature = "datagram")]
16use crate::connection::{UdpDispatcher, UdpSession};
17
18pub struct Client<T, C>
24where
25 T: Initiator<Connection = C>,
26 C: Connection,
27{
28 connection: Arc<ClientConnection<T, C>>,
30 #[cfg(feature = "datagram")]
32 _dispatcher_handle: tokio::task::JoinHandle<()>,
33 #[cfg(feature = "datagram")]
34 session_id_counter: Arc<std::sync::atomic::AtomicU64>,
35 #[cfg(feature = "datagram")]
36 udp_dispatcher: Arc<UdpDispatcher>,
37 #[cfg(feature = "datagram")]
38 shutdown_token: CancellationToken,
39}
40
41impl<T, C> Client<T, C>
42where
43 T: Initiator<Connection = C>,
44 C: Connection,
45{
46 pub async fn new(transport: T, secret: Secret, options: Option<Bytes>) -> io::Result<Self> {
51 let connection = Arc::new(ClientConnection::new(transport, secret, options).await?);
52
53 #[cfg(feature = "datagram")]
54 let session_id_counter = Arc::new(AtomicU64::new(1));
55 #[cfg(feature = "datagram")]
56 let udp_dispatcher = Arc::new(UdpDispatcher::new());
57 #[cfg(feature = "datagram")]
58 let shutdown_token = CancellationToken::new();
59
60 #[cfg(feature = "datagram")]
62 let dispatcher_handle = {
63 let connection_clone = Arc::clone(&connection);
64 let dispatcher_clone = Arc::clone(&udp_dispatcher);
65 let shutdown_clone = shutdown_token.clone();
66 tokio::spawn(async move {
67 UdpDispatcher::run(connection_clone, dispatcher_clone, shutdown_clone).await;
68 })
69 };
70
71 Ok(Self {
72 connection,
73 #[cfg(feature = "datagram")]
74 _dispatcher_handle: dispatcher_handle,
75 #[cfg(feature = "datagram")]
76 session_id_counter,
77 #[cfg(feature = "datagram")]
78 udp_dispatcher,
79 #[cfg(feature = "datagram")]
80 shutdown_token,
81 })
82 }
83
84 #[cfg(feature = "datagram")]
89 pub fn open_associate(&self) -> UdpSession<T, C> {
90 use ombrac_macros::info;
91
92 let session_id = self
93 .session_id_counter
94 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
95 info!(
96 "[Client] New UDP session created with session_id={}",
97 session_id
98 );
99 let receiver = self.udp_dispatcher.register_session(session_id);
100
101 UdpSession::new(
102 session_id,
103 Arc::clone(&self.connection),
104 Arc::clone(&self.udp_dispatcher),
105 receiver,
106 )
107 }
108
109 pub async fn open_bidirectional(
119 &self,
120 dest_addr: Address,
121 ) -> io::Result<BufferedStream<C::Stream>> {
122 self.connection.open_bidirectional(dest_addr).await
123 }
124
125 pub async fn rebind(&self) -> io::Result<()> {
127 self.connection.rebind().await
128 }
129}
130
131impl<T, C> Drop for Client<T, C>
132where
133 T: Initiator<Connection = C>,
134 C: Connection,
135{
136 fn drop(&mut self) {
137 #[cfg(feature = "datagram")]
139 self.shutdown_token.cancel();
140 }
141}