cc_switch/daemon/aggregate/
mod.rs1pub mod routes;
2pub mod state;
3pub mod stream;
4
5use ccs_proxy::CaptureEvent;
6use state::{AggregateState, AliasMap, StoreEntry};
7use std::sync::Arc;
8use stream::TaggedCaptureEvent;
9use tokio::net::TcpListener;
10use tokio::sync::broadcast;
11use tokio::task::JoinHandle;
12
13pub type EventSenderEntry = (String, broadcast::Sender<CaptureEvent>);
14
15pub struct AggregateHandle {
16 pub port: u16,
17 shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,
18 join: Option<JoinHandle<()>>,
19}
20
21impl AggregateHandle {
22 pub async fn shutdown(mut self) {
23 if let Some(tx) = self.shutdown_tx.take() {
24 let _ = tx.send(());
25 }
26 if let Some(join) = self.join.take() {
27 let _ = join.await;
28 }
29 }
30}
31
32impl Drop for AggregateHandle {
33 fn drop(&mut self) {
34 if let Some(tx) = self.shutdown_tx.take() {
35 let _ = tx.send(());
36 }
37 }
38}
39
40pub async fn serve(
41 stores: Vec<StoreEntry>,
42 proxy_events: Vec<EventSenderEntry>,
43 alias_map: Arc<AliasMap>,
44 port: u16,
45) -> anyhow::Result<AggregateHandle> {
46 let listener = TcpListener::bind(("127.0.0.1", port)).await?;
47 let bound_port = listener.local_addr()?.port();
48
49 let (merged_tx, _) = broadcast::channel::<TaggedCaptureEvent>(2048);
50
51 let receivers: Vec<_> = proxy_events
52 .iter()
53 .map(|(upstream, sender)| (upstream.clone(), sender.subscribe()))
54 .collect();
55 if !receivers.is_empty() {
56 let merger_alias_map = alias_map.clone();
57 let merger_tx = merged_tx.clone();
58 tokio::spawn(stream::event_merger(receivers, merger_alias_map, merger_tx));
59 }
60
61 let agg_state = Arc::new(AggregateState {
62 stores,
63 merged_events: merged_tx,
64 alias_map,
65 started_at: chrono::Utc::now(),
66 });
67
68 let app = axum::Router::new()
69 .merge(routes::router())
70 .merge(ui_router())
71 .with_state(agg_state);
72
73 let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
74 let join = tokio::spawn(async move {
75 let server = axum::serve(listener, app);
76 tokio::select! {
77 res = server => {
78 if let Err(err) = res {
79 tracing::warn!(error = %err, "aggregate server exited");
80 }
81 }
82 _ = shutdown_rx => {}
83 }
84 });
85
86 tracing::info!(port = bound_port, "aggregate server started");
87
88 Ok(AggregateHandle {
89 port: bound_port,
90 shutdown_tx: Some(shutdown_tx),
91 join: Some(join),
92 })
93}
94
95use axum::Router;
96use axum::http::{StatusCode, header};
97use axum::response::{IntoResponse, Response};
98use axum::routing::get;
99use rust_embed::RustEmbed;
100
101#[derive(RustEmbed)]
102#[folder = "web-aggregate/"]
103struct AggWebAsset;
104
105fn ui_router() -> Router<Arc<AggregateState>> {
106 Router::new()
107 .route("/", get(|| async { serve_asset("index.html") }))
108 .route("/index.html", get(|| async { serve_asset("index.html") }))
109 .route("/app.js", get(|| async { serve_asset("app.js") }))
110 .route("/style.css", get(|| async { serve_asset("style.css") }))
111}
112
113fn serve_asset(name: &str) -> Response {
114 match AggWebAsset::get(name) {
115 Some(asset) => {
116 let mime = match std::path::Path::new(name)
117 .extension()
118 .and_then(|x| x.to_str())
119 {
120 Some("html") => "text/html; charset=utf-8",
121 Some("js") => "application/javascript; charset=utf-8",
122 Some("css") => "text/css; charset=utf-8",
123 _ => "application/octet-stream",
124 };
125 (
126 StatusCode::OK,
127 [(header::CONTENT_TYPE, mime)],
128 asset.data.into_owned(),
129 )
130 .into_response()
131 }
132 None => (StatusCode::NOT_FOUND, "not found").into_response(),
133 }
134}