Skip to main content

cc_switch/daemon/aggregate/
mod.rs

1pub mod routes;
2pub mod state;
3pub mod stream;
4
5use ccs_proxy::CaptureEvent;
6use state::{AggregateState, AliasMap, StoreEntry};
7use std::sync::Arc;
8use stream::TaggedCaptureEvent;
9use tokio::net::TcpListener;
10use tokio::sync::broadcast;
11use tokio::task::JoinHandle;
12
13pub type EventSenderEntry = (String, broadcast::Sender<CaptureEvent>);
14
15pub struct AggregateHandle {
16    pub port: u16,
17    shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
18    join: Option<JoinHandle<()>>,
19}
20
21impl AggregateHandle {
22    pub async fn shutdown(mut self) {
23        if let Some(tx) = self.shutdown_tx.take() {
24            let _ = tx.send(());
25        }
26        if let Some(join) = self.join.take() {
27            let _ = join.await;
28        }
29    }
30}
31
32impl Drop for AggregateHandle {
33    fn drop(&mut self) {
34        if let Some(tx) = self.shutdown_tx.take() {
35            let _ = tx.send(());
36        }
37    }
38}
39
40pub async fn serve(
41    stores: Vec<StoreEntry>,
42    proxy_events: Vec<EventSenderEntry>,
43    alias_map: Arc<AliasMap>,
44    port: u16,
45) -> anyhow::Result<AggregateHandle> {
46    let listener = TcpListener::bind(("127.0.0.1", port)).await?;
47    let bound_port = listener.local_addr()?.port();
48
49    let (merged_tx, _) = broadcast::channel::<TaggedCaptureEvent>(2048);
50
51    let receivers: Vec<_> = proxy_events
52        .iter()
53        .map(|(upstream, sender)| (upstream.clone(), sender.subscribe()))
54        .collect();
55    if !receivers.is_empty() {
56        let merger_alias_map = alias_map.clone();
57        let merger_tx = merged_tx.clone();
58        tokio::spawn(stream::event_merger(receivers, merger_alias_map, merger_tx));
59    }
60
61    let agg_state = Arc::new(AggregateState {
62        stores,
63        merged_events: merged_tx,
64        alias_map,
65        started_at: chrono::Utc::now(),
66    });
67
68    let app = axum::Router::new()
69        .merge(routes::router())
70        .merge(ui_router())
71        .with_state(agg_state);
72
73    let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
74    let join = tokio::spawn(async move {
75        let server = axum::serve(listener, app);
76        tokio::select! {
77            res = server => {
78                if let Err(err) = res {
79                    tracing::warn!(error = %err, "aggregate server exited");
80                }
81            }
82            _ = shutdown_rx => {}
83        }
84    });
85
86    tracing::info!(port = bound_port, "aggregate server started");
87
88    Ok(AggregateHandle {
89        port: bound_port,
90        shutdown_tx: Some(shutdown_tx),
91        join: Some(join),
92    })
93}
94
95use axum::Router;
96use axum::http::{StatusCode, header};
97use axum::response::{IntoResponse, Response};
98use axum::routing::get;
99use rust_embed::RustEmbed;
100
101#[derive(RustEmbed)]
102#[folder = "web-aggregate/"]
103struct AggWebAsset;
104
105fn ui_router() -> Router<Arc<AggregateState>> {
106    Router::new()
107        .route("/", get(|| async { serve_asset("index.html") }))
108        .route("/index.html", get(|| async { serve_asset("index.html") }))
109        .route("/app.js", get(|| async { serve_asset("app.js") }))
110        .route("/style.css", get(|| async { serve_asset("style.css") }))
111}
112
113fn serve_asset(name: &str) -> Response {
114    match AggWebAsset::get(name) {
115        Some(asset) => {
116            let mime = match std::path::Path::new(name)
117                .extension()
118                .and_then(|x| x.to_str())
119            {
120                Some("html") => "text/html; charset=utf-8",
121                Some("js") => "application/javascript; charset=utf-8",
122                Some("css") => "text/css; charset=utf-8",
123                _ => "application/octet-stream",
124            };
125            (
126                StatusCode::OK,
127                [(header::CONTENT_TYPE, mime)],
128                asset.data.into_owned(),
129            )
130                .into_response()
131        }
132        None => (StatusCode::NOT_FOUND, "not found").into_response(),
133    }
134}