Skip to main content

brainwires_proxy/
proxy.rs

1//! ProxyService — the assembled proxy with its run loop.
2
3use crate::config::ProxyConfig;
4use crate::convert::ConversionRegistry;
5use crate::error::{ProxyError, ProxyResult};
6use crate::inspector::{EventBroadcaster, EventStore};
7use crate::middleware::MiddlewareStack;
8use crate::transport::{InboundConnection, TransportConnector};
9use std::sync::Arc;
10use tokio::sync::{mpsc, watch};
11
12const CONNECTION_CHANNEL_CAPACITY: usize = 256;
13
14/// A boxed listener factory that produces a future accepting inbound connections.
15pub(crate) type ListenerFactory = Box<
16    dyn Fn(
17            mpsc::Sender<InboundConnection>,
18            watch::Receiver<bool>,
19        ) -> futures::future::BoxFuture<'static, ProxyResult<()>>
20        + Send
21        + Sync,
22>;
23
24/// The assembled proxy service. Call [`run()`](ProxyService::run) to start.
25pub struct ProxyService {
26    pub(crate) config: ProxyConfig,
27    pub(crate) middleware: MiddlewareStack,
28    pub(crate) connector: Box<dyn TransportConnector>,
29    pub(crate) listener_factory: ListenerFactory,
30    pub(crate) conversions: ConversionRegistry,
31    pub(crate) event_store: Arc<EventStore>,
32    pub(crate) event_broadcaster: Arc<EventBroadcaster>,
33}
34
35impl ProxyService {
36    /// Run the proxy until shutdown.
37    pub async fn run(self) -> ProxyResult<()> {
38        let (shutdown_tx, shutdown_rx) = watch::channel(false);
39        let (conn_tx, mut conn_rx) =
40            mpsc::channel::<InboundConnection>(CONNECTION_CHANNEL_CAPACITY);
41
42        // Spawn the inspector API if configured
43        #[cfg(feature = "inspector-api")]
44        if let Some(api_addr) = self.config.inspector.api_addr {
45            let store = self.event_store.clone();
46            let broadcaster = self.event_broadcaster.clone();
47            let api_shutdown = shutdown_rx.clone();
48            tokio::spawn(async move {
49                if let Err(e) = crate::inspector::api::run_inspector_api(
50                    api_addr,
51                    store,
52                    broadcaster,
53                    api_shutdown,
54                )
55                .await
56                {
57                    tracing::error!(error = %e, "Inspector API failed");
58                }
59            });
60        }
61
62        // Spawn the listener
63        let listener_shutdown = shutdown_rx.clone();
64        let listener_fut = (self.listener_factory)(conn_tx, listener_shutdown);
65        let listener_handle = tokio::spawn(async move {
66            if let Err(e) = listener_fut.await {
67                tracing::error!(error = %e, "Listener failed");
68            }
69        });
70
71        // Process connections
72        let connector = Arc::new(self.connector);
73        let middleware = Arc::new(self.middleware);
74        let timeout = self.config.timeout;
75
76        tracing::info!("Proxy service started");
77
78        while let Some((request, resp_tx)) = conn_rx.recv().await {
79            let connector = connector.clone();
80            let middleware = middleware.clone();
81
82            tokio::spawn(async move {
83                let result = tokio::time::timeout(timeout, async {
84                    // Run request through middleware
85                    let (request, depth) = match middleware.process_request(request).await {
86                        Ok(Ok((req, depth))) => (req, depth),
87                        Ok(Err(response)) => {
88                            // Middleware short-circuited
89                            let _ = resp_tx.send(response);
90                            return Ok(());
91                        }
92                        Err(e) => return Err(e),
93                    };
94
95                    // Forward to upstream
96                    let response = connector.forward(request).await?;
97
98                    // Run response through middleware (reverse)
99                    let response = middleware.process_response(response, depth).await?;
100
101                    let _ = resp_tx.send(response);
102                    Ok::<(), ProxyError>(())
103                })
104                .await;
105
106                match result {
107                    Ok(Ok(())) => {}
108                    Ok(Err(e)) => {
109                        tracing::error!(error = %e, "Request processing failed");
110                    }
111                    Err(_) => {
112                        tracing::warn!("Request timed out");
113                    }
114                }
115            });
116        }
117
118        // Shutdown
119        let _ = shutdown_tx.send(true);
120        listener_handle.await.ok();
121
122        tracing::info!("Proxy service stopped");
123        Ok(())
124    }
125
126    /// Access the conversion registry.
127    pub fn conversions(&self) -> &ConversionRegistry {
128        &self.conversions
129    }
130
131    /// Access the event store for querying captured traffic.
132    pub fn event_store(&self) -> &Arc<EventStore> {
133        &self.event_store
134    }
135
136    /// Access the broadcaster for subscribing to live events.
137    pub fn event_broadcaster(&self) -> &Arc<EventBroadcaster> {
138        &self.event_broadcaster
139    }
140}