1#[cfg(feature = "tls")]
26use std::net::SocketAddr;
27use std::sync::Arc;
28
29use oxide_k::bus::{Envelope, MessageBus};
30use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
31#[cfg(feature = "tls")]
32use tokio::net::{TcpListener, TcpStream};
33use tokio::sync::Mutex;
34
35use crate::error::{MeshError, Result};
36
37#[derive(Clone)]
42pub struct BusBridge {
43 inner: Arc<Mutex<Box<dyn tokio::io::AsyncWrite + Send + Unpin>>>,
44}
45
46impl BusBridge {
47 #[cfg(feature = "tls")]
53 pub async fn connect_tls(
54 addr: SocketAddr,
55 connector: tokio_rustls::TlsConnector,
56 server_name: tokio_rustls::rustls::pki_types::ServerName<'static>,
57 ) -> Result<Self> {
58 let stream = TcpStream::connect(addr)
59 .await
60 .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
61 let tls = connector
62 .connect(server_name, stream)
63 .await
64 .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
65 Ok(Self {
66 inner: Arc::new(Mutex::new(
67 Box::new(tls) as Box<dyn tokio::io::AsyncWrite + Send + Unpin>
68 )),
69 })
70 }
71
72 pub async fn forward(&self, envelope: &Envelope) -> Result<()> {
77 let json =
78 serde_json::to_string(envelope).map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
79 let line = format!("{json}\n");
80 self.inner
81 .lock()
82 .await
83 .write_all(line.as_bytes())
84 .await
85 .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
86 Ok(())
87 }
88
89 #[cfg(feature = "tls")]
98 pub async fn serve_tls(
99 addr: SocketAddr,
100 bus: MessageBus,
101 acceptor: tokio_rustls::TlsAcceptor,
102 ) -> Result<()> {
103 let listener = TcpListener::bind(addr)
104 .await
105 .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
106 tracing::info!(%addr, "bus bridge listening (tls)");
107
108 loop {
109 let (stream, peer) = listener
110 .accept()
111 .await
112 .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
113 let acceptor = acceptor.clone();
114 let bus = bus.clone();
115
116 tokio::spawn(async move {
117 match acceptor.accept(stream).await {
118 Ok(tls) => {
119 if let Err(e) = relay_inbound(tls, bus).await {
120 tracing::warn!(?peer, ?e, "bus bridge connection ended");
121 }
122 }
123 Err(e) => tracing::warn!(?peer, ?e, "bus bridge tls handshake failed"),
124 }
125 });
126 }
127 }
128}
129
130#[cfg(feature = "tls")]
132async fn relay_inbound<S>(stream: S, bus: MessageBus) -> Result<()>
133where
134 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
135{
136 let mut reader = BufReader::new(stream);
137 let mut line = String::new();
138 loop {
139 line.clear();
140 let n = reader
141 .read_line(&mut line)
142 .await
143 .map_err(|e| MeshError::Other(anyhow::anyhow!(e)))?;
144 if n == 0 {
145 break; }
147 let trimmed = line.trim();
148 if trimmed.is_empty() {
149 continue;
150 }
151 match serde_json::from_str::<Envelope>(trimmed) {
152 Ok(env) => {
153 if let Err(e) = bus.publish(env).await {
154 tracing::warn!(?e, "bus bridge: failed to publish relay envelope");
155 }
156 }
157 Err(e) => tracing::warn!(?e, "bus bridge: discarding malformed envelope line"),
158 }
159 }
160 Ok(())
161}
162
163#[cfg(test)]
168mod tests {
169 use super::*;
170 #[cfg(feature = "tls")]
171 use oxide_k::bus::{Command, Message};
172 #[cfg(feature = "tls")]
173 use std::net::Ipv4Addr;
174
175 #[cfg(feature = "tls")]
177 #[tokio::test]
178 async fn bus_bridge_tls_relays_envelope() {
179 use crate::tcp::{tls_client_config, tls_server_config};
180 use rcgen::generate_simple_self_signed;
181 use std::sync::Arc;
182 use tokio_rustls::{TlsAcceptor, TlsConnector};
183
184 let _ = rustls::crypto::ring::default_provider().install_default();
186
187 let cert = generate_simple_self_signed(vec!["localhost".into()]).unwrap();
189 let cert_pem = cert.cert.pem();
190 let key_pem = cert.key_pair.serialize_pem();
191
192 let server_cfg = tls_server_config(cert_pem.as_bytes(), key_pem.as_bytes()).unwrap();
193 let acceptor = TlsAcceptor::from(Arc::new(server_cfg));
194
195 let client_cfg = tls_client_config(cert_pem.as_bytes()).unwrap();
196 let connector = TlsConnector::from(Arc::new(client_cfg));
197
198 let server_bus = MessageBus::new();
200 let mut sub = server_bus.subscribe().await;
201
202 let std_listener =
204 std::net::TcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, 0))).unwrap();
205 std_listener.set_nonblocking(true).unwrap();
206 let tls_listener = tokio::net::TcpListener::from_std(std_listener).unwrap();
207 let addr = tls_listener.local_addr().unwrap();
208
209 let bus_srv = server_bus.clone();
210 tokio::spawn(async move {
211 let (stream, peer) = tls_listener.accept().await.unwrap();
212 match acceptor.accept(stream).await {
213 Ok(tls) => {
214 relay_inbound(tls, bus_srv).await.ok();
215 }
216 Err(e) => tracing::warn!(?peer, ?e, "test tls handshake failed"),
217 }
218 });
219
220 let server_name =
222 tokio_rustls::rustls::pki_types::ServerName::try_from("localhost").unwrap();
223 let bridge = BusBridge::connect_tls(addr, connector, server_name)
224 .await
225 .unwrap();
226
227 let env = oxide_k::bus::Envelope::new("remote-kernel", Message::Command(Command::Ping));
228 bridge.forward(&env).await.unwrap();
229
230 let received =
232 tokio::time::timeout(std::time::Duration::from_millis(500), sub.receiver.recv())
233 .await
234 .expect("timeout")
235 .expect("channel closed");
236
237 assert_eq!(received.source, "remote-kernel");
238 assert!(matches!(received.message, Message::Command(Command::Ping)));
239 }
240}