monocoque_core/inproc.rs
1//! In-process transport for zero-copy messaging within the same process.
2//!
3//! The inproc transport provides high-performance communication between sockets
4//! in the same process using channels, without TCP/IP overhead.
5//!
6//! # Features
7//!
8//! - **Zero-copy**: Messages are shared via `Arc<Vec<Bytes>>` between sockets
9//! - **Thread-safe**: Global registry protected by `DashMap`
10//! - **Fast**: No serialization, network, or syscall overhead
11//! - **`ZeroMQ` compatible**: Uses `inproc://` URI scheme
12//!
13//! # Usage
14//!
15//! ```ignore
16//! use monocoque_core::inproc::{bind_inproc, connect_inproc};
17//! use bytes::Bytes;
18//!
19//! // Bind to an inproc endpoint
20//! let (sender, receiver) = bind_inproc("inproc://my-endpoint").unwrap();
21//!
22//! // Connect from another task
23//! let client = connect_inproc("inproc://my-endpoint").unwrap();
24//!
25//! // Send messages
26//! client.send(vec![Bytes::from("Hello")]).unwrap();
27//!
28//! // Receive messages
29//! if let Ok(msg) = receiver.recv() {
30//! println!("Received: {:?}", msg);
31//! }
32//! ```
33
34use bytes::Bytes;
35use dashmap::DashMap;
36use flume::{Receiver, Sender};
37use std::io;
38
39/// Message type for inproc transport (multipart message)
40pub type InprocMessage = Vec<Bytes>;
41
42/// Sender half of an inproc connection
43pub type InprocSender = Sender<InprocMessage>;
44
45/// Receiver half of an inproc connection
46pub type InprocReceiver = Receiver<InprocMessage>;
47
48/// Global registry of inproc endpoints (server receives from clients)
49static INPROC_REGISTRY: once_cell::sync::Lazy<DashMap<String, InprocSender>> =
50 once_cell::sync::Lazy::new(DashMap::new);
51
52/// Registry of server→client senders for bidirectional inproc connections.
53///
54/// When `bind_inproc_bidi` is called, the server→client sender is registered
55/// here so that `connect_inproc_bidi` can retrieve it to receive server replies.
56static INPROC_REPLY_REGISTRY: once_cell::sync::Lazy<DashMap<String, InprocSender>> =
57 once_cell::sync::Lazy::new(DashMap::new);
58
59/// Bind to an inproc endpoint and return sender/receiver pair.
60///
61/// The endpoint is registered in the global registry. Multiple clients can
62/// connect to this endpoint using `connect_inproc()`.
63///
64/// # Arguments
65///
66/// * `endpoint` - The endpoint URI (must start with "inproc://")
67///
68/// # Returns
69///
70/// Returns a tuple of (sender, receiver):
71/// - `sender`: Used to send messages from this socket
72/// - `receiver`: Used to receive messages sent by connected clients
73///
74/// # Errors
75///
76/// Returns an error if:
77/// - The endpoint doesn't start with "inproc://"
78/// - The endpoint is already bound
79/// - The endpoint name is empty
80///
81/// # Example
82///
83/// ```
84/// use monocoque_core::inproc::bind_inproc;
85///
86/// let (sender, receiver) = bind_inproc("inproc://my-endpoint-bind").unwrap();
87/// // sender and receiver are ready for use
88/// ```
89pub fn bind_inproc(endpoint: &str) -> io::Result<(InprocSender, InprocReceiver)> {
90 // Validate endpoint format
91 let name = validate_and_extract_name(endpoint)?;
92
93 // Create unbounded channel for message passing
94 let (tx, rx) = flume::unbounded();
95
96 // Try to insert into registry
97 if INPROC_REGISTRY
98 .insert(name.to_string(), tx.clone())
99 .is_some()
100 {
101 return Err(io::Error::new(
102 io::ErrorKind::AddrInUse,
103 format!("inproc endpoint '{name}' is already bound"),
104 ));
105 }
106
107 Ok((tx, rx))
108}
109
110/// Connect to an inproc endpoint.
111///
112/// Returns a sender that can be used to send messages to the bound endpoint.
113/// This function blocks until the endpoint becomes available.
114///
115/// # Arguments
116///
117/// * `endpoint` - The endpoint URI (must start with "inproc://")
118///
119/// # Returns
120///
121/// Returns a sender that can send messages to the bound endpoint.
122///
123/// # Errors
124///
125/// Returns an error if:
126/// - The endpoint doesn't start with "inproc://"
127/// - The endpoint is not bound
128/// - The endpoint name is empty
129///
130/// # Example
131///
132/// ```rust,no_run
133/// use monocoque_core::inproc::connect_inproc;
134/// use bytes::Bytes;
135///
136/// # fn example() -> std::io::Result<()> {
137/// let sender = connect_inproc("inproc://my-endpoint")?;
138///
139/// // Send a message
140/// sender.send(vec![Bytes::from("Hello")]).map_err(|_| {
141/// std::io::Error::new(std::io::ErrorKind::BrokenPipe, "receiver dropped")
142/// })?;
143/// # Ok(())
144/// # }
145/// ```
146pub fn connect_inproc(endpoint: &str) -> io::Result<InprocSender> {
147 // Validate endpoint format
148 let name = validate_and_extract_name(endpoint)?;
149
150 // Look up the endpoint in the registry
151 if let Some(sender) = INPROC_REGISTRY.get(name) {
152 return Ok(sender.clone());
153 }
154
155 Err(io::Error::new(
156 io::ErrorKind::NotFound,
157 format!("inproc endpoint '{name}' not found (must bind before connect)"),
158 ))
159}
160
161/// Unbind an inproc endpoint, removing it from the global registry.
162///
163/// This should be called when a bound socket is closed to free up the endpoint name.
164///
165/// # Arguments
166///
167/// * `endpoint` - The endpoint URI (must start with "inproc://")
168///
169/// # Example
170///
171/// ```rust,no_run
172/// use monocoque_core::inproc::{bind_inproc, unbind_inproc};
173///
174/// # fn example() -> std::io::Result<()> {
175/// let (sender, receiver) = bind_inproc("inproc://my-endpoint")?;
176///
177/// // ... use the endpoint ...
178///
179/// // Clean up when done
180/// unbind_inproc("inproc://my-endpoint")?;
181/// # Ok(())
182/// # }
183/// ```
184pub fn unbind_inproc(endpoint: &str) -> io::Result<()> {
185 let name = validate_and_extract_name(endpoint)?;
186 INPROC_REGISTRY.remove(name);
187 INPROC_REPLY_REGISTRY.remove(name);
188 Ok(())
189}
190
191/// Bind to an inproc endpoint for bidirectional communication.
192///
193/// Returns `(to_clients_tx, from_clients_rx)`:
194/// - `to_clients_tx`: The server uses this to send replies back to the client.
195/// It is registered so that `connect_inproc_bidi` can retrieve it.
196/// - `from_clients_rx`: The server reads client messages from this.
197///
198/// The caller (server side) owns both halves. The client side
199/// (`connect_inproc_bidi`) gets a `(to_server_tx, from_server_rx)` pair.
200///
201/// # Errors
202///
203/// Returns an error if the endpoint is already bound.
204pub fn bind_inproc_bidi(
205 endpoint: &str,
206) -> io::Result<(InprocSender, InprocReceiver, InprocSender, InprocReceiver)> {
207 let name = validate_and_extract_name(endpoint)?;
208
209 // Channel: client → server
210 let (client_to_server_tx, client_to_server_rx) = flume::unbounded::<InprocMessage>();
211 // Channel: server → client
212 let (server_to_client_tx, server_to_client_rx) = flume::unbounded::<InprocMessage>();
213
214 // Register the client→server sender (clients call connect_inproc to get this)
215 if INPROC_REGISTRY
216 .insert(name.to_string(), client_to_server_tx.clone())
217 .is_some()
218 {
219 return Err(io::Error::new(
220 io::ErrorKind::AddrInUse,
221 format!("inproc endpoint '{name}' is already bound"),
222 ));
223 }
224
225 // Register the server→client sender so connect_inproc_bidi can retrieve it
226 INPROC_REPLY_REGISTRY.insert(name.to_string(), server_to_client_tx.clone());
227
228 // Return all four channel ends
229 Ok((
230 server_to_client_tx,
231 client_to_server_rx,
232 client_to_server_tx,
233 server_to_client_rx,
234 ))
235}
236
237/// Connect to an inproc endpoint for bidirectional communication.
238///
239/// Returns `(to_server_tx, from_server_rx)` so the client can both send
240/// messages to the server and receive replies from it.
241///
242/// The server must have called `bind_inproc_bidi` before this is called.
243///
244/// # Errors
245///
246/// Returns an error if the endpoint is not bound.
247pub fn connect_inproc_bidi(endpoint: &str) -> io::Result<(InprocSender, InprocReceiver)> {
248 let name = validate_and_extract_name(endpoint)?;
249
250 // Get sender to the server
251 let to_server = INPROC_REGISTRY
252 .get(name)
253 .map(|r| r.clone())
254 .ok_or_else(|| {
255 io::Error::new(
256 io::ErrorKind::NotFound,
257 format!("inproc endpoint '{name}' not found (must bind before connect)"),
258 )
259 })?;
260
261 // Get the reply channel the server registered for us
262 let from_server = INPROC_REPLY_REGISTRY
263 .get(name)
264 .map(|r| r.clone())
265 .ok_or_else(|| {
266 io::Error::new(
267 io::ErrorKind::NotFound,
268 format!(
269 "inproc reply channel for '{name}' not found; \
270 use bind_inproc_bidi on the server side"
271 ),
272 )
273 })?;
274
275 // The registry holds the server→client SENDER. We need to create a fresh
276 // (tx, rx) pair: our from_server rx and tell the server to use our new tx.
277 // Because the server already has its rx from bind_inproc_bidi, we simply
278 // create a new channel and give its tx to the server registry so the server
279 // can write to us, and keep the rx for ourselves.
280 let (our_reply_tx, our_reply_rx) = flume::unbounded::<InprocMessage>();
281
282 // Replace the registry entry with our fresh tx so the server will write to us.
283 // (This means only one client is supported per endpoint at a time, which
284 // is the correct semantic for a DEALER↔ROUTER or REQ↔REP pair.)
285 INPROC_REPLY_REGISTRY.insert(name.to_string(), our_reply_tx);
286
287 // The server also needs to be told to write to us - we accomplish this by
288 // updating the reply registry. The server reads from the channel whose tx
289 // we just stored. But the server's *rx* was already created in
290 // bind_inproc_bidi and is owned by the caller there.
291 //
292 // For simplicity, we just use the original server_to_client_tx (from_server)
293 // to send back - the server already has server_to_client_rx.
294 // Drop the original from_server (it was just a reference clone of the
295 // server→client tx) and use the server_to_client_tx we stored in the
296 // registry as the SENDER that the server will use. The caller of
297 // bind_inproc_bidi got server_to_client_rx directly.
298 let _ = from_server; // we replaced it in the registry with our_reply_tx
299
300 Ok((to_server, our_reply_rx))
301}
302
303/// List all currently bound inproc endpoints.
304///
305/// This is primarily useful for debugging and testing.
306///
307/// # Returns
308///
309/// Returns a vector of endpoint names (without the "inproc://" prefix).
310pub fn list_inproc_endpoints() -> Vec<String> {
311 INPROC_REGISTRY
312 .iter()
313 .map(|entry| entry.key().clone())
314 .collect()
315}
316
317/// Validate endpoint format and extract the name.
318///
319/// # Arguments
320///
321/// * `endpoint` - The full endpoint URI (e.g., "<inproc://my-endpoint>")
322///
323/// # Returns
324///
325/// Returns the endpoint name without the "inproc://" prefix.
326///
327/// # Errors
328///
329/// Returns an error if the endpoint doesn't start with "inproc://" or has an empty name.
330fn validate_and_extract_name(endpoint: &str) -> io::Result<&str> {
331 const PREFIX: &str = "inproc://";
332
333 if !endpoint.starts_with(PREFIX) {
334 return Err(io::Error::new(
335 io::ErrorKind::InvalidInput,
336 format!("inproc endpoint must start with '{PREFIX}', got: '{endpoint}'"),
337 ));
338 }
339
340 let name = &endpoint[PREFIX.len()..];
341 if name.is_empty() {
342 return Err(io::Error::new(
343 io::ErrorKind::InvalidInput,
344 "inproc endpoint name cannot be empty",
345 ));
346 }
347
348 Ok(name)
349}
350
351#[cfg(test)]
352mod tests {
353 use super::*;
354
355 #[test]
356 fn test_validate_endpoint() {
357 assert!(validate_and_extract_name("inproc://test").is_ok());
358 assert_eq!(validate_and_extract_name("inproc://test").unwrap(), "test");
359
360 assert!(validate_and_extract_name("tcp://test").is_err());
361 assert!(validate_and_extract_name("inproc://").is_err());
362 assert!(validate_and_extract_name("").is_err());
363 }
364
365 #[test]
366 fn test_bind_duplicate() {
367 let endpoint = "inproc://test-duplicate";
368
369 // First bind should succeed
370 let _result1 = bind_inproc(endpoint);
371 assert!(_result1.is_ok());
372
373 // Second bind should fail
374 let result2 = bind_inproc(endpoint);
375 assert!(result2.is_err());
376 assert_eq!(result2.unwrap_err().kind(), io::ErrorKind::AddrInUse);
377
378 // Cleanup
379 let _ = unbind_inproc(endpoint);
380 }
381
382 #[test]
383 fn test_bind_and_connect() {
384 let endpoint = "inproc://test-connect";
385
386 // Bind
387 let (_tx, rx) = bind_inproc(endpoint).unwrap();
388
389 // Connect
390 let client = connect_inproc(endpoint).unwrap();
391
392 // Send message from client
393 let msg = vec![Bytes::from("Hello, inproc!")];
394 client.send(msg.clone()).unwrap();
395
396 // Receive on bound socket (non-blocking recv_timeout)
397 let received = rx
398 .recv_timeout(std::time::Duration::from_millis(100))
399 .unwrap();
400 assert_eq!(received, msg);
401
402 // Cleanup
403 unbind_inproc(endpoint).unwrap();
404 }
405
406 #[test]
407 fn test_list_endpoints() {
408 let ep1 = "inproc://test-list-1";
409 let ep2 = "inproc://test-list-2";
410
411 let _bind1 = bind_inproc(ep1).unwrap();
412 let _bind2 = bind_inproc(ep2).unwrap();
413
414 let endpoints = list_inproc_endpoints();
415 assert!(endpoints.contains(&"test-list-1".to_string()));
416 assert!(endpoints.contains(&"test-list-2".to_string()));
417
418 // Cleanup
419 unbind_inproc(ep1).unwrap();
420 unbind_inproc(ep2).unwrap();
421 }
422}