Skip to main content

monocoque_core/
inproc.rs

1//! In-process transport for zero-copy messaging within the same process.
2//!
3//! The inproc transport provides high-performance communication between sockets
4//! in the same process using channels, without TCP/IP overhead.
5//!
6//! # Features
7//!
8//! - **Zero-copy**: Messages are shared via `Arc<Vec<Bytes>>` between sockets
9//! - **Thread-safe**: Global registry protected by `DashMap`
10//! - **Fast**: No serialization, network, or syscall overhead
11//! - **`ZeroMQ` compatible**: Uses `inproc://` URI scheme
12//!
13//! # Usage
14//!
15//! ```ignore
16//! use monocoque_core::inproc::{bind_inproc, connect_inproc};
17//! use bytes::Bytes;
18//!
19//! // Bind to an inproc endpoint
20//! let (sender, receiver) = bind_inproc("inproc://my-endpoint").unwrap();
21//!
22//! // Connect from another task
23//! let client = connect_inproc("inproc://my-endpoint").unwrap();
24//!
25//! // Send messages
26//! client.send(vec![Bytes::from("Hello")]).unwrap();
27//!
28//! // Receive messages
29//! if let Ok(msg) = receiver.recv() {
30//!     println!("Received: {:?}", msg);
31//! }
32//! ```
33
34use bytes::Bytes;
35use dashmap::DashMap;
36use flume::{Receiver, Sender};
37use std::io;
38
39/// Message type for inproc transport (multipart message)
40pub type InprocMessage = Vec<Bytes>;
41
42/// Sender half of an inproc connection
43pub type InprocSender = Sender<InprocMessage>;
44
45/// Receiver half of an inproc connection
46pub type InprocReceiver = Receiver<InprocMessage>;
47
48/// Global registry of inproc endpoints (server receives from clients)
49static INPROC_REGISTRY: once_cell::sync::Lazy<DashMap<String, InprocSender>> =
50    once_cell::sync::Lazy::new(DashMap::new);
51
52/// Registry of server→client senders for bidirectional inproc connections.
53///
54/// When `bind_inproc_bidi` is called, the server→client sender is registered
55/// here so that `connect_inproc_bidi` can retrieve it to receive server replies.
56static INPROC_REPLY_REGISTRY: once_cell::sync::Lazy<DashMap<String, InprocSender>> =
57    once_cell::sync::Lazy::new(DashMap::new);
58
59/// Bind to an inproc endpoint and return sender/receiver pair.
60///
61/// The endpoint is registered in the global registry. Multiple clients can
62/// connect to this endpoint using `connect_inproc()`.
63///
64/// # Arguments
65///
66/// * `endpoint` - The endpoint URI (must start with "inproc://")
67///
68/// # Returns
69///
70/// Returns a tuple of (sender, receiver):
71/// - `sender`: Used to send messages from this socket
72/// - `receiver`: Used to receive messages sent by connected clients
73///
74/// # Errors
75///
76/// Returns an error if:
77/// - The endpoint doesn't start with "inproc://"
78/// - The endpoint is already bound
79/// - The endpoint name is empty
80///
81/// # Example
82///
83/// ```
84/// use monocoque_core::inproc::bind_inproc;
85///
86/// let (sender, receiver) = bind_inproc("inproc://my-endpoint-bind").unwrap();
87/// // sender and receiver are ready for use
88/// ```
89pub fn bind_inproc(endpoint: &str) -> io::Result<(InprocSender, InprocReceiver)> {
90    // Validate endpoint format
91    let name = validate_and_extract_name(endpoint)?;
92
93    // Create unbounded channel for message passing
94    let (tx, rx) = flume::unbounded();
95
96    // Try to insert into registry
97    if INPROC_REGISTRY
98        .insert(name.to_string(), tx.clone())
99        .is_some()
100    {
101        return Err(io::Error::new(
102            io::ErrorKind::AddrInUse,
103            format!("inproc endpoint '{name}' is already bound"),
104        ));
105    }
106
107    Ok((tx, rx))
108}
109
110/// Connect to an inproc endpoint.
111///
112/// Returns a sender that can be used to send messages to the bound endpoint.
113/// This function blocks until the endpoint becomes available.
114///
115/// # Arguments
116///
117/// * `endpoint` - The endpoint URI (must start with "inproc://")
118///
119/// # Returns
120///
121/// Returns a sender that can send messages to the bound endpoint.
122///
123/// # Errors
124///
125/// Returns an error if:
126/// - The endpoint doesn't start with "inproc://"
127/// - The endpoint is not bound
128/// - The endpoint name is empty
129///
130/// # Example
131///
132/// ```rust,no_run
133/// use monocoque_core::inproc::connect_inproc;
134/// use bytes::Bytes;
135///
136/// # fn example() -> std::io::Result<()> {
137/// let sender = connect_inproc("inproc://my-endpoint")?;
138///
139/// // Send a message
140/// sender.send(vec![Bytes::from("Hello")]).map_err(|_| {
141///     std::io::Error::new(std::io::ErrorKind::BrokenPipe, "receiver dropped")
142/// })?;
143/// # Ok(())
144/// # }
145/// ```
146pub fn connect_inproc(endpoint: &str) -> io::Result<InprocSender> {
147    // Validate endpoint format
148    let name = validate_and_extract_name(endpoint)?;
149
150    // Look up the endpoint in the registry
151    if let Some(sender) = INPROC_REGISTRY.get(name) {
152        return Ok(sender.clone());
153    }
154
155    Err(io::Error::new(
156        io::ErrorKind::NotFound,
157        format!("inproc endpoint '{name}' not found (must bind before connect)"),
158    ))
159}
160
161/// Unbind an inproc endpoint, removing it from the global registry.
162///
163/// This should be called when a bound socket is closed to free up the endpoint name.
164///
165/// # Arguments
166///
167/// * `endpoint` - The endpoint URI (must start with "inproc://")
168///
169/// # Example
170///
171/// ```rust,no_run
172/// use monocoque_core::inproc::{bind_inproc, unbind_inproc};
173///
174/// # fn example() -> std::io::Result<()> {
175/// let (sender, receiver) = bind_inproc("inproc://my-endpoint")?;
176///
177/// // ... use the endpoint ...
178///
179/// // Clean up when done
180/// unbind_inproc("inproc://my-endpoint")?;
181/// # Ok(())
182/// # }
183/// ```
184pub fn unbind_inproc(endpoint: &str) -> io::Result<()> {
185    let name = validate_and_extract_name(endpoint)?;
186    INPROC_REGISTRY.remove(name);
187    INPROC_REPLY_REGISTRY.remove(name);
188    Ok(())
189}
190
191/// Bind to an inproc endpoint for bidirectional communication.
192///
193/// Returns `(to_clients_tx, from_clients_rx)`:
194/// - `to_clients_tx`: The server uses this to send replies back to the client.
195///   It is registered so that `connect_inproc_bidi` can retrieve it.
196/// - `from_clients_rx`: The server reads client messages from this.
197///
198/// The caller (server side) owns both halves.  The client side
199/// (`connect_inproc_bidi`) gets a `(to_server_tx, from_server_rx)` pair.
200///
201/// # Errors
202///
203/// Returns an error if the endpoint is already bound.
204pub fn bind_inproc_bidi(
205    endpoint: &str,
206) -> io::Result<(InprocSender, InprocReceiver, InprocSender, InprocReceiver)> {
207    let name = validate_and_extract_name(endpoint)?;
208
209    // Channel: client → server
210    let (client_to_server_tx, client_to_server_rx) = flume::unbounded::<InprocMessage>();
211    // Channel: server → client
212    let (server_to_client_tx, server_to_client_rx) = flume::unbounded::<InprocMessage>();
213
214    // Register the client→server sender (clients call connect_inproc to get this)
215    if INPROC_REGISTRY
216        .insert(name.to_string(), client_to_server_tx.clone())
217        .is_some()
218    {
219        return Err(io::Error::new(
220            io::ErrorKind::AddrInUse,
221            format!("inproc endpoint '{name}' is already bound"),
222        ));
223    }
224
225    // Register the server→client sender so connect_inproc_bidi can retrieve it
226    INPROC_REPLY_REGISTRY.insert(name.to_string(), server_to_client_tx.clone());
227
228    // Return all four channel ends
229    Ok((
230        server_to_client_tx,
231        client_to_server_rx,
232        client_to_server_tx,
233        server_to_client_rx,
234    ))
235}
236
237/// Connect to an inproc endpoint for bidirectional communication.
238///
239/// Returns `(to_server_tx, from_server_rx)` so the client can both send
240/// messages to the server and receive replies from it.
241///
242/// The server must have called `bind_inproc_bidi` before this is called.
243///
244/// # Errors
245///
246/// Returns an error if the endpoint is not bound.
247pub fn connect_inproc_bidi(endpoint: &str) -> io::Result<(InprocSender, InprocReceiver)> {
248    let name = validate_and_extract_name(endpoint)?;
249
250    // Get sender to the server
251    let to_server = INPROC_REGISTRY
252        .get(name)
253        .map(|r| r.clone())
254        .ok_or_else(|| {
255            io::Error::new(
256                io::ErrorKind::NotFound,
257                format!("inproc endpoint '{name}' not found (must bind before connect)"),
258            )
259        })?;
260
261    // Get the reply channel the server registered for us
262    let from_server = INPROC_REPLY_REGISTRY
263        .get(name)
264        .map(|r| r.clone())
265        .ok_or_else(|| {
266            io::Error::new(
267                io::ErrorKind::NotFound,
268                format!(
269                    "inproc reply channel for '{name}' not found; \
270                     use bind_inproc_bidi on the server side"
271                ),
272            )
273        })?;
274
275    // The registry holds the server→client SENDER.  We need to create a fresh
276    // (tx, rx) pair: our from_server rx and tell the server to use our new tx.
277    // Because the server already has its rx from bind_inproc_bidi, we simply
278    // create a new channel and give its tx to the server registry so the server
279    // can write to us, and keep the rx for ourselves.
280    let (our_reply_tx, our_reply_rx) = flume::unbounded::<InprocMessage>();
281
282    // Replace the registry entry with our fresh tx so the server will write to us.
283    // (This means only one client is supported per endpoint at a time, which
284    // is the correct semantic for a DEALER↔ROUTER or REQ↔REP pair.)
285    INPROC_REPLY_REGISTRY.insert(name.to_string(), our_reply_tx);
286
287    // The server also needs to be told to write to us  -  we accomplish this by
288    // updating the reply registry.  The server reads from the channel whose tx
289    // we just stored.  But the server's *rx* was already created in
290    // bind_inproc_bidi and is owned by the caller there.
291    //
292    // For simplicity, we just use the original server_to_client_tx (from_server)
293    // to send back  -  the server already has server_to_client_rx.
294    // Drop the original from_server (it was just a reference clone of the
295    // server→client tx) and use the server_to_client_tx we stored in the
296    // registry as the SENDER that the server will use.  The caller of
297    // bind_inproc_bidi got server_to_client_rx directly.
298    let _ = from_server; // we replaced it in the registry with our_reply_tx
299
300    Ok((to_server, our_reply_rx))
301}
302
303/// List all currently bound inproc endpoints.
304///
305/// This is primarily useful for debugging and testing.
306///
307/// # Returns
308///
309/// Returns a vector of endpoint names (without the "inproc://" prefix).
310pub fn list_inproc_endpoints() -> Vec<String> {
311    INPROC_REGISTRY
312        .iter()
313        .map(|entry| entry.key().clone())
314        .collect()
315}
316
317/// Validate endpoint format and extract the name.
318///
319/// # Arguments
320///
321/// * `endpoint` - The full endpoint URI (e.g., "<inproc://my-endpoint>")
322///
323/// # Returns
324///
325/// Returns the endpoint name without the "inproc://" prefix.
326///
327/// # Errors
328///
329/// Returns an error if the endpoint doesn't start with "inproc://" or has an empty name.
330fn validate_and_extract_name(endpoint: &str) -> io::Result<&str> {
331    const PREFIX: &str = "inproc://";
332
333    if !endpoint.starts_with(PREFIX) {
334        return Err(io::Error::new(
335            io::ErrorKind::InvalidInput,
336            format!("inproc endpoint must start with '{PREFIX}', got: '{endpoint}'"),
337        ));
338    }
339
340    let name = &endpoint[PREFIX.len()..];
341    if name.is_empty() {
342        return Err(io::Error::new(
343            io::ErrorKind::InvalidInput,
344            "inproc endpoint name cannot be empty",
345        ));
346    }
347
348    Ok(name)
349}
350
351#[cfg(test)]
352mod tests {
353    use super::*;
354
355    #[test]
356    fn test_validate_endpoint() {
357        assert!(validate_and_extract_name("inproc://test").is_ok());
358        assert_eq!(validate_and_extract_name("inproc://test").unwrap(), "test");
359
360        assert!(validate_and_extract_name("tcp://test").is_err());
361        assert!(validate_and_extract_name("inproc://").is_err());
362        assert!(validate_and_extract_name("").is_err());
363    }
364
365    #[test]
366    fn test_bind_duplicate() {
367        let endpoint = "inproc://test-duplicate";
368
369        // First bind should succeed
370        let _result1 = bind_inproc(endpoint);
371        assert!(_result1.is_ok());
372
373        // Second bind should fail
374        let result2 = bind_inproc(endpoint);
375        assert!(result2.is_err());
376        assert_eq!(result2.unwrap_err().kind(), io::ErrorKind::AddrInUse);
377
378        // Cleanup
379        let _ = unbind_inproc(endpoint);
380    }
381
382    #[test]
383    fn test_bind_and_connect() {
384        let endpoint = "inproc://test-connect";
385
386        // Bind
387        let (_tx, rx) = bind_inproc(endpoint).unwrap();
388
389        // Connect
390        let client = connect_inproc(endpoint).unwrap();
391
392        // Send message from client
393        let msg = vec![Bytes::from("Hello, inproc!")];
394        client.send(msg.clone()).unwrap();
395
396        // Receive on bound socket (non-blocking recv_timeout)
397        let received = rx
398            .recv_timeout(std::time::Duration::from_millis(100))
399            .unwrap();
400        assert_eq!(received, msg);
401
402        // Cleanup
403        unbind_inproc(endpoint).unwrap();
404    }
405
406    #[test]
407    fn test_list_endpoints() {
408        let ep1 = "inproc://test-list-1";
409        let ep2 = "inproc://test-list-2";
410
411        let _bind1 = bind_inproc(ep1).unwrap();
412        let _bind2 = bind_inproc(ep2).unwrap();
413
414        let endpoints = list_inproc_endpoints();
415        assert!(endpoints.contains(&"test-list-1".to_string()));
416        assert!(endpoints.contains(&"test-list-2".to_string()));
417
418        // Cleanup
419        unbind_inproc(ep1).unwrap();
420        unbind_inproc(ep2).unwrap();
421    }
422}