Skip to main content

allsource_core/infrastructure/web/
api_v1.rs

1/// v1.0 API router with authentication and multi-tenancy
2use crate::infrastructure::di::ServiceContainer;
3use crate::{
4    domain::repositories::TenantRepository,
5    infrastructure::{
6        cluster::{
7            ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8        },
9        replication::{WalReceiver, WalShipper},
10        security::{
11            auth::AuthManager,
12            middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13            rate_limit::RateLimiter,
14        },
15        web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16    },
17    store::EventStore,
18};
19use axum::{
20    Json, Router,
21    extract::{Path, State},
22    middleware,
23    response::IntoResponse,
24    routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28    cors::{Any, CorsLayer},
29    trace::TraceLayer,
30};
31
32/// Node role for leader-follower replication
33#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36    Leader,
37    Follower,
38}
39
40impl NodeRole {
41    /// Detect role from environment variables.
42    ///
43    /// Checks `ALLSOURCE_ROLE` ("leader" or "follower") first,
44    /// then falls back to `ALLSOURCE_READ_ONLY` ("true" β†’ follower).
45    /// Defaults to `Leader` if neither is set.
46    pub fn from_env() -> Self {
47        if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48            match role.to_lowercase().as_str() {
49                "follower" => return NodeRole::Follower,
50                "leader" => return NodeRole::Leader,
51                other => {
52                    tracing::warn!(
53                        "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54                        other
55                    );
56                    return NodeRole::Leader;
57                }
58            }
59        }
60        if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61            && (read_only == "true" || read_only == "1")
62        {
63            return NodeRole::Follower;
64        }
65        NodeRole::Leader
66    }
67
68    pub fn is_follower(self) -> bool {
69        self == NodeRole::Follower
70    }
71
72    fn to_u8(self) -> u8 {
73        match self {
74            NodeRole::Leader => 0,
75            NodeRole::Follower => 1,
76        }
77    }
78
79    fn from_u8(v: u8) -> Self {
80        match v {
81            1 => NodeRole::Follower,
82            _ => NodeRole::Leader,
83        }
84    }
85}
86
87impl std::fmt::Display for NodeRole {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            NodeRole::Leader => write!(f, "leader"),
91            NodeRole::Follower => write!(f, "follower"),
92        }
93    }
94}
95
96/// Thread-safe, runtime-mutable node role for failover support.
97///
98/// Wraps an `AtomicU8` so the read-only middleware and health endpoint
99/// always see the current role, even after a sentinel-triggered promotion.
100#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104    pub fn new(role: NodeRole) -> Self {
105        Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106    }
107
108    pub fn load(&self) -> NodeRole {
109        NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110    }
111
112    pub fn store(&self, role: NodeRole) {
113        self.0
114            .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115    }
116}
117
118/// Unified application state for all handlers
119#[derive(Clone)]
120pub struct AppState {
121    pub store: Arc<EventStore>,
122    pub auth_manager: Arc<AuthManager>,
123    pub tenant_repo: Arc<dyn TenantRepository>,
124    /// Service container for paywall domain use cases (Creator, Article, Payment, etc.)
125    pub service_container: ServiceContainer,
126    /// Node role for leader-follower replication (runtime-mutable for failover)
127    pub role: AtomicNodeRole,
128    /// WAL shipper for replication status reporting (leader only).
129    /// Wrapped in RwLock so a promoted follower can install a shipper at runtime.
130    pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131    /// WAL receiver for replication status reporting (follower only)
132    pub wal_receiver: Option<Arc<WalReceiver>>,
133    /// Replication port used by the WAL shipper (needed for runtime promotion)
134    pub replication_port: u16,
135    /// Cluster manager for multi-node consensus and membership (optional)
136    pub cluster_manager: Option<Arc<ClusterManager>>,
137    /// Geo-replication manager for cross-region replication (optional)
138    pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141// Enable extracting Arc<EventStore> from AppState
142// This allows handlers that expect State<Arc<EventStore>> to work with AppState
143impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144    fn from_ref(state: &AppState) -> Self {
145        state.store.clone()
146    }
147}
148
149pub async fn serve_v1(
150    store: Arc<EventStore>,
151    auth_manager: Arc<AuthManager>,
152    tenant_repo: Arc<dyn TenantRepository>,
153    rate_limiter: Arc<RateLimiter>,
154    service_container: ServiceContainer,
155    addr: &str,
156    role: NodeRole,
157    wal_shipper: Option<Arc<WalShipper>>,
158    wal_receiver: Option<Arc<WalReceiver>>,
159    replication_port: u16,
160    cluster_manager: Option<Arc<ClusterManager>>,
161    geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163    let app_state = AppState {
164        store,
165        auth_manager: auth_manager.clone(),
166        tenant_repo,
167        service_container,
168        role: AtomicNodeRole::new(role),
169        wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170        wal_receiver,
171        replication_port,
172        cluster_manager,
173        geo_replication,
174    };
175
176    let auth_state = AuthState {
177        auth_manager: auth_manager.clone(),
178    };
179
180    let rate_limit_state = RateLimitState { rate_limiter };
181
182    let app = Router::new()
183        // Public routes (no auth)
184        .route("/health", get(health_v1))
185        .route("/metrics", get(super::api::prometheus_metrics))
186        // Auth routes
187        .route("/api/v1/auth/register", post(register_handler))
188        .route("/api/v1/auth/login", post(login_handler))
189        .route("/api/v1/auth/me", get(me_handler))
190        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192        .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193        .route("/api/v1/auth/users", get(list_users_handler))
194        .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195        // Tenant routes (protected)
196        .route("/api/v1/tenants", post(create_tenant_handler))
197        .route("/api/v1/tenants", get(list_tenants_handler))
198        .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199        .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200        .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201        .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202        .route(
203            "/api/v1/tenants/{id}/deactivate",
204            post(deactivate_tenant_handler),
205        )
206        .route(
207            "/api/v1/tenants/{id}/activate",
208            post(activate_tenant_handler),
209        )
210        .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211        // Audit endpoints (admin only)
212        .route("/api/v1/audit/events", post(log_audit_event))
213        .route("/api/v1/audit/events", get(query_audit_events))
214        // Config endpoints (admin only)
215        .route("/api/v1/config", get(list_configs))
216        .route("/api/v1/config", post(set_config))
217        .route("/api/v1/config/{key}", get(get_config))
218        .route("/api/v1/config/{key}", put(update_config))
219        .route("/api/v1/config/{key}", delete(delete_config))
220        // Demo seeding
221        .route(
222            "/api/v1/demo/seed",
223            post(super::demo_api::demo_seed_handler),
224        )
225        // Event and data routes (protected by auth)
226        .route("/api/v1/events", post(super::api::ingest_event_v1))
227        .route(
228            "/api/v1/events/batch",
229            post(super::api::ingest_events_batch_v1),
230        )
231        .route("/api/v1/events/query", get(super::api::query_events))
232        .route(
233            "/api/v1/events/{event_id}",
234            get(super::api::get_event_by_id),
235        )
236        .route("/api/v1/events/stream", get(super::api::events_websocket))
237        .route("/api/v1/entities", get(super::api::list_entities))
238        .route(
239            "/api/v1/entities/duplicates",
240            get(super::api::detect_duplicates),
241        )
242        .route(
243            "/api/v1/entities/{entity_id}/state",
244            get(super::api::get_entity_state),
245        )
246        .route(
247            "/api/v1/entities/{entity_id}/snapshot",
248            get(super::api::get_entity_snapshot),
249        )
250        .route("/api/v1/stats", get(super::api::get_stats))
251        // v0.10: Stream and event type discovery endpoints
252        .route("/api/v1/streams", get(super::api::list_streams))
253        .route("/api/v1/event-types", get(super::api::list_event_types))
254        // Analytics
255        .route(
256            "/api/v1/analytics/frequency",
257            get(super::api::analytics_frequency),
258        )
259        .route(
260            "/api/v1/analytics/summary",
261            get(super::api::analytics_summary),
262        )
263        .route(
264            "/api/v1/analytics/correlation",
265            get(super::api::analytics_correlation),
266        )
267        // Snapshots
268        .route("/api/v1/snapshots", post(super::api::create_snapshot))
269        .route("/api/v1/snapshots", get(super::api::list_snapshots))
270        .route(
271            "/api/v1/snapshots/{entity_id}/latest",
272            get(super::api::get_latest_snapshot),
273        )
274        // Compaction
275        .route(
276            "/api/v1/compaction/trigger",
277            post(super::api::trigger_compaction),
278        )
279        .route(
280            "/api/v1/compaction/stats",
281            get(super::api::compaction_stats),
282        )
283        // Schemas
284        .route("/api/v1/schemas", post(super::api::register_schema))
285        .route("/api/v1/schemas", get(super::api::list_subjects))
286        .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287        .route(
288            "/api/v1/schemas/{subject}/versions",
289            get(super::api::list_schema_versions),
290        )
291        .route(
292            "/api/v1/schemas/validate",
293            post(super::api::validate_event_schema),
294        )
295        .route(
296            "/api/v1/schemas/{subject}/compatibility",
297            put(super::api::set_compatibility_mode),
298        )
299        // Replay
300        .route("/api/v1/replay", post(super::api::start_replay))
301        .route("/api/v1/replay", get(super::api::list_replays))
302        .route(
303            "/api/v1/replay/{replay_id}",
304            get(super::api::get_replay_progress),
305        )
306        .route(
307            "/api/v1/replay/{replay_id}/cancel",
308            post(super::api::cancel_replay),
309        )
310        .route(
311            "/api/v1/replay/{replay_id}",
312            delete(super::api::delete_replay),
313        )
314        // Pipelines
315        .route("/api/v1/pipelines", post(super::api::register_pipeline))
316        .route("/api/v1/pipelines", get(super::api::list_pipelines))
317        .route(
318            "/api/v1/pipelines/stats",
319            get(super::api::all_pipeline_stats),
320        )
321        .route(
322            "/api/v1/pipelines/{pipeline_id}",
323            get(super::api::get_pipeline),
324        )
325        .route(
326            "/api/v1/pipelines/{pipeline_id}",
327            delete(super::api::remove_pipeline),
328        )
329        .route(
330            "/api/v1/pipelines/{pipeline_id}/stats",
331            get(super::api::get_pipeline_stats),
332        )
333        .route(
334            "/api/v1/pipelines/{pipeline_id}/reset",
335            put(super::api::reset_pipeline),
336        )
337        // v0.7: Projection State API for Query Service integration
338        .route("/api/v1/projections", get(super::api::list_projections))
339        .route(
340            "/api/v1/projections/{name}",
341            get(super::api::get_projection),
342        )
343        .route(
344            "/api/v1/projections/{name}",
345            delete(super::api::delete_projection),
346        )
347        .route(
348            "/api/v1/projections/{name}/state",
349            get(super::api::get_projection_state_summary),
350        )
351        .route(
352            "/api/v1/projections/{name}/reset",
353            post(super::api::reset_projection),
354        )
355        .route(
356            "/api/v1/projections/{name}/pause",
357            post(super::api::pause_projection),
358        )
359        .route(
360            "/api/v1/projections/{name}/start",
361            post(super::api::start_projection),
362        )
363        .route(
364            "/api/v1/projections/{name}/{entity_id}/state",
365            get(super::api::get_projection_state),
366        )
367        .route(
368            "/api/v1/projections/{name}/{entity_id}/state",
369            post(super::api::save_projection_state),
370        )
371        .route(
372            "/api/v1/projections/{name}/{entity_id}/state",
373            put(super::api::save_projection_state),
374        )
375        .route(
376            "/api/v1/projections/{name}/bulk",
377            post(super::api::bulk_get_projection_states),
378        )
379        .route(
380            "/api/v1/projections/{name}/bulk/save",
381            post(super::api::bulk_save_projection_states),
382        )
383        // v0.11: Webhook management
384        .route("/api/v1/webhooks", post(super::api::register_webhook))
385        .route("/api/v1/webhooks", get(super::api::list_webhooks))
386        .route(
387            "/api/v1/webhooks/{webhook_id}",
388            get(super::api::get_webhook),
389        )
390        .route(
391            "/api/v1/webhooks/{webhook_id}",
392            put(super::api::update_webhook),
393        )
394        .route(
395            "/api/v1/webhooks/{webhook_id}",
396            delete(super::api::delete_webhook),
397        )
398        .route(
399            "/api/v1/webhooks/{webhook_id}/deliveries",
400            get(super::api::list_webhook_deliveries),
401        )
402        // v0.14: Durable consumer subscriptions
403        .route("/api/v1/consumers", post(super::api::register_consumer))
404        .route(
405            "/api/v1/consumers/{consumer_id}",
406            get(super::api::get_consumer),
407        )
408        .route(
409            "/api/v1/consumers/{consumer_id}/events",
410            get(super::api::poll_consumer_events),
411        )
412        .route(
413            "/api/v1/consumers/{consumer_id}/ack",
414            post(super::api::ack_consumer),
415        )
416        // v1.8: Cluster membership management API
417        .route("/api/v1/cluster/status", get(cluster_status_handler))
418        .route("/api/v1/cluster/members", get(cluster_list_members_handler))
419        .route("/api/v1/cluster/members", post(cluster_add_member_handler))
420        .route(
421            "/api/v1/cluster/members/{node_id}",
422            delete(cluster_remove_member_handler),
423        )
424        .route(
425            "/api/v1/cluster/members/{node_id}/heartbeat",
426            post(cluster_heartbeat_handler),
427        )
428        .route("/api/v1/cluster/vote", post(cluster_vote_handler))
429        .route("/api/v1/cluster/election", post(cluster_election_handler))
430        .route(
431            "/api/v1/cluster/partitions",
432            get(cluster_partitions_handler),
433        )
434        // v2.0: Advanced query features
435        .route("/api/v1/graphql", post(super::api::graphql_query))
436        .route("/api/v1/geospatial/query", post(super::api::geo_query))
437        .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
438        .route(
439            "/api/v1/exactly-once/stats",
440            get(super::api::exactly_once_stats),
441        )
442        .route(
443            "/api/v1/schema-evolution/history/{event_type}",
444            get(super::api::schema_evolution_history),
445        )
446        .route(
447            "/api/v1/schema-evolution/schema/{event_type}",
448            get(super::api::schema_evolution_schema),
449        )
450        .route(
451            "/api/v1/schema-evolution/stats",
452            get(super::api::schema_evolution_stats),
453        )
454        // v1.9: Geo-replication API
455        .route("/api/v1/geo/status", get(geo_status_handler))
456        .route("/api/v1/geo/sync", post(geo_sync_handler))
457        .route("/api/v1/geo/peers", get(geo_peers_handler))
458        .route("/api/v1/geo/failover", post(geo_failover_handler))
459        // Internal endpoints for sentinel-driven failover (not exposed publicly)
460        .route("/internal/promote", post(promote_handler))
461        .route("/internal/repoint", post(repoint_handler));
462
463    // v0.11: Bidirectional sync protocol (embedded↔server)
464    #[cfg(feature = "embedded-sync")]
465    let app = app
466        .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
467        .route("/api/v1/sync/push", post(super::api::sync_push_handler));
468
469    let app = app
470        .with_state(app_state.clone())
471        // IMPORTANT: Middleware layers execute from bottom to top in Tower/Axum
472        // Read-only middleware runs after auth (applied before rate limit layer)
473        .layer(middleware::from_fn_with_state(
474            app_state,
475            read_only_middleware,
476        ))
477        .layer(middleware::from_fn_with_state(
478            rate_limit_state,
479            rate_limit_middleware,
480        ))
481        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
482        .layer(
483            CorsLayer::new()
484                .allow_origin(Any)
485                .allow_methods(Any)
486                .allow_headers(Any),
487        )
488        .layer(TraceLayer::new_for_http());
489
490    let listener = tokio::net::TcpListener::bind(addr).await?;
491
492    // Graceful shutdown on SIGTERM (required for serverless platforms)
493    axum::serve(listener, app)
494        .with_graceful_shutdown(shutdown_signal())
495        .await?;
496
497    tracing::info!("πŸ›‘ AllSource Core shutdown complete");
498    Ok(())
499}
500
501/// Write paths that should be rejected when running as a follower.
502const WRITE_PATHS: &[&str] = &[
503    "/api/v1/events",
504    "/api/v1/events/batch",
505    "/api/v1/snapshots",
506    "/api/v1/projections/",
507    "/api/v1/schemas",
508    "/api/v1/replay",
509    "/api/v1/pipelines",
510    "/api/v1/compaction/trigger",
511    "/api/v1/audit/events",
512    "/api/v1/config",
513    "/api/v1/webhooks",
514    "/api/v1/demo/seed",
515];
516
517/// Returns true if this request is a write operation that should be blocked on followers.
518fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
519    use axum::http::Method;
520    // Only POST/PUT/DELETE are writes
521    if method != Method::POST && method != Method::PUT && method != Method::DELETE {
522        return false;
523    }
524    WRITE_PATHS
525        .iter()
526        .any(|write_path| path.starts_with(write_path))
527}
528
529/// Returns true if the request targets an internal or cluster endpoint (not subject to read-only checks).
530fn is_internal_request(path: &str) -> bool {
531    path.starts_with("/internal/")
532        || path.starts_with("/api/v1/cluster/")
533        || path.starts_with("/api/v1/geo/")
534}
535
536/// Middleware that rejects write requests when the node is a follower.
537///
538/// Returns HTTP 409 Conflict with `{"error": "read_only", "message": "..."}`.
539/// Internal endpoints (`/internal/*`) are exempt β€” they are used by the sentinel
540/// to trigger promotion and repointing during failover.
541async fn read_only_middleware(
542    State(state): State<AppState>,
543    request: axum::extract::Request,
544    next: axum::middleware::Next,
545) -> axum::response::Response {
546    let path = request.uri().path();
547    if state.role.load().is_follower()
548        && is_write_request(request.method(), path)
549        && !is_internal_request(path)
550    {
551        return (
552            axum::http::StatusCode::CONFLICT,
553            axum::Json(serde_json::json!({
554                "error": "read_only",
555                "message": "This node is a read-only follower"
556            })),
557        )
558            .into_response();
559    }
560    next.run(request).await
561}
562
563/// Health endpoint with system stream health reporting.
564///
565/// Reports overall health plus detailed system metadata health when
566/// event-sourced system repositories are configured.
567async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
568    let has_system_repos = state.service_container.has_system_repositories();
569
570    let system_streams = if has_system_repos {
571        let (tenant_count, config_count, total_events) =
572            if let Some(store) = state.service_container.system_store() {
573                use crate::domain::value_objects::system_stream::SystemDomain;
574                (
575                    store.count_stream(SystemDomain::Tenant),
576                    store.count_stream(SystemDomain::Config),
577                    store.total_events(),
578                )
579            } else {
580                (0, 0, 0)
581            };
582
583        serde_json::json!({
584            "status": "healthy",
585            "mode": "event-sourced",
586            "total_events": total_events,
587            "tenant_events": tenant_count,
588            "config_events": config_count,
589        })
590    } else {
591        serde_json::json!({
592            "status": "disabled",
593            "mode": "in-memory",
594        })
595    };
596
597    let replication = {
598        let shipper_guard = state.wal_shipper.read().await;
599        if let Some(ref shipper) = *shipper_guard {
600            serde_json::to_value(shipper.status()).unwrap_or_default()
601        } else if let Some(ref receiver) = state.wal_receiver {
602            serde_json::to_value(receiver.status()).unwrap_or_default()
603        } else {
604            serde_json::json!(null)
605        }
606    };
607
608    let current_role = state.role.load();
609
610    Json(serde_json::json!({
611        "status": "healthy",
612        "service": "allsource-core",
613        "version": env!("CARGO_PKG_VERSION"),
614        "role": current_role,
615        "system_streams": system_streams,
616        "replication": replication,
617    }))
618}
619
620/// POST /internal/promote β€” Promote this follower to leader.
621///
622/// Called by the sentinel process during automated failover.
623/// Switches the node role to leader, stops WAL receiving, and starts
624/// a WAL shipper on the replication port so other followers can connect.
625async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
626    let current_role = state.role.load();
627    if current_role == NodeRole::Leader {
628        return (
629            axum::http::StatusCode::OK,
630            Json(serde_json::json!({
631                "status": "already_leader",
632                "message": "This node is already the leader",
633            })),
634        );
635    }
636
637    tracing::info!("PROMOTE: Switching role from follower to leader");
638
639    // 1. Switch role β€” the read-only middleware will immediately start accepting writes
640    state.role.store(NodeRole::Leader);
641
642    // 2. Signal the WAL receiver to stop (it will stop reconnecting)
643    if let Some(ref receiver) = state.wal_receiver {
644        receiver.shutdown();
645        tracing::info!("PROMOTE: WAL receiver shutdown signalled");
646    }
647
648    // 3. Start a new WAL shipper so remaining followers can connect
649    let replication_port = state.replication_port;
650    let (mut shipper, tx) = WalShipper::new();
651    state.store.enable_wal_replication(tx);
652    shipper.set_store(Arc::clone(&state.store));
653    shipper.set_metrics(state.store.metrics());
654    let shipper = Arc::new(shipper);
655
656    // Install into AppState so health endpoint reports shipper status
657    {
658        let mut shipper_guard = state.wal_shipper.write().await;
659        *shipper_guard = Some(Arc::clone(&shipper));
660    }
661
662    // Spawn the shipper server
663    let shipper_clone = Arc::clone(&shipper);
664    tokio::spawn(async move {
665        if let Err(e) = shipper_clone.serve(replication_port).await {
666            tracing::error!("Promoted WAL shipper error: {}", e);
667        }
668    });
669
670    tracing::info!(
671        "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
672        replication_port,
673    );
674
675    (
676        axum::http::StatusCode::OK,
677        Json(serde_json::json!({
678            "status": "promoted",
679            "role": "leader",
680            "replication_port": replication_port,
681        })),
682    )
683}
684
685/// POST /internal/repoint?leader=host:port β€” Switch replication target.
686///
687/// Called by the sentinel process to tell a follower to disconnect from
688/// the old leader and reconnect to a newly promoted leader.
689async fn repoint_handler(
690    State(state): State<AppState>,
691    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
692) -> impl IntoResponse {
693    let current_role = state.role.load();
694    if current_role != NodeRole::Follower {
695        return (
696            axum::http::StatusCode::CONFLICT,
697            Json(serde_json::json!({
698                "error": "not_follower",
699                "message": "Repoint only applies to follower nodes",
700            })),
701        );
702    }
703
704    let new_leader = match params.get("leader") {
705        Some(l) if !l.is_empty() => l.clone(),
706        _ => {
707            return (
708                axum::http::StatusCode::BAD_REQUEST,
709                Json(serde_json::json!({
710                    "error": "missing_leader",
711                    "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
712                })),
713            );
714        }
715    };
716
717    tracing::info!("REPOINT: Switching replication target to {}", new_leader);
718
719    if let Some(ref receiver) = state.wal_receiver {
720        receiver.repoint(&new_leader);
721        tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
722    } else {
723        tracing::warn!("REPOINT: No WAL receiver to repoint");
724    }
725
726    (
727        axum::http::StatusCode::OK,
728        Json(serde_json::json!({
729            "status": "repointed",
730            "new_leader": new_leader,
731        })),
732    )
733}
734
735// =============================================================================
736// Cluster Management Handlers (v1.8)
737// =============================================================================
738
739/// GET /api/v1/cluster/status β€” Get cluster status including term, leader, and members
740async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
741    let Some(ref cm) = state.cluster_manager else {
742        return (
743            axum::http::StatusCode::SERVICE_UNAVAILABLE,
744            Json(serde_json::json!({
745                "error": "cluster_not_enabled",
746                "message": "Cluster mode is not enabled on this node"
747            })),
748        );
749    };
750
751    let status = cm.status().await;
752    (
753        axum::http::StatusCode::OK,
754        Json(serde_json::to_value(status).unwrap()),
755    )
756}
757
758/// GET /api/v1/cluster/members β€” List all cluster members
759async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
760    let Some(ref cm) = state.cluster_manager else {
761        return (
762            axum::http::StatusCode::SERVICE_UNAVAILABLE,
763            Json(serde_json::json!({
764                "error": "cluster_not_enabled",
765                "message": "Cluster mode is not enabled on this node"
766            })),
767        );
768    };
769
770    let members = cm.all_members();
771    (
772        axum::http::StatusCode::OK,
773        Json(serde_json::json!({
774            "members": members,
775            "count": members.len(),
776        })),
777    )
778}
779
780/// POST /api/v1/cluster/members β€” Add a member to the cluster
781async fn cluster_add_member_handler(
782    State(state): State<AppState>,
783    Json(member): Json<ClusterMember>,
784) -> impl IntoResponse {
785    let Some(ref cm) = state.cluster_manager else {
786        return (
787            axum::http::StatusCode::SERVICE_UNAVAILABLE,
788            Json(serde_json::json!({
789                "error": "cluster_not_enabled",
790                "message": "Cluster mode is not enabled on this node"
791            })),
792        );
793    };
794
795    let node_id = member.node_id;
796    cm.add_member(member).await;
797
798    tracing::info!("Cluster member {} added", node_id);
799    (
800        axum::http::StatusCode::OK,
801        Json(serde_json::json!({
802            "status": "added",
803            "node_id": node_id,
804        })),
805    )
806}
807
808/// DELETE /api/v1/cluster/members/{node_id} β€” Remove a member from the cluster
809async fn cluster_remove_member_handler(
810    State(state): State<AppState>,
811    Path(node_id): Path<u32>,
812) -> impl IntoResponse {
813    let Some(ref cm) = state.cluster_manager else {
814        return (
815            axum::http::StatusCode::SERVICE_UNAVAILABLE,
816            Json(serde_json::json!({
817                "error": "cluster_not_enabled",
818                "message": "Cluster mode is not enabled on this node"
819            })),
820        );
821    };
822
823    match cm.remove_member(node_id).await {
824        Some(_) => {
825            tracing::info!("Cluster member {} removed", node_id);
826            (
827                axum::http::StatusCode::OK,
828                Json(serde_json::json!({
829                    "status": "removed",
830                    "node_id": node_id,
831                })),
832            )
833        }
834        None => (
835            axum::http::StatusCode::NOT_FOUND,
836            Json(serde_json::json!({
837                "error": "not_found",
838                "message": format!("Node {} not found in cluster", node_id),
839            })),
840        ),
841    }
842}
843
844/// POST /api/v1/cluster/members/{node_id}/heartbeat β€” Update member heartbeat
845#[derive(serde::Deserialize)]
846struct HeartbeatRequest {
847    wal_offset: u64,
848    #[serde(default = "default_true")]
849    healthy: bool,
850}
851
852fn default_true() -> bool {
853    true
854}
855
856async fn cluster_heartbeat_handler(
857    State(state): State<AppState>,
858    Path(node_id): Path<u32>,
859    Json(req): Json<HeartbeatRequest>,
860) -> impl IntoResponse {
861    let Some(ref cm) = state.cluster_manager else {
862        return (
863            axum::http::StatusCode::SERVICE_UNAVAILABLE,
864            Json(serde_json::json!({
865                "error": "cluster_not_enabled",
866                "message": "Cluster mode is not enabled on this node"
867            })),
868        );
869    };
870
871    cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
872    (
873        axum::http::StatusCode::OK,
874        Json(serde_json::json!({
875            "status": "updated",
876            "node_id": node_id,
877        })),
878    )
879}
880
881/// POST /api/v1/cluster/vote β€” Handle a vote request (simplified Raft RequestVote)
882async fn cluster_vote_handler(
883    State(state): State<AppState>,
884    Json(request): Json<VoteRequest>,
885) -> impl IntoResponse {
886    let Some(ref cm) = state.cluster_manager else {
887        return (
888            axum::http::StatusCode::SERVICE_UNAVAILABLE,
889            Json(serde_json::json!({
890                "error": "cluster_not_enabled",
891                "message": "Cluster mode is not enabled on this node"
892            })),
893        );
894    };
895
896    let response = cm.handle_vote_request(&request).await;
897    (
898        axum::http::StatusCode::OK,
899        Json(serde_json::to_value(response).unwrap()),
900    )
901}
902
903/// POST /api/v1/cluster/election β€” Trigger a leader election (manual failover)
904async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
905    let Some(ref cm) = state.cluster_manager else {
906        return (
907            axum::http::StatusCode::SERVICE_UNAVAILABLE,
908            Json(serde_json::json!({
909                "error": "cluster_not_enabled",
910                "message": "Cluster mode is not enabled on this node"
911            })),
912        );
913    };
914
915    // Select best candidate deterministically
916    let candidate = cm.select_leader_candidate();
917
918    match candidate {
919        Some(candidate_id) => {
920            let new_term = cm.start_election().await;
921            tracing::info!(
922                "Cluster election started: term={}, candidate={}",
923                new_term,
924                candidate_id,
925            );
926
927            // If this node is the candidate, become leader immediately
928            // (In a full Raft, we'd collect votes from a majority first)
929            if candidate_id == cm.self_id() {
930                cm.become_leader(new_term).await;
931                tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
932            }
933
934            (
935                axum::http::StatusCode::OK,
936                Json(serde_json::json!({
937                    "status": "election_started",
938                    "term": new_term,
939                    "candidate_id": candidate_id,
940                    "self_is_leader": candidate_id == cm.self_id(),
941                })),
942            )
943        }
944        None => (
945            axum::http::StatusCode::CONFLICT,
946            Json(serde_json::json!({
947                "error": "no_candidates",
948                "message": "No healthy members available for leader election",
949            })),
950        ),
951    }
952}
953
954/// GET /api/v1/cluster/partitions β€” Get partition distribution across nodes
955async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
956    let Some(ref cm) = state.cluster_manager else {
957        return (
958            axum::http::StatusCode::SERVICE_UNAVAILABLE,
959            Json(serde_json::json!({
960                "error": "cluster_not_enabled",
961                "message": "Cluster mode is not enabled on this node"
962            })),
963        );
964    };
965
966    let registry = cm.registry();
967    let distribution = registry.partition_distribution();
968    let total_partitions: usize = distribution.values().map(std::vec::Vec::len).sum();
969
970    (
971        axum::http::StatusCode::OK,
972        Json(serde_json::json!({
973            "total_partitions": total_partitions,
974            "node_count": registry.node_count(),
975            "healthy_node_count": registry.healthy_node_count(),
976            "distribution": distribution,
977        })),
978    )
979}
980
981// =============================================================================
982// Geo-Replication Handlers (v1.9)
983// =============================================================================
984
985/// GET /api/v1/geo/status β€” Get geo-replication status
986async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
987    let Some(ref geo) = state.geo_replication else {
988        return (
989            axum::http::StatusCode::SERVICE_UNAVAILABLE,
990            Json(serde_json::json!({
991                "error": "geo_replication_not_enabled",
992                "message": "Geo-replication is not enabled on this node"
993            })),
994        );
995    };
996
997    let status = geo.status();
998    (
999        axum::http::StatusCode::OK,
1000        Json(serde_json::to_value(status).unwrap()),
1001    )
1002}
1003
1004/// POST /api/v1/geo/sync β€” Receive replicated events from a peer region
1005async fn geo_sync_handler(
1006    State(state): State<AppState>,
1007    Json(request): Json<GeoSyncRequest>,
1008) -> impl IntoResponse {
1009    let Some(ref geo) = state.geo_replication else {
1010        return (
1011            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1012            Json(serde_json::json!({
1013                "error": "geo_replication_not_enabled",
1014                "message": "Geo-replication is not enabled on this node"
1015            })),
1016        );
1017    };
1018
1019    tracing::info!(
1020        "Geo-sync received from region '{}': {} events",
1021        request.source_region,
1022        request.events.len(),
1023    );
1024
1025    let response = geo.receive_sync(&request);
1026    (
1027        axum::http::StatusCode::OK,
1028        Json(serde_json::to_value(response).unwrap()),
1029    )
1030}
1031
1032/// GET /api/v1/geo/peers β€” List peer regions and their health
1033async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1034    let Some(ref geo) = state.geo_replication else {
1035        return (
1036            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1037            Json(serde_json::json!({
1038                "error": "geo_replication_not_enabled",
1039                "message": "Geo-replication is not enabled on this node"
1040            })),
1041        );
1042    };
1043
1044    let status = geo.status();
1045    (
1046        axum::http::StatusCode::OK,
1047        Json(serde_json::json!({
1048            "region_id": status.region_id,
1049            "peers": status.peers,
1050        })),
1051    )
1052}
1053
1054/// POST /api/v1/geo/failover β€” Trigger regional failover
1055async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1056    let Some(ref geo) = state.geo_replication else {
1057        return (
1058            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1059            Json(serde_json::json!({
1060                "error": "geo_replication_not_enabled",
1061                "message": "Geo-replication is not enabled on this node"
1062            })),
1063        );
1064    };
1065
1066    match geo.select_failover_region() {
1067        Some(failover_region) => {
1068            tracing::info!(
1069                "Geo-failover: selected region '{}' as failover target",
1070                failover_region,
1071            );
1072            (
1073                axum::http::StatusCode::OK,
1074                Json(serde_json::json!({
1075                    "status": "failover_target_selected",
1076                    "failover_region": failover_region,
1077                    "message": "Region selected for failover. DNS/routing update required externally.",
1078                })),
1079            )
1080        }
1081        None => (
1082            axum::http::StatusCode::CONFLICT,
1083            Json(serde_json::json!({
1084                "error": "no_healthy_peers",
1085                "message": "No healthy peer regions available for failover",
1086            })),
1087        ),
1088    }
1089}
1090
1091/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
1092async fn shutdown_signal() {
1093    let ctrl_c = async {
1094        tokio::signal::ctrl_c()
1095            .await
1096            .expect("failed to install Ctrl+C handler");
1097    };
1098
1099    #[cfg(unix)]
1100    let terminate = async {
1101        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1102            .expect("failed to install SIGTERM handler")
1103            .recv()
1104            .await;
1105    };
1106
1107    #[cfg(not(unix))]
1108    let terminate = std::future::pending::<()>();
1109
1110    tokio::select! {
1111        () = ctrl_c => {
1112            tracing::info!("πŸ“€ Received Ctrl+C, initiating graceful shutdown...");
1113        }
1114        () = terminate => {
1115            tracing::info!("πŸ“€ Received SIGTERM, initiating graceful shutdown...");
1116        }
1117    }
1118}