brainwires_proxy/
proxy.rs1use crate::config::ProxyConfig;
4use crate::convert::ConversionRegistry;
5use crate::error::{ProxyError, ProxyResult};
6use crate::inspector::{EventBroadcaster, EventStore};
7use crate::middleware::MiddlewareStack;
8use crate::transport::{InboundConnection, TransportConnector};
9use std::sync::Arc;
10use tokio::sync::{mpsc, watch};
11
12const CONNECTION_CHANNEL_CAPACITY: usize = 256;
13
14pub(crate) type ListenerFactory = Box<
16 dyn Fn(
17 mpsc::Sender<InboundConnection>,
18 watch::Receiver<bool>,
19 ) -> futures::future::BoxFuture<'static, ProxyResult<()>>
20 + Send
21 + Sync,
22>;
23
24pub struct ProxyService {
26 pub(crate) config: ProxyConfig,
27 pub(crate) middleware: MiddlewareStack,
28 pub(crate) connector: Box<dyn TransportConnector>,
29 pub(crate) listener_factory: ListenerFactory,
30 pub(crate) conversions: ConversionRegistry,
31 pub(crate) event_store: Arc<EventStore>,
32 pub(crate) event_broadcaster: Arc<EventBroadcaster>,
33}
34
35impl ProxyService {
36 pub async fn run(self) -> ProxyResult<()> {
38 let (shutdown_tx, shutdown_rx) = watch::channel(false);
39 let (conn_tx, mut conn_rx) =
40 mpsc::channel::<InboundConnection>(CONNECTION_CHANNEL_CAPACITY);
41
42 #[cfg(feature = "inspector-api")]
44 if let Some(api_addr) = self.config.inspector.api_addr {
45 let store = self.event_store.clone();
46 let broadcaster = self.event_broadcaster.clone();
47 let api_shutdown = shutdown_rx.clone();
48 tokio::spawn(async move {
49 if let Err(e) = crate::inspector::api::run_inspector_api(
50 api_addr,
51 store,
52 broadcaster,
53 api_shutdown,
54 )
55 .await
56 {
57 tracing::error!(error = %e, "Inspector API failed");
58 }
59 });
60 }
61
62 let listener_shutdown = shutdown_rx.clone();
64 let listener_fut = (self.listener_factory)(conn_tx, listener_shutdown);
65 let listener_handle = tokio::spawn(async move {
66 if let Err(e) = listener_fut.await {
67 tracing::error!(error = %e, "Listener failed");
68 }
69 });
70
71 let connector = Arc::new(self.connector);
73 let middleware = Arc::new(self.middleware);
74 let timeout = self.config.timeout;
75
76 tracing::info!("Proxy service started");
77
78 while let Some((request, resp_tx)) = conn_rx.recv().await {
79 let connector = connector.clone();
80 let middleware = middleware.clone();
81
82 tokio::spawn(async move {
83 let result = tokio::time::timeout(timeout, async {
84 let (request, depth) = match middleware.process_request(request).await {
86 Ok(Ok((req, depth))) => (req, depth),
87 Ok(Err(response)) => {
88 let _ = resp_tx.send(response);
90 return Ok(());
91 }
92 Err(e) => return Err(e),
93 };
94
95 let response = connector.forward(request).await?;
97
98 let response = middleware.process_response(response, depth).await?;
100
101 let _ = resp_tx.send(response);
102 Ok::<(), ProxyError>(())
103 })
104 .await;
105
106 match result {
107 Ok(Ok(())) => {}
108 Ok(Err(e)) => {
109 tracing::error!(error = %e, "Request processing failed");
110 }
111 Err(_) => {
112 tracing::warn!("Request timed out");
113 }
114 }
115 });
116 }
117
118 let _ = shutdown_tx.send(true);
120 listener_handle.await.ok();
121
122 tracing::info!("Proxy service stopped");
123 Ok(())
124 }
125
126 pub fn conversions(&self) -> &ConversionRegistry {
128 &self.conversions
129 }
130
131 pub fn event_store(&self) -> &Arc<EventStore> {
133 &self.event_store
134 }
135
136 pub fn event_broadcaster(&self) -> &Arc<EventBroadcaster> {
138 &self.event_broadcaster
139 }
140}