Skip to main content

allsource_core/infrastructure/web/
api_v1.rs

1/// v1.0 API router with authentication and multi-tenancy
2use crate::infrastructure::di::ServiceContainer;
3use crate::{
4    application::services::tenant_service::TenantManager,
5    infrastructure::{
6        cluster::{
7            ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8        },
9        replication::{WalReceiver, WalShipper},
10        security::{
11            auth::AuthManager,
12            middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13            rate_limit::RateLimiter,
14        },
15        web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16    },
17    store::EventStore,
18};
19use axum::{
20    Json, Router,
21    extract::{Path, State},
22    middleware,
23    response::IntoResponse,
24    routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28    cors::{Any, CorsLayer},
29    trace::TraceLayer,
30};
31
32/// Node role for leader-follower replication
33#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36    Leader,
37    Follower,
38}
39
40impl NodeRole {
41    /// Detect role from environment variables.
42    ///
43    /// Checks `ALLSOURCE_ROLE` ("leader" or "follower") first,
44    /// then falls back to `ALLSOURCE_READ_ONLY` ("true" → follower).
45    /// Defaults to `Leader` if neither is set.
46    pub fn from_env() -> Self {
47        if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48            match role.to_lowercase().as_str() {
49                "follower" => return NodeRole::Follower,
50                "leader" => return NodeRole::Leader,
51                other => {
52                    tracing::warn!(
53                        "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54                        other
55                    );
56                    return NodeRole::Leader;
57                }
58            }
59        }
60        if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61            && (read_only == "true" || read_only == "1")
62        {
63            return NodeRole::Follower;
64        }
65        NodeRole::Leader
66    }
67
68    pub fn is_follower(self) -> bool {
69        self == NodeRole::Follower
70    }
71
72    fn to_u8(self) -> u8 {
73        match self {
74            NodeRole::Leader => 0,
75            NodeRole::Follower => 1,
76        }
77    }
78
79    fn from_u8(v: u8) -> Self {
80        match v {
81            1 => NodeRole::Follower,
82            _ => NodeRole::Leader,
83        }
84    }
85}
86
87impl std::fmt::Display for NodeRole {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            NodeRole::Leader => write!(f, "leader"),
91            NodeRole::Follower => write!(f, "follower"),
92        }
93    }
94}
95
96/// Thread-safe, runtime-mutable node role for failover support.
97///
98/// Wraps an `AtomicU8` so the read-only middleware and health endpoint
99/// always see the current role, even after a sentinel-triggered promotion.
100#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104    pub fn new(role: NodeRole) -> Self {
105        Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106    }
107
108    pub fn load(&self) -> NodeRole {
109        NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110    }
111
112    pub fn store(&self, role: NodeRole) {
113        self.0
114            .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115    }
116}
117
118/// Unified application state for all handlers
119#[derive(Clone)]
120pub struct AppState {
121    pub store: Arc<EventStore>,
122    pub auth_manager: Arc<AuthManager>,
123    pub tenant_manager: Arc<TenantManager>,
124    /// Service container for paywall domain use cases (Creator, Article, Payment, etc.)
125    pub service_container: ServiceContainer,
126    /// Node role for leader-follower replication (runtime-mutable for failover)
127    pub role: AtomicNodeRole,
128    /// WAL shipper for replication status reporting (leader only).
129    /// Wrapped in RwLock so a promoted follower can install a shipper at runtime.
130    pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131    /// WAL receiver for replication status reporting (follower only)
132    pub wal_receiver: Option<Arc<WalReceiver>>,
133    /// Replication port used by the WAL shipper (needed for runtime promotion)
134    pub replication_port: u16,
135    /// Cluster manager for multi-node consensus and membership (optional)
136    pub cluster_manager: Option<Arc<ClusterManager>>,
137    /// Geo-replication manager for cross-region replication (optional)
138    pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141// Enable extracting Arc<EventStore> from AppState
142// This allows handlers that expect State<Arc<EventStore>> to work with AppState
143impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144    fn from_ref(state: &AppState) -> Self {
145        state.store.clone()
146    }
147}
148
149pub async fn serve_v1(
150    store: Arc<EventStore>,
151    auth_manager: Arc<AuthManager>,
152    tenant_manager: Arc<TenantManager>,
153    rate_limiter: Arc<RateLimiter>,
154    service_container: ServiceContainer,
155    addr: &str,
156    role: NodeRole,
157    wal_shipper: Option<Arc<WalShipper>>,
158    wal_receiver: Option<Arc<WalReceiver>>,
159    replication_port: u16,
160    cluster_manager: Option<Arc<ClusterManager>>,
161    geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163    let app_state = AppState {
164        store,
165        auth_manager: auth_manager.clone(),
166        tenant_manager,
167        service_container,
168        role: AtomicNodeRole::new(role),
169        wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170        wal_receiver,
171        replication_port,
172        cluster_manager,
173        geo_replication,
174    };
175
176    let auth_state = AuthState {
177        auth_manager: auth_manager.clone(),
178    };
179
180    let rate_limit_state = RateLimitState { rate_limiter };
181
182    let app = Router::new()
183        // Public routes (no auth)
184        .route("/health", get(health_v1))
185        .route("/metrics", get(super::api::prometheus_metrics))
186        // Auth routes
187        .route("/api/v1/auth/register", post(register_handler))
188        .route("/api/v1/auth/login", post(login_handler))
189        .route("/api/v1/auth/me", get(me_handler))
190        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192        .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193        .route("/api/v1/auth/users", get(list_users_handler))
194        .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195        // Tenant routes (protected)
196        .route("/api/v1/tenants", post(create_tenant_handler))
197        .route("/api/v1/tenants", get(list_tenants_handler))
198        .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199        .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
200        .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
201        .route(
202            "/api/v1/tenants/{id}/deactivate",
203            post(deactivate_tenant_handler),
204        )
205        .route(
206            "/api/v1/tenants/{id}/activate",
207            post(activate_tenant_handler),
208        )
209        .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
210        // Audit endpoints (admin only)
211        .route("/api/v1/audit/events", post(log_audit_event))
212        .route("/api/v1/audit/events", get(query_audit_events))
213        // Config endpoints (admin only)
214        .route("/api/v1/config", get(list_configs))
215        .route("/api/v1/config", post(set_config))
216        .route("/api/v1/config/{key}", get(get_config))
217        .route("/api/v1/config/{key}", put(update_config))
218        .route("/api/v1/config/{key}", delete(delete_config))
219        // Event and data routes (protected by auth)
220        .route("/api/v1/events", post(super::api::ingest_event_v1))
221        .route(
222            "/api/v1/events/batch",
223            post(super::api::ingest_events_batch_v1),
224        )
225        .route("/api/v1/events/query", get(super::api::query_events))
226        .route(
227            "/api/v1/events/{event_id}",
228            get(super::api::get_event_by_id),
229        )
230        .route("/api/v1/events/stream", get(super::api::events_websocket))
231        .route(
232            "/api/v1/entities/{entity_id}/state",
233            get(super::api::get_entity_state),
234        )
235        .route(
236            "/api/v1/entities/{entity_id}/snapshot",
237            get(super::api::get_entity_snapshot),
238        )
239        .route("/api/v1/stats", get(super::api::get_stats))
240        // v0.10: Stream and event type discovery endpoints
241        .route("/api/v1/streams", get(super::api::list_streams))
242        .route("/api/v1/event-types", get(super::api::list_event_types))
243        // Analytics
244        .route(
245            "/api/v1/analytics/frequency",
246            get(super::api::analytics_frequency),
247        )
248        .route(
249            "/api/v1/analytics/summary",
250            get(super::api::analytics_summary),
251        )
252        .route(
253            "/api/v1/analytics/correlation",
254            get(super::api::analytics_correlation),
255        )
256        // Snapshots
257        .route("/api/v1/snapshots", post(super::api::create_snapshot))
258        .route("/api/v1/snapshots", get(super::api::list_snapshots))
259        .route(
260            "/api/v1/snapshots/{entity_id}/latest",
261            get(super::api::get_latest_snapshot),
262        )
263        // Compaction
264        .route(
265            "/api/v1/compaction/trigger",
266            post(super::api::trigger_compaction),
267        )
268        .route(
269            "/api/v1/compaction/stats",
270            get(super::api::compaction_stats),
271        )
272        // Schemas
273        .route("/api/v1/schemas", post(super::api::register_schema))
274        .route("/api/v1/schemas", get(super::api::list_subjects))
275        .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
276        .route(
277            "/api/v1/schemas/{subject}/versions",
278            get(super::api::list_schema_versions),
279        )
280        .route(
281            "/api/v1/schemas/validate",
282            post(super::api::validate_event_schema),
283        )
284        .route(
285            "/api/v1/schemas/{subject}/compatibility",
286            put(super::api::set_compatibility_mode),
287        )
288        // Replay
289        .route("/api/v1/replay", post(super::api::start_replay))
290        .route("/api/v1/replay", get(super::api::list_replays))
291        .route(
292            "/api/v1/replay/{replay_id}",
293            get(super::api::get_replay_progress),
294        )
295        .route(
296            "/api/v1/replay/{replay_id}/cancel",
297            post(super::api::cancel_replay),
298        )
299        .route(
300            "/api/v1/replay/{replay_id}",
301            delete(super::api::delete_replay),
302        )
303        // Pipelines
304        .route("/api/v1/pipelines", post(super::api::register_pipeline))
305        .route("/api/v1/pipelines", get(super::api::list_pipelines))
306        .route(
307            "/api/v1/pipelines/stats",
308            get(super::api::all_pipeline_stats),
309        )
310        .route(
311            "/api/v1/pipelines/{pipeline_id}",
312            get(super::api::get_pipeline),
313        )
314        .route(
315            "/api/v1/pipelines/{pipeline_id}",
316            delete(super::api::remove_pipeline),
317        )
318        .route(
319            "/api/v1/pipelines/{pipeline_id}/stats",
320            get(super::api::get_pipeline_stats),
321        )
322        .route(
323            "/api/v1/pipelines/{pipeline_id}/reset",
324            put(super::api::reset_pipeline),
325        )
326        // v0.7: Projection State API for Query Service integration
327        .route("/api/v1/projections", get(super::api::list_projections))
328        .route(
329            "/api/v1/projections/{name}",
330            get(super::api::get_projection),
331        )
332        .route(
333            "/api/v1/projections/{name}",
334            delete(super::api::delete_projection),
335        )
336        .route(
337            "/api/v1/projections/{name}/state",
338            get(super::api::get_projection_state_summary),
339        )
340        .route(
341            "/api/v1/projections/{name}/reset",
342            post(super::api::reset_projection),
343        )
344        .route(
345            "/api/v1/projections/{name}/{entity_id}/state",
346            get(super::api::get_projection_state),
347        )
348        .route(
349            "/api/v1/projections/{name}/{entity_id}/state",
350            post(super::api::save_projection_state),
351        )
352        .route(
353            "/api/v1/projections/{name}/{entity_id}/state",
354            put(super::api::save_projection_state),
355        )
356        .route(
357            "/api/v1/projections/{name}/bulk",
358            post(super::api::bulk_get_projection_states),
359        )
360        .route(
361            "/api/v1/projections/{name}/bulk/save",
362            post(super::api::bulk_save_projection_states),
363        )
364        // v0.11: Webhook management
365        .route("/api/v1/webhooks", post(super::api::register_webhook))
366        .route("/api/v1/webhooks", get(super::api::list_webhooks))
367        .route(
368            "/api/v1/webhooks/{webhook_id}",
369            get(super::api::get_webhook),
370        )
371        .route(
372            "/api/v1/webhooks/{webhook_id}",
373            put(super::api::update_webhook),
374        )
375        .route(
376            "/api/v1/webhooks/{webhook_id}",
377            delete(super::api::delete_webhook),
378        )
379        .route(
380            "/api/v1/webhooks/{webhook_id}/deliveries",
381            get(super::api::list_webhook_deliveries),
382        )
383        // v1.8: Cluster membership management API
384        .route("/api/v1/cluster/status", get(cluster_status_handler))
385        .route("/api/v1/cluster/members", get(cluster_list_members_handler))
386        .route("/api/v1/cluster/members", post(cluster_add_member_handler))
387        .route(
388            "/api/v1/cluster/members/{node_id}",
389            delete(cluster_remove_member_handler),
390        )
391        .route(
392            "/api/v1/cluster/members/{node_id}/heartbeat",
393            post(cluster_heartbeat_handler),
394        )
395        .route("/api/v1/cluster/vote", post(cluster_vote_handler))
396        .route("/api/v1/cluster/election", post(cluster_election_handler))
397        .route(
398            "/api/v1/cluster/partitions",
399            get(cluster_partitions_handler),
400        )
401        // v2.0: Advanced query features
402        .route("/api/v1/eventql", post(super::api::eventql_query))
403        .route("/api/v1/graphql", post(super::api::graphql_query))
404        .route("/api/v1/geospatial/query", post(super::api::geo_query))
405        .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
406        .route(
407            "/api/v1/exactly-once/stats",
408            get(super::api::exactly_once_stats),
409        )
410        .route(
411            "/api/v1/schema-evolution/history/{event_type}",
412            get(super::api::schema_evolution_history),
413        )
414        .route(
415            "/api/v1/schema-evolution/schema/{event_type}",
416            get(super::api::schema_evolution_schema),
417        )
418        .route(
419            "/api/v1/schema-evolution/stats",
420            get(super::api::schema_evolution_stats),
421        )
422        // v1.9: Geo-replication API
423        .route("/api/v1/geo/status", get(geo_status_handler))
424        .route("/api/v1/geo/sync", post(geo_sync_handler))
425        .route("/api/v1/geo/peers", get(geo_peers_handler))
426        .route("/api/v1/geo/failover", post(geo_failover_handler))
427        // Internal endpoints for sentinel-driven failover (not exposed publicly)
428        .route("/internal/promote", post(promote_handler))
429        .route("/internal/repoint", post(repoint_handler))
430        .with_state(app_state.clone())
431        // IMPORTANT: Middleware layers execute from bottom to top in Tower/Axum
432        // Read-only middleware runs after auth (applied before rate limit layer)
433        .layer(middleware::from_fn_with_state(
434            app_state,
435            read_only_middleware,
436        ))
437        .layer(middleware::from_fn_with_state(
438            rate_limit_state,
439            rate_limit_middleware,
440        ))
441        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
442        .layer(
443            CorsLayer::new()
444                .allow_origin(Any)
445                .allow_methods(Any)
446                .allow_headers(Any),
447        )
448        .layer(TraceLayer::new_for_http());
449
450    let listener = tokio::net::TcpListener::bind(addr).await?;
451
452    // Graceful shutdown on SIGTERM (required for serverless platforms)
453    axum::serve(listener, app)
454        .with_graceful_shutdown(shutdown_signal())
455        .await?;
456
457    tracing::info!("🛑 AllSource Core shutdown complete");
458    Ok(())
459}
460
461/// Write paths that should be rejected when running as a follower.
462const WRITE_PATHS: &[&str] = &[
463    "/api/v1/events",
464    "/api/v1/events/batch",
465    "/api/v1/snapshots",
466    "/api/v1/projections/",
467    "/api/v1/schemas",
468    "/api/v1/replay",
469    "/api/v1/pipelines",
470    "/api/v1/compaction/trigger",
471    "/api/v1/audit/events",
472    "/api/v1/config",
473    "/api/v1/webhooks",
474];
475
476/// Returns true if this request is a write operation that should be blocked on followers.
477fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
478    use axum::http::Method;
479    // Only POST/PUT/DELETE are writes
480    if method != Method::POST && method != Method::PUT && method != Method::DELETE {
481        return false;
482    }
483    WRITE_PATHS
484        .iter()
485        .any(|write_path| path.starts_with(write_path))
486}
487
488/// Returns true if the request targets an internal or cluster endpoint (not subject to read-only checks).
489fn is_internal_request(path: &str) -> bool {
490    path.starts_with("/internal/")
491        || path.starts_with("/api/v1/cluster/")
492        || path.starts_with("/api/v1/geo/")
493}
494
495/// Middleware that rejects write requests when the node is a follower.
496///
497/// Returns HTTP 409 Conflict with `{"error": "read_only", "message": "..."}`.
498/// Internal endpoints (`/internal/*`) are exempt — they are used by the sentinel
499/// to trigger promotion and repointing during failover.
500async fn read_only_middleware(
501    State(state): State<AppState>,
502    request: axum::extract::Request,
503    next: axum::middleware::Next,
504) -> axum::response::Response {
505    let path = request.uri().path();
506    if state.role.load().is_follower()
507        && is_write_request(request.method(), path)
508        && !is_internal_request(path)
509    {
510        return (
511            axum::http::StatusCode::CONFLICT,
512            axum::Json(serde_json::json!({
513                "error": "read_only",
514                "message": "This node is a read-only follower"
515            })),
516        )
517            .into_response();
518    }
519    next.run(request).await
520}
521
522/// Health endpoint with system stream health reporting.
523///
524/// Reports overall health plus detailed system metadata health when
525/// event-sourced system repositories are configured.
526async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
527    let has_system_repos = state.service_container.has_system_repositories();
528
529    let system_streams = if has_system_repos {
530        let (tenant_count, config_count, total_events) =
531            if let Some(store) = state.service_container.system_store() {
532                use crate::domain::value_objects::system_stream::SystemDomain;
533                (
534                    store.count_stream(SystemDomain::Tenant),
535                    store.count_stream(SystemDomain::Config),
536                    store.total_events(),
537                )
538            } else {
539                (0, 0, 0)
540            };
541
542        serde_json::json!({
543            "status": "healthy",
544            "mode": "event-sourced",
545            "total_events": total_events,
546            "tenant_events": tenant_count,
547            "config_events": config_count,
548        })
549    } else {
550        serde_json::json!({
551            "status": "disabled",
552            "mode": "in-memory",
553        })
554    };
555
556    let replication = {
557        let shipper_guard = state.wal_shipper.read().await;
558        if let Some(ref shipper) = *shipper_guard {
559            serde_json::to_value(shipper.status()).unwrap_or_default()
560        } else if let Some(ref receiver) = state.wal_receiver {
561            serde_json::to_value(receiver.status()).unwrap_or_default()
562        } else {
563            serde_json::json!(null)
564        }
565    };
566
567    let current_role = state.role.load();
568
569    Json(serde_json::json!({
570        "status": "healthy",
571        "service": "allsource-core",
572        "version": env!("CARGO_PKG_VERSION"),
573        "role": current_role,
574        "system_streams": system_streams,
575        "replication": replication,
576    }))
577}
578
579/// POST /internal/promote — Promote this follower to leader.
580///
581/// Called by the sentinel process during automated failover.
582/// Switches the node role to leader, stops WAL receiving, and starts
583/// a WAL shipper on the replication port so other followers can connect.
584async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
585    let current_role = state.role.load();
586    if current_role == NodeRole::Leader {
587        return (
588            axum::http::StatusCode::OK,
589            Json(serde_json::json!({
590                "status": "already_leader",
591                "message": "This node is already the leader",
592            })),
593        );
594    }
595
596    tracing::info!("PROMOTE: Switching role from follower to leader");
597
598    // 1. Switch role — the read-only middleware will immediately start accepting writes
599    state.role.store(NodeRole::Leader);
600
601    // 2. Signal the WAL receiver to stop (it will stop reconnecting)
602    if let Some(ref receiver) = state.wal_receiver {
603        receiver.shutdown();
604        tracing::info!("PROMOTE: WAL receiver shutdown signalled");
605    }
606
607    // 3. Start a new WAL shipper so remaining followers can connect
608    let replication_port = state.replication_port;
609    let (mut shipper, tx) = WalShipper::new();
610    state.store.enable_wal_replication(tx);
611    shipper.set_store(Arc::clone(&state.store));
612    shipper.set_metrics(state.store.metrics());
613    let shipper = Arc::new(shipper);
614
615    // Install into AppState so health endpoint reports shipper status
616    {
617        let mut shipper_guard = state.wal_shipper.write().await;
618        *shipper_guard = Some(Arc::clone(&shipper));
619    }
620
621    // Spawn the shipper server
622    let shipper_clone = Arc::clone(&shipper);
623    tokio::spawn(async move {
624        if let Err(e) = shipper_clone.serve(replication_port).await {
625            tracing::error!("Promoted WAL shipper error: {}", e);
626        }
627    });
628
629    tracing::info!(
630        "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
631        replication_port,
632    );
633
634    (
635        axum::http::StatusCode::OK,
636        Json(serde_json::json!({
637            "status": "promoted",
638            "role": "leader",
639            "replication_port": replication_port,
640        })),
641    )
642}
643
644/// POST /internal/repoint?leader=host:port — Switch replication target.
645///
646/// Called by the sentinel process to tell a follower to disconnect from
647/// the old leader and reconnect to a newly promoted leader.
648async fn repoint_handler(
649    State(state): State<AppState>,
650    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
651) -> impl IntoResponse {
652    let current_role = state.role.load();
653    if current_role != NodeRole::Follower {
654        return (
655            axum::http::StatusCode::CONFLICT,
656            Json(serde_json::json!({
657                "error": "not_follower",
658                "message": "Repoint only applies to follower nodes",
659            })),
660        );
661    }
662
663    let new_leader = match params.get("leader") {
664        Some(l) if !l.is_empty() => l.clone(),
665        _ => {
666            return (
667                axum::http::StatusCode::BAD_REQUEST,
668                Json(serde_json::json!({
669                    "error": "missing_leader",
670                    "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
671                })),
672            );
673        }
674    };
675
676    tracing::info!("REPOINT: Switching replication target to {}", new_leader);
677
678    if let Some(ref receiver) = state.wal_receiver {
679        receiver.repoint(&new_leader);
680        tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
681    } else {
682        tracing::warn!("REPOINT: No WAL receiver to repoint");
683    }
684
685    (
686        axum::http::StatusCode::OK,
687        Json(serde_json::json!({
688            "status": "repointed",
689            "new_leader": new_leader,
690        })),
691    )
692}
693
694// =============================================================================
695// Cluster Management Handlers (v1.8)
696// =============================================================================
697
698/// GET /api/v1/cluster/status — Get cluster status including term, leader, and members
699async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
700    let Some(ref cm) = state.cluster_manager else {
701        return (
702            axum::http::StatusCode::SERVICE_UNAVAILABLE,
703            Json(serde_json::json!({
704                "error": "cluster_not_enabled",
705                "message": "Cluster mode is not enabled on this node"
706            })),
707        );
708    };
709
710    let status = cm.status().await;
711    (
712        axum::http::StatusCode::OK,
713        Json(serde_json::to_value(status).unwrap()),
714    )
715}
716
717/// GET /api/v1/cluster/members — List all cluster members
718async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
719    let Some(ref cm) = state.cluster_manager else {
720        return (
721            axum::http::StatusCode::SERVICE_UNAVAILABLE,
722            Json(serde_json::json!({
723                "error": "cluster_not_enabled",
724                "message": "Cluster mode is not enabled on this node"
725            })),
726        );
727    };
728
729    let members = cm.all_members();
730    (
731        axum::http::StatusCode::OK,
732        Json(serde_json::json!({
733            "members": members,
734            "count": members.len(),
735        })),
736    )
737}
738
739/// POST /api/v1/cluster/members — Add a member to the cluster
740async fn cluster_add_member_handler(
741    State(state): State<AppState>,
742    Json(member): Json<ClusterMember>,
743) -> impl IntoResponse {
744    let Some(ref cm) = state.cluster_manager else {
745        return (
746            axum::http::StatusCode::SERVICE_UNAVAILABLE,
747            Json(serde_json::json!({
748                "error": "cluster_not_enabled",
749                "message": "Cluster mode is not enabled on this node"
750            })),
751        );
752    };
753
754    let node_id = member.node_id;
755    cm.add_member(member).await;
756
757    tracing::info!("Cluster member {} added", node_id);
758    (
759        axum::http::StatusCode::OK,
760        Json(serde_json::json!({
761            "status": "added",
762            "node_id": node_id,
763        })),
764    )
765}
766
767/// DELETE /api/v1/cluster/members/{node_id} — Remove a member from the cluster
768async fn cluster_remove_member_handler(
769    State(state): State<AppState>,
770    Path(node_id): Path<u32>,
771) -> impl IntoResponse {
772    let Some(ref cm) = state.cluster_manager else {
773        return (
774            axum::http::StatusCode::SERVICE_UNAVAILABLE,
775            Json(serde_json::json!({
776                "error": "cluster_not_enabled",
777                "message": "Cluster mode is not enabled on this node"
778            })),
779        );
780    };
781
782    match cm.remove_member(node_id).await {
783        Some(_) => {
784            tracing::info!("Cluster member {} removed", node_id);
785            (
786                axum::http::StatusCode::OK,
787                Json(serde_json::json!({
788                    "status": "removed",
789                    "node_id": node_id,
790                })),
791            )
792        }
793        None => (
794            axum::http::StatusCode::NOT_FOUND,
795            Json(serde_json::json!({
796                "error": "not_found",
797                "message": format!("Node {} not found in cluster", node_id),
798            })),
799        ),
800    }
801}
802
803/// POST /api/v1/cluster/members/{node_id}/heartbeat — Update member heartbeat
804#[derive(serde::Deserialize)]
805struct HeartbeatRequest {
806    wal_offset: u64,
807    #[serde(default = "default_true")]
808    healthy: bool,
809}
810
811fn default_true() -> bool {
812    true
813}
814
815async fn cluster_heartbeat_handler(
816    State(state): State<AppState>,
817    Path(node_id): Path<u32>,
818    Json(req): Json<HeartbeatRequest>,
819) -> impl IntoResponse {
820    let Some(ref cm) = state.cluster_manager else {
821        return (
822            axum::http::StatusCode::SERVICE_UNAVAILABLE,
823            Json(serde_json::json!({
824                "error": "cluster_not_enabled",
825                "message": "Cluster mode is not enabled on this node"
826            })),
827        );
828    };
829
830    cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
831    (
832        axum::http::StatusCode::OK,
833        Json(serde_json::json!({
834            "status": "updated",
835            "node_id": node_id,
836        })),
837    )
838}
839
840/// POST /api/v1/cluster/vote — Handle a vote request (simplified Raft RequestVote)
841async fn cluster_vote_handler(
842    State(state): State<AppState>,
843    Json(request): Json<VoteRequest>,
844) -> impl IntoResponse {
845    let Some(ref cm) = state.cluster_manager else {
846        return (
847            axum::http::StatusCode::SERVICE_UNAVAILABLE,
848            Json(serde_json::json!({
849                "error": "cluster_not_enabled",
850                "message": "Cluster mode is not enabled on this node"
851            })),
852        );
853    };
854
855    let response = cm.handle_vote_request(&request).await;
856    (
857        axum::http::StatusCode::OK,
858        Json(serde_json::to_value(response).unwrap()),
859    )
860}
861
862/// POST /api/v1/cluster/election — Trigger a leader election (manual failover)
863async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
864    let Some(ref cm) = state.cluster_manager else {
865        return (
866            axum::http::StatusCode::SERVICE_UNAVAILABLE,
867            Json(serde_json::json!({
868                "error": "cluster_not_enabled",
869                "message": "Cluster mode is not enabled on this node"
870            })),
871        );
872    };
873
874    // Select best candidate deterministically
875    let candidate = cm.select_leader_candidate();
876
877    match candidate {
878        Some(candidate_id) => {
879            let new_term = cm.start_election().await;
880            tracing::info!(
881                "Cluster election started: term={}, candidate={}",
882                new_term,
883                candidate_id,
884            );
885
886            // If this node is the candidate, become leader immediately
887            // (In a full Raft, we'd collect votes from a majority first)
888            if candidate_id == cm.self_id() {
889                cm.become_leader(new_term).await;
890                tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
891            }
892
893            (
894                axum::http::StatusCode::OK,
895                Json(serde_json::json!({
896                    "status": "election_started",
897                    "term": new_term,
898                    "candidate_id": candidate_id,
899                    "self_is_leader": candidate_id == cm.self_id(),
900                })),
901            )
902        }
903        None => (
904            axum::http::StatusCode::CONFLICT,
905            Json(serde_json::json!({
906                "error": "no_candidates",
907                "message": "No healthy members available for leader election",
908            })),
909        ),
910    }
911}
912
913/// GET /api/v1/cluster/partitions — Get partition distribution across nodes
914async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
915    let Some(ref cm) = state.cluster_manager else {
916        return (
917            axum::http::StatusCode::SERVICE_UNAVAILABLE,
918            Json(serde_json::json!({
919                "error": "cluster_not_enabled",
920                "message": "Cluster mode is not enabled on this node"
921            })),
922        );
923    };
924
925    let registry = cm.registry();
926    let distribution = registry.partition_distribution();
927    let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
928
929    (
930        axum::http::StatusCode::OK,
931        Json(serde_json::json!({
932            "total_partitions": total_partitions,
933            "node_count": registry.node_count(),
934            "healthy_node_count": registry.healthy_node_count(),
935            "distribution": distribution,
936        })),
937    )
938}
939
940// =============================================================================
941// Geo-Replication Handlers (v1.9)
942// =============================================================================
943
944/// GET /api/v1/geo/status — Get geo-replication status
945async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
946    let Some(ref geo) = state.geo_replication else {
947        return (
948            axum::http::StatusCode::SERVICE_UNAVAILABLE,
949            Json(serde_json::json!({
950                "error": "geo_replication_not_enabled",
951                "message": "Geo-replication is not enabled on this node"
952            })),
953        );
954    };
955
956    let status = geo.status();
957    (
958        axum::http::StatusCode::OK,
959        Json(serde_json::to_value(status).unwrap()),
960    )
961}
962
963/// POST /api/v1/geo/sync — Receive replicated events from a peer region
964async fn geo_sync_handler(
965    State(state): State<AppState>,
966    Json(request): Json<GeoSyncRequest>,
967) -> impl IntoResponse {
968    let Some(ref geo) = state.geo_replication else {
969        return (
970            axum::http::StatusCode::SERVICE_UNAVAILABLE,
971            Json(serde_json::json!({
972                "error": "geo_replication_not_enabled",
973                "message": "Geo-replication is not enabled on this node"
974            })),
975        );
976    };
977
978    tracing::info!(
979        "Geo-sync received from region '{}': {} events",
980        request.source_region,
981        request.events.len(),
982    );
983
984    let response = geo.receive_sync(&request);
985    (
986        axum::http::StatusCode::OK,
987        Json(serde_json::to_value(response).unwrap()),
988    )
989}
990
991/// GET /api/v1/geo/peers — List peer regions and their health
992async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
993    let Some(ref geo) = state.geo_replication else {
994        return (
995            axum::http::StatusCode::SERVICE_UNAVAILABLE,
996            Json(serde_json::json!({
997                "error": "geo_replication_not_enabled",
998                "message": "Geo-replication is not enabled on this node"
999            })),
1000        );
1001    };
1002
1003    let status = geo.status();
1004    (
1005        axum::http::StatusCode::OK,
1006        Json(serde_json::json!({
1007            "region_id": status.region_id,
1008            "peers": status.peers,
1009        })),
1010    )
1011}
1012
1013/// POST /api/v1/geo/failover — Trigger regional failover
1014async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1015    let Some(ref geo) = state.geo_replication else {
1016        return (
1017            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1018            Json(serde_json::json!({
1019                "error": "geo_replication_not_enabled",
1020                "message": "Geo-replication is not enabled on this node"
1021            })),
1022        );
1023    };
1024
1025    match geo.select_failover_region() {
1026        Some(failover_region) => {
1027            tracing::info!(
1028                "Geo-failover: selected region '{}' as failover target",
1029                failover_region,
1030            );
1031            (
1032                axum::http::StatusCode::OK,
1033                Json(serde_json::json!({
1034                    "status": "failover_target_selected",
1035                    "failover_region": failover_region,
1036                    "message": "Region selected for failover. DNS/routing update required externally.",
1037                })),
1038            )
1039        }
1040        None => (
1041            axum::http::StatusCode::CONFLICT,
1042            Json(serde_json::json!({
1043                "error": "no_healthy_peers",
1044                "message": "No healthy peer regions available for failover",
1045            })),
1046        ),
1047    }
1048}
1049
1050/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
1051async fn shutdown_signal() {
1052    let ctrl_c = async {
1053        tokio::signal::ctrl_c()
1054            .await
1055            .expect("failed to install Ctrl+C handler");
1056    };
1057
1058    #[cfg(unix)]
1059    let terminate = async {
1060        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1061            .expect("failed to install SIGTERM handler")
1062            .recv()
1063            .await;
1064    };
1065
1066    #[cfg(not(unix))]
1067    let terminate = std::future::pending::<()>();
1068
1069    tokio::select! {
1070        _ = ctrl_c => {
1071            tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
1072        }
1073        _ = terminate => {
1074            tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
1075        }
1076    }
1077}