zlink_core/connection/mod.rs
1//! Contains connection related API.
2
3mod read_connection;
4pub use read_connection::ReadConnection;
5pub mod chain;
6pub mod socket;
7mod write_connection;
8use crate::{
9 reply::{self, Reply},
10 Call, Result,
11};
12pub use chain::Chain;
13use core::{fmt::Debug, sync::atomic::AtomicUsize};
14pub use write_connection::WriteConnection;
15
16use serde::{Deserialize, Serialize};
17pub use socket::Socket;
18
19/// A connection.
20///
21/// The low-level API to send and receive messages.
22///
23/// Each connection gets a unique identifier when created that can be queried using
24/// [`Connection::id`]. This ID is shared betwen the read and write halves of the connection. It
25/// can be used to associate the read and write halves of the same connection.
26///
27/// # Cancel safety
28///
29/// All async methods of this type are cancel safe unless explicitly stated otherwise in its
30/// documentation.
31#[derive(Debug)]
32pub struct Connection<S: Socket> {
33 read: ReadConnection<S::ReadHalf>,
34 write: WriteConnection<S::WriteHalf>,
35}
36
37impl<S> Connection<S>
38where
39 S: Socket,
40{
41 /// Create a new connection.
42 pub fn new(socket: S) -> Self {
43 let (read, write) = socket.split();
44 let id = NEXT_ID.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
45 Self {
46 read: ReadConnection::new(read, id),
47 write: WriteConnection::new(write, id),
48 }
49 }
50
51 /// The reference to the read half of the connection.
52 pub fn read(&self) -> &ReadConnection<S::ReadHalf> {
53 &self.read
54 }
55
56 /// The mutable reference to the read half of the connection.
57 pub fn read_mut(&mut self) -> &mut ReadConnection<S::ReadHalf> {
58 &mut self.read
59 }
60
61 /// The reference to the write half of the connection.
62 pub fn write(&self) -> &WriteConnection<S::WriteHalf> {
63 &self.write
64 }
65
66 /// The mutable reference to the write half of the connection.
67 pub fn write_mut(&mut self) -> &mut WriteConnection<S::WriteHalf> {
68 &mut self.write
69 }
70
71 /// Split the connection into read and write halves.
72 pub fn split(self) -> (ReadConnection<S::ReadHalf>, WriteConnection<S::WriteHalf>) {
73 (self.read, self.write)
74 }
75
76 /// Join the read and write halves into a connection (the opposite of [`Connection::split`]).
77 pub fn join(read: ReadConnection<S::ReadHalf>, write: WriteConnection<S::WriteHalf>) -> Self {
78 Self { read, write }
79 }
80
81 /// The unique identifier of the connection.
82 pub fn id(&self) -> usize {
83 assert_eq!(self.read.id(), self.write.id());
84 self.read.id()
85 }
86
87 /// Sends a method call.
88 ///
89 /// Convenience wrapper around [`WriteConnection::send_call`].
90 pub async fn send_call<Method>(&mut self, call: &Call<Method>) -> Result<()>
91 where
92 Method: Serialize + Debug,
93 {
94 self.write.send_call(call).await
95 }
96
97 /// Receives a method call reply.
98 ///
99 /// Convenience wrapper around [`ReadConnection::receive_reply`].
100 pub async fn receive_reply<'r, ReplyParams, ReplyError>(
101 &'r mut self,
102 ) -> Result<reply::Result<ReplyParams, ReplyError>>
103 where
104 ReplyParams: Deserialize<'r> + Debug,
105 ReplyError: Deserialize<'r> + Debug,
106 {
107 self.read.receive_reply().await
108 }
109
110 /// Call a method and receive a reply.
111 ///
112 /// This is a convenience method that combines [`Connection::send_call`] and
113 /// [`Connection::receive_reply`].
114 pub async fn call_method<'r, Method, ReplyParams, ReplyError>(
115 &'r mut self,
116 call: &Call<Method>,
117 ) -> Result<reply::Result<ReplyParams, ReplyError>>
118 where
119 Method: Serialize + Debug,
120 ReplyParams: Deserialize<'r> + Debug,
121 ReplyError: Deserialize<'r> + Debug,
122 {
123 self.send_call(call).await?;
124 self.receive_reply().await
125 }
126
127 /// Receive a method call over the socket.
128 ///
129 /// Convenience wrapper around [`ReadConnection::receive_call`].
130 pub async fn receive_call<'m, Method>(&'m mut self) -> Result<Call<Method>>
131 where
132 Method: Deserialize<'m> + Debug,
133 {
134 self.read.receive_call().await
135 }
136
137 /// Send a reply over the socket.
138 ///
139 /// Convenience wrapper around [`WriteConnection::send_reply`].
140 pub async fn send_reply<ReplyParams>(&mut self, reply: &Reply<ReplyParams>) -> Result<()>
141 where
142 ReplyParams: Serialize + Debug,
143 {
144 self.write.send_reply(reply).await
145 }
146
147 /// Send an error reply over the socket.
148 ///
149 /// Convenience wrapper around [`WriteConnection::send_error`].
150 pub async fn send_error<ReplyError>(&mut self, error: &ReplyError) -> Result<()>
151 where
152 ReplyError: Serialize + Debug,
153 {
154 self.write.send_error(error).await
155 }
156
157 /// Enqueue a call to the server.
158 ///
159 /// Convenience wrapper around [`WriteConnection::enqueue_call`].
160 pub fn enqueue_call<Method>(&mut self, method: &Call<Method>) -> Result<()>
161 where
162 Method: Serialize + Debug,
163 {
164 self.write.enqueue_call(method)
165 }
166
167 /// Flush the connection.
168 ///
169 /// Convenience wrapper around [`WriteConnection::flush`].
170 pub async fn flush(&mut self) -> Result<()> {
171 self.write.flush().await
172 }
173
174 /// Start a chain of method calls.
175 ///
176 /// This allows batching multiple calls together and sending them in a single write operation.
177 ///
178 /// # Examples
179 ///
180 /// ## Basic Usage with Sequential Access
181 ///
182 /// ```no_run
183 /// use zlink_core::{Connection, Call, reply};
184 /// use serde::{Serialize, Deserialize};
185 /// use serde_prefix_all::prefix_all;
186 /// use futures_util::{pin_mut, stream::StreamExt};
187 ///
188 /// # async fn example() -> zlink_core::Result<()> {
189 /// # let mut conn: Connection<zlink_core::connection::socket::impl_for_doc::Socket> = todo!();
190 ///
191 /// #[prefix_all("org.example.")]
192 /// #[derive(Debug, Serialize, Deserialize)]
193 /// #[serde(tag = "method", content = "parameters")]
194 /// enum Methods {
195 /// GetUser { id: u32 },
196 /// GetProject { id: u32 },
197 /// }
198 ///
199 /// #[derive(Debug, Deserialize)]
200 /// struct User { name: String }
201 ///
202 /// #[derive(Debug, Deserialize)]
203 /// struct Project { title: String }
204 ///
205 /// #[derive(Debug, zlink_core::ReplyError)]
206 /// #[zlink(
207 /// interface = "org.example",
208 /// // Not needed in the real code because you'll use `ReplyError` through `zlink` crate.
209 /// crate = "zlink_core",
210 /// )]
211 /// enum ApiError {
212 /// UserNotFound { code: i32 },
213 /// ProjectNotFound { code: i32 },
214 /// }
215 ///
216 /// let get_user = Call::new(Methods::GetUser { id: 1 });
217 /// let get_project = Call::new(Methods::GetProject { id: 2 });
218 ///
219 /// // Chain calls and send them in a batch
220 /// let replies = conn
221 /// .chain_call::<Methods, User, ApiError>(&get_user)?
222 /// .append(&get_project)?
223 /// .send().await?;
224 /// pin_mut!(replies);
225 ///
226 /// // Access replies sequentially - types are now fixed by the chain
227 /// let user_reply = replies.next().await.unwrap()?;
228 /// let project_reply = replies.next().await.unwrap()?;
229 ///
230 /// match user_reply {
231 /// Ok(user) => println!("User: {}", user.parameters().unwrap().name),
232 /// Err(error) => println!("User error: {:?}", error),
233 /// }
234 /// # Ok(())
235 /// # }
236 /// ```
237 ///
238 /// ## Arbitrary Number of Calls
239 ///
240 /// ```no_run
241 /// # use zlink_core::{Connection, Call, reply};
242 /// # use serde::{Serialize, Deserialize};
243 /// # use futures_util::{pin_mut, stream::StreamExt};
244 /// # use serde_prefix_all::prefix_all;
245 /// # async fn example() -> zlink_core::Result<()> {
246 /// # let mut conn: Connection<zlink_core::connection::socket::impl_for_doc::Socket> = todo!();
247 /// # #[prefix_all("org.example.")]
248 /// # #[derive(Debug, Serialize, Deserialize)]
249 /// # #[serde(tag = "method", content = "parameters")]
250 /// # enum Methods {
251 /// # GetUser { id: u32 },
252 /// # }
253 /// # #[derive(Debug, Deserialize)]
254 /// # struct User { name: String }
255 /// # #[derive(Debug, zlink_core::ReplyError)]
256 /// #[zlink(
257 /// interface = "org.example",
258 /// // Not needed in the real code because you'll use `ReplyError` through `zlink` crate.
259 /// crate = "zlink_core",
260 /// )]
261 /// # enum ApiError {
262 /// # UserNotFound { code: i32 },
263 /// # ProjectNotFound { code: i32 },
264 /// # }
265 /// # let get_user = Call::new(Methods::GetUser { id: 1 });
266 ///
267 /// // Chain many calls (no upper limit)
268 /// let mut chain = conn.chain_call::<Methods, User, ApiError>(&get_user)?;
269 /// for i in 2..100 {
270 /// chain = chain.append(&Call::new(Methods::GetUser { id: i }))?;
271 /// }
272 ///
273 /// let replies = chain.send().await?;
274 /// pin_mut!(replies);
275 ///
276 /// // Process all replies sequentially - types are fixed by the chain
277 /// while let Some(user_reply) = replies.next().await {
278 /// let user_reply = user_reply?;
279 /// // Handle each reply...
280 /// match user_reply {
281 /// Ok(user) => println!("User: {}", user.parameters().unwrap().name),
282 /// Err(error) => println!("Error: {:?}", error),
283 /// }
284 /// }
285 /// # Ok(())
286 /// # }
287 /// ```
288 ///
289 /// # Performance Benefits
290 ///
291 /// Instead of multiple write operations, the chain sends all calls in a single
292 /// write operation, reducing context switching and therefore minimizing latency.
293 pub fn chain_call<'c, Method, ReplyParams, ReplyError>(
294 &'c mut self,
295 call: &Call<Method>,
296 ) -> Result<Chain<'c, S, ReplyParams, ReplyError>>
297 where
298 Method: Serialize + Debug,
299 ReplyParams: Deserialize<'c> + Debug,
300 ReplyError: Deserialize<'c> + Debug,
301 {
302 Chain::new(self, call)
303 }
304}
305
306impl<S> From<S> for Connection<S>
307where
308 S: Socket,
309{
310 fn from(socket: S) -> Self {
311 Self::new(socket)
312 }
313}
314
315#[cfg(feature = "io-buffer-1mb")]
316pub(crate) const BUFFER_SIZE: usize = 1024 * 1024;
317#[cfg(all(not(feature = "io-buffer-1mb"), feature = "io-buffer-16kb"))]
318pub(crate) const BUFFER_SIZE: usize = 16 * 1024;
319#[cfg(all(
320 not(feature = "io-buffer-1mb"),
321 not(feature = "io-buffer-16kb"),
322 feature = "io-buffer-4kb"
323))]
324pub(crate) const BUFFER_SIZE: usize = 4 * 1024;
325#[cfg(all(
326 not(feature = "io-buffer-1mb"),
327 not(feature = "io-buffer-16kb"),
328 not(feature = "io-buffer-4kb"),
329))]
330pub(crate) const BUFFER_SIZE: usize = 4 * 1024;
331
332#[cfg(feature = "std")]
333const MAX_BUFFER_SIZE: usize = 100 * 1024 * 1024; // Don't allow buffers over 100MB.
334
335static NEXT_ID: AtomicUsize = AtomicUsize::new(0);