Skip to main content

allsource_core/infrastructure/web/
api_v1.rs

1/// v1.0 API router with authentication and multi-tenancy
2use crate::infrastructure::di::ServiceContainer;
3#[cfg(feature = "multi-tenant")]
4use crate::infrastructure::web::tenant_api::*;
5use crate::{
6    domain::repositories::TenantRepository,
7    infrastructure::{
8        cluster::{
9            ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
10        },
11        replication::{WalReceiver, WalShipper},
12        security::{
13            auth::AuthManager,
14            middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
15            rate_limit::RateLimiter,
16        },
17        web::{audit_api::*, auth_api::*, config_api::*},
18    },
19    store::EventStore,
20};
21use axum::{
22    Json, Router,
23    extract::{Path, State},
24    middleware,
25    response::IntoResponse,
26    routing::{delete, get, post, put},
27};
28use std::sync::Arc;
29use tower_http::{
30    cors::{Any, CorsLayer},
31    trace::TraceLayer,
32};
33
34/// Node role for leader-follower replication
35#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
36#[serde(rename_all = "lowercase")]
37pub enum NodeRole {
38    Leader,
39    Follower,
40}
41
42impl NodeRole {
43    /// Detect role from environment variables.
44    ///
45    /// Checks `ALLSOURCE_ROLE` ("leader" or "follower") first,
46    /// then falls back to `ALLSOURCE_READ_ONLY` ("true" → follower).
47    /// Defaults to `Leader` if neither is set.
48    pub fn from_env() -> Self {
49        if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
50            match role.to_lowercase().as_str() {
51                "follower" => return NodeRole::Follower,
52                "leader" => return NodeRole::Leader,
53                other => {
54                    tracing::warn!(
55                        "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
56                        other
57                    );
58                    return NodeRole::Leader;
59                }
60            }
61        }
62        if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
63            && (read_only == "true" || read_only == "1")
64        {
65            return NodeRole::Follower;
66        }
67        NodeRole::Leader
68    }
69
70    pub fn is_follower(self) -> bool {
71        self == NodeRole::Follower
72    }
73
74    fn to_u8(self) -> u8 {
75        match self {
76            NodeRole::Leader => 0,
77            NodeRole::Follower => 1,
78        }
79    }
80
81    fn from_u8(v: u8) -> Self {
82        match v {
83            1 => NodeRole::Follower,
84            _ => NodeRole::Leader,
85        }
86    }
87}
88
89impl std::fmt::Display for NodeRole {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self {
92            NodeRole::Leader => write!(f, "leader"),
93            NodeRole::Follower => write!(f, "follower"),
94        }
95    }
96}
97
98/// Thread-safe, runtime-mutable node role for failover support.
99///
100/// Wraps an `AtomicU8` so the read-only middleware and health endpoint
101/// always see the current role, even after a sentinel-triggered promotion.
102#[derive(Clone)]
103pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
104
105impl AtomicNodeRole {
106    pub fn new(role: NodeRole) -> Self {
107        Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
108    }
109
110    pub fn load(&self) -> NodeRole {
111        NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
112    }
113
114    pub fn store(&self, role: NodeRole) {
115        self.0
116            .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
117    }
118}
119
120/// Unified application state for all handlers
121#[derive(Clone)]
122pub struct AppState {
123    pub store: Arc<EventStore>,
124    pub auth_manager: Arc<AuthManager>,
125    pub tenant_repo: Arc<dyn TenantRepository>,
126    /// Service container for paywall domain use cases (Creator, Article, Payment, etc.)
127    pub service_container: ServiceContainer,
128    /// Node role for leader-follower replication (runtime-mutable for failover)
129    pub role: AtomicNodeRole,
130    /// WAL shipper for replication status reporting (leader only).
131    /// Wrapped in RwLock so a promoted follower can install a shipper at runtime.
132    pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
133    /// WAL receiver for replication status reporting (follower only)
134    pub wal_receiver: Option<Arc<WalReceiver>>,
135    /// Replication port used by the WAL shipper (needed for runtime promotion)
136    pub replication_port: u16,
137    /// Cluster manager for multi-node consensus and membership (optional)
138    pub cluster_manager: Option<Arc<ClusterManager>>,
139    /// Geo-replication manager for cross-region replication (optional)
140    pub geo_replication: Option<Arc<GeoReplicationManager>>,
141}
142
143// Enable extracting Arc<EventStore> from AppState
144// This allows handlers that expect State<Arc<EventStore>> to work with AppState
145impl axum::extract::FromRef<AppState> for Arc<EventStore> {
146    fn from_ref(state: &AppState) -> Self {
147        state.store.clone()
148    }
149}
150
151pub async fn serve_v1(
152    store: Arc<EventStore>,
153    auth_manager: Arc<AuthManager>,
154    tenant_repo: Arc<dyn TenantRepository>,
155    rate_limiter: Arc<RateLimiter>,
156    service_container: ServiceContainer,
157    addr: &str,
158    role: NodeRole,
159    wal_shipper: Option<Arc<WalShipper>>,
160    wal_receiver: Option<Arc<WalReceiver>>,
161    replication_port: u16,
162    cluster_manager: Option<Arc<ClusterManager>>,
163    geo_replication: Option<Arc<GeoReplicationManager>>,
164) -> anyhow::Result<()> {
165    let app_state = AppState {
166        store,
167        auth_manager: auth_manager.clone(),
168        tenant_repo,
169        service_container,
170        role: AtomicNodeRole::new(role),
171        wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
172        wal_receiver,
173        replication_port,
174        cluster_manager,
175        geo_replication,
176    };
177
178    let auth_state = AuthState {
179        auth_manager: auth_manager.clone(),
180    };
181
182    let rate_limit_state = RateLimitState { rate_limiter };
183
184    let app = Router::new()
185        // Public routes (no auth)
186        .route("/health", get(health_v1))
187        .route("/metrics", get(super::api::prometheus_metrics))
188        // Auth routes
189        .route("/api/v1/auth/register", post(register_handler))
190        .route("/api/v1/auth/login", post(login_handler))
191        .route("/api/v1/auth/me", get(me_handler))
192        .route("/api/v1/auth/api-keys", post(create_api_key_handler))
193        .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
194        .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
195        .route("/api/v1/auth/users", get(list_users_handler))
196        .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
197        // Tenant routes (protected — enterprise only)
198        ;
199    #[cfg(feature = "multi-tenant")]
200    let app = app
201        .route("/api/v1/tenants", post(create_tenant_handler))
202        .route("/api/v1/tenants", get(list_tenants_handler))
203        .route("/api/v1/tenants/{id}", get(get_tenant_handler))
204        .route("/api/v1/tenants/{id}", put(update_tenant_handler))
205        .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
206        .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
207        .route(
208            "/api/v1/tenants/{id}/deactivate",
209            post(deactivate_tenant_handler),
210        )
211        .route(
212            "/api/v1/tenants/{id}/activate",
213            post(activate_tenant_handler),
214        )
215        .route("/api/v1/tenants/{id}", delete(delete_tenant_handler));
216    let app = app
217        // Audit endpoints (admin only)
218        .route("/api/v1/audit/events", post(log_audit_event))
219        .route("/api/v1/audit/events", get(query_audit_events))
220        // Config endpoints (admin only)
221        .route("/api/v1/config", get(list_configs))
222        .route("/api/v1/config", post(set_config))
223        .route("/api/v1/config/{key}", get(get_config))
224        .route("/api/v1/config/{key}", put(update_config))
225        .route("/api/v1/config/{key}", delete(delete_config))
226        // Demo seeding
227        .route(
228            "/api/v1/demo/seed",
229            post(super::demo_api::demo_seed_handler),
230        )
231        // Event and data routes (protected by auth)
232        .route("/api/v1/events", post(super::api::ingest_event_v1))
233        .route(
234            "/api/v1/events/batch",
235            post(super::api::ingest_events_batch_v1),
236        )
237        .route("/api/v1/events/query", get(super::api::query_events))
238        .route(
239            "/api/v1/events/{event_id}",
240            get(super::api::get_event_by_id),
241        )
242        .route("/api/v1/events/stream", get(super::api::events_websocket))
243        .route("/api/v1/entities", get(super::api::list_entities))
244        .route(
245            "/api/v1/entities/duplicates",
246            get(super::api::detect_duplicates),
247        )
248        .route(
249            "/api/v1/entities/{entity_id}/state",
250            get(super::api::get_entity_state),
251        )
252        .route(
253            "/api/v1/entities/{entity_id}/snapshot",
254            get(super::api::get_entity_snapshot),
255        )
256        .route("/api/v1/stats", get(super::api::get_stats))
257        // v0.10: Stream and event type discovery endpoints
258        .route("/api/v1/streams", get(super::api::list_streams))
259        .route("/api/v1/event-types", get(super::api::list_event_types))
260        // Analytics
261        .route(
262            "/api/v1/analytics/frequency",
263            get(super::api::analytics_frequency),
264        )
265        .route(
266            "/api/v1/analytics/summary",
267            get(super::api::analytics_summary),
268        )
269        .route(
270            "/api/v1/analytics/correlation",
271            get(super::api::analytics_correlation),
272        )
273        // Snapshots
274        .route("/api/v1/snapshots", post(super::api::create_snapshot))
275        .route("/api/v1/snapshots", get(super::api::list_snapshots))
276        .route(
277            "/api/v1/snapshots/{entity_id}/latest",
278            get(super::api::get_latest_snapshot),
279        )
280        // Compaction
281        .route(
282            "/api/v1/compaction/trigger",
283            post(super::api::trigger_compaction),
284        )
285        .route(
286            "/api/v1/compaction/stats",
287            get(super::api::compaction_stats),
288        )
289        // Schemas
290        .route("/api/v1/schemas", post(super::api::register_schema))
291        .route("/api/v1/schemas", get(super::api::list_subjects))
292        .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
293        .route(
294            "/api/v1/schemas/{subject}/versions",
295            get(super::api::list_schema_versions),
296        )
297        .route(
298            "/api/v1/schemas/validate",
299            post(super::api::validate_event_schema),
300        )
301        .route(
302            "/api/v1/schemas/{subject}/compatibility",
303            put(super::api::set_compatibility_mode),
304        )
305        // Replay
306        .route("/api/v1/replay", post(super::api::start_replay))
307        .route("/api/v1/replay", get(super::api::list_replays))
308        .route(
309            "/api/v1/replay/{replay_id}",
310            get(super::api::get_replay_progress),
311        )
312        .route(
313            "/api/v1/replay/{replay_id}/cancel",
314            post(super::api::cancel_replay),
315        )
316        .route(
317            "/api/v1/replay/{replay_id}",
318            delete(super::api::delete_replay),
319        )
320        // Pipelines
321        .route("/api/v1/pipelines", post(super::api::register_pipeline))
322        .route("/api/v1/pipelines", get(super::api::list_pipelines))
323        .route(
324            "/api/v1/pipelines/stats",
325            get(super::api::all_pipeline_stats),
326        )
327        .route(
328            "/api/v1/pipelines/{pipeline_id}",
329            get(super::api::get_pipeline),
330        )
331        .route(
332            "/api/v1/pipelines/{pipeline_id}",
333            delete(super::api::remove_pipeline),
334        )
335        .route(
336            "/api/v1/pipelines/{pipeline_id}/stats",
337            get(super::api::get_pipeline_stats),
338        )
339        .route(
340            "/api/v1/pipelines/{pipeline_id}/reset",
341            put(super::api::reset_pipeline),
342        )
343        // v0.7: Projection State API for Query Service integration
344        .route("/api/v1/projections", get(super::api::list_projections))
345        .route(
346            "/api/v1/projections/{name}",
347            get(super::api::get_projection),
348        )
349        .route(
350            "/api/v1/projections/{name}",
351            delete(super::api::delete_projection),
352        )
353        .route(
354            "/api/v1/projections/{name}/state",
355            get(super::api::get_projection_state_summary),
356        )
357        .route(
358            "/api/v1/projections/{name}/reset",
359            post(super::api::reset_projection),
360        )
361        .route(
362            "/api/v1/projections/{name}/pause",
363            post(super::api::pause_projection),
364        )
365        .route(
366            "/api/v1/projections/{name}/start",
367            post(super::api::start_projection),
368        )
369        .route(
370            "/api/v1/projections/{name}/{entity_id}/state",
371            get(super::api::get_projection_state),
372        )
373        .route(
374            "/api/v1/projections/{name}/{entity_id}/state",
375            post(super::api::save_projection_state),
376        )
377        .route(
378            "/api/v1/projections/{name}/{entity_id}/state",
379            put(super::api::save_projection_state),
380        )
381        .route(
382            "/api/v1/projections/{name}/bulk",
383            post(super::api::bulk_get_projection_states),
384        )
385        .route(
386            "/api/v1/projections/{name}/bulk/save",
387            post(super::api::bulk_save_projection_states),
388        )
389        // v0.11: Webhook management
390        .route("/api/v1/webhooks", post(super::api::register_webhook))
391        .route("/api/v1/webhooks", get(super::api::list_webhooks))
392        .route(
393            "/api/v1/webhooks/{webhook_id}",
394            get(super::api::get_webhook),
395        )
396        .route(
397            "/api/v1/webhooks/{webhook_id}",
398            put(super::api::update_webhook),
399        )
400        .route(
401            "/api/v1/webhooks/{webhook_id}",
402            delete(super::api::delete_webhook),
403        )
404        .route(
405            "/api/v1/webhooks/{webhook_id}/deliveries",
406            get(super::api::list_webhook_deliveries),
407        )
408        // v0.14: Durable consumer subscriptions
409        .route("/api/v1/consumers", post(super::api::register_consumer))
410        .route(
411            "/api/v1/consumers/{consumer_id}",
412            get(super::api::get_consumer),
413        )
414        .route(
415            "/api/v1/consumers/{consumer_id}/events",
416            get(super::api::poll_consumer_events),
417        )
418        .route(
419            "/api/v1/consumers/{consumer_id}/ack",
420            post(super::api::ack_consumer),
421        )
422        // v1.8: Cluster membership management API
423        .route("/api/v1/cluster/status", get(cluster_status_handler))
424        .route("/api/v1/cluster/members", get(cluster_list_members_handler))
425        .route("/api/v1/cluster/members", post(cluster_add_member_handler))
426        .route(
427            "/api/v1/cluster/members/{node_id}",
428            delete(cluster_remove_member_handler),
429        )
430        .route(
431            "/api/v1/cluster/members/{node_id}/heartbeat",
432            post(cluster_heartbeat_handler),
433        )
434        .route("/api/v1/cluster/vote", post(cluster_vote_handler))
435        .route("/api/v1/cluster/election", post(cluster_election_handler))
436        .route(
437            "/api/v1/cluster/partitions",
438            get(cluster_partitions_handler),
439        )
440        // v2.0: Advanced query features
441        .route("/api/v1/graphql", post(super::api::graphql_query))
442        .route("/api/v1/geospatial/query", post(super::api::geo_query))
443        .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
444        .route(
445            "/api/v1/exactly-once/stats",
446            get(super::api::exactly_once_stats),
447        )
448        .route(
449            "/api/v1/schema-evolution/history/{event_type}",
450            get(super::api::schema_evolution_history),
451        )
452        .route(
453            "/api/v1/schema-evolution/schema/{event_type}",
454            get(super::api::schema_evolution_schema),
455        )
456        .route(
457            "/api/v1/schema-evolution/stats",
458            get(super::api::schema_evolution_stats),
459        )
460        // v1.9: Geo-replication API
461        .route("/api/v1/geo/status", get(geo_status_handler))
462        .route("/api/v1/geo/sync", post(geo_sync_handler))
463        .route("/api/v1/geo/peers", get(geo_peers_handler))
464        .route("/api/v1/geo/failover", post(geo_failover_handler));
465    // Internal endpoints for sentinel-driven failover (enterprise only)
466    #[cfg(feature = "replication")]
467    let app = app
468        .route("/internal/promote", post(promote_handler))
469        .route("/internal/repoint", post(repoint_handler));
470    let app = app;
471
472    // v0.11: Bidirectional sync protocol (embedded↔server)
473    #[cfg(feature = "embedded-sync")]
474    let app = app
475        .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
476        .route("/api/v1/sync/push", post(super::api::sync_push_handler));
477
478    let app = app
479        .with_state(app_state.clone())
480        // IMPORTANT: Middleware layers execute from bottom to top in Tower/Axum
481        // Read-only middleware runs after auth (applied before rate limit layer)
482        .layer(middleware::from_fn_with_state(
483            app_state,
484            read_only_middleware,
485        ))
486        .layer(middleware::from_fn_with_state(
487            rate_limit_state,
488            rate_limit_middleware,
489        ))
490        .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
491        .layer(
492            CorsLayer::new()
493                .allow_origin(Any)
494                .allow_methods(Any)
495                .allow_headers(Any),
496        )
497        .layer(TraceLayer::new_for_http());
498
499    // Prime API — nested router with its own state (feature-gated)
500    #[cfg(feature = "prime")]
501    let app = {
502        let data_dir =
503            std::env::var("PRIME_DATA_DIR").unwrap_or_else(|_| "/tmp/prime-data".to_string());
504        match crate::prime::Prime::open(&data_dir).await {
505            Ok(prime) => {
506                let prime_state = Arc::new(super::prime_api::PrimeState { prime });
507                tracing::info!("Prime API enabled at /api/v1/prime/*");
508                app.nest(
509                    "/api/v1/prime",
510                    super::prime_api::prime_router().with_state(prime_state),
511                )
512            }
513            Err(e) => {
514                tracing::warn!("Prime API disabled: failed to open Prime: {e}");
515                app
516            }
517        }
518    };
519
520    let listener = tokio::net::TcpListener::bind(addr).await?;
521
522    // Graceful shutdown on SIGTERM (required for serverless platforms)
523    axum::serve(listener, app)
524        .with_graceful_shutdown(shutdown_signal())
525        .await?;
526
527    tracing::info!("🛑 AllSource Core shutdown complete");
528    Ok(())
529}
530
531/// Write paths that should be rejected when running as a follower.
532const WRITE_PATHS: &[&str] = &[
533    "/api/v1/events",
534    "/api/v1/events/batch",
535    "/api/v1/snapshots",
536    "/api/v1/projections/",
537    "/api/v1/schemas",
538    "/api/v1/replay",
539    "/api/v1/pipelines",
540    "/api/v1/compaction/trigger",
541    "/api/v1/audit/events",
542    "/api/v1/config",
543    "/api/v1/webhooks",
544    "/api/v1/demo/seed",
545];
546
547/// Returns true if this request is a write operation that should be blocked on followers.
548fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
549    use axum::http::Method;
550    // Only POST/PUT/DELETE are writes
551    if method != Method::POST && method != Method::PUT && method != Method::DELETE {
552        return false;
553    }
554    WRITE_PATHS
555        .iter()
556        .any(|write_path| path.starts_with(write_path))
557}
558
559/// Returns true if the request targets an internal or cluster endpoint (not subject to read-only checks).
560fn is_internal_request(path: &str) -> bool {
561    path.starts_with("/internal/")
562        || path.starts_with("/api/v1/cluster/")
563        || path.starts_with("/api/v1/geo/")
564}
565
566/// Middleware that rejects write requests when the node is a follower.
567///
568/// Returns HTTP 409 Conflict with `{"error": "read_only", "message": "..."}`.
569/// Internal endpoints (`/internal/*`) are exempt — they are used by the sentinel
570/// to trigger promotion and repointing during failover.
571async fn read_only_middleware(
572    State(state): State<AppState>,
573    request: axum::extract::Request,
574    next: axum::middleware::Next,
575) -> axum::response::Response {
576    let path = request.uri().path();
577    if state.role.load().is_follower()
578        && is_write_request(request.method(), path)
579        && !is_internal_request(path)
580    {
581        return (
582            axum::http::StatusCode::CONFLICT,
583            axum::Json(serde_json::json!({
584                "error": "read_only",
585                "message": "This node is a read-only follower"
586            })),
587        )
588            .into_response();
589    }
590    next.run(request).await
591}
592
593/// Health endpoint with system stream health reporting.
594///
595/// Reports overall health plus detailed system metadata health when
596/// event-sourced system repositories are configured.
597async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
598    let has_system_repos = state.service_container.has_system_repositories();
599
600    let system_streams = if has_system_repos {
601        let (tenant_count, config_count, total_events) =
602            if let Some(store) = state.service_container.system_store() {
603                use crate::domain::value_objects::system_stream::SystemDomain;
604                (
605                    store.count_stream(SystemDomain::Tenant),
606                    store.count_stream(SystemDomain::Config),
607                    store.total_events(),
608                )
609            } else {
610                (0, 0, 0)
611            };
612
613        serde_json::json!({
614            "status": "healthy",
615            "mode": "event-sourced",
616            "total_events": total_events,
617            "tenant_events": tenant_count,
618            "config_events": config_count,
619        })
620    } else {
621        serde_json::json!({
622            "status": "disabled",
623            "mode": "in-memory",
624        })
625    };
626
627    let replication = {
628        #[cfg(feature = "replication")]
629        {
630            let shipper_guard = state.wal_shipper.read().await;
631            if let Some(ref shipper) = *shipper_guard {
632                serde_json::to_value(shipper.status()).unwrap_or_default()
633            } else if let Some(ref receiver) = state.wal_receiver {
634                serde_json::to_value(receiver.status()).unwrap_or_default()
635            } else {
636                serde_json::json!(null)
637            }
638        }
639        #[cfg(not(feature = "replication"))]
640        serde_json::json!({"edition": "community", "status": "not_available"})
641    };
642
643    let current_role = state.role.load();
644
645    Json(serde_json::json!({
646        "status": "healthy",
647        "service": "allsource-core",
648        "version": env!("CARGO_PKG_VERSION"),
649        "role": current_role,
650        "system_streams": system_streams,
651        "replication": replication,
652    }))
653}
654
655/// POST /internal/promote — Promote this follower to leader.
656///
657/// Called by the sentinel process during automated failover.
658/// Switches the node role to leader, stops WAL receiving, and starts
659/// a WAL shipper on the replication port so other followers can connect.
660#[cfg(feature = "replication")]
661async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
662    let current_role = state.role.load();
663    if current_role == NodeRole::Leader {
664        return (
665            axum::http::StatusCode::OK,
666            Json(serde_json::json!({
667                "status": "already_leader",
668                "message": "This node is already the leader",
669            })),
670        );
671    }
672
673    tracing::info!("PROMOTE: Switching role from follower to leader");
674
675    // 1. Switch role — the read-only middleware will immediately start accepting writes
676    state.role.store(NodeRole::Leader);
677
678    // 2. Signal the WAL receiver to stop (it will stop reconnecting)
679    if let Some(ref receiver) = state.wal_receiver {
680        receiver.shutdown();
681        tracing::info!("PROMOTE: WAL receiver shutdown signalled");
682    }
683
684    // 3. Start a new WAL shipper so remaining followers can connect
685    let replication_port = state.replication_port;
686    let (mut shipper, tx) = WalShipper::new();
687    state.store.enable_wal_replication(tx);
688    shipper.set_store(Arc::clone(&state.store));
689    shipper.set_metrics(state.store.metrics());
690    let shipper = Arc::new(shipper);
691
692    // Install into AppState so health endpoint reports shipper status
693    {
694        let mut shipper_guard = state.wal_shipper.write().await;
695        *shipper_guard = Some(Arc::clone(&shipper));
696    }
697
698    // Spawn the shipper server
699    let shipper_clone = Arc::clone(&shipper);
700    tokio::spawn(async move {
701        if let Err(e) = shipper_clone.serve(replication_port).await {
702            tracing::error!("Promoted WAL shipper error: {}", e);
703        }
704    });
705
706    tracing::info!(
707        "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
708        replication_port,
709    );
710
711    (
712        axum::http::StatusCode::OK,
713        Json(serde_json::json!({
714            "status": "promoted",
715            "role": "leader",
716            "replication_port": replication_port,
717        })),
718    )
719}
720
721/// POST /internal/repoint?leader=host:port — Switch replication target.
722///
723/// Called by the sentinel process to tell a follower to disconnect from
724/// the old leader and reconnect to a newly promoted leader.
725#[cfg(feature = "replication")]
726async fn repoint_handler(
727    State(state): State<AppState>,
728    axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
729) -> impl IntoResponse {
730    let current_role = state.role.load();
731    if current_role != NodeRole::Follower {
732        return (
733            axum::http::StatusCode::CONFLICT,
734            Json(serde_json::json!({
735                "error": "not_follower",
736                "message": "Repoint only applies to follower nodes",
737            })),
738        );
739    }
740
741    let new_leader = match params.get("leader") {
742        Some(l) if !l.is_empty() => l.clone(),
743        _ => {
744            return (
745                axum::http::StatusCode::BAD_REQUEST,
746                Json(serde_json::json!({
747                    "error": "missing_leader",
748                    "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
749                })),
750            );
751        }
752    };
753
754    tracing::info!("REPOINT: Switching replication target to {}", new_leader);
755
756    if let Some(ref receiver) = state.wal_receiver {
757        receiver.repoint(&new_leader);
758        tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
759    } else {
760        tracing::warn!("REPOINT: No WAL receiver to repoint");
761    }
762
763    (
764        axum::http::StatusCode::OK,
765        Json(serde_json::json!({
766            "status": "repointed",
767            "new_leader": new_leader,
768        })),
769    )
770}
771
772// =============================================================================
773// Cluster Management Handlers (v1.8)
774// =============================================================================
775
776/// GET /api/v1/cluster/status — Get cluster status including term, leader, and members
777async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
778    let Some(ref cm) = state.cluster_manager else {
779        return (
780            axum::http::StatusCode::SERVICE_UNAVAILABLE,
781            Json(serde_json::json!({
782                "error": "cluster_not_enabled",
783                "message": "Cluster mode is not enabled on this node"
784            })),
785        );
786    };
787
788    let status = cm.status().await;
789    (
790        axum::http::StatusCode::OK,
791        Json(serde_json::to_value(status).unwrap()),
792    )
793}
794
795/// GET /api/v1/cluster/members — List all cluster members
796async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
797    let Some(ref cm) = state.cluster_manager else {
798        return (
799            axum::http::StatusCode::SERVICE_UNAVAILABLE,
800            Json(serde_json::json!({
801                "error": "cluster_not_enabled",
802                "message": "Cluster mode is not enabled on this node"
803            })),
804        );
805    };
806
807    let members = cm.all_members();
808    (
809        axum::http::StatusCode::OK,
810        Json(serde_json::json!({
811            "members": members,
812            "count": members.len(),
813        })),
814    )
815}
816
817/// POST /api/v1/cluster/members — Add a member to the cluster
818async fn cluster_add_member_handler(
819    State(state): State<AppState>,
820    Json(member): Json<ClusterMember>,
821) -> impl IntoResponse {
822    let Some(ref cm) = state.cluster_manager else {
823        return (
824            axum::http::StatusCode::SERVICE_UNAVAILABLE,
825            Json(serde_json::json!({
826                "error": "cluster_not_enabled",
827                "message": "Cluster mode is not enabled on this node"
828            })),
829        );
830    };
831
832    let node_id = member.node_id;
833    cm.add_member(member).await;
834
835    tracing::info!("Cluster member {} added", node_id);
836    (
837        axum::http::StatusCode::OK,
838        Json(serde_json::json!({
839            "status": "added",
840            "node_id": node_id,
841        })),
842    )
843}
844
845/// DELETE /api/v1/cluster/members/{node_id} — Remove a member from the cluster
846async fn cluster_remove_member_handler(
847    State(state): State<AppState>,
848    Path(node_id): Path<u32>,
849) -> impl IntoResponse {
850    let Some(ref cm) = state.cluster_manager else {
851        return (
852            axum::http::StatusCode::SERVICE_UNAVAILABLE,
853            Json(serde_json::json!({
854                "error": "cluster_not_enabled",
855                "message": "Cluster mode is not enabled on this node"
856            })),
857        );
858    };
859
860    match cm.remove_member(node_id).await {
861        Some(_) => {
862            tracing::info!("Cluster member {} removed", node_id);
863            (
864                axum::http::StatusCode::OK,
865                Json(serde_json::json!({
866                    "status": "removed",
867                    "node_id": node_id,
868                })),
869            )
870        }
871        None => (
872            axum::http::StatusCode::NOT_FOUND,
873            Json(serde_json::json!({
874                "error": "not_found",
875                "message": format!("Node {} not found in cluster", node_id),
876            })),
877        ),
878    }
879}
880
881/// POST /api/v1/cluster/members/{node_id}/heartbeat — Update member heartbeat
882#[derive(serde::Deserialize)]
883struct HeartbeatRequest {
884    wal_offset: u64,
885    #[serde(default = "default_true")]
886    healthy: bool,
887}
888
889fn default_true() -> bool {
890    true
891}
892
893async fn cluster_heartbeat_handler(
894    State(state): State<AppState>,
895    Path(node_id): Path<u32>,
896    Json(req): Json<HeartbeatRequest>,
897) -> impl IntoResponse {
898    let Some(ref cm) = state.cluster_manager else {
899        return (
900            axum::http::StatusCode::SERVICE_UNAVAILABLE,
901            Json(serde_json::json!({
902                "error": "cluster_not_enabled",
903                "message": "Cluster mode is not enabled on this node"
904            })),
905        );
906    };
907
908    cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
909    (
910        axum::http::StatusCode::OK,
911        Json(serde_json::json!({
912            "status": "updated",
913            "node_id": node_id,
914        })),
915    )
916}
917
918/// POST /api/v1/cluster/vote — Handle a vote request (simplified Raft RequestVote)
919async fn cluster_vote_handler(
920    State(state): State<AppState>,
921    Json(request): Json<VoteRequest>,
922) -> impl IntoResponse {
923    let Some(ref cm) = state.cluster_manager else {
924        return (
925            axum::http::StatusCode::SERVICE_UNAVAILABLE,
926            Json(serde_json::json!({
927                "error": "cluster_not_enabled",
928                "message": "Cluster mode is not enabled on this node"
929            })),
930        );
931    };
932
933    let response = cm.handle_vote_request(&request).await;
934    (
935        axum::http::StatusCode::OK,
936        Json(serde_json::to_value(response).unwrap()),
937    )
938}
939
940/// POST /api/v1/cluster/election — Trigger a leader election (manual failover)
941async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
942    let Some(ref cm) = state.cluster_manager else {
943        return (
944            axum::http::StatusCode::SERVICE_UNAVAILABLE,
945            Json(serde_json::json!({
946                "error": "cluster_not_enabled",
947                "message": "Cluster mode is not enabled on this node"
948            })),
949        );
950    };
951
952    // Select best candidate deterministically
953    let candidate = cm.select_leader_candidate();
954
955    match candidate {
956        Some(candidate_id) => {
957            let new_term = cm.start_election().await;
958            tracing::info!(
959                "Cluster election started: term={}, candidate={}",
960                new_term,
961                candidate_id,
962            );
963
964            // If this node is the candidate, become leader immediately
965            // (In a full Raft, we'd collect votes from a majority first)
966            if candidate_id == cm.self_id() {
967                cm.become_leader(new_term).await;
968                tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
969            }
970
971            (
972                axum::http::StatusCode::OK,
973                Json(serde_json::json!({
974                    "status": "election_started",
975                    "term": new_term,
976                    "candidate_id": candidate_id,
977                    "self_is_leader": candidate_id == cm.self_id(),
978                })),
979            )
980        }
981        None => (
982            axum::http::StatusCode::CONFLICT,
983            Json(serde_json::json!({
984                "error": "no_candidates",
985                "message": "No healthy members available for leader election",
986            })),
987        ),
988    }
989}
990
991/// GET /api/v1/cluster/partitions — Get partition distribution across nodes
992async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
993    let Some(ref cm) = state.cluster_manager else {
994        return (
995            axum::http::StatusCode::SERVICE_UNAVAILABLE,
996            Json(serde_json::json!({
997                "error": "cluster_not_enabled",
998                "message": "Cluster mode is not enabled on this node"
999            })),
1000        );
1001    };
1002
1003    let registry = cm.registry();
1004    let distribution = registry.partition_distribution();
1005    let total_partitions: usize = distribution.values().map(std::vec::Vec::len).sum();
1006
1007    (
1008        axum::http::StatusCode::OK,
1009        Json(serde_json::json!({
1010            "total_partitions": total_partitions,
1011            "node_count": registry.node_count(),
1012            "healthy_node_count": registry.healthy_node_count(),
1013            "distribution": distribution,
1014        })),
1015    )
1016}
1017
1018// =============================================================================
1019// Geo-Replication Handlers (v1.9)
1020// =============================================================================
1021
1022/// GET /api/v1/geo/status — Get geo-replication status
1023async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
1024    let Some(ref geo) = state.geo_replication else {
1025        return (
1026            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1027            Json(serde_json::json!({
1028                "error": "geo_replication_not_enabled",
1029                "message": "Geo-replication is not enabled on this node"
1030            })),
1031        );
1032    };
1033
1034    let status = geo.status();
1035    (
1036        axum::http::StatusCode::OK,
1037        Json(serde_json::to_value(status).unwrap()),
1038    )
1039}
1040
1041/// POST /api/v1/geo/sync — Receive replicated events from a peer region
1042async fn geo_sync_handler(
1043    State(state): State<AppState>,
1044    Json(request): Json<GeoSyncRequest>,
1045) -> impl IntoResponse {
1046    let Some(ref geo) = state.geo_replication else {
1047        return (
1048            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1049            Json(serde_json::json!({
1050                "error": "geo_replication_not_enabled",
1051                "message": "Geo-replication is not enabled on this node"
1052            })),
1053        );
1054    };
1055
1056    tracing::info!(
1057        "Geo-sync received from region '{}': {} events",
1058        request.source_region,
1059        request.events.len(),
1060    );
1061
1062    let response = geo.receive_sync(&request);
1063    (
1064        axum::http::StatusCode::OK,
1065        Json(serde_json::to_value(response).unwrap()),
1066    )
1067}
1068
1069/// GET /api/v1/geo/peers — List peer regions and their health
1070async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1071    let Some(ref geo) = state.geo_replication else {
1072        return (
1073            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1074            Json(serde_json::json!({
1075                "error": "geo_replication_not_enabled",
1076                "message": "Geo-replication is not enabled on this node"
1077            })),
1078        );
1079    };
1080
1081    let status = geo.status();
1082    (
1083        axum::http::StatusCode::OK,
1084        Json(serde_json::json!({
1085            "region_id": status.region_id,
1086            "peers": status.peers,
1087        })),
1088    )
1089}
1090
1091/// POST /api/v1/geo/failover — Trigger regional failover
1092async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1093    let Some(ref geo) = state.geo_replication else {
1094        return (
1095            axum::http::StatusCode::SERVICE_UNAVAILABLE,
1096            Json(serde_json::json!({
1097                "error": "geo_replication_not_enabled",
1098                "message": "Geo-replication is not enabled on this node"
1099            })),
1100        );
1101    };
1102
1103    match geo.select_failover_region() {
1104        Some(failover_region) => {
1105            tracing::info!(
1106                "Geo-failover: selected region '{}' as failover target",
1107                failover_region,
1108            );
1109            (
1110                axum::http::StatusCode::OK,
1111                Json(serde_json::json!({
1112                    "status": "failover_target_selected",
1113                    "failover_region": failover_region,
1114                    "message": "Region selected for failover. DNS/routing update required externally.",
1115                })),
1116            )
1117        }
1118        None => (
1119            axum::http::StatusCode::CONFLICT,
1120            Json(serde_json::json!({
1121                "error": "no_healthy_peers",
1122                "message": "No healthy peer regions available for failover",
1123            })),
1124        ),
1125    }
1126}
1127
1128/// Listen for shutdown signals (SIGTERM for serverless, SIGINT for local dev)
1129async fn shutdown_signal() {
1130    let ctrl_c = async {
1131        tokio::signal::ctrl_c()
1132            .await
1133            .expect("failed to install Ctrl+C handler");
1134    };
1135
1136    #[cfg(unix)]
1137    let terminate = async {
1138        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1139            .expect("failed to install SIGTERM handler")
1140            .recv()
1141            .await;
1142    };
1143
1144    #[cfg(not(unix))]
1145    let terminate = std::future::pending::<()>();
1146
1147    tokio::select! {
1148        () = ctrl_c => {
1149            tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
1150        }
1151        () = terminate => {
1152            tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
1153        }
1154    }
1155}