use std::net::SocketAddr;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use cellos_core::CloudEventV1;
#[cfg(target_os = "linux")]
use super::SniProxyConfig;
use super::{L7DecisionEmitter, ProxyStats};
pub struct EventSinkEmitter {
pub runtime_handle: tokio::runtime::Handle,
pub sink: Arc<dyn cellos_core::ports::EventSink>,
pub jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
}
impl EventSinkEmitter {
pub fn capture_current(
sink: Arc<dyn cellos_core::ports::EventSink>,
jsonl_sink: Option<Arc<dyn cellos_core::ports::EventSink>>,
) -> Self {
let handle = tokio::runtime::Handle::try_current()
.expect("EventSinkEmitter::capture_current called outside a tokio runtime context");
Self {
runtime_handle: handle,
sink,
jsonl_sink,
}
}
}
impl L7DecisionEmitter for EventSinkEmitter {
fn emit(&self, event: CloudEventV1) {
let sink = self.sink.clone();
let jsonl = self.jsonl_sink.clone();
let event_for_jsonl = event.clone();
self.runtime_handle.spawn(async move {
if let Err(e) = sink.emit(&event).await {
tracing::warn!(
target: "cellos.supervisor.sni_proxy",
error = %e,
"primary sink emit failed for l7_egress_decision event"
);
}
});
if let Some(j) = jsonl {
self.runtime_handle.spawn(async move {
if let Err(e) = j.emit(&event_for_jsonl).await {
tracing::warn!(
target: "cellos.supervisor.sni_proxy",
error = %e,
"jsonl sink emit failed for l7_egress_decision event"
);
}
});
}
}
}
pub struct SniProxyHandle {
pub shutdown: Arc<AtomicBool>,
pub listen_addr: SocketAddr,
#[cfg(target_os = "linux")]
pub thread: Option<std::thread::JoinHandle<ProxyStats>>,
}
impl SniProxyHandle {
#[cfg(target_os = "linux")]
pub fn join(&mut self) -> Option<ProxyStats> {
let handle = self.thread.take()?;
match handle.join() {
Ok(stats) => Some(stats),
Err(_panic) => {
tracing::warn!(
target: "cellos.supervisor.sni_proxy",
"SNI proxy thread panicked on join"
);
None
}
}
}
#[cfg(not(target_os = "linux"))]
pub fn join(&mut self) -> Option<ProxyStats> {
None
}
}
pub fn signal_sni_proxy_shutdown(listen_addr: SocketAddr) {
match std::net::TcpStream::connect_timeout(&listen_addr, Duration::from_millis(500)) {
Ok(stream) => {
drop(stream);
}
Err(e) => {
tracing::debug!(
target: "cellos.supervisor.sni_proxy",
error = %e,
addr = %listen_addr,
"wake-up TCP connection failed; falling back to natural exit"
);
}
}
}
#[cfg(target_os = "linux")]
pub fn spawn_sni_proxy_in_netns(
child_pid: u32,
cfg: SniProxyConfig,
emitter: Arc<dyn L7DecisionEmitter>,
shutdown: Arc<AtomicBool>,
) -> std::io::Result<SniProxyHandle> {
use std::fs::File;
use std::os::unix::io::AsRawFd;
let netns_path = format!("/proc/{child_pid}/ns/net");
let netns_file = File::open(&netns_path)
.map_err(|e| std::io::Error::new(e.kind(), format!("open netns at {netns_path}: {e}")))?;
let bind_addr = cfg.bind_addr;
let shutdown_for_thread = shutdown.clone();
let (ready_tx, ready_rx) = std::sync::mpsc::channel::<std::io::Result<SocketAddr>>();
let thread = std::thread::Builder::new()
.name(format!("cellos-sni-proxy-{child_pid}"))
.spawn(move || {
let setns_rc = unsafe { libc::setns(netns_file.as_raw_fd(), libc::CLONE_NEWNET) };
if setns_rc != 0 {
let err = std::io::Error::last_os_error();
let _ = ready_tx.send(Err(std::io::Error::new(
err.kind(),
format!("setns(CLONE_NEWNET) for pid={child_pid}: {err}"),
)));
return ProxyStats::default();
}
let runtime = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(e) => {
let _ = ready_tx.send(Err(e));
return ProxyStats::default();
}
};
runtime.block_on(async move {
let listener = match tokio::net::TcpListener::bind(bind_addr).await {
Ok(l) => l,
Err(e) => {
let _ = ready_tx.send(Err(std::io::Error::new(
e.kind(),
format!("bind sni-proxy listener at {bind_addr} in cell netns: {e}"),
)));
return ProxyStats::default();
}
};
let actual_listen = match listener.local_addr() {
Ok(a) => a,
Err(e) => {
let _ = ready_tx.send(Err(e));
return ProxyStats::default();
}
};
if ready_tx.send(Ok(actual_listen)).is_err() {
return ProxyStats::default();
}
match super::run_one_shot(&cfg, listener, emitter, shutdown_for_thread).await {
Ok(stats) => stats,
Err(e) => {
tracing::warn!(
target: "cellos.supervisor.sni_proxy",
error = %e,
"sni-proxy run_one_shot returned with I/O error"
);
ProxyStats::default()
}
}
})
})?;
let bound_addr = ready_rx
.recv_timeout(Duration::from_secs(2))
.map_err(|e| std::io::Error::other(format!("sni-proxy ready timeout: {e}")))??;
Ok(SniProxyHandle {
shutdown,
listen_addr: bound_addr,
thread: Some(thread),
})
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::Ordering;
#[test]
fn signal_shutdown_does_not_panic_on_unbound_addr() {
let addr: SocketAddr = "127.0.0.1:1".parse().unwrap();
signal_sni_proxy_shutdown(addr);
}
#[test]
fn handle_join_returns_none_when_no_thread() {
let mut h = SniProxyHandle {
shutdown: Arc::new(AtomicBool::new(false)),
listen_addr: "127.0.0.1:0".parse().unwrap(),
#[cfg(target_os = "linux")]
thread: None,
};
assert!(h.join().is_none());
h.shutdown.store(true, Ordering::SeqCst);
assert!(h.shutdown.load(Ordering::SeqCst));
}
}