allsource_core/infrastructure/web/
api_v1.rs1use crate::application::services::tenant_service::TenantManager;
2use crate::infrastructure::di::ServiceContainer;
4use crate::infrastructure::security::auth::AuthManager;
5use crate::infrastructure::security::middleware::{
6 auth_middleware, rate_limit_middleware, AuthState, RateLimitState,
7};
8use crate::infrastructure::security::rate_limit::RateLimiter;
9use crate::infrastructure::web::auth_api::*;
10use crate::infrastructure::web::tenant_api::*;
11use crate::store::EventStore;
12use axum::{
13 middleware,
14 routing::{delete, get, post, put},
15 Router,
16};
17use std::sync::Arc;
18use tower_http::cors::{Any, CorsLayer};
19use tower_http::trace::TraceLayer;
20
21#[derive(Clone)]
23pub struct AppState {
24 pub store: Arc<EventStore>,
25 pub auth_manager: Arc<AuthManager>,
26 pub tenant_manager: Arc<TenantManager>,
27 pub service_container: ServiceContainer,
29}
30
31impl axum::extract::FromRef<AppState> for Arc<EventStore> {
34 fn from_ref(state: &AppState) -> Self {
35 state.store.clone()
36 }
37}
38
39pub async fn serve_v1(
40 store: Arc<EventStore>,
41 auth_manager: Arc<AuthManager>,
42 tenant_manager: Arc<TenantManager>,
43 rate_limiter: Arc<RateLimiter>,
44 service_container: ServiceContainer,
45 addr: &str,
46) -> anyhow::Result<()> {
47 let app_state = AppState {
48 store,
49 auth_manager: auth_manager.clone(),
50 tenant_manager,
51 service_container,
52 };
53
54 let auth_state = AuthState {
55 auth_manager: auth_manager.clone(),
56 };
57
58 let rate_limit_state = RateLimitState { rate_limiter };
59
60 let app = Router::new()
61 .route("/health", get(super::api::health))
63 .route("/metrics", get(super::api::prometheus_metrics))
64 .route("/api/v1/auth/register", post(register_handler))
66 .route("/api/v1/auth/login", post(login_handler))
67 .route("/api/v1/auth/me", get(me_handler))
68 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
69 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
70 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
71 .route("/api/v1/auth/users", get(list_users_handler))
72 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
73 .route("/api/v1/tenants", post(create_tenant_handler))
75 .route("/api/v1/tenants", get(list_tenants_handler))
76 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
77 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
78 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
79 .route(
80 "/api/v1/tenants/{id}/deactivate",
81 post(deactivate_tenant_handler),
82 )
83 .route(
84 "/api/v1/tenants/{id}/activate",
85 post(activate_tenant_handler),
86 )
87 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
88 .route("/api/v1/events", post(super::api::ingest_event))
90 .route("/api/v1/events/query", get(super::api::query_events))
91 .route("/api/v1/events/stream", get(super::api::events_websocket))
92 .route(
93 "/api/v1/entities/{entity_id}/state",
94 get(super::api::get_entity_state),
95 )
96 .route(
97 "/api/v1/entities/{entity_id}/snapshot",
98 get(super::api::get_entity_snapshot),
99 )
100 .route("/api/v1/stats", get(super::api::get_stats))
101 .route(
103 "/api/v1/analytics/frequency",
104 get(super::api::analytics_frequency),
105 )
106 .route(
107 "/api/v1/analytics/summary",
108 get(super::api::analytics_summary),
109 )
110 .route(
111 "/api/v1/analytics/correlation",
112 get(super::api::analytics_correlation),
113 )
114 .route("/api/v1/snapshots", post(super::api::create_snapshot))
116 .route("/api/v1/snapshots", get(super::api::list_snapshots))
117 .route(
118 "/api/v1/snapshots/{entity_id}/latest",
119 get(super::api::get_latest_snapshot),
120 )
121 .route(
123 "/api/v1/compaction/trigger",
124 post(super::api::trigger_compaction),
125 )
126 .route(
127 "/api/v1/compaction/stats",
128 get(super::api::compaction_stats),
129 )
130 .route("/api/v1/schemas", post(super::api::register_schema))
132 .route("/api/v1/schemas", get(super::api::list_subjects))
133 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
134 .route(
135 "/api/v1/schemas/{subject}/versions",
136 get(super::api::list_schema_versions),
137 )
138 .route(
139 "/api/v1/schemas/validate",
140 post(super::api::validate_event_schema),
141 )
142 .route(
143 "/api/v1/schemas/{subject}/compatibility",
144 put(super::api::set_compatibility_mode),
145 )
146 .route("/api/v1/replay", post(super::api::start_replay))
148 .route("/api/v1/replay", get(super::api::list_replays))
149 .route(
150 "/api/v1/replay/{replay_id}",
151 get(super::api::get_replay_progress),
152 )
153 .route(
154 "/api/v1/replay/{replay_id}/cancel",
155 post(super::api::cancel_replay),
156 )
157 .route(
158 "/api/v1/replay/{replay_id}",
159 delete(super::api::delete_replay),
160 )
161 .route("/api/v1/pipelines", post(super::api::register_pipeline))
163 .route("/api/v1/pipelines", get(super::api::list_pipelines))
164 .route(
165 "/api/v1/pipelines/stats",
166 get(super::api::all_pipeline_stats),
167 )
168 .route(
169 "/api/v1/pipelines/{pipeline_id}",
170 get(super::api::get_pipeline),
171 )
172 .route(
173 "/api/v1/pipelines/{pipeline_id}",
174 delete(super::api::remove_pipeline),
175 )
176 .route(
177 "/api/v1/pipelines/{pipeline_id}/stats",
178 get(super::api::get_pipeline_stats),
179 )
180 .route(
181 "/api/v1/pipelines/{pipeline_id}/reset",
182 put(super::api::reset_pipeline),
183 )
184 .route("/api/v1/projections", get(super::api::list_projections))
186 .route(
187 "/api/v1/projections/{name}",
188 get(super::api::get_projection),
189 )
190 .route(
191 "/api/v1/projections/{name}/{entity_id}/state",
192 get(super::api::get_projection_state),
193 )
194 .route(
195 "/api/v1/projections/{name}/{entity_id}/state",
196 post(super::api::save_projection_state),
197 )
198 .route(
199 "/api/v1/projections/{name}/{entity_id}/state",
200 put(super::api::save_projection_state),
201 )
202 .route(
203 "/api/v1/projections/{name}/bulk",
204 post(super::api::bulk_get_projection_states),
205 )
206 .route(
207 "/api/v1/projections/{name}/bulk/save",
208 post(super::api::bulk_save_projection_states),
209 )
210 .with_state(app_state)
211 .layer(middleware::from_fn_with_state(
214 rate_limit_state,
215 rate_limit_middleware,
216 ))
217 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
218 .layer(
219 CorsLayer::new()
220 .allow_origin(Any)
221 .allow_methods(Any)
222 .allow_headers(Any),
223 )
224 .layer(TraceLayer::new_for_http());
225
226 let listener = tokio::net::TcpListener::bind(addr).await?;
227
228 axum::serve(listener, app)
230 .with_graceful_shutdown(shutdown_signal())
231 .await?;
232
233 tracing::info!("🛑 AllSource Core shutdown complete");
234 Ok(())
235}
236
237async fn shutdown_signal() {
239 let ctrl_c = async {
240 tokio::signal::ctrl_c()
241 .await
242 .expect("failed to install Ctrl+C handler");
243 };
244
245 #[cfg(unix)]
246 let terminate = async {
247 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
248 .expect("failed to install SIGTERM handler")
249 .recv()
250 .await;
251 };
252
253 #[cfg(not(unix))]
254 let terminate = std::future::pending::<()>();
255
256 tokio::select! {
257 _ = ctrl_c => {
258 tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
259 }
260 _ = terminate => {
261 tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
262 }
263 }
264}