Skip to main content

allsource_core/infrastructure/web/
api_v1.rs

1use crate::application::services::tenant_service::TenantManager;
2/// v1.0 API router with authentication and multi-tenancy
3use crate::infrastructure::di::ServiceContainer;
4use crate::infrastructure::security::auth::AuthManager;
5use crate::infrastructure::security::middleware::{
6    auth_middleware, rate_limit_middleware, AuthState, RateLimitState,
7};
8use crate::infrastructure::security::rate_limit::RateLimiter;
9use crate::infrastructure::web::auth_api::*;
10use crate::infrastructure::web::tenant_api::*;
11use crate::store::EventStore;
12use axum::{
13    middleware,
14    routing::{delete, get, post, put},
15    Router,
16};
17use std::sync::Arc;
18use tower_http::cors::{Any, CorsLayer};
19use tower_http::trace::TraceLayer;
20
21/// Unified application state for all handlers
22#[derive(Clone)]
23pub struct AppState {
24    pub store: Arc<EventStore>,
25    pub auth_manager: Arc<AuthManager>,
26    pub tenant_manager: Arc<TenantManager>,
27    /// Service container for paywall domain use cases (Creator, Article, Payment, etc.)
28    pub service_container: ServiceContainer,
29}
30
31// Enable extracting Arc<EventStore> from AppState
32// This allows handlers that expect State<Arc<EventStore>> to work with AppState
33impl axum::extract::FromRef<AppState> for Arc<EventStore> {
34    fn from_ref(state: &AppState) -> Self {
35        state.store.clone()
36    }
37}
38
39pub async fn serve_v1(
40    store: Arc<EventStore>,
41    auth_manager: Arc<AuthManager>,
42    tenant_manager: Arc<TenantManager>,
43    rate_limiter: Arc<RateLimiter>,
44    service_container: ServiceContainer,
45    addr: &str,
46) -> anyhow::Result<()> {
47    let app_state = AppState {
48        store,
49        auth_manager: auth_manager.clone(),
50        tenant_manager,
51        service_container,
52    };
53
54    let auth_state = AuthState {
55        auth_manager: auth_manager.clone(),
56    };
57
58    let rate_limit_state = RateLimitState { rate_limiter };
59
60    let app = Router::new()
61        // Public routes (no auth)
62        .route("/health", get(super::api::health))
63        .route("/metrics", get(super::api::prometheus_metrics))
64        // Auth routes
65        .route("/api/v1/auth/register", post(register_handler))
66        .route("/api/v1/auth/login", post(login_handler))
67        .route("/api/v1/auth/me", get(me_handler))
68        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
69        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
70        .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
71        .route("/api/v1/auth/users", get(list_users_handler))
72        .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
73        // Tenant routes (protected)
74        .route("/api/v1/tenants", post(create_tenant_handler))
75        .route("/api/v1/tenants", get(list_tenants_handler))
76        .route("/api/v1/tenants/{id}", get(get_tenant_handler))
77        .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
78        .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
79        .route(
80            "/api/v1/tenants/{id}/deactivate",
81            post(deactivate_tenant_handler),
82        )
83        .route(
84            "/api/v1/tenants/{id}/activate",
85            post(activate_tenant_handler),
86        )
87        .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
88        // Event and data routes (protected by auth)
89        .route("/api/v1/events", post(super::api::ingest_event))
90        .route("/api/v1/events/query", get(super::api::query_events))
91        .route("/api/v1/events/stream", get(super::api::events_websocket))
92        .route(
93            "/api/v1/entities/{entity_id}/state",
94            get(super::api::get_entity_state),
95        )
96        .route(
97            "/api/v1/entities/{entity_id}/snapshot",
98            get(super::api::get_entity_snapshot),
99        )
100        .route("/api/v1/stats", get(super::api::get_stats))
101        // Analytics
102        .route(
103            "/api/v1/analytics/frequency",
104            get(super::api::analytics_frequency),
105        )
106        .route(
107            "/api/v1/analytics/summary",
108            get(super::api::analytics_summary),
109        )
110        .route(
111            "/api/v1/analytics/correlation",
112            get(super::api::analytics_correlation),
113        )
114        // Snapshots
115        .route("/api/v1/snapshots", post(super::api::create_snapshot))
116        .route("/api/v1/snapshots", get(super::api::list_snapshots))
117        .route(
118            "/api/v1/snapshots/{entity_id}/latest",
119            get(super::api::get_latest_snapshot),
120        )
121        // Compaction
122        .route(
123            "/api/v1/compaction/trigger",
124            post(super::api::trigger_compaction),
125        )
126        .route(
127            "/api/v1/compaction/stats",
128            get(super::api::compaction_stats),
129        )
130        // Schemas
131        .route("/api/v1/schemas", post(super::api::register_schema))
132        .route("/api/v1/schemas", get(super::api::list_subjects))
133        .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
134        .route(
135            "/api/v1/schemas/{subject}/versions",
136            get(super::api::list_schema_versions),
137        )
138        .route(
139            "/api/v1/schemas/validate",
140            post(super::api::validate_event_schema),
141        )
142        .route(
143            "/api/v1/schemas/{subject}/compatibility",
144            put(super::api::set_compatibility_mode),
145        )
146        // Replay
147        .route("/api/v1/replay", post(super::api::start_replay))
148        .route("/api/v1/replay", get(super::api::list_replays))
149        .route(
150            "/api/v1/replay/{replay_id}",
151            get(super::api::get_replay_progress),
152        )
153        .route(
154            "/api/v1/replay/{replay_id}/cancel",
155            post(super::api::cancel_replay),
156        )
157        .route(
158            "/api/v1/replay/{replay_id}",
159            delete(super::api::delete_replay),
160        )
161        // Pipelines
162        .route("/api/v1/pipelines", post(super::api::register_pipeline))
163        .route("/api/v1/pipelines", get(super::api::list_pipelines))
164        .route(
165            "/api/v1/pipelines/stats",
166            get(super::api::all_pipeline_stats),
167        )
168        .route(
169            "/api/v1/pipelines/{pipeline_id}",
170            get(super::api::get_pipeline),
171        )
172        .route(
173            "/api/v1/pipelines/{pipeline_id}",
174            delete(super::api::remove_pipeline),
175        )
176        .route(
177            "/api/v1/pipelines/{pipeline_id}/stats",
178            get(super::api::get_pipeline_stats),
179        )
180        .route(
181            "/api/v1/pipelines/{pipeline_id}/reset",
182            put(super::api::reset_pipeline),
183        )
184        // v0.7: Projection State API for Query Service integration
185        .route("/api/v1/projections", get(super::api::list_projections))
186        .route(
187            "/api/v1/projections/{name}",
188            get(super::api::get_projection),
189        )
190        .route(
191            "/api/v1/projections/{name}/{entity_id}/state",
192            get(super::api::get_projection_state),
193        )
194        .route(
195            "/api/v1/projections/{name}/{entity_id}/state",
196            post(super::api::save_projection_state),
197        )
198        .route(
199            "/api/v1/projections/{name}/{entity_id}/state",
200            put(super::api::save_projection_state),
201        )
202        .route(
203            "/api/v1/projections/{name}/bulk",
204            post(super::api::bulk_get_projection_states),
205        )
206        .route(
207            "/api/v1/projections/{name}/bulk/save",
208            post(super::api::bulk_save_projection_states),
209        )
210        .with_state(app_state)
211        // IMPORTANT: Middleware layers execute from bottom to top in Tower/Axum
212        // Rate limit MUST come before auth so auth runs first and populates AuthContext
213        .layer(middleware::from_fn_with_state(
214            rate_limit_state,
215            rate_limit_middleware,
216        ))
217        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
218        .layer(
219            CorsLayer::new()
220                .allow_origin(Any)
221                .allow_methods(Any)
222                .allow_headers(Any),
223        )
224        .layer(TraceLayer::new_for_http());
225
226    let listener = tokio::net::TcpListener::bind(addr).await?;
227
228    // Graceful shutdown on SIGTERM (required for serverless platforms)
229    axum::serve(listener, app)
230        .with_graceful_shutdown(shutdown_signal())
231        .await?;
232
233    tracing::info!("🛑 AllSource Core shutdown complete");
234    Ok(())
235}
236
237/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
238async fn shutdown_signal() {
239    let ctrl_c = async {
240        tokio::signal::ctrl_c()
241            .await
242            .expect("failed to install Ctrl+C handler");
243    };
244
245    #[cfg(unix)]
246    let terminate = async {
247        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
248            .expect("failed to install SIGTERM handler")
249            .recv()
250            .await;
251    };
252
253    #[cfg(not(unix))]
254    let terminate = std::future::pending::<()>();
255
256    tokio::select! {
257        _ = ctrl_c => {
258            tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
259        }
260        _ = terminate => {
261            tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
262        }
263    }
264}