zlink_core/connection/
mod.rs

1//! Contains connection related API.
2
3mod read_connection;
4pub use read_connection::ReadConnection;
5pub mod chain;
6pub mod socket;
7mod write_connection;
8use crate::{
9    reply::{self, Reply},
10    Call, Result,
11};
12pub use chain::Chain;
13use core::{fmt::Debug, sync::atomic::AtomicUsize};
14pub use write_connection::WriteConnection;
15
16use serde::{Deserialize, Serialize};
17pub use socket::Socket;
18
19/// A connection.
20///
21/// The low-level API to send and receive messages.
22///
23/// Each connection gets a unique identifier when created that can be queried using
24/// [`Connection::id`]. This ID is shared betwen the read and write halves of the connection. It
25/// can be used to associate the read and write halves of the same connection.
26///
27/// # Cancel safety
28///
29/// All async methods of this type are cancel safe unless explicitly stated otherwise in its
30/// documentation.
31#[derive(Debug)]
32pub struct Connection<S: Socket> {
33    read: ReadConnection<S::ReadHalf>,
34    write: WriteConnection<S::WriteHalf>,
35}
36
37impl<S> Connection<S>
38where
39    S: Socket,
40{
41    /// Create a new connection.
42    pub fn new(socket: S) -> Self {
43        let (read, write) = socket.split();
44        let id = NEXT_ID.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
45        Self {
46            read: ReadConnection::new(read, id),
47            write: WriteConnection::new(write, id),
48        }
49    }
50
51    /// The reference to the read half of the connection.
52    pub fn read(&self) -> &ReadConnection<S::ReadHalf> {
53        &self.read
54    }
55
56    /// The mutable reference to the read half of the connection.
57    pub fn read_mut(&mut self) -> &mut ReadConnection<S::ReadHalf> {
58        &mut self.read
59    }
60
61    /// The reference to the write half of the connection.
62    pub fn write(&self) -> &WriteConnection<S::WriteHalf> {
63        &self.write
64    }
65
66    /// The mutable reference to the write half of the connection.
67    pub fn write_mut(&mut self) -> &mut WriteConnection<S::WriteHalf> {
68        &mut self.write
69    }
70
71    /// Split the connection into read and write halves.
72    pub fn split(self) -> (ReadConnection<S::ReadHalf>, WriteConnection<S::WriteHalf>) {
73        (self.read, self.write)
74    }
75
76    /// Join the read and write halves into a connection (the opposite of [`Connection::split`]).
77    pub fn join(read: ReadConnection<S::ReadHalf>, write: WriteConnection<S::WriteHalf>) -> Self {
78        Self { read, write }
79    }
80
81    /// The unique identifier of the connection.
82    pub fn id(&self) -> usize {
83        assert_eq!(self.read.id(), self.write.id());
84        self.read.id()
85    }
86
87    /// Sends a method call.
88    ///
89    /// Convenience wrapper around [`WriteConnection::send_call`].
90    pub async fn send_call<Method>(&mut self, call: &Call<Method>) -> Result<()>
91    where
92        Method: Serialize + Debug,
93    {
94        self.write.send_call(call).await
95    }
96
97    /// Receives a method call reply.
98    ///
99    /// Convenience wrapper around [`ReadConnection::receive_reply`].
100    pub async fn receive_reply<'r, ReplyParams, ReplyError>(
101        &'r mut self,
102    ) -> Result<reply::Result<ReplyParams, ReplyError>>
103    where
104        ReplyParams: Deserialize<'r> + Debug,
105        ReplyError: Deserialize<'r> + Debug,
106    {
107        self.read.receive_reply().await
108    }
109
110    /// Call a method and receive a reply.
111    ///
112    /// This is a convenience method that combines [`Connection::send_call`] and
113    /// [`Connection::receive_reply`].
114    pub async fn call_method<'r, Method, ReplyParams, ReplyError>(
115        &'r mut self,
116        call: &Call<Method>,
117    ) -> Result<reply::Result<ReplyParams, ReplyError>>
118    where
119        Method: Serialize + Debug,
120        ReplyParams: Deserialize<'r> + Debug,
121        ReplyError: Deserialize<'r> + Debug,
122    {
123        self.send_call(call).await?;
124        self.receive_reply().await
125    }
126
127    /// Receive a method call over the socket.
128    ///
129    /// Convenience wrapper around [`ReadConnection::receive_call`].
130    pub async fn receive_call<'m, Method>(&'m mut self) -> Result<Call<Method>>
131    where
132        Method: Deserialize<'m> + Debug,
133    {
134        self.read.receive_call().await
135    }
136
137    /// Send a reply over the socket.
138    ///
139    /// Convenience wrapper around [`WriteConnection::send_reply`].
140    pub async fn send_reply<ReplyParams>(&mut self, reply: &Reply<ReplyParams>) -> Result<()>
141    where
142        ReplyParams: Serialize + Debug,
143    {
144        self.write.send_reply(reply).await
145    }
146
147    /// Send an error reply over the socket.
148    ///
149    /// Convenience wrapper around [`WriteConnection::send_error`].
150    pub async fn send_error<ReplyError>(&mut self, error: &ReplyError) -> Result<()>
151    where
152        ReplyError: Serialize + Debug,
153    {
154        self.write.send_error(error).await
155    }
156
157    /// Enqueue a call to the server.
158    ///
159    /// Convenience wrapper around [`WriteConnection::enqueue_call`].
160    pub fn enqueue_call<Method>(&mut self, method: &Call<Method>) -> Result<()>
161    where
162        Method: Serialize + Debug,
163    {
164        self.write.enqueue_call(method)
165    }
166
167    /// Flush the connection.
168    ///
169    /// Convenience wrapper around [`WriteConnection::flush`].
170    pub async fn flush(&mut self) -> Result<()> {
171        self.write.flush().await
172    }
173
174    /// Start a chain of method calls.
175    ///
176    /// This allows batching multiple calls together and sending them in a single write operation.
177    ///
178    /// # Examples
179    ///
180    /// ## Basic Usage with Sequential Access
181    ///
182    /// ```no_run
183    /// use zlink_core::{Connection, Call, reply};
184    /// use serde::{Serialize, Deserialize};
185    /// use serde_prefix_all::prefix_all;
186    /// use futures_util::{pin_mut, stream::StreamExt};
187    ///
188    /// # async fn example() -> zlink_core::Result<()> {
189    /// # let mut conn: Connection<zlink_core::connection::socket::impl_for_doc::Socket> = todo!();
190    ///
191    /// #[prefix_all("org.example.")]
192    /// #[derive(Debug, Serialize, Deserialize)]
193    /// #[serde(tag = "method", content = "parameters")]
194    /// enum Methods {
195    ///     GetUser { id: u32 },
196    ///     GetProject { id: u32 },
197    /// }
198    ///
199    /// #[derive(Debug, Deserialize)]
200    /// struct User { name: String }
201    ///
202    /// #[derive(Debug, Deserialize)]
203    /// struct Project { title: String }
204    ///
205    /// #[derive(Debug, zlink_core::ReplyError)]
206    /// #[zlink(
207    ///     interface = "org.example",
208    ///     // Not needed in the real code because you'll use `ReplyError` through `zlink` crate.
209    ///     crate = "zlink_core",
210    /// )]
211    /// enum ApiError {
212    ///     UserNotFound { code: i32 },
213    ///     ProjectNotFound { code: i32 },
214    /// }
215    ///
216    /// let get_user = Call::new(Methods::GetUser { id: 1 });
217    /// let get_project = Call::new(Methods::GetProject { id: 2 });
218    ///
219    /// // Chain calls and send them in a batch
220    /// let replies = conn
221    ///     .chain_call::<Methods, User, ApiError>(&get_user)?
222    ///     .append(&get_project)?
223    ///     .send().await?;
224    /// pin_mut!(replies);
225    ///
226    /// // Access replies sequentially - types are now fixed by the chain
227    /// let user_reply = replies.next().await.unwrap()?;
228    /// let project_reply = replies.next().await.unwrap()?;
229    ///
230    /// match user_reply {
231    ///     Ok(user) => println!("User: {}", user.parameters().unwrap().name),
232    ///     Err(error) => println!("User error: {:?}", error),
233    /// }
234    /// # Ok(())
235    /// # }
236    /// ```
237    ///
238    /// ## Arbitrary Number of Calls
239    ///
240    /// ```no_run
241    /// # use zlink_core::{Connection, Call, reply};
242    /// # use serde::{Serialize, Deserialize};
243    /// # use futures_util::{pin_mut, stream::StreamExt};
244    /// # use serde_prefix_all::prefix_all;
245    /// # async fn example() -> zlink_core::Result<()> {
246    /// # let mut conn: Connection<zlink_core::connection::socket::impl_for_doc::Socket> = todo!();
247    /// # #[prefix_all("org.example.")]
248    /// # #[derive(Debug, Serialize, Deserialize)]
249    /// # #[serde(tag = "method", content = "parameters")]
250    /// # enum Methods {
251    /// #     GetUser { id: u32 },
252    /// # }
253    /// # #[derive(Debug, Deserialize)]
254    /// # struct User { name: String }
255    /// # #[derive(Debug, zlink_core::ReplyError)]
256    /// #[zlink(
257    ///     interface = "org.example",
258    ///     // Not needed in the real code because you'll use `ReplyError` through `zlink` crate.
259    ///     crate = "zlink_core",
260    /// )]
261    /// # enum ApiError {
262    /// #     UserNotFound { code: i32 },
263    /// #     ProjectNotFound { code: i32 },
264    /// # }
265    /// # let get_user = Call::new(Methods::GetUser { id: 1 });
266    ///
267    /// // Chain many calls (no upper limit)
268    /// let mut chain = conn.chain_call::<Methods, User, ApiError>(&get_user)?;
269    /// for i in 2..100 {
270    ///     chain = chain.append(&Call::new(Methods::GetUser { id: i }))?;
271    /// }
272    ///
273    /// let replies = chain.send().await?;
274    /// pin_mut!(replies);
275    ///
276    /// // Process all replies sequentially - types are fixed by the chain
277    /// while let Some(user_reply) = replies.next().await {
278    ///     let user_reply = user_reply?;
279    ///     // Handle each reply...
280    ///     match user_reply {
281    ///         Ok(user) => println!("User: {}", user.parameters().unwrap().name),
282    ///         Err(error) => println!("Error: {:?}", error),
283    ///     }
284    /// }
285    /// # Ok(())
286    /// # }
287    /// ```
288    ///
289    /// # Performance Benefits
290    ///
291    /// Instead of multiple write operations, the chain sends all calls in a single
292    /// write operation, reducing context switching and therefore minimizing latency.
293    pub fn chain_call<'c, Method, ReplyParams, ReplyError>(
294        &'c mut self,
295        call: &Call<Method>,
296    ) -> Result<Chain<'c, S, ReplyParams, ReplyError>>
297    where
298        Method: Serialize + Debug,
299        ReplyParams: Deserialize<'c> + Debug,
300        ReplyError: Deserialize<'c> + Debug,
301    {
302        Chain::new(self, call)
303    }
304}
305
306impl<S> From<S> for Connection<S>
307where
308    S: Socket,
309{
310    fn from(socket: S) -> Self {
311        Self::new(socket)
312    }
313}
314
315#[cfg(feature = "io-buffer-1mb")]
316pub(crate) const BUFFER_SIZE: usize = 1024 * 1024;
317#[cfg(all(not(feature = "io-buffer-1mb"), feature = "io-buffer-16kb"))]
318pub(crate) const BUFFER_SIZE: usize = 16 * 1024;
319#[cfg(all(
320    not(feature = "io-buffer-1mb"),
321    not(feature = "io-buffer-16kb"),
322    feature = "io-buffer-4kb"
323))]
324pub(crate) const BUFFER_SIZE: usize = 4 * 1024;
325#[cfg(all(
326    not(feature = "io-buffer-1mb"),
327    not(feature = "io-buffer-16kb"),
328    not(feature = "io-buffer-4kb"),
329))]
330pub(crate) const BUFFER_SIZE: usize = 4 * 1024;
331
332#[cfg(feature = "std")]
333const MAX_BUFFER_SIZE: usize = 100 * 1024 * 1024; // Don't allow buffers over 100MB.
334
335static NEXT_ID: AtomicUsize = AtomicUsize::new(0);