allsource_core/infrastructure/web/
api_v1.rs1use crate::application::services::tenant_service::TenantManager;
2use crate::infrastructure::security::auth::AuthManager;
4use crate::infrastructure::security::middleware::{
5 auth_middleware, rate_limit_middleware, AuthState, RateLimitState,
6};
7use crate::infrastructure::security::rate_limit::RateLimiter;
8use crate::infrastructure::web::auth_api::*;
9use crate::infrastructure::web::tenant_api::*;
10use crate::store::EventStore;
11use axum::{
12 middleware,
13 routing::{delete, get, post, put},
14 Router,
15};
16use std::sync::Arc;
17use tower_http::cors::{Any, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20#[derive(Clone)]
22pub struct AppState {
23 pub store: Arc<EventStore>,
24 pub auth_manager: Arc<AuthManager>,
25 pub tenant_manager: Arc<TenantManager>,
26}
27
28impl axum::extract::FromRef<AppState> for Arc<EventStore> {
31 fn from_ref(state: &AppState) -> Self {
32 state.store.clone()
33 }
34}
35
36pub async fn serve_v1(
37 store: Arc<EventStore>,
38 auth_manager: Arc<AuthManager>,
39 tenant_manager: Arc<TenantManager>,
40 rate_limiter: Arc<RateLimiter>,
41 addr: &str,
42) -> anyhow::Result<()> {
43 let app_state = AppState {
44 store,
45 auth_manager: auth_manager.clone(),
46 tenant_manager,
47 };
48
49 let auth_state = AuthState {
50 auth_manager: auth_manager.clone(),
51 };
52
53 let rate_limit_state = RateLimitState { rate_limiter };
54
55 let app = Router::new()
56 .route("/health", get(super::api::health))
58 .route("/metrics", get(super::api::prometheus_metrics))
59 .route("/api/v1/auth/register", post(register_handler))
61 .route("/api/v1/auth/login", post(login_handler))
62 .route("/api/v1/auth/me", get(me_handler))
63 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
64 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
65 .route("/api/v1/auth/api-keys/:id", delete(revoke_api_key_handler))
66 .route("/api/v1/auth/users", get(list_users_handler))
67 .route("/api/v1/auth/users/:id", delete(delete_user_handler))
68 .route("/api/v1/tenants", post(create_tenant_handler))
70 .route("/api/v1/tenants", get(list_tenants_handler))
71 .route("/api/v1/tenants/:id", get(get_tenant_handler))
72 .route("/api/v1/tenants/:id/stats", get(get_tenant_stats_handler))
73 .route("/api/v1/tenants/:id/quotas", put(update_quotas_handler))
74 .route(
75 "/api/v1/tenants/:id/deactivate",
76 post(deactivate_tenant_handler),
77 )
78 .route(
79 "/api/v1/tenants/:id/activate",
80 post(activate_tenant_handler),
81 )
82 .route("/api/v1/tenants/:id", delete(delete_tenant_handler))
83 .route("/api/v1/events", post(super::api::ingest_event))
85 .route("/api/v1/events/query", get(super::api::query_events))
86 .route("/api/v1/events/stream", get(super::api::events_websocket))
87 .route(
88 "/api/v1/entities/:entity_id/state",
89 get(super::api::get_entity_state),
90 )
91 .route(
92 "/api/v1/entities/:entity_id/snapshot",
93 get(super::api::get_entity_snapshot),
94 )
95 .route("/api/v1/stats", get(super::api::get_stats))
96 .route(
98 "/api/v1/analytics/frequency",
99 get(super::api::analytics_frequency),
100 )
101 .route(
102 "/api/v1/analytics/summary",
103 get(super::api::analytics_summary),
104 )
105 .route(
106 "/api/v1/analytics/correlation",
107 get(super::api::analytics_correlation),
108 )
109 .route("/api/v1/snapshots", post(super::api::create_snapshot))
111 .route("/api/v1/snapshots", get(super::api::list_snapshots))
112 .route(
113 "/api/v1/snapshots/:entity_id/latest",
114 get(super::api::get_latest_snapshot),
115 )
116 .route(
118 "/api/v1/compaction/trigger",
119 post(super::api::trigger_compaction),
120 )
121 .route(
122 "/api/v1/compaction/stats",
123 get(super::api::compaction_stats),
124 )
125 .route("/api/v1/schemas", post(super::api::register_schema))
127 .route("/api/v1/schemas", get(super::api::list_subjects))
128 .route("/api/v1/schemas/:subject", get(super::api::get_schema))
129 .route(
130 "/api/v1/schemas/:subject/versions",
131 get(super::api::list_schema_versions),
132 )
133 .route(
134 "/api/v1/schemas/validate",
135 post(super::api::validate_event_schema),
136 )
137 .route(
138 "/api/v1/schemas/:subject/compatibility",
139 put(super::api::set_compatibility_mode),
140 )
141 .route("/api/v1/replay", post(super::api::start_replay))
143 .route("/api/v1/replay", get(super::api::list_replays))
144 .route(
145 "/api/v1/replay/:replay_id",
146 get(super::api::get_replay_progress),
147 )
148 .route(
149 "/api/v1/replay/:replay_id/cancel",
150 post(super::api::cancel_replay),
151 )
152 .route(
153 "/api/v1/replay/:replay_id",
154 delete(super::api::delete_replay),
155 )
156 .route("/api/v1/pipelines", post(super::api::register_pipeline))
158 .route("/api/v1/pipelines", get(super::api::list_pipelines))
159 .route(
160 "/api/v1/pipelines/stats",
161 get(super::api::all_pipeline_stats),
162 )
163 .route(
164 "/api/v1/pipelines/:pipeline_id",
165 get(super::api::get_pipeline),
166 )
167 .route(
168 "/api/v1/pipelines/:pipeline_id",
169 delete(super::api::remove_pipeline),
170 )
171 .route(
172 "/api/v1/pipelines/:pipeline_id/stats",
173 get(super::api::get_pipeline_stats),
174 )
175 .route(
176 "/api/v1/pipelines/:pipeline_id/reset",
177 put(super::api::reset_pipeline),
178 )
179 .route("/api/v1/projections", get(super::api::list_projections))
181 .route("/api/v1/projections/:name", get(super::api::get_projection))
182 .route(
183 "/api/v1/projections/:name/:entity_id/state",
184 get(super::api::get_projection_state),
185 )
186 .route(
187 "/api/v1/projections/:name/:entity_id/state",
188 put(super::api::save_projection_state),
189 )
190 .route(
191 "/api/v1/projections/:name/bulk",
192 post(super::api::bulk_get_projection_states),
193 )
194 .with_state(app_state)
195 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
196 .layer(middleware::from_fn_with_state(
197 rate_limit_state,
198 rate_limit_middleware,
199 ))
200 .layer(
201 CorsLayer::new()
202 .allow_origin(Any)
203 .allow_methods(Any)
204 .allow_headers(Any),
205 )
206 .layer(TraceLayer::new_for_http());
207
208 let listener = tokio::net::TcpListener::bind(addr).await?;
209
210 axum::serve(listener, app)
212 .with_graceful_shutdown(shutdown_signal())
213 .await?;
214
215 tracing::info!("🛑 AllSource Core shutdown complete");
216 Ok(())
217}
218
219async fn shutdown_signal() {
221 let ctrl_c = async {
222 tokio::signal::ctrl_c()
223 .await
224 .expect("failed to install Ctrl+C handler");
225 };
226
227 #[cfg(unix)]
228 let terminate = async {
229 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
230 .expect("failed to install SIGTERM handler")
231 .recv()
232 .await;
233 };
234
235 #[cfg(not(unix))]
236 let terminate = std::future::pending::<()>();
237
238 tokio::select! {
239 _ = ctrl_c => {
240 tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
241 }
242 _ = terminate => {
243 tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
244 }
245 }
246}