Skip to main content

allsource_core/infrastructure/web/
api_v1.rs

1/// v1.0 API router with authentication and multi-tenancy
2use crate::infrastructure::di::ServiceContainer;
3use crate::{
4    domain::repositories::TenantRepository,
5    infrastructure::{
6        cluster::{
7            ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8        },
9        replication::{WalReceiver, WalShipper},
10        security::{
11            auth::AuthManager,
12            middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13            rate_limit::RateLimiter,
14        },
15        web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16    },
17    store::EventStore,
18};
19use axum::{
20    Json, Router,
21    extract::{Path, State},
22    middleware,
23    response::IntoResponse,
24    routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28    cors::{Any, CorsLayer},
29    trace::TraceLayer,
30};
31
32/// Node role for leader-follower replication
33#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36    Leader,
37    Follower,
38}
39
40impl NodeRole {
41    /// Detect role from environment variables.
42    ///
43    /// Checks `ALLSOURCE_ROLE` ("leader" or "follower") first,
44    /// then falls back to `ALLSOURCE_READ_ONLY` ("true" β†’ follower).
45    /// Defaults to `Leader` if neither is set.
46    pub fn from_env() -> Self {
47        if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48            match role.to_lowercase().as_str() {
49                "follower" => return NodeRole::Follower,
50                "leader" => return NodeRole::Leader,
51                other => {
52                    tracing::warn!(
53                        "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54                        other
55                    );
56                    return NodeRole::Leader;
57                }
58            }
59        }
60        if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61            && (read_only == "true" || read_only == "1")
62        {
63            return NodeRole::Follower;
64        }
65        NodeRole::Leader
66    }
67
68    pub fn is_follower(self) -> bool {
69        self == NodeRole::Follower
70    }
71
72    fn to_u8(self) -> u8 {
73        match self {
74            NodeRole::Leader => 0,
75            NodeRole::Follower => 1,
76        }
77    }
78
79    fn from_u8(v: u8) -> Self {
80        match v {
81            1 => NodeRole::Follower,
82            _ => NodeRole::Leader,
83        }
84    }
85}
86
87impl std::fmt::Display for NodeRole {
88    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89        match self {
90            NodeRole::Leader => write!(f, "leader"),
91            NodeRole::Follower => write!(f, "follower"),
92        }
93    }
94}
95
96/// Thread-safe, runtime-mutable node role for failover support.
97///
98/// Wraps an `AtomicU8` so the read-only middleware and health endpoint
99/// always see the current role, even after a sentinel-triggered promotion.
100#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104    pub fn new(role: NodeRole) -> Self {
105        Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106    }
107
108    pub fn load(&self) -> NodeRole {
109        NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110    }
111
112    pub fn store(&self, role: NodeRole) {
113        self.0
114            .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115    }
116}
117
118/// Unified application state for all handlers
119#[derive(Clone)]
120pub struct AppState {
121    pub store: Arc<EventStore>,
122    pub auth_manager: Arc<AuthManager>,
123    pub tenant_repo: Arc<dyn TenantRepository>,
124    /// Service container for paywall domain use cases (Creator, Article, Payment, etc.)
125    pub service_container: ServiceContainer,
126    /// Node role for leader-follower replication (runtime-mutable for failover)
127    pub role: AtomicNodeRole,
128    /// WAL shipper for replication status reporting (leader only).
129    /// Wrapped in RwLock so a promoted follower can install a shipper at runtime.
130    pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131    /// WAL receiver for replication status reporting (follower only)
132    pub wal_receiver: Option<Arc<WalReceiver>>,
133    /// Replication port used by the WAL shipper (needed for runtime promotion)
134    pub replication_port: u16,
135    /// Cluster manager for multi-node consensus and membership (optional)
136    pub cluster_manager: Option<Arc<ClusterManager>>,
137    /// Geo-replication manager for cross-region replication (optional)
138    pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141// Enable extracting Arc<EventStore> from AppState
142// This allows handlers that expect State<Arc<EventStore>> to work with AppState
143impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144    fn from_ref(state: &AppState) -> Self {
145        state.store.clone()
146    }
147}
148
149pub async fn serve_v1(
150    store: Arc<EventStore>,
151    auth_manager: Arc<AuthManager>,
152    tenant_repo: Arc<dyn TenantRepository>,
153    rate_limiter: Arc<RateLimiter>,
154    service_container: ServiceContainer,
155    addr: &str,
156    role: NodeRole,
157    wal_shipper: Option<Arc<WalShipper>>,
158    wal_receiver: Option<Arc<WalReceiver>>,
159    replication_port: u16,
160    cluster_manager: Option<Arc<ClusterManager>>,
161    geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163    let app_state = AppState {
164        store,
165        auth_manager: auth_manager.clone(),
166        tenant_repo,
167        service_container,
168        role: AtomicNodeRole::new(role),
169        wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170        wal_receiver,
171        replication_port,
172        cluster_manager,
173        geo_replication,
174    };
175
176    let auth_state = AuthState {
177        auth_manager: auth_manager.clone(),
178    };
179
180    let rate_limit_state = RateLimitState { rate_limiter };
181
182    let app = Router::new()
183        // Public routes (no auth)
184        .route("/health", get(health_v1))
185        .route("/metrics", get(super::api::prometheus_metrics))
186        // Auth routes
187        .route("/api/v1/auth/register", post(register_handler))
188        .route("/api/v1/auth/login", post(login_handler))
189        .route("/api/v1/auth/me", get(me_handler))
190        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192        .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193        .route("/api/v1/auth/users", get(list_users_handler))
194        .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195        // Tenant routes (protected)
196        .route("/api/v1/tenants", post(create_tenant_handler))
197        .route("/api/v1/tenants", get(list_tenants_handler))
198        .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199        .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200        .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201        .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202        .route(
203            "/api/v1/tenants/{id}/deactivate",
204            post(deactivate_tenant_handler),
205        )
206        .route(
207            "/api/v1/tenants/{id}/activate",
208            post(activate_tenant_handler),
209        )
210        .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211        // Audit endpoints (admin only)
212        .route("/api/v1/audit/events", post(log_audit_event))
213        .route("/api/v1/audit/events", get(query_audit_events))
214        // Config endpoints (admin only)
215        .route("/api/v1/config", get(list_configs))
216        .route("/api/v1/config", post(set_config))
217        .route("/api/v1/config/{key}", get(get_config))
218        .route("/api/v1/config/{key}", put(update_config))
219        .route("/api/v1/config/{key}", delete(delete_config))
220        // Demo seeding
221        .route(
222            "/api/v1/demo/seed",
223            post(super::demo_api::demo_seed_handler),
224        )
225        // Event and data routes (protected by auth)
226        .route("/api/v1/events", post(super::api::ingest_event_v1))
227        .route(
228            "/api/v1/events/batch",
229            post(super::api::ingest_events_batch_v1),
230        )
231        .route("/api/v1/events/query", get(super::api::query_events))
232        .route(
233            "/api/v1/events/{event_id}",
234            get(super::api::get_event_by_id),
235        )
236        .route("/api/v1/events/stream", get(super::api::events_websocket))
237        .route("/api/v1/entities", get(super::api::list_entities))
238        .route(
239            "/api/v1/entities/duplicates",
240            get(super::api::detect_duplicates),
241        )
242        .route(
243            "/api/v1/entities/{entity_id}/state",
244            get(super::api::get_entity_state),
245        )
246        .route(
247            "/api/v1/entities/{entity_id}/snapshot",
248            get(super::api::get_entity_snapshot),
249        )
250        .route("/api/v1/stats", get(super::api::get_stats))
251        // v0.10: Stream and event type discovery endpoints
252        .route("/api/v1/streams", get(super::api::list_streams))
253        .route("/api/v1/event-types", get(super::api::list_event_types))
254        // Analytics
255        .route(
256            "/api/v1/analytics/frequency",
257            get(super::api::analytics_frequency),
258        )
259        .route(
260            "/api/v1/analytics/summary",
261            get(super::api::analytics_summary),
262        )
263        .route(
264            "/api/v1/analytics/correlation",
265            get(super::api::analytics_correlation),
266        )
267        // Snapshots
268        .route("/api/v1/snapshots", post(super::api::create_snapshot))
269        .route("/api/v1/snapshots", get(super::api::list_snapshots))
270        .route(
271            "/api/v1/snapshots/{entity_id}/latest",
272            get(super::api::get_latest_snapshot),
273        )
274        // Compaction
275        .route(
276            "/api/v1/compaction/trigger",
277            post(super::api::trigger_compaction),
278        )
279        .route(
280            "/api/v1/compaction/stats",
281            get(super::api::compaction_stats),
282        )
283        // Schemas
284        .route("/api/v1/schemas", post(super::api::register_schema))
285        .route("/api/v1/schemas", get(super::api::list_subjects))
286        .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287        .route(
288            "/api/v1/schemas/{subject}/versions",
289            get(super::api::list_schema_versions),
290        )
291        .route(
292            "/api/v1/schemas/validate",
293            post(super::api::validate_event_schema),
294        )
295        .route(
296            "/api/v1/schemas/{subject}/compatibility",
297            put(super::api::set_compatibility_mode),
298        )
299        // Replay
300        .route("/api/v1/replay", post(super::api::start_replay))
301        .route("/api/v1/replay", get(super::api::list_replays))
302        .route(
303            "/api/v1/replay/{replay_id}",
304            get(super::api::get_replay_progress),
305        )
306        .route(
307            "/api/v1/replay/{replay_id}/cancel",
308            post(super::api::cancel_replay),
309        )
310        .route(
311            "/api/v1/replay/{replay_id}",
312            delete(super::api::delete_replay),
313        )
314        // Pipelines
315        .route("/api/v1/pipelines", post(super::api::register_pipeline))
316        .route("/api/v1/pipelines", get(super::api::list_pipelines))
317        .route(
318            "/api/v1/pipelines/stats",
319            get(super::api::all_pipeline_stats),
320        )
321        .route(
322            "/api/v1/pipelines/{pipeline_id}",
323            get(super::api::get_pipeline),
324        )
325        .route(
326            "/api/v1/pipelines/{pipeline_id}",
327            delete(super::api::remove_pipeline),
328        )
329        .route(
330            "/api/v1/pipelines/{pipeline_id}/stats",
331            get(super::api::get_pipeline_stats),
332        )
333        .route(
334            "/api/v1/pipelines/{pipeline_id}/reset",
335            put(super::api::reset_pipeline),
336        )
337        // v0.7: Projection State API for Query Service integration
338        .route("/api/v1/projections", get(super::api::list_projections))
339        .route(
340            "/api/v1/projections/{name}",
341            get(super::api::get_projection),
342        )
343        .route(
344            "/api/v1/projections/{name}",
345            delete(super::api::delete_projection),
346        )
347        .route(
348            "/api/v1/projections/{name}/state",
349            get(super::api::get_projection_state_summary),
350        )
351        .route(
352            "/api/v1/projections/{name}/reset",
353            post(super::api::reset_projection),
354        )
355        .route(
356            "/api/v1/projections/{name}/pause",
357            post(super::api::pause_projection),
358        )
359        .route(
360            "/api/v1/projections/{name}/start",
361            post(super::api::start_projection),
362        )
363        .route(
364            "/api/v1/projections/{name}/{entity_id}/state",
365            get(super::api::get_projection_state),
366        )
367        .route(
368            "/api/v1/projections/{name}/{entity_id}/state",
369            post(super::api::save_projection_state),
370        )
371        .route(
372            "/api/v1/projections/{name}/{entity_id}/state",
373            put(super::api::save_projection_state),
374        )
375        .route(
376            "/api/v1/projections/{name}/bulk",
377            post(super::api::bulk_get_projection_states),
378        )
379        .route(
380            "/api/v1/projections/{name}/bulk/save",
381            post(super::api::bulk_save_projection_states),
382        )
383        // v0.11: Webhook management
384        .route("/api/v1/webhooks", post(super::api::register_webhook))
385        .route("/api/v1/webhooks", get(super::api::list_webhooks))
386        .route(
387            "/api/v1/webhooks/{webhook_id}",
388            get(super::api::get_webhook),
389        )
390        .route(
391            "/api/v1/webhooks/{webhook_id}",
392            put(super::api::update_webhook),
393        )
394        .route(
395            "/api/v1/webhooks/{webhook_id}",
396            delete(super::api::delete_webhook),
397        )
398        .route(
399            "/api/v1/webhooks/{webhook_id}/deliveries",
400            get(super::api::list_webhook_deliveries),
401        )
402        // v0.14: Durable consumer subscriptions
403        .route("/api/v1/consumers", post(super::api::register_consumer))
404        .route(
405            "/api/v1/consumers/{consumer_id}",
406            get(super::api::get_consumer),
407        )
408        .route(
409            "/api/v1/consumers/{consumer_id}/events",
410            get(super::api::poll_consumer_events),
411        )
412        .route(
413            "/api/v1/consumers/{consumer_id}/ack",
414            post(super::api::ack_consumer),
415        )
416        // v1.8: Cluster membership management API
417        .route("/api/v1/cluster/status", get(cluster_status_handler))
418        .route("/api/v1/cluster/members", get(cluster_list_members_handler))
419        .route("/api/v1/cluster/members", post(cluster_add_member_handler))
420        .route(
421            "/api/v1/cluster/members/{node_id}",
422            delete(cluster_remove_member_handler),
423        )
424        .route(
425            "/api/v1/cluster/members/{node_id}/heartbeat",
426            post(cluster_heartbeat_handler),
427        )
428        .route("/api/v1/cluster/vote", post(cluster_vote_handler))
429        .route("/api/v1/cluster/election", post(cluster_election_handler))
430        .route(
431            "/api/v1/cluster/partitions",
432            get(cluster_partitions_handler),
433        )
434        // v2.0: Advanced query features
435        .route("/api/v1/eventql", post(super::api::eventql_query))
436        .route("/api/v1/graphql", post(super::api::graphql_query))
437        .route("/api/v1/geospatial/query", post(super::api::geo_query))
438        .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
439        .route(
440            "/api/v1/exactly-once/stats",
441            get(super::api::exactly_once_stats),
442        )
443        .route(
444            "/api/v1/schema-evolution/history/{event_type}",
445            get(super::api::schema_evolution_history),
446        )
447        .route(
448            "/api/v1/schema-evolution/schema/{event_type}",
449            get(super::api::schema_evolution_schema),
450        )
451        .route(
452            "/api/v1/schema-evolution/stats",
453            get(super::api::schema_evolution_stats),
454        )
455        // v1.9: Geo-replication API
456        .route("/api/v1/geo/status", get(geo_status_handler))
457        .route("/api/v1/geo/sync", post(geo_sync_handler))
458        .route("/api/v1/geo/peers", get(geo_peers_handler))
459        .route("/api/v1/geo/failover", post(geo_failover_handler))
460        // Internal endpoints for sentinel-driven failover (not exposed publicly)
461        .route("/internal/promote", post(promote_handler))
462        .route("/internal/repoint", post(repoint_handler));
463
464    // v0.11: Bidirectional sync protocol (embedded↔server)
465    #[cfg(feature = "embedded-sync")]
466    let app = app
467        .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
468        .route("/api/v1/sync/push", post(super::api::sync_push_handler));
469
470    let app = app
471        .with_state(app_state.clone())
472        // IMPORTANT: Middleware layers execute from bottom to top in Tower/Axum
473        // Read-only middleware runs after auth (applied before rate limit layer)
474        .layer(middleware::from_fn_with_state(
475            app_state,
476            read_only_middleware,
477        ))
478        .layer(middleware::from_fn_with_state(
479            rate_limit_state,
480            rate_limit_middleware,
481        ))
482        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
483        .layer(
484            CorsLayer::new()
485                .allow_origin(Any)
486                .allow_methods(Any)
487                .allow_headers(Any),
488        )
489        .layer(TraceLayer::new_for_http());
490
491    let listener = tokio::net::TcpListener::bind(addr).await?;
492
493    // Graceful shutdown on SIGTERM (required for serverless platforms)
494    axum::serve(listener, app)
495        .with_graceful_shutdown(shutdown_signal())
496        .await?;
497
498    tracing::info!("πŸ›‘ AllSource Core shutdown complete");
499    Ok(())
500}
501
502/// Write paths that should be rejected when running as a follower.
503const WRITE_PATHS: &[&str] = &[
504    "/api/v1/events",
505    "/api/v1/events/batch",
506    "/api/v1/snapshots",
507    "/api/v1/projections/",
508    "/api/v1/schemas",
509    "/api/v1/replay",
510    "/api/v1/pipelines",
511    "/api/v1/compaction/trigger",
512    "/api/v1/audit/events",
513    "/api/v1/config",
514    "/api/v1/webhooks",
515    "/api/v1/demo/seed",
516];
517
518/// Returns true if this request is a write operation that should be blocked on followers.
519fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
520    use axum::http::Method;
521    // Only POST/PUT/DELETE are writes
522    if method != Method::POST && method != Method::PUT && method != Method::DELETE {
523        return false;
524    }
525    WRITE_PATHS
526        .iter()
527        .any(|write_path| path.starts_with(write_path))
528}
529
530/// Returns true if the request targets an internal or cluster endpoint (not subject to read-only checks).
531fn is_internal_request(path: &str) -> bool {
532    path.starts_with("/internal/")
533        || path.starts_with("/api/v1/cluster/")
534        || path.starts_with("/api/v1/geo/")
535}
536
537/// Middleware that rejects write requests when the node is a follower.
538///
539/// Returns HTTP 409 Conflict with `{"error": "read_only", "message": "..."}`.
540/// Internal endpoints (`/internal/*`) are exempt β€” they are used by the sentinel
541/// to trigger promotion and repointing during failover.
542async fn read_only_middleware(
543    State(state): State<AppState>,
544    request: axum::extract::Request,
545    next: axum::middleware::Next,
546) -> axum::response::Response {
547    let path = request.uri().path();
548    if state.role.load().is_follower()
549        && is_write_request(request.method(), path)
550        && !is_internal_request(path)
551    {
552        return (
553            axum::http::StatusCode::CONFLICT,
554            axum::Json(serde_json::json!({
555                "error": "read_only",
556                "message": "This node is a read-only follower"
557            })),
558        )
559            .into_response();
560    }
561    next.run(request).await
562}
563
564/// Health endpoint with system stream health reporting.
565///
566/// Reports overall health plus detailed system metadata health when
567/// event-sourced system repositories are configured.
568async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
569    let has_system_repos = state.service_container.has_system_repositories();
570
571    let system_streams = if has_system_repos {
572        let (tenant_count, config_count, total_events) =
573            if let Some(store) = state.service_container.system_store() {
574                use crate::domain::value_objects::system_stream::SystemDomain;
575                (
576                    store.count_stream(SystemDomain::Tenant),
577                    store.count_stream(SystemDomain::Config),
578                    store.total_events(),
579                )
580            } else {
581                (0, 0, 0)
582            };
583
584        serde_json::json!({
585            "status": "healthy",
586            "mode": "event-sourced",
587            "total_events": total_events,
588            "tenant_events": tenant_count,
589            "config_events": config_count,
590        })
591    } else {
592        serde_json::json!({
593            "status": "disabled",
594            "mode": "in-memory",
595        })
596    };
597
598    let replication = {
599        let shipper_guard = state.wal_shipper.read().await;
600        if let Some(ref shipper) = *shipper_guard {
601            serde_json::to_value(shipper.status()).unwrap_or_default()
602        } else if let Some(ref receiver) = state.wal_receiver {
603            serde_json::to_value(receiver.status()).unwrap_or_default()
604        } else {
605            serde_json::json!(null)
606        }
607    };
608
609    let current_role = state.role.load();
610
611    Json(serde_json::json!({
612        "status": "healthy",
613        "service": "allsource-core",
614        "version": env!("CARGO_PKG_VERSION"),
615        "role": current_role,
616        "system_streams": system_streams,
617        "replication": replication,
618    }))
619}
620
621/// POST /internal/promote β€” Promote this follower to leader.
622///
623/// Called by the sentinel process during automated failover.
624/// Switches the node role to leader, stops WAL receiving, and starts
625/// a WAL shipper on the replication port so other followers can connect.
626async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
627    let current_role = state.role.load();
628    if current_role == NodeRole::Leader {
629        return (
630            axum::http::StatusCode::OK,
631            Json(serde_json::json!({
632                "status": "already_leader",
633                "message": "This node is already the leader",
634            })),
635        );
636    }
637
638    tracing::info!("PROMOTE: Switching role from follower to leader");
639
640    // 1. Switch role β€” the read-only middleware will immediately start accepting writes
641    state.role.store(NodeRole::Leader);
642
643    // 2. Signal the WAL receiver to stop (it will stop reconnecting)
644    if let Some(ref receiver) = state.wal_receiver {
645        receiver.shutdown();
646        tracing::info!("PROMOTE: WAL receiver shutdown signalled");
647    }
648
649    // 3. Start a new WAL shipper so remaining followers can connect
650    let replication_port = state.replication_port;
651    let (mut shipper, tx) = WalShipper::new();
652    state.store.enable_wal_replication(tx);
653    shipper.set_store(Arc::clone(&state.store));
654    shipper.set_metrics(state.store.metrics());
655    let shipper = Arc::new(shipper);
656
657    // Install into AppState so health endpoint reports shipper status
658    {
659        let mut shipper_guard = state.wal_shipper.write().await;
660        *shipper_guard = Some(Arc::clone(&shipper));
661    }
662
663    // Spawn the shipper server
664    let shipper_clone = Arc::clone(&shipper);
665    tokio::spawn(async move {
666        if let Err(e) = shipper_clone.serve(replication_port).await {
667            tracing::error!("Promoted WAL shipper error: {}", e);
668        }
669    });
670
671    tracing::info!(
672        "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
673        replication_port,
674    );
675
676    (
677        axum::http::StatusCode::OK,
678        Json(serde_json::json!({
679            "status": "promoted",
680            "role": "leader",
681            "replication_port": replication_port,
682        })),
683    )
684}
685
686/// POST /internal/repoint?leader=host:port β€” Switch replication target.
687///
688/// Called by the sentinel process to tell a follower to disconnect from
689/// the old leader and reconnect to a newly promoted leader.
690async fn repoint_handler(
691    State(state): State<AppState>,
692    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
693) -> impl IntoResponse {
694    let current_role = state.role.load();
695    if current_role != NodeRole::Follower {
696        return (
697            axum::http::StatusCode::CONFLICT,
698            Json(serde_json::json!({
699                "error": "not_follower",
700                "message": "Repoint only applies to follower nodes",
701            })),
702        );
703    }
704
705    let new_leader = match params.get("leader") {
706        Some(l) if !l.is_empty() => l.clone(),
707        _ => {
708            return (
709                axum::http::StatusCode::BAD_REQUEST,
710                Json(serde_json::json!({
711                    "error": "missing_leader",
712                    "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
713                })),
714            );
715        }
716    };
717
718    tracing::info!("REPOINT: Switching replication target to {}", new_leader);
719
720    if let Some(ref receiver) = state.wal_receiver {
721        receiver.repoint(&new_leader);
722        tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
723    } else {
724        tracing::warn!("REPOINT: No WAL receiver to repoint");
725    }
726
727    (
728        axum::http::StatusCode::OK,
729        Json(serde_json::json!({
730            "status": "repointed",
731            "new_leader": new_leader,
732        })),
733    )
734}
735
736// =============================================================================
737// Cluster Management Handlers (v1.8)
738// =============================================================================
739
740/// GET /api/v1/cluster/status β€” Get cluster status including term, leader, and members
741async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
742    let Some(ref cm) = state.cluster_manager else {
743        return (
744            axum::http::StatusCode::SERVICE_UNAVAILABLE,
745            Json(serde_json::json!({
746                "error": "cluster_not_enabled",
747                "message": "Cluster mode is not enabled on this node"
748            })),
749        );
750    };
751
752    let status = cm.status().await;
753    (
754        axum::http::StatusCode::OK,
755        Json(serde_json::to_value(status).unwrap()),
756    )
757}
758
759/// GET /api/v1/cluster/members β€” List all cluster members
760async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
761    let Some(ref cm) = state.cluster_manager else {
762        return (
763            axum::http::StatusCode::SERVICE_UNAVAILABLE,
764            Json(serde_json::json!({
765                "error": "cluster_not_enabled",
766                "message": "Cluster mode is not enabled on this node"
767            })),
768        );
769    };
770
771    let members = cm.all_members();
772    (
773        axum::http::StatusCode::OK,
774        Json(serde_json::json!({
775            "members": members,
776            "count": members.len(),
777        })),
778    )
779}
780
781/// POST /api/v1/cluster/members β€” Add a member to the cluster
782async fn cluster_add_member_handler(
783    State(state): State<AppState>,
784    Json(member): Json<ClusterMember>,
785) -> impl IntoResponse {
786    let Some(ref cm) = state.cluster_manager else {
787        return (
788            axum::http::StatusCode::SERVICE_UNAVAILABLE,
789            Json(serde_json::json!({
790                "error": "cluster_not_enabled",
791                "message": "Cluster mode is not enabled on this node"
792            })),
793        );
794    };
795
796    let node_id = member.node_id;
797    cm.add_member(member).await;
798
799    tracing::info!("Cluster member {} added", node_id);
800    (
801        axum::http::StatusCode::OK,
802        Json(serde_json::json!({
803            "status": "added",
804            "node_id": node_id,
805        })),
806    )
807}
808
809/// DELETE /api/v1/cluster/members/{node_id} β€” Remove a member from the cluster
810async fn cluster_remove_member_handler(
811    State(state): State<AppState>,
812    Path(node_id): Path<u32>,
813) -> impl IntoResponse {
814    let Some(ref cm) = state.cluster_manager else {
815        return (
816            axum::http::StatusCode::SERVICE_UNAVAILABLE,
817            Json(serde_json::json!({
818                "error": "cluster_not_enabled",
819                "message": "Cluster mode is not enabled on this node"
820            })),
821        );
822    };
823
824    match cm.remove_member(node_id).await {
825        Some(_) => {
826            tracing::info!("Cluster member {} removed", node_id);
827            (
828                axum::http::StatusCode::OK,
829                Json(serde_json::json!({
830                    "status": "removed",
831                    "node_id": node_id,
832                })),
833            )
834        }
835        None => (
836            axum::http::StatusCode::NOT_FOUND,
837            Json(serde_json::json!({
838                "error": "not_found",
839                "message": format!("Node {} not found in cluster", node_id),
840            })),
841        ),
842    }
843}
844
845/// POST /api/v1/cluster/members/{node_id}/heartbeat β€” Update member heartbeat
846#[derive(serde::Deserialize)]
847struct HeartbeatRequest {
848    wal_offset: u64,
849    #[serde(default = "default_true")]
850    healthy: bool,
851}
852
853fn default_true() -> bool {
854    true
855}
856
857async fn cluster_heartbeat_handler(
858    State(state): State<AppState>,
859    Path(node_id): Path<u32>,
860    Json(req): Json<HeartbeatRequest>,
861) -> impl IntoResponse {
862    let Some(ref cm) = state.cluster_manager else {
863        return (
864            axum::http::StatusCode::SERVICE_UNAVAILABLE,
865            Json(serde_json::json!({
866                "error": "cluster_not_enabled",
867                "message": "Cluster mode is not enabled on this node"
868            })),
869        );
870    };
871
872    cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
873    (
874        axum::http::StatusCode::OK,
875        Json(serde_json::json!({
876            "status": "updated",
877            "node_id": node_id,
878        })),
879    )
880}
881
882/// POST /api/v1/cluster/vote β€” Handle a vote request (simplified Raft RequestVote)
883async fn cluster_vote_handler(
884    State(state): State<AppState>,
885    Json(request): Json<VoteRequest>,
886) -> impl IntoResponse {
887    let Some(ref cm) = state.cluster_manager else {
888        return (
889            axum::http::StatusCode::SERVICE_UNAVAILABLE,
890            Json(serde_json::json!({
891                "error": "cluster_not_enabled",
892                "message": "Cluster mode is not enabled on this node"
893            })),
894        );
895    };
896
897    let response = cm.handle_vote_request(&request).await;
898    (
899        axum::http::StatusCode::OK,
900        Json(serde_json::to_value(response).unwrap()),
901    )
902}
903
904/// POST /api/v1/cluster/election β€” Trigger a leader election (manual failover)
905async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
906    let Some(ref cm) = state.cluster_manager else {
907        return (
908            axum::http::StatusCode::SERVICE_UNAVAILABLE,
909            Json(serde_json::json!({
910                "error": "cluster_not_enabled",
911                "message": "Cluster mode is not enabled on this node"
912            })),
913        );
914    };
915
916    // Select best candidate deterministically
917    let candidate = cm.select_leader_candidate();
918
919    match candidate {
920        Some(candidate_id) => {
921            let new_term = cm.start_election().await;
922            tracing::info!(
923                "Cluster election started: term={}, candidate={}",
924                new_term,
925                candidate_id,
926            );
927
928            // If this node is the candidate, become leader immediately
929            // (In a full Raft, we'd collect votes from a majority first)
930            if candidate_id == cm.self_id() {
931                cm.become_leader(new_term).await;
932                tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
933            }
934
935            (
936                axum::http::StatusCode::OK,
937                Json(serde_json::json!({
938                    "status": "election_started",
939                    "term": new_term,
940                    "candidate_id": candidate_id,
941                    "self_is_leader": candidate_id == cm.self_id(),
942                })),
943            )
944        }
945        None => (
946            axum::http::StatusCode::CONFLICT,
947            Json(serde_json::json!({
948                "error": "no_candidates",
949                "message": "No healthy members available for leader election",
950            })),
951        ),
952    }
953}
954
955/// GET /api/v1/cluster/partitions β€” Get partition distribution across nodes
956async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
957    let Some(ref cm) = state.cluster_manager else {
958        return (
959            axum::http::StatusCode::SERVICE_UNAVAILABLE,
960            Json(serde_json::json!({
961                "error": "cluster_not_enabled",
962                "message": "Cluster mode is not enabled on this node"
963            })),
964        );
965    };
966
967    let registry = cm.registry();
968    let distribution = registry.partition_distribution();
969    let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
970
971    (
972        axum::http::StatusCode::OK,
973        Json(serde_json::json!({
974            "total_partitions": total_partitions,
975            "node_count": registry.node_count(),
976            "healthy_node_count": registry.healthy_node_count(),
977            "distribution": distribution,
978        })),
979    )
980}
981
982// =============================================================================
983// Geo-Replication Handlers (v1.9)
984// =============================================================================
985
986/// GET /api/v1/geo/status β€” Get geo-replication status
987async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
988    let Some(ref geo) = state.geo_replication else {
989        return (
990            axum::http::StatusCode::SERVICE_UNAVAILABLE,
991            Json(serde_json::json!({
992                "error": "geo_replication_not_enabled",
993                "message": "Geo-replication is not enabled on this node"
994            })),
995        );
996    };
997
998    let status = geo.status();
999    (
1000        axum::http::StatusCode::OK,
1001        Json(serde_json::to_value(status).unwrap()),
1002    )
1003}
1004
1005/// POST /api/v1/geo/sync β€” Receive replicated events from a peer region
1006async fn geo_sync_handler(
1007    State(state): State<AppState>,
1008    Json(request): Json<GeoSyncRequest>,
1009) -> impl IntoResponse {
1010    let Some(ref geo) = state.geo_replication else {
1011        return (
1012            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1013            Json(serde_json::json!({
1014                "error": "geo_replication_not_enabled",
1015                "message": "Geo-replication is not enabled on this node"
1016            })),
1017        );
1018    };
1019
1020    tracing::info!(
1021        "Geo-sync received from region '{}': {} events",
1022        request.source_region,
1023        request.events.len(),
1024    );
1025
1026    let response = geo.receive_sync(&request);
1027    (
1028        axum::http::StatusCode::OK,
1029        Json(serde_json::to_value(response).unwrap()),
1030    )
1031}
1032
1033/// GET /api/v1/geo/peers β€” List peer regions and their health
1034async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1035    let Some(ref geo) = state.geo_replication else {
1036        return (
1037            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1038            Json(serde_json::json!({
1039                "error": "geo_replication_not_enabled",
1040                "message": "Geo-replication is not enabled on this node"
1041            })),
1042        );
1043    };
1044
1045    let status = geo.status();
1046    (
1047        axum::http::StatusCode::OK,
1048        Json(serde_json::json!({
1049            "region_id": status.region_id,
1050            "peers": status.peers,
1051        })),
1052    )
1053}
1054
1055/// POST /api/v1/geo/failover β€” Trigger regional failover
1056async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1057    let Some(ref geo) = state.geo_replication else {
1058        return (
1059            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1060            Json(serde_json::json!({
1061                "error": "geo_replication_not_enabled",
1062                "message": "Geo-replication is not enabled on this node"
1063            })),
1064        );
1065    };
1066
1067    match geo.select_failover_region() {
1068        Some(failover_region) => {
1069            tracing::info!(
1070                "Geo-failover: selected region '{}' as failover target",
1071                failover_region,
1072            );
1073            (
1074                axum::http::StatusCode::OK,
1075                Json(serde_json::json!({
1076                    "status": "failover_target_selected",
1077                    "failover_region": failover_region,
1078                    "message": "Region selected for failover. DNS/routing update required externally.",
1079                })),
1080            )
1081        }
1082        None => (
1083            axum::http::StatusCode::CONFLICT,
1084            Json(serde_json::json!({
1085                "error": "no_healthy_peers",
1086                "message": "No healthy peer regions available for failover",
1087            })),
1088        ),
1089    }
1090}
1091
1092/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
1093async fn shutdown_signal() {
1094    let ctrl_c = async {
1095        tokio::signal::ctrl_c()
1096            .await
1097            .expect("failed to install Ctrl+C handler");
1098    };
1099
1100    #[cfg(unix)]
1101    let terminate = async {
1102        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1103            .expect("failed to install SIGTERM handler")
1104            .recv()
1105            .await;
1106    };
1107
1108    #[cfg(not(unix))]
1109    let terminate = std::future::pending::<()>();
1110
1111    tokio::select! {
1112        _ = ctrl_c => {
1113            tracing::info!("πŸ“€ Received Ctrl+C, initiating graceful shutdown...");
1114        }
1115        _ = terminate => {
1116            tracing::info!("πŸ“€ Received SIGTERM, initiating graceful shutdown...");
1117        }
1118    }
1119}