1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 application::services::tenant_service::TenantManager,
5 infrastructure::{
6 cluster::{
7 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8 },
9 replication::{WalReceiver, WalShipper},
10 security::{
11 auth::AuthManager,
12 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13 rate_limit::RateLimiter,
14 },
15 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16 },
17 store::EventStore,
18};
19use axum::{
20 Json, Router,
21 extract::{Path, State},
22 middleware,
23 response::IntoResponse,
24 routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28 cors::{Any, CorsLayer},
29 trace::TraceLayer,
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36 Leader,
37 Follower,
38}
39
40impl NodeRole {
41 pub fn from_env() -> Self {
47 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48 match role.to_lowercase().as_str() {
49 "follower" => return NodeRole::Follower,
50 "leader" => return NodeRole::Leader,
51 other => {
52 tracing::warn!(
53 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54 other
55 );
56 return NodeRole::Leader;
57 }
58 }
59 }
60 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61 && (read_only == "true" || read_only == "1")
62 {
63 return NodeRole::Follower;
64 }
65 NodeRole::Leader
66 }
67
68 pub fn is_follower(self) -> bool {
69 self == NodeRole::Follower
70 }
71
72 fn to_u8(self) -> u8 {
73 match self {
74 NodeRole::Leader => 0,
75 NodeRole::Follower => 1,
76 }
77 }
78
79 fn from_u8(v: u8) -> Self {
80 match v {
81 1 => NodeRole::Follower,
82 _ => NodeRole::Leader,
83 }
84 }
85}
86
87impl std::fmt::Display for NodeRole {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 NodeRole::Leader => write!(f, "leader"),
91 NodeRole::Follower => write!(f, "follower"),
92 }
93 }
94}
95
96#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104 pub fn new(role: NodeRole) -> Self {
105 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106 }
107
108 pub fn load(&self) -> NodeRole {
109 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110 }
111
112 pub fn store(&self, role: NodeRole) {
113 self.0
114 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115 }
116}
117
118#[derive(Clone)]
120pub struct AppState {
121 pub store: Arc<EventStore>,
122 pub auth_manager: Arc<AuthManager>,
123 pub tenant_manager: Arc<TenantManager>,
124 pub service_container: ServiceContainer,
126 pub role: AtomicNodeRole,
128 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131 pub wal_receiver: Option<Arc<WalReceiver>>,
133 pub replication_port: u16,
135 pub cluster_manager: Option<Arc<ClusterManager>>,
137 pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144 fn from_ref(state: &AppState) -> Self {
145 state.store.clone()
146 }
147}
148
149pub async fn serve_v1(
150 store: Arc<EventStore>,
151 auth_manager: Arc<AuthManager>,
152 tenant_manager: Arc<TenantManager>,
153 rate_limiter: Arc<RateLimiter>,
154 service_container: ServiceContainer,
155 addr: &str,
156 role: NodeRole,
157 wal_shipper: Option<Arc<WalShipper>>,
158 wal_receiver: Option<Arc<WalReceiver>>,
159 replication_port: u16,
160 cluster_manager: Option<Arc<ClusterManager>>,
161 geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163 let app_state = AppState {
164 store,
165 auth_manager: auth_manager.clone(),
166 tenant_manager,
167 service_container,
168 role: AtomicNodeRole::new(role),
169 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170 wal_receiver,
171 replication_port,
172 cluster_manager,
173 geo_replication,
174 };
175
176 let auth_state = AuthState {
177 auth_manager: auth_manager.clone(),
178 };
179
180 let rate_limit_state = RateLimitState { rate_limiter };
181
182 let app = Router::new()
183 .route("/health", get(health_v1))
185 .route("/metrics", get(super::api::prometheus_metrics))
186 .route("/api/v1/auth/register", post(register_handler))
188 .route("/api/v1/auth/login", post(login_handler))
189 .route("/api/v1/auth/me", get(me_handler))
190 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193 .route("/api/v1/auth/users", get(list_users_handler))
194 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195 .route("/api/v1/tenants", post(create_tenant_handler))
197 .route("/api/v1/tenants", get(list_tenants_handler))
198 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202 .route(
203 "/api/v1/tenants/{id}/deactivate",
204 post(deactivate_tenant_handler),
205 )
206 .route(
207 "/api/v1/tenants/{id}/activate",
208 post(activate_tenant_handler),
209 )
210 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211 .route("/api/v1/audit/events", post(log_audit_event))
213 .route("/api/v1/audit/events", get(query_audit_events))
214 .route("/api/v1/config", get(list_configs))
216 .route("/api/v1/config", post(set_config))
217 .route("/api/v1/config/{key}", get(get_config))
218 .route("/api/v1/config/{key}", put(update_config))
219 .route("/api/v1/config/{key}", delete(delete_config))
220 .route(
222 "/api/v1/demo/seed",
223 post(super::demo_api::demo_seed_handler),
224 )
225 .route("/api/v1/events", post(super::api::ingest_event_v1))
227 .route(
228 "/api/v1/events/batch",
229 post(super::api::ingest_events_batch_v1),
230 )
231 .route("/api/v1/events/query", get(super::api::query_events))
232 .route(
233 "/api/v1/events/{event_id}",
234 get(super::api::get_event_by_id),
235 )
236 .route("/api/v1/events/stream", get(super::api::events_websocket))
237 .route("/api/v1/entities", get(super::api::list_entities))
238 .route(
239 "/api/v1/entities/duplicates",
240 get(super::api::detect_duplicates),
241 )
242 .route(
243 "/api/v1/entities/{entity_id}/state",
244 get(super::api::get_entity_state),
245 )
246 .route(
247 "/api/v1/entities/{entity_id}/snapshot",
248 get(super::api::get_entity_snapshot),
249 )
250 .route("/api/v1/stats", get(super::api::get_stats))
251 .route("/api/v1/streams", get(super::api::list_streams))
253 .route("/api/v1/event-types", get(super::api::list_event_types))
254 .route(
256 "/api/v1/analytics/frequency",
257 get(super::api::analytics_frequency),
258 )
259 .route(
260 "/api/v1/analytics/summary",
261 get(super::api::analytics_summary),
262 )
263 .route(
264 "/api/v1/analytics/correlation",
265 get(super::api::analytics_correlation),
266 )
267 .route("/api/v1/snapshots", post(super::api::create_snapshot))
269 .route("/api/v1/snapshots", get(super::api::list_snapshots))
270 .route(
271 "/api/v1/snapshots/{entity_id}/latest",
272 get(super::api::get_latest_snapshot),
273 )
274 .route(
276 "/api/v1/compaction/trigger",
277 post(super::api::trigger_compaction),
278 )
279 .route(
280 "/api/v1/compaction/stats",
281 get(super::api::compaction_stats),
282 )
283 .route("/api/v1/schemas", post(super::api::register_schema))
285 .route("/api/v1/schemas", get(super::api::list_subjects))
286 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287 .route(
288 "/api/v1/schemas/{subject}/versions",
289 get(super::api::list_schema_versions),
290 )
291 .route(
292 "/api/v1/schemas/validate",
293 post(super::api::validate_event_schema),
294 )
295 .route(
296 "/api/v1/schemas/{subject}/compatibility",
297 put(super::api::set_compatibility_mode),
298 )
299 .route("/api/v1/replay", post(super::api::start_replay))
301 .route("/api/v1/replay", get(super::api::list_replays))
302 .route(
303 "/api/v1/replay/{replay_id}",
304 get(super::api::get_replay_progress),
305 )
306 .route(
307 "/api/v1/replay/{replay_id}/cancel",
308 post(super::api::cancel_replay),
309 )
310 .route(
311 "/api/v1/replay/{replay_id}",
312 delete(super::api::delete_replay),
313 )
314 .route("/api/v1/pipelines", post(super::api::register_pipeline))
316 .route("/api/v1/pipelines", get(super::api::list_pipelines))
317 .route(
318 "/api/v1/pipelines/stats",
319 get(super::api::all_pipeline_stats),
320 )
321 .route(
322 "/api/v1/pipelines/{pipeline_id}",
323 get(super::api::get_pipeline),
324 )
325 .route(
326 "/api/v1/pipelines/{pipeline_id}",
327 delete(super::api::remove_pipeline),
328 )
329 .route(
330 "/api/v1/pipelines/{pipeline_id}/stats",
331 get(super::api::get_pipeline_stats),
332 )
333 .route(
334 "/api/v1/pipelines/{pipeline_id}/reset",
335 put(super::api::reset_pipeline),
336 )
337 .route("/api/v1/projections", get(super::api::list_projections))
339 .route(
340 "/api/v1/projections/{name}",
341 get(super::api::get_projection),
342 )
343 .route(
344 "/api/v1/projections/{name}",
345 delete(super::api::delete_projection),
346 )
347 .route(
348 "/api/v1/projections/{name}/state",
349 get(super::api::get_projection_state_summary),
350 )
351 .route(
352 "/api/v1/projections/{name}/reset",
353 post(super::api::reset_projection),
354 )
355 .route(
356 "/api/v1/projections/{name}/{entity_id}/state",
357 get(super::api::get_projection_state),
358 )
359 .route(
360 "/api/v1/projections/{name}/{entity_id}/state",
361 post(super::api::save_projection_state),
362 )
363 .route(
364 "/api/v1/projections/{name}/{entity_id}/state",
365 put(super::api::save_projection_state),
366 )
367 .route(
368 "/api/v1/projections/{name}/bulk",
369 post(super::api::bulk_get_projection_states),
370 )
371 .route(
372 "/api/v1/projections/{name}/bulk/save",
373 post(super::api::bulk_save_projection_states),
374 )
375 .route("/api/v1/webhooks", post(super::api::register_webhook))
377 .route("/api/v1/webhooks", get(super::api::list_webhooks))
378 .route(
379 "/api/v1/webhooks/{webhook_id}",
380 get(super::api::get_webhook),
381 )
382 .route(
383 "/api/v1/webhooks/{webhook_id}",
384 put(super::api::update_webhook),
385 )
386 .route(
387 "/api/v1/webhooks/{webhook_id}",
388 delete(super::api::delete_webhook),
389 )
390 .route(
391 "/api/v1/webhooks/{webhook_id}/deliveries",
392 get(super::api::list_webhook_deliveries),
393 )
394 .route("/api/v1/cluster/status", get(cluster_status_handler))
396 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
397 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
398 .route(
399 "/api/v1/cluster/members/{node_id}",
400 delete(cluster_remove_member_handler),
401 )
402 .route(
403 "/api/v1/cluster/members/{node_id}/heartbeat",
404 post(cluster_heartbeat_handler),
405 )
406 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
407 .route("/api/v1/cluster/election", post(cluster_election_handler))
408 .route(
409 "/api/v1/cluster/partitions",
410 get(cluster_partitions_handler),
411 )
412 .route("/api/v1/eventql", post(super::api::eventql_query))
414 .route("/api/v1/graphql", post(super::api::graphql_query))
415 .route("/api/v1/geospatial/query", post(super::api::geo_query))
416 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
417 .route(
418 "/api/v1/exactly-once/stats",
419 get(super::api::exactly_once_stats),
420 )
421 .route(
422 "/api/v1/schema-evolution/history/{event_type}",
423 get(super::api::schema_evolution_history),
424 )
425 .route(
426 "/api/v1/schema-evolution/schema/{event_type}",
427 get(super::api::schema_evolution_schema),
428 )
429 .route(
430 "/api/v1/schema-evolution/stats",
431 get(super::api::schema_evolution_stats),
432 )
433 .route("/api/v1/geo/status", get(geo_status_handler))
435 .route("/api/v1/geo/sync", post(geo_sync_handler))
436 .route("/api/v1/geo/peers", get(geo_peers_handler))
437 .route("/api/v1/geo/failover", post(geo_failover_handler))
438 .route("/internal/promote", post(promote_handler))
440 .route("/internal/repoint", post(repoint_handler))
441 .with_state(app_state.clone())
442 .layer(middleware::from_fn_with_state(
445 app_state,
446 read_only_middleware,
447 ))
448 .layer(middleware::from_fn_with_state(
449 rate_limit_state,
450 rate_limit_middleware,
451 ))
452 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
453 .layer(
454 CorsLayer::new()
455 .allow_origin(Any)
456 .allow_methods(Any)
457 .allow_headers(Any),
458 )
459 .layer(TraceLayer::new_for_http());
460
461 let listener = tokio::net::TcpListener::bind(addr).await?;
462
463 axum::serve(listener, app)
465 .with_graceful_shutdown(shutdown_signal())
466 .await?;
467
468 tracing::info!("🛑 AllSource Core shutdown complete");
469 Ok(())
470}
471
472const WRITE_PATHS: &[&str] = &[
474 "/api/v1/events",
475 "/api/v1/events/batch",
476 "/api/v1/snapshots",
477 "/api/v1/projections/",
478 "/api/v1/schemas",
479 "/api/v1/replay",
480 "/api/v1/pipelines",
481 "/api/v1/compaction/trigger",
482 "/api/v1/audit/events",
483 "/api/v1/config",
484 "/api/v1/webhooks",
485 "/api/v1/demo/seed",
486];
487
488fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
490 use axum::http::Method;
491 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
493 return false;
494 }
495 WRITE_PATHS
496 .iter()
497 .any(|write_path| path.starts_with(write_path))
498}
499
500fn is_internal_request(path: &str) -> bool {
502 path.starts_with("/internal/")
503 || path.starts_with("/api/v1/cluster/")
504 || path.starts_with("/api/v1/geo/")
505}
506
507async fn read_only_middleware(
513 State(state): State<AppState>,
514 request: axum::extract::Request,
515 next: axum::middleware::Next,
516) -> axum::response::Response {
517 let path = request.uri().path();
518 if state.role.load().is_follower()
519 && is_write_request(request.method(), path)
520 && !is_internal_request(path)
521 {
522 return (
523 axum::http::StatusCode::CONFLICT,
524 axum::Json(serde_json::json!({
525 "error": "read_only",
526 "message": "This node is a read-only follower"
527 })),
528 )
529 .into_response();
530 }
531 next.run(request).await
532}
533
534async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
539 let has_system_repos = state.service_container.has_system_repositories();
540
541 let system_streams = if has_system_repos {
542 let (tenant_count, config_count, total_events) =
543 if let Some(store) = state.service_container.system_store() {
544 use crate::domain::value_objects::system_stream::SystemDomain;
545 (
546 store.count_stream(SystemDomain::Tenant),
547 store.count_stream(SystemDomain::Config),
548 store.total_events(),
549 )
550 } else {
551 (0, 0, 0)
552 };
553
554 serde_json::json!({
555 "status": "healthy",
556 "mode": "event-sourced",
557 "total_events": total_events,
558 "tenant_events": tenant_count,
559 "config_events": config_count,
560 })
561 } else {
562 serde_json::json!({
563 "status": "disabled",
564 "mode": "in-memory",
565 })
566 };
567
568 let replication = {
569 let shipper_guard = state.wal_shipper.read().await;
570 if let Some(ref shipper) = *shipper_guard {
571 serde_json::to_value(shipper.status()).unwrap_or_default()
572 } else if let Some(ref receiver) = state.wal_receiver {
573 serde_json::to_value(receiver.status()).unwrap_or_default()
574 } else {
575 serde_json::json!(null)
576 }
577 };
578
579 let current_role = state.role.load();
580
581 Json(serde_json::json!({
582 "status": "healthy",
583 "service": "allsource-core",
584 "version": env!("CARGO_PKG_VERSION"),
585 "role": current_role,
586 "system_streams": system_streams,
587 "replication": replication,
588 }))
589}
590
591async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
597 let current_role = state.role.load();
598 if current_role == NodeRole::Leader {
599 return (
600 axum::http::StatusCode::OK,
601 Json(serde_json::json!({
602 "status": "already_leader",
603 "message": "This node is already the leader",
604 })),
605 );
606 }
607
608 tracing::info!("PROMOTE: Switching role from follower to leader");
609
610 state.role.store(NodeRole::Leader);
612
613 if let Some(ref receiver) = state.wal_receiver {
615 receiver.shutdown();
616 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
617 }
618
619 let replication_port = state.replication_port;
621 let (mut shipper, tx) = WalShipper::new();
622 state.store.enable_wal_replication(tx);
623 shipper.set_store(Arc::clone(&state.store));
624 shipper.set_metrics(state.store.metrics());
625 let shipper = Arc::new(shipper);
626
627 {
629 let mut shipper_guard = state.wal_shipper.write().await;
630 *shipper_guard = Some(Arc::clone(&shipper));
631 }
632
633 let shipper_clone = Arc::clone(&shipper);
635 tokio::spawn(async move {
636 if let Err(e) = shipper_clone.serve(replication_port).await {
637 tracing::error!("Promoted WAL shipper error: {}", e);
638 }
639 });
640
641 tracing::info!(
642 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
643 replication_port,
644 );
645
646 (
647 axum::http::StatusCode::OK,
648 Json(serde_json::json!({
649 "status": "promoted",
650 "role": "leader",
651 "replication_port": replication_port,
652 })),
653 )
654}
655
656async fn repoint_handler(
661 State(state): State<AppState>,
662 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
663) -> impl IntoResponse {
664 let current_role = state.role.load();
665 if current_role != NodeRole::Follower {
666 return (
667 axum::http::StatusCode::CONFLICT,
668 Json(serde_json::json!({
669 "error": "not_follower",
670 "message": "Repoint only applies to follower nodes",
671 })),
672 );
673 }
674
675 let new_leader = match params.get("leader") {
676 Some(l) if !l.is_empty() => l.clone(),
677 _ => {
678 return (
679 axum::http::StatusCode::BAD_REQUEST,
680 Json(serde_json::json!({
681 "error": "missing_leader",
682 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
683 })),
684 );
685 }
686 };
687
688 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
689
690 if let Some(ref receiver) = state.wal_receiver {
691 receiver.repoint(&new_leader);
692 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
693 } else {
694 tracing::warn!("REPOINT: No WAL receiver to repoint");
695 }
696
697 (
698 axum::http::StatusCode::OK,
699 Json(serde_json::json!({
700 "status": "repointed",
701 "new_leader": new_leader,
702 })),
703 )
704}
705
706async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
712 let Some(ref cm) = state.cluster_manager else {
713 return (
714 axum::http::StatusCode::SERVICE_UNAVAILABLE,
715 Json(serde_json::json!({
716 "error": "cluster_not_enabled",
717 "message": "Cluster mode is not enabled on this node"
718 })),
719 );
720 };
721
722 let status = cm.status().await;
723 (
724 axum::http::StatusCode::OK,
725 Json(serde_json::to_value(status).unwrap()),
726 )
727}
728
729async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
731 let Some(ref cm) = state.cluster_manager else {
732 return (
733 axum::http::StatusCode::SERVICE_UNAVAILABLE,
734 Json(serde_json::json!({
735 "error": "cluster_not_enabled",
736 "message": "Cluster mode is not enabled on this node"
737 })),
738 );
739 };
740
741 let members = cm.all_members();
742 (
743 axum::http::StatusCode::OK,
744 Json(serde_json::json!({
745 "members": members,
746 "count": members.len(),
747 })),
748 )
749}
750
751async fn cluster_add_member_handler(
753 State(state): State<AppState>,
754 Json(member): Json<ClusterMember>,
755) -> impl IntoResponse {
756 let Some(ref cm) = state.cluster_manager else {
757 return (
758 axum::http::StatusCode::SERVICE_UNAVAILABLE,
759 Json(serde_json::json!({
760 "error": "cluster_not_enabled",
761 "message": "Cluster mode is not enabled on this node"
762 })),
763 );
764 };
765
766 let node_id = member.node_id;
767 cm.add_member(member).await;
768
769 tracing::info!("Cluster member {} added", node_id);
770 (
771 axum::http::StatusCode::OK,
772 Json(serde_json::json!({
773 "status": "added",
774 "node_id": node_id,
775 })),
776 )
777}
778
779async fn cluster_remove_member_handler(
781 State(state): State<AppState>,
782 Path(node_id): Path<u32>,
783) -> impl IntoResponse {
784 let Some(ref cm) = state.cluster_manager else {
785 return (
786 axum::http::StatusCode::SERVICE_UNAVAILABLE,
787 Json(serde_json::json!({
788 "error": "cluster_not_enabled",
789 "message": "Cluster mode is not enabled on this node"
790 })),
791 );
792 };
793
794 match cm.remove_member(node_id).await {
795 Some(_) => {
796 tracing::info!("Cluster member {} removed", node_id);
797 (
798 axum::http::StatusCode::OK,
799 Json(serde_json::json!({
800 "status": "removed",
801 "node_id": node_id,
802 })),
803 )
804 }
805 None => (
806 axum::http::StatusCode::NOT_FOUND,
807 Json(serde_json::json!({
808 "error": "not_found",
809 "message": format!("Node {} not found in cluster", node_id),
810 })),
811 ),
812 }
813}
814
815#[derive(serde::Deserialize)]
817struct HeartbeatRequest {
818 wal_offset: u64,
819 #[serde(default = "default_true")]
820 healthy: bool,
821}
822
823fn default_true() -> bool {
824 true
825}
826
827async fn cluster_heartbeat_handler(
828 State(state): State<AppState>,
829 Path(node_id): Path<u32>,
830 Json(req): Json<HeartbeatRequest>,
831) -> impl IntoResponse {
832 let Some(ref cm) = state.cluster_manager else {
833 return (
834 axum::http::StatusCode::SERVICE_UNAVAILABLE,
835 Json(serde_json::json!({
836 "error": "cluster_not_enabled",
837 "message": "Cluster mode is not enabled on this node"
838 })),
839 );
840 };
841
842 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
843 (
844 axum::http::StatusCode::OK,
845 Json(serde_json::json!({
846 "status": "updated",
847 "node_id": node_id,
848 })),
849 )
850}
851
852async fn cluster_vote_handler(
854 State(state): State<AppState>,
855 Json(request): Json<VoteRequest>,
856) -> impl IntoResponse {
857 let Some(ref cm) = state.cluster_manager else {
858 return (
859 axum::http::StatusCode::SERVICE_UNAVAILABLE,
860 Json(serde_json::json!({
861 "error": "cluster_not_enabled",
862 "message": "Cluster mode is not enabled on this node"
863 })),
864 );
865 };
866
867 let response = cm.handle_vote_request(&request).await;
868 (
869 axum::http::StatusCode::OK,
870 Json(serde_json::to_value(response).unwrap()),
871 )
872}
873
874async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
876 let Some(ref cm) = state.cluster_manager else {
877 return (
878 axum::http::StatusCode::SERVICE_UNAVAILABLE,
879 Json(serde_json::json!({
880 "error": "cluster_not_enabled",
881 "message": "Cluster mode is not enabled on this node"
882 })),
883 );
884 };
885
886 let candidate = cm.select_leader_candidate();
888
889 match candidate {
890 Some(candidate_id) => {
891 let new_term = cm.start_election().await;
892 tracing::info!(
893 "Cluster election started: term={}, candidate={}",
894 new_term,
895 candidate_id,
896 );
897
898 if candidate_id == cm.self_id() {
901 cm.become_leader(new_term).await;
902 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
903 }
904
905 (
906 axum::http::StatusCode::OK,
907 Json(serde_json::json!({
908 "status": "election_started",
909 "term": new_term,
910 "candidate_id": candidate_id,
911 "self_is_leader": candidate_id == cm.self_id(),
912 })),
913 )
914 }
915 None => (
916 axum::http::StatusCode::CONFLICT,
917 Json(serde_json::json!({
918 "error": "no_candidates",
919 "message": "No healthy members available for leader election",
920 })),
921 ),
922 }
923}
924
925async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
927 let Some(ref cm) = state.cluster_manager else {
928 return (
929 axum::http::StatusCode::SERVICE_UNAVAILABLE,
930 Json(serde_json::json!({
931 "error": "cluster_not_enabled",
932 "message": "Cluster mode is not enabled on this node"
933 })),
934 );
935 };
936
937 let registry = cm.registry();
938 let distribution = registry.partition_distribution();
939 let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
940
941 (
942 axum::http::StatusCode::OK,
943 Json(serde_json::json!({
944 "total_partitions": total_partitions,
945 "node_count": registry.node_count(),
946 "healthy_node_count": registry.healthy_node_count(),
947 "distribution": distribution,
948 })),
949 )
950}
951
952async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
958 let Some(ref geo) = state.geo_replication else {
959 return (
960 axum::http::StatusCode::SERVICE_UNAVAILABLE,
961 Json(serde_json::json!({
962 "error": "geo_replication_not_enabled",
963 "message": "Geo-replication is not enabled on this node"
964 })),
965 );
966 };
967
968 let status = geo.status();
969 (
970 axum::http::StatusCode::OK,
971 Json(serde_json::to_value(status).unwrap()),
972 )
973}
974
975async fn geo_sync_handler(
977 State(state): State<AppState>,
978 Json(request): Json<GeoSyncRequest>,
979) -> impl IntoResponse {
980 let Some(ref geo) = state.geo_replication else {
981 return (
982 axum::http::StatusCode::SERVICE_UNAVAILABLE,
983 Json(serde_json::json!({
984 "error": "geo_replication_not_enabled",
985 "message": "Geo-replication is not enabled on this node"
986 })),
987 );
988 };
989
990 tracing::info!(
991 "Geo-sync received from region '{}': {} events",
992 request.source_region,
993 request.events.len(),
994 );
995
996 let response = geo.receive_sync(&request);
997 (
998 axum::http::StatusCode::OK,
999 Json(serde_json::to_value(response).unwrap()),
1000 )
1001}
1002
1003async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1005 let Some(ref geo) = state.geo_replication else {
1006 return (
1007 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1008 Json(serde_json::json!({
1009 "error": "geo_replication_not_enabled",
1010 "message": "Geo-replication is not enabled on this node"
1011 })),
1012 );
1013 };
1014
1015 let status = geo.status();
1016 (
1017 axum::http::StatusCode::OK,
1018 Json(serde_json::json!({
1019 "region_id": status.region_id,
1020 "peers": status.peers,
1021 })),
1022 )
1023}
1024
1025async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1027 let Some(ref geo) = state.geo_replication else {
1028 return (
1029 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1030 Json(serde_json::json!({
1031 "error": "geo_replication_not_enabled",
1032 "message": "Geo-replication is not enabled on this node"
1033 })),
1034 );
1035 };
1036
1037 match geo.select_failover_region() {
1038 Some(failover_region) => {
1039 tracing::info!(
1040 "Geo-failover: selected region '{}' as failover target",
1041 failover_region,
1042 );
1043 (
1044 axum::http::StatusCode::OK,
1045 Json(serde_json::json!({
1046 "status": "failover_target_selected",
1047 "failover_region": failover_region,
1048 "message": "Region selected for failover. DNS/routing update required externally.",
1049 })),
1050 )
1051 }
1052 None => (
1053 axum::http::StatusCode::CONFLICT,
1054 Json(serde_json::json!({
1055 "error": "no_healthy_peers",
1056 "message": "No healthy peer regions available for failover",
1057 })),
1058 ),
1059 }
1060}
1061
1062async fn shutdown_signal() {
1064 let ctrl_c = async {
1065 tokio::signal::ctrl_c()
1066 .await
1067 .expect("failed to install Ctrl+C handler");
1068 };
1069
1070 #[cfg(unix)]
1071 let terminate = async {
1072 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1073 .expect("failed to install SIGTERM handler")
1074 .recv()
1075 .await;
1076 };
1077
1078 #[cfg(not(unix))]
1079 let terminate = std::future::pending::<()>();
1080
1081 tokio::select! {
1082 _ = ctrl_c => {
1083 tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
1084 }
1085 _ = terminate => {
1086 tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
1087 }
1088 }
1089}