Skip to main content

oxide_mesh/
bus_bridge.rs

1//! # Bus Bridge — bus-over-TLS for cross-process kernels (R-21)
2//!
3//! [`BusBridge`] relays `oxide_k::bus::Envelope` messages between two kernel
4//! instances over a TLS-secured TCP connection using the JSON-line framing
5//! already established in [`crate::tcp`].
6//!
7//! ## Protocol
8//!
9//! Each side sends one JSON-serialized [`oxide_k::bus::Envelope`] per line
10//! (`\n` terminated). TLS provides confidentiality and peer authentication.
11//!
12//! ## Usage
13//!
14//! ```text
15//! // Kernel A (server)
16//! BusBridge::serve_tls(addr, bus_a, acceptor).await;
17//!
18//! // Kernel B (client)
19//! let bridge = BusBridge::connect_tls(addr, connector, server_name).await?;
20//! bridge.forward(&envelope).await?;
21//! ```
22//!
23//! Requires the `tls` Cargo feature on `oxide-mesh`.
24
25#[cfg(feature = "tls")]
26use std::net::SocketAddr;
27use std::sync::Arc;
28
29use oxide_k::bus::{Envelope, MessageBus};
30use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
31#[cfg(feature = "tls")]
32use tokio::net::{TcpListener, TcpStream};
33use tokio::sync::Mutex;
34
35use crate::error::{MeshError, Result};
36
37/// A handle to a TLS connection between two kernels' buses.
38///
39/// Obtain via [`BusBridge::connect_tls`]. Call [`BusBridge::forward`] to send
40/// an envelope to the remote kernel.
41#[derive(Clone)]
42pub struct BusBridge {
43    inner: Arc<Mutex<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>,
44}
45
46impl BusBridge {
47    /// Connect to a remote kernel's bus bridge over TLS.
48    ///
49    /// `server_name` must match the CN/SAN of the server certificate.
50    /// `connector` is built from a [`rustls::ClientConfig`] that trusts the
51    /// server's CA (see [`crate::tcp::tls_client_config`]).
52    #[cfg(feature = "tls")]
53    pub async fn connect_tls(
54        addr: SocketAddr,
55        connector: tokio_rustls::TlsConnector,
56        server_name: tokio_rustls::rustls::pki_types::ServerName<'static>,
57    ) -> Result<Self> {
58        let stream = TcpStream::connect(addr)
59            .await
60            .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
61        let tls = connector
62            .connect(server_name, stream)
63            .await
64            .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
65        Ok(Self {
66            inner: Arc::new(Mutex::new(
67                Box::new(tls) as Box<dyn tokio::io::AsyncWrite + Send + Unpin>
68            )),
69        })
70    }
71
72    /// Send an [`Envelope`] to the remote kernel.
73    ///
74    /// The envelope is serialized to a single JSON line and written over the
75    /// TLS stream.
76    pub async fn forward(&self, envelope: &Envelope) -> Result<()> {
77        let json =
78            serde_json::to_string(envelope).map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
79        let line = format!("{json}\n");
80        self.inner
81            .lock()
82            .await
83            .write_all(line.as_bytes())
84            .await
85            .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
86        Ok(())
87    }
88
89    /// Serve a TLS bus bridge: accept connections and relay each received
90    /// [`Envelope`] into `bus`.
91    ///
92    /// Each inbound JSON line is deserialized as an `Envelope` and published
93    /// on `bus`. Malformed lines are logged and skipped.
94    ///
95    /// This function runs indefinitely; cancel it with
96    /// `tokio::task::JoinHandle::abort()`.
97    #[cfg(feature = "tls")]
98    pub async fn serve_tls(
99        addr: SocketAddr,
100        bus: MessageBus,
101        acceptor: tokio_rustls::TlsAcceptor,
102    ) -> Result<()> {
103        let listener = TcpListener::bind(addr)
104            .await
105            .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
106        tracing::info!(%addr, "bus bridge listening (tls)");
107
108        loop {
109            let (stream, peer) = listener
110                .accept()
111                .await
112                .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
113            let acceptor = acceptor.clone();
114            let bus = bus.clone();
115
116            tokio::spawn(async move {
117                match acceptor.accept(stream).await {
118                    Ok(tls) => {
119                        if let Err(e) = relay_inbound(tls, bus).await {
120                            tracing::warn!(?peer, ?e, "bus bridge connection ended");
121                        }
122                    }
123                    Err(e) => tracing::warn!(?peer, ?e, "bus bridge tls handshake failed"),
124                }
125            });
126        }
127    }
128}
129
130/// Read JSON-line envelopes from `stream` and publish each onto `bus`.
131#[cfg(feature = "tls")]
132async fn relay_inbound<S>(stream: S, bus: MessageBus) -> Result<()>
133where
134    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
135{
136    let mut reader = BufReader::new(stream);
137    let mut line = String::new();
138    loop {
139        line.clear();
140        let n = reader
141            .read_line(&mut line)
142            .await
143            .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
144        if n == 0 {
145            break; // EOF — remote closed
146        }
147        let trimmed = line.trim();
148        if trimmed.is_empty() {
149            continue;
150        }
151        match serde_json::from_str::<Envelope>(trimmed) {
152            Ok(env) => {
153                if let Err(e) = bus.publish(env).await {
154                    tracing::warn!(?e, "bus bridge: failed to publish relay envelope");
155                }
156            }
157            Err(e) => tracing::warn!(?e, "bus bridge: discarding malformed envelope line"),
158        }
159    }
160    Ok(())
161}
162
163// ---------------------------------------------------------------------------
164// Tests
165// ---------------------------------------------------------------------------
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    #[cfg(feature = "tls")]
171    use oxide_k::bus::{Command, Message};
172    #[cfg(feature = "tls")]
173    use std::net::Ipv4Addr;
174
175    /// End-to-end: bridge two in-process buses over TLS using rcgen certs.
176    #[cfg(feature = "tls")]
177    #[tokio::test]
178    async fn bus_bridge_tls_relays_envelope() {
179        use crate::tcp::{tls_client_config, tls_server_config};
180        use rcgen::generate_simple_self_signed;
181        use std::sync::Arc;
182        use tokio_rustls::{TlsAcceptor, TlsConnector};
183
184        // Install ring provider (idempotent).
185        let _ = rustls::crypto::ring::default_provider().install_default();
186
187        // Generate self-signed cert.
188        let cert = generate_simple_self_signed(vec!["localhost".into()]).unwrap();
189        let cert_pem = cert.cert.pem();
190        let key_pem = cert.key_pair.serialize_pem();
191
192        let server_cfg = tls_server_config(cert_pem.as_bytes(), key_pem.as_bytes()).unwrap();
193        let acceptor = TlsAcceptor::from(Arc::new(server_cfg));
194
195        let client_cfg = tls_client_config(cert_pem.as_bytes()).unwrap();
196        let connector = TlsConnector::from(Arc::new(client_cfg));
197
198        // Server bus — receives relayed messages.
199        let server_bus = MessageBus::new();
200        let mut sub = server_bus.subscribe().await;
201
202        // Bind server.
203        let std_listener =
204            std::net::TcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap();
205        std_listener.set_nonblocking(true).unwrap();
206        let tls_listener = tokio::net::TcpListener::from_std(std_listener).unwrap();
207        let addr = tls_listener.local_addr().unwrap();
208
209        let bus_srv = server_bus.clone();
210        tokio::spawn(async move {
211            let (stream, peer) = tls_listener.accept().await.unwrap();
212            match acceptor.accept(stream).await {
213                Ok(tls) => {
214                    relay_inbound(tls, bus_srv).await.ok();
215                }
216                Err(e) => tracing::warn!(?peer, ?e, "test tls handshake failed"),
217            }
218        });
219
220        // Client connects and forwards an envelope.
221        let server_name =
222            tokio_rustls::rustls::pki_types::ServerName::try_from("localhost").unwrap();
223        let bridge = BusBridge::connect_tls(addr, connector, server_name)
224            .await
225            .unwrap();
226
227        let env = oxide_k::bus::Envelope::new("remote-kernel", Message::Command(Command::Ping));
228        bridge.forward(&env).await.unwrap();
229
230        // Server bus should receive the relayed Ping.
231        let received =
232            tokio::time::timeout(std::time::Duration::from_millis(500), sub.receiver.recv())
233                .await
234                .expect("timeout")
235                .expect("channel closed");
236
237        assert_eq!(received.source, "remote-kernel");
238        assert!(matches!(received.message, Message::Command(Command::Ping)));
239    }
240}