Skip to main content

allsource_core/infrastructure/web/
api_v1.rs

1/// v1.0 API router with authentication and multi-tenancy
2use crate::infrastructure::di::ServiceContainer;
3use crate::{
4    application::services::tenant_service::TenantManager,
5    infrastructure::{
6        replication::{WalReceiver, WalShipper},
7        security::{
8            auth::AuthManager,
9            middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
10            rate_limit::RateLimiter,
11        },
12        web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
13    },
14    store::EventStore,
15};
16use axum::{
17    Json, Router,
18    extract::State,
19    middleware,
20    response::IntoResponse,
21    routing::{delete, get, post, put},
22};
23use std::sync::Arc;
24use tower_http::{
25    cors::{Any, CorsLayer},
26    trace::TraceLayer,
27};
28
29/// Node role for leader-follower replication
30#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
31#[serde(rename_all = "lowercase")]
32pub enum NodeRole {
33    Leader,
34    Follower,
35}
36
37impl NodeRole {
38    /// Detect role from environment variables.
39    ///
40    /// Checks `ALLSOURCE_ROLE` ("leader" or "follower") first,
41    /// then falls back to `ALLSOURCE_READ_ONLY` ("true" → follower).
42    /// Defaults to `Leader` if neither is set.
43    pub fn from_env() -> Self {
44        if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
45            match role.to_lowercase().as_str() {
46                "follower" => return NodeRole::Follower,
47                "leader" => return NodeRole::Leader,
48                other => {
49                    tracing::warn!(
50                        "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
51                        other
52                    );
53                    return NodeRole::Leader;
54                }
55            }
56        }
57        if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
58            && (read_only == "true" || read_only == "1")
59        {
60            return NodeRole::Follower;
61        }
62        NodeRole::Leader
63    }
64
65    pub fn is_follower(self) -> bool {
66        self == NodeRole::Follower
67    }
68
69    fn to_u8(self) -> u8 {
70        match self {
71            NodeRole::Leader => 0,
72            NodeRole::Follower => 1,
73        }
74    }
75
76    fn from_u8(v: u8) -> Self {
77        match v {
78            1 => NodeRole::Follower,
79            _ => NodeRole::Leader,
80        }
81    }
82}
83
84impl std::fmt::Display for NodeRole {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        match self {
87            NodeRole::Leader => write!(f, "leader"),
88            NodeRole::Follower => write!(f, "follower"),
89        }
90    }
91}
92
93/// Thread-safe, runtime-mutable node role for failover support.
94///
95/// Wraps an `AtomicU8` so the read-only middleware and health endpoint
96/// always see the current role, even after a sentinel-triggered promotion.
97#[derive(Clone)]
98pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
99
100impl AtomicNodeRole {
101    pub fn new(role: NodeRole) -> Self {
102        Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
103    }
104
105    pub fn load(&self) -> NodeRole {
106        NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
107    }
108
109    pub fn store(&self, role: NodeRole) {
110        self.0
111            .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
112    }
113}
114
115/// Unified application state for all handlers
116#[derive(Clone)]
117pub struct AppState {
118    pub store: Arc<EventStore>,
119    pub auth_manager: Arc<AuthManager>,
120    pub tenant_manager: Arc<TenantManager>,
121    /// Service container for paywall domain use cases (Creator, Article, Payment, etc.)
122    pub service_container: ServiceContainer,
123    /// Node role for leader-follower replication (runtime-mutable for failover)
124    pub role: AtomicNodeRole,
125    /// WAL shipper for replication status reporting (leader only).
126    /// Wrapped in RwLock so a promoted follower can install a shipper at runtime.
127    pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
128    /// WAL receiver for replication status reporting (follower only)
129    pub wal_receiver: Option<Arc<WalReceiver>>,
130    /// Replication port used by the WAL shipper (needed for runtime promotion)
131    pub replication_port: u16,
132}
133
134// Enable extracting Arc<EventStore> from AppState
135// This allows handlers that expect State<Arc<EventStore>> to work with AppState
136impl axum::extract::FromRef<AppState> for Arc<EventStore> {
137    fn from_ref(state: &AppState) -> Self {
138        state.store.clone()
139    }
140}
141
142pub async fn serve_v1(
143    store: Arc<EventStore>,
144    auth_manager: Arc<AuthManager>,
145    tenant_manager: Arc<TenantManager>,
146    rate_limiter: Arc<RateLimiter>,
147    service_container: ServiceContainer,
148    addr: &str,
149    role: NodeRole,
150    wal_shipper: Option<Arc<WalShipper>>,
151    wal_receiver: Option<Arc<WalReceiver>>,
152    replication_port: u16,
153) -> anyhow::Result<()> {
154    let app_state = AppState {
155        store,
156        auth_manager: auth_manager.clone(),
157        tenant_manager,
158        service_container,
159        role: AtomicNodeRole::new(role),
160        wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
161        wal_receiver,
162        replication_port,
163    };
164
165    let auth_state = AuthState {
166        auth_manager: auth_manager.clone(),
167    };
168
169    let rate_limit_state = RateLimitState { rate_limiter };
170
171    let app = Router::new()
172        // Public routes (no auth)
173        .route("/health", get(health_v1))
174        .route("/metrics", get(super::api::prometheus_metrics))
175        // Auth routes
176        .route("/api/v1/auth/register", post(register_handler))
177        .route("/api/v1/auth/login", post(login_handler))
178        .route("/api/v1/auth/me", get(me_handler))
179        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
180        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
181        .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
182        .route("/api/v1/auth/users", get(list_users_handler))
183        .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
184        // Tenant routes (protected)
185        .route("/api/v1/tenants", post(create_tenant_handler))
186        .route("/api/v1/tenants", get(list_tenants_handler))
187        .route("/api/v1/tenants/{id}", get(get_tenant_handler))
188        .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
189        .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
190        .route(
191            "/api/v1/tenants/{id}/deactivate",
192            post(deactivate_tenant_handler),
193        )
194        .route(
195            "/api/v1/tenants/{id}/activate",
196            post(activate_tenant_handler),
197        )
198        .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
199        // Audit endpoints (admin only)
200        .route("/api/v1/audit/events", post(log_audit_event))
201        .route("/api/v1/audit/events", get(query_audit_events))
202        // Config endpoints (admin only)
203        .route("/api/v1/config", get(list_configs))
204        .route("/api/v1/config", post(set_config))
205        .route("/api/v1/config/{key}", get(get_config))
206        .route("/api/v1/config/{key}", put(update_config))
207        .route("/api/v1/config/{key}", delete(delete_config))
208        // Event and data routes (protected by auth)
209        .route("/api/v1/events", post(super::api::ingest_event_v1))
210        .route(
211            "/api/v1/events/batch",
212            post(super::api::ingest_events_batch_v1),
213        )
214        .route("/api/v1/events/query", get(super::api::query_events))
215        .route("/api/v1/events/stream", get(super::api::events_websocket))
216        .route(
217            "/api/v1/entities/{entity_id}/state",
218            get(super::api::get_entity_state),
219        )
220        .route(
221            "/api/v1/entities/{entity_id}/snapshot",
222            get(super::api::get_entity_snapshot),
223        )
224        .route("/api/v1/stats", get(super::api::get_stats))
225        // v0.10: Stream and event type discovery endpoints
226        .route("/api/v1/streams", get(super::api::list_streams))
227        .route("/api/v1/event-types", get(super::api::list_event_types))
228        // Analytics
229        .route(
230            "/api/v1/analytics/frequency",
231            get(super::api::analytics_frequency),
232        )
233        .route(
234            "/api/v1/analytics/summary",
235            get(super::api::analytics_summary),
236        )
237        .route(
238            "/api/v1/analytics/correlation",
239            get(super::api::analytics_correlation),
240        )
241        // Snapshots
242        .route("/api/v1/snapshots", post(super::api::create_snapshot))
243        .route("/api/v1/snapshots", get(super::api::list_snapshots))
244        .route(
245            "/api/v1/snapshots/{entity_id}/latest",
246            get(super::api::get_latest_snapshot),
247        )
248        // Compaction
249        .route(
250            "/api/v1/compaction/trigger",
251            post(super::api::trigger_compaction),
252        )
253        .route(
254            "/api/v1/compaction/stats",
255            get(super::api::compaction_stats),
256        )
257        // Schemas
258        .route("/api/v1/schemas", post(super::api::register_schema))
259        .route("/api/v1/schemas", get(super::api::list_subjects))
260        .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
261        .route(
262            "/api/v1/schemas/{subject}/versions",
263            get(super::api::list_schema_versions),
264        )
265        .route(
266            "/api/v1/schemas/validate",
267            post(super::api::validate_event_schema),
268        )
269        .route(
270            "/api/v1/schemas/{subject}/compatibility",
271            put(super::api::set_compatibility_mode),
272        )
273        // Replay
274        .route("/api/v1/replay", post(super::api::start_replay))
275        .route("/api/v1/replay", get(super::api::list_replays))
276        .route(
277            "/api/v1/replay/{replay_id}",
278            get(super::api::get_replay_progress),
279        )
280        .route(
281            "/api/v1/replay/{replay_id}/cancel",
282            post(super::api::cancel_replay),
283        )
284        .route(
285            "/api/v1/replay/{replay_id}",
286            delete(super::api::delete_replay),
287        )
288        // Pipelines
289        .route("/api/v1/pipelines", post(super::api::register_pipeline))
290        .route("/api/v1/pipelines", get(super::api::list_pipelines))
291        .route(
292            "/api/v1/pipelines/stats",
293            get(super::api::all_pipeline_stats),
294        )
295        .route(
296            "/api/v1/pipelines/{pipeline_id}",
297            get(super::api::get_pipeline),
298        )
299        .route(
300            "/api/v1/pipelines/{pipeline_id}",
301            delete(super::api::remove_pipeline),
302        )
303        .route(
304            "/api/v1/pipelines/{pipeline_id}/stats",
305            get(super::api::get_pipeline_stats),
306        )
307        .route(
308            "/api/v1/pipelines/{pipeline_id}/reset",
309            put(super::api::reset_pipeline),
310        )
311        // v0.7: Projection State API for Query Service integration
312        .route("/api/v1/projections", get(super::api::list_projections))
313        .route(
314            "/api/v1/projections/{name}",
315            get(super::api::get_projection),
316        )
317        .route(
318            "/api/v1/projections/{name}/{entity_id}/state",
319            get(super::api::get_projection_state),
320        )
321        .route(
322            "/api/v1/projections/{name}/{entity_id}/state",
323            post(super::api::save_projection_state),
324        )
325        .route(
326            "/api/v1/projections/{name}/{entity_id}/state",
327            put(super::api::save_projection_state),
328        )
329        .route(
330            "/api/v1/projections/{name}/bulk",
331            post(super::api::bulk_get_projection_states),
332        )
333        .route(
334            "/api/v1/projections/{name}/bulk/save",
335            post(super::api::bulk_save_projection_states),
336        )
337        // Internal endpoints for sentinel-driven failover (not exposed publicly)
338        .route("/internal/promote", post(promote_handler))
339        .route("/internal/repoint", post(repoint_handler))
340        .with_state(app_state.clone())
341        // IMPORTANT: Middleware layers execute from bottom to top in Tower/Axum
342        // Read-only middleware runs after auth (applied before rate limit layer)
343        .layer(middleware::from_fn_with_state(
344            app_state,
345            read_only_middleware,
346        ))
347        .layer(middleware::from_fn_with_state(
348            rate_limit_state,
349            rate_limit_middleware,
350        ))
351        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
352        .layer(
353            CorsLayer::new()
354                .allow_origin(Any)
355                .allow_methods(Any)
356                .allow_headers(Any),
357        )
358        .layer(TraceLayer::new_for_http());
359
360    let listener = tokio::net::TcpListener::bind(addr).await?;
361
362    // Graceful shutdown on SIGTERM (required for serverless platforms)
363    axum::serve(listener, app)
364        .with_graceful_shutdown(shutdown_signal())
365        .await?;
366
367    tracing::info!("🛑 AllSource Core shutdown complete");
368    Ok(())
369}
370
371/// Write paths that should be rejected when running as a follower.
372const WRITE_PATHS: &[&str] = &[
373    "/api/v1/events",
374    "/api/v1/events/batch",
375    "/api/v1/snapshots",
376    "/api/v1/projections/",
377    "/api/v1/schemas",
378    "/api/v1/replay",
379    "/api/v1/pipelines",
380    "/api/v1/compaction/trigger",
381    "/api/v1/audit/events",
382    "/api/v1/config",
383];
384
385/// Returns true if this request is a write operation that should be blocked on followers.
386fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
387    use axum::http::Method;
388    // Only POST/PUT/DELETE are writes
389    if method != Method::POST && method != Method::PUT && method != Method::DELETE {
390        return false;
391    }
392    WRITE_PATHS
393        .iter()
394        .any(|write_path| path.starts_with(write_path))
395}
396
397/// Returns true if the request targets an internal endpoint (not subject to read-only checks).
398fn is_internal_request(path: &str) -> bool {
399    path.starts_with("/internal/")
400}
401
402/// Middleware that rejects write requests when the node is a follower.
403///
404/// Returns HTTP 409 Conflict with `{"error": "read_only", "message": "..."}`.
405/// Internal endpoints (`/internal/*`) are exempt — they are used by the sentinel
406/// to trigger promotion and repointing during failover.
407async fn read_only_middleware(
408    State(state): State<AppState>,
409    request: axum::extract::Request,
410    next: axum::middleware::Next,
411) -> axum::response::Response {
412    let path = request.uri().path();
413    if state.role.load().is_follower()
414        && is_write_request(request.method(), path)
415        && !is_internal_request(path)
416    {
417        return (
418            axum::http::StatusCode::CONFLICT,
419            axum::Json(serde_json::json!({
420                "error": "read_only",
421                "message": "This node is a read-only follower"
422            })),
423        )
424            .into_response();
425    }
426    next.run(request).await
427}
428
429/// Health endpoint with system stream health reporting.
430///
431/// Reports overall health plus detailed system metadata health when
432/// event-sourced system repositories are configured.
433async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
434    let has_system_repos = state.service_container.has_system_repositories();
435
436    let system_streams = if has_system_repos {
437        let (tenant_count, config_count, total_events) =
438            if let Some(store) = state.service_container.system_store() {
439                use crate::domain::value_objects::system_stream::SystemDomain;
440                (
441                    store.count_stream(SystemDomain::Tenant),
442                    store.count_stream(SystemDomain::Config),
443                    store.total_events(),
444                )
445            } else {
446                (0, 0, 0)
447            };
448
449        serde_json::json!({
450            "status": "healthy",
451            "mode": "event-sourced",
452            "total_events": total_events,
453            "tenant_events": tenant_count,
454            "config_events": config_count,
455        })
456    } else {
457        serde_json::json!({
458            "status": "disabled",
459            "mode": "in-memory",
460        })
461    };
462
463    let replication = {
464        let shipper_guard = state.wal_shipper.read().await;
465        if let Some(ref shipper) = *shipper_guard {
466            serde_json::to_value(shipper.status()).unwrap_or_default()
467        } else if let Some(ref receiver) = state.wal_receiver {
468            serde_json::to_value(receiver.status()).unwrap_or_default()
469        } else {
470            serde_json::json!(null)
471        }
472    };
473
474    let current_role = state.role.load();
475
476    Json(serde_json::json!({
477        "status": "healthy",
478        "service": "allsource-core",
479        "version": env!("CARGO_PKG_VERSION"),
480        "role": current_role,
481        "system_streams": system_streams,
482        "replication": replication,
483    }))
484}
485
486/// POST /internal/promote — Promote this follower to leader.
487///
488/// Called by the sentinel process during automated failover.
489/// Switches the node role to leader, stops WAL receiving, and starts
490/// a WAL shipper on the replication port so other followers can connect.
491async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
492    let current_role = state.role.load();
493    if current_role == NodeRole::Leader {
494        return (
495            axum::http::StatusCode::OK,
496            Json(serde_json::json!({
497                "status": "already_leader",
498                "message": "This node is already the leader",
499            })),
500        );
501    }
502
503    tracing::info!("PROMOTE: Switching role from follower to leader");
504
505    // 1. Switch role — the read-only middleware will immediately start accepting writes
506    state.role.store(NodeRole::Leader);
507
508    // 2. Signal the WAL receiver to stop (it will stop reconnecting)
509    if let Some(ref receiver) = state.wal_receiver {
510        receiver.shutdown();
511        tracing::info!("PROMOTE: WAL receiver shutdown signalled");
512    }
513
514    // 3. Start a new WAL shipper so remaining followers can connect
515    let replication_port = state.replication_port;
516    let (mut shipper, tx) = WalShipper::new();
517    state.store.enable_wal_replication(tx);
518    shipper.set_store(Arc::clone(&state.store));
519    shipper.set_metrics(state.store.metrics());
520    let shipper = Arc::new(shipper);
521
522    // Install into AppState so health endpoint reports shipper status
523    {
524        let mut shipper_guard = state.wal_shipper.write().await;
525        *shipper_guard = Some(Arc::clone(&shipper));
526    }
527
528    // Spawn the shipper server
529    let shipper_clone = Arc::clone(&shipper);
530    tokio::spawn(async move {
531        if let Err(e) = shipper_clone.serve(replication_port).await {
532            tracing::error!("Promoted WAL shipper error: {}", e);
533        }
534    });
535
536    tracing::info!(
537        "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
538        replication_port,
539    );
540
541    (
542        axum::http::StatusCode::OK,
543        Json(serde_json::json!({
544            "status": "promoted",
545            "role": "leader",
546            "replication_port": replication_port,
547        })),
548    )
549}
550
551/// POST /internal/repoint?leader=host:port — Switch replication target.
552///
553/// Called by the sentinel process to tell a follower to disconnect from
554/// the old leader and reconnect to a newly promoted leader.
555async fn repoint_handler(
556    State(state): State<AppState>,
557    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
558) -> impl IntoResponse {
559    let current_role = state.role.load();
560    if current_role != NodeRole::Follower {
561        return (
562            axum::http::StatusCode::CONFLICT,
563            Json(serde_json::json!({
564                "error": "not_follower",
565                "message": "Repoint only applies to follower nodes",
566            })),
567        );
568    }
569
570    let new_leader = match params.get("leader") {
571        Some(l) if !l.is_empty() => l.clone(),
572        _ => {
573            return (
574                axum::http::StatusCode::BAD_REQUEST,
575                Json(serde_json::json!({
576                    "error": "missing_leader",
577                    "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
578                })),
579            );
580        }
581    };
582
583    tracing::info!("REPOINT: Switching replication target to {}", new_leader);
584
585    if let Some(ref receiver) = state.wal_receiver {
586        receiver.repoint(&new_leader);
587        tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
588    } else {
589        tracing::warn!("REPOINT: No WAL receiver to repoint");
590    }
591
592    (
593        axum::http::StatusCode::OK,
594        Json(serde_json::json!({
595            "status": "repointed",
596            "new_leader": new_leader,
597        })),
598    )
599}
600
601/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
602async fn shutdown_signal() {
603    let ctrl_c = async {
604        tokio::signal::ctrl_c()
605            .await
606            .expect("failed to install Ctrl+C handler");
607    };
608
609    #[cfg(unix)]
610    let terminate = async {
611        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
612            .expect("failed to install SIGTERM handler")
613            .recv()
614            .await;
615    };
616
617    #[cfg(not(unix))]
618    let terminate = std::future::pending::<()>();
619
620    tokio::select! {
621        _ = ctrl_c => {
622            tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
623        }
624        _ = terminate => {
625            tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
626        }
627    }
628}