allsource_core/infrastructure/web/
api_v1.rs

1use crate::application::services::tenant_service::TenantManager;
2/// v1.0 API router with authentication and multi-tenancy
3use crate::infrastructure::security::auth::AuthManager;
4use crate::infrastructure::security::middleware::{
5    auth_middleware, rate_limit_middleware, AuthState, RateLimitState,
6};
7use crate::infrastructure::security::rate_limit::RateLimiter;
8use crate::infrastructure::web::auth_api::*;
9use crate::infrastructure::web::tenant_api::*;
10use crate::store::EventStore;
11use axum::{
12    middleware,
13    routing::{delete, get, post, put},
14    Router,
15};
16use std::sync::Arc;
17use tower_http::cors::{Any, CorsLayer};
18use tower_http::trace::TraceLayer;
19
20/// Unified application state for all handlers
21#[derive(Clone)]
22pub struct AppState {
23    pub store: Arc<EventStore>,
24    pub auth_manager: Arc<AuthManager>,
25    pub tenant_manager: Arc<TenantManager>,
26}
27
28// Enable extracting Arc<EventStore> from AppState
29// This allows handlers that expect State<Arc<EventStore>> to work with AppState
30impl axum::extract::FromRef<AppState> for Arc<EventStore> {
31    fn from_ref(state: &AppState) -> Self {
32        state.store.clone()
33    }
34}
35
36pub async fn serve_v1(
37    store: Arc<EventStore>,
38    auth_manager: Arc<AuthManager>,
39    tenant_manager: Arc<TenantManager>,
40    rate_limiter: Arc<RateLimiter>,
41    addr: &str,
42) -> anyhow::Result<()> {
43    let app_state = AppState {
44        store,
45        auth_manager: auth_manager.clone(),
46        tenant_manager,
47    };
48
49    let auth_state = AuthState {
50        auth_manager: auth_manager.clone(),
51    };
52
53    let rate_limit_state = RateLimitState { rate_limiter };
54
55    let app = Router::new()
56        // Public routes (no auth)
57        .route("/health", get(super::api::health))
58        .route("/metrics", get(super::api::prometheus_metrics))
59        // Auth routes
60        .route("/api/v1/auth/register", post(register_handler))
61        .route("/api/v1/auth/login", post(login_handler))
62        .route("/api/v1/auth/me", get(me_handler))
63        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
64        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
65        .route("/api/v1/auth/api-keys/:id", delete(revoke_api_key_handler))
66        .route("/api/v1/auth/users", get(list_users_handler))
67        .route("/api/v1/auth/users/:id", delete(delete_user_handler))
68        // Tenant routes (protected)
69        .route("/api/v1/tenants", post(create_tenant_handler))
70        .route("/api/v1/tenants", get(list_tenants_handler))
71        .route("/api/v1/tenants/:id", get(get_tenant_handler))
72        .route("/api/v1/tenants/:id/stats", get(get_tenant_stats_handler))
73        .route("/api/v1/tenants/:id/quotas", put(update_quotas_handler))
74        .route(
75            "/api/v1/tenants/:id/deactivate",
76            post(deactivate_tenant_handler),
77        )
78        .route(
79            "/api/v1/tenants/:id/activate",
80            post(activate_tenant_handler),
81        )
82        .route("/api/v1/tenants/:id", delete(delete_tenant_handler))
83        // Event and data routes (protected by auth)
84        .route("/api/v1/events", post(super::api::ingest_event))
85        .route("/api/v1/events/query", get(super::api::query_events))
86        .route("/api/v1/events/stream", get(super::api::events_websocket))
87        .route(
88            "/api/v1/entities/:entity_id/state",
89            get(super::api::get_entity_state),
90        )
91        .route(
92            "/api/v1/entities/:entity_id/snapshot",
93            get(super::api::get_entity_snapshot),
94        )
95        .route("/api/v1/stats", get(super::api::get_stats))
96        // Analytics
97        .route(
98            "/api/v1/analytics/frequency",
99            get(super::api::analytics_frequency),
100        )
101        .route(
102            "/api/v1/analytics/summary",
103            get(super::api::analytics_summary),
104        )
105        .route(
106            "/api/v1/analytics/correlation",
107            get(super::api::analytics_correlation),
108        )
109        // Snapshots
110        .route("/api/v1/snapshots", post(super::api::create_snapshot))
111        .route("/api/v1/snapshots", get(super::api::list_snapshots))
112        .route(
113            "/api/v1/snapshots/:entity_id/latest",
114            get(super::api::get_latest_snapshot),
115        )
116        // Compaction
117        .route(
118            "/api/v1/compaction/trigger",
119            post(super::api::trigger_compaction),
120        )
121        .route(
122            "/api/v1/compaction/stats",
123            get(super::api::compaction_stats),
124        )
125        // Schemas
126        .route("/api/v1/schemas", post(super::api::register_schema))
127        .route("/api/v1/schemas", get(super::api::list_subjects))
128        .route("/api/v1/schemas/:subject", get(super::api::get_schema))
129        .route(
130            "/api/v1/schemas/:subject/versions",
131            get(super::api::list_schema_versions),
132        )
133        .route(
134            "/api/v1/schemas/validate",
135            post(super::api::validate_event_schema),
136        )
137        .route(
138            "/api/v1/schemas/:subject/compatibility",
139            put(super::api::set_compatibility_mode),
140        )
141        // Replay
142        .route("/api/v1/replay", post(super::api::start_replay))
143        .route("/api/v1/replay", get(super::api::list_replays))
144        .route(
145            "/api/v1/replay/:replay_id",
146            get(super::api::get_replay_progress),
147        )
148        .route(
149            "/api/v1/replay/:replay_id/cancel",
150            post(super::api::cancel_replay),
151        )
152        .route(
153            "/api/v1/replay/:replay_id",
154            delete(super::api::delete_replay),
155        )
156        // Pipelines
157        .route("/api/v1/pipelines", post(super::api::register_pipeline))
158        .route("/api/v1/pipelines", get(super::api::list_pipelines))
159        .route(
160            "/api/v1/pipelines/stats",
161            get(super::api::all_pipeline_stats),
162        )
163        .route(
164            "/api/v1/pipelines/:pipeline_id",
165            get(super::api::get_pipeline),
166        )
167        .route(
168            "/api/v1/pipelines/:pipeline_id",
169            delete(super::api::remove_pipeline),
170        )
171        .route(
172            "/api/v1/pipelines/:pipeline_id/stats",
173            get(super::api::get_pipeline_stats),
174        )
175        .route(
176            "/api/v1/pipelines/:pipeline_id/reset",
177            put(super::api::reset_pipeline),
178        )
179        // v0.7: Projection State API for Query Service integration
180        .route("/api/v1/projections", get(super::api::list_projections))
181        .route("/api/v1/projections/:name", get(super::api::get_projection))
182        .route(
183            "/api/v1/projections/:name/:entity_id/state",
184            get(super::api::get_projection_state),
185        )
186        .route(
187            "/api/v1/projections/:name/:entity_id/state",
188            put(super::api::save_projection_state),
189        )
190        .route(
191            "/api/v1/projections/:name/bulk",
192            post(super::api::bulk_get_projection_states),
193        )
194        .with_state(app_state)
195        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
196        .layer(middleware::from_fn_with_state(
197            rate_limit_state,
198            rate_limit_middleware,
199        ))
200        .layer(
201            CorsLayer::new()
202                .allow_origin(Any)
203                .allow_methods(Any)
204                .allow_headers(Any),
205        )
206        .layer(TraceLayer::new_for_http());
207
208    let listener = tokio::net::TcpListener::bind(addr).await?;
209
210    // Graceful shutdown on SIGTERM (required for serverless platforms)
211    axum::serve(listener, app)
212        .with_graceful_shutdown(shutdown_signal())
213        .await?;
214
215    tracing::info!("🛑 AllSource Core shutdown complete");
216    Ok(())
217}
218
219/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
220async fn shutdown_signal() {
221    let ctrl_c = async {
222        tokio::signal::ctrl_c()
223            .await
224            .expect("failed to install Ctrl+C handler");
225    };
226
227    #[cfg(unix)]
228    let terminate = async {
229        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
230            .expect("failed to install SIGTERM handler")
231            .recv()
232            .await;
233    };
234
235    #[cfg(not(unix))]
236    let terminate = std::future::pending::<()>();
237
238    tokio::select! {
239        _ = ctrl_c => {
240            tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
241        }
242        _ = terminate => {
243            tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
244        }
245    }
246}