1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 application::services::tenant_service::TenantManager,
5 infrastructure::{
6 cluster::{
7 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8 },
9 replication::{WalReceiver, WalShipper},
10 security::{
11 auth::AuthManager,
12 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13 rate_limit::RateLimiter,
14 },
15 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16 },
17 store::EventStore,
18};
19use axum::{
20 Json, Router,
21 extract::{Path, State},
22 middleware,
23 response::IntoResponse,
24 routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28 cors::{Any, CorsLayer},
29 trace::TraceLayer,
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36 Leader,
37 Follower,
38}
39
40impl NodeRole {
41 pub fn from_env() -> Self {
47 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48 match role.to_lowercase().as_str() {
49 "follower" => return NodeRole::Follower,
50 "leader" => return NodeRole::Leader,
51 other => {
52 tracing::warn!(
53 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54 other
55 );
56 return NodeRole::Leader;
57 }
58 }
59 }
60 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61 && (read_only == "true" || read_only == "1")
62 {
63 return NodeRole::Follower;
64 }
65 NodeRole::Leader
66 }
67
68 pub fn is_follower(self) -> bool {
69 self == NodeRole::Follower
70 }
71
72 fn to_u8(self) -> u8 {
73 match self {
74 NodeRole::Leader => 0,
75 NodeRole::Follower => 1,
76 }
77 }
78
79 fn from_u8(v: u8) -> Self {
80 match v {
81 1 => NodeRole::Follower,
82 _ => NodeRole::Leader,
83 }
84 }
85}
86
87impl std::fmt::Display for NodeRole {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 NodeRole::Leader => write!(f, "leader"),
91 NodeRole::Follower => write!(f, "follower"),
92 }
93 }
94}
95
96#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104 pub fn new(role: NodeRole) -> Self {
105 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106 }
107
108 pub fn load(&self) -> NodeRole {
109 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110 }
111
112 pub fn store(&self, role: NodeRole) {
113 self.0
114 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115 }
116}
117
118#[derive(Clone)]
120pub struct AppState {
121 pub store: Arc<EventStore>,
122 pub auth_manager: Arc<AuthManager>,
123 pub tenant_manager: Arc<TenantManager>,
124 pub service_container: ServiceContainer,
126 pub role: AtomicNodeRole,
128 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131 pub wal_receiver: Option<Arc<WalReceiver>>,
133 pub replication_port: u16,
135 pub cluster_manager: Option<Arc<ClusterManager>>,
137 pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144 fn from_ref(state: &AppState) -> Self {
145 state.store.clone()
146 }
147}
148
149pub async fn serve_v1(
150 store: Arc<EventStore>,
151 auth_manager: Arc<AuthManager>,
152 tenant_manager: Arc<TenantManager>,
153 rate_limiter: Arc<RateLimiter>,
154 service_container: ServiceContainer,
155 addr: &str,
156 role: NodeRole,
157 wal_shipper: Option<Arc<WalShipper>>,
158 wal_receiver: Option<Arc<WalReceiver>>,
159 replication_port: u16,
160 cluster_manager: Option<Arc<ClusterManager>>,
161 geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163 let app_state = AppState {
164 store,
165 auth_manager: auth_manager.clone(),
166 tenant_manager,
167 service_container,
168 role: AtomicNodeRole::new(role),
169 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170 wal_receiver,
171 replication_port,
172 cluster_manager,
173 geo_replication,
174 };
175
176 let auth_state = AuthState {
177 auth_manager: auth_manager.clone(),
178 };
179
180 let rate_limit_state = RateLimitState { rate_limiter };
181
182 let app = Router::new()
183 .route("/health", get(health_v1))
185 .route("/metrics", get(super::api::prometheus_metrics))
186 .route("/api/v1/auth/register", post(register_handler))
188 .route("/api/v1/auth/login", post(login_handler))
189 .route("/api/v1/auth/me", get(me_handler))
190 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193 .route("/api/v1/auth/users", get(list_users_handler))
194 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195 .route("/api/v1/tenants", post(create_tenant_handler))
197 .route("/api/v1/tenants", get(list_tenants_handler))
198 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
200 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
201 .route(
202 "/api/v1/tenants/{id}/deactivate",
203 post(deactivate_tenant_handler),
204 )
205 .route(
206 "/api/v1/tenants/{id}/activate",
207 post(activate_tenant_handler),
208 )
209 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
210 .route("/api/v1/audit/events", post(log_audit_event))
212 .route("/api/v1/audit/events", get(query_audit_events))
213 .route("/api/v1/config", get(list_configs))
215 .route("/api/v1/config", post(set_config))
216 .route("/api/v1/config/{key}", get(get_config))
217 .route("/api/v1/config/{key}", put(update_config))
218 .route("/api/v1/config/{key}", delete(delete_config))
219 .route("/api/v1/events", post(super::api::ingest_event_v1))
221 .route(
222 "/api/v1/events/batch",
223 post(super::api::ingest_events_batch_v1),
224 )
225 .route("/api/v1/events/query", get(super::api::query_events))
226 .route(
227 "/api/v1/events/{event_id}",
228 get(super::api::get_event_by_id),
229 )
230 .route("/api/v1/events/stream", get(super::api::events_websocket))
231 .route(
232 "/api/v1/entities/{entity_id}/state",
233 get(super::api::get_entity_state),
234 )
235 .route(
236 "/api/v1/entities/{entity_id}/snapshot",
237 get(super::api::get_entity_snapshot),
238 )
239 .route("/api/v1/stats", get(super::api::get_stats))
240 .route("/api/v1/streams", get(super::api::list_streams))
242 .route("/api/v1/event-types", get(super::api::list_event_types))
243 .route(
245 "/api/v1/analytics/frequency",
246 get(super::api::analytics_frequency),
247 )
248 .route(
249 "/api/v1/analytics/summary",
250 get(super::api::analytics_summary),
251 )
252 .route(
253 "/api/v1/analytics/correlation",
254 get(super::api::analytics_correlation),
255 )
256 .route("/api/v1/snapshots", post(super::api::create_snapshot))
258 .route("/api/v1/snapshots", get(super::api::list_snapshots))
259 .route(
260 "/api/v1/snapshots/{entity_id}/latest",
261 get(super::api::get_latest_snapshot),
262 )
263 .route(
265 "/api/v1/compaction/trigger",
266 post(super::api::trigger_compaction),
267 )
268 .route(
269 "/api/v1/compaction/stats",
270 get(super::api::compaction_stats),
271 )
272 .route("/api/v1/schemas", post(super::api::register_schema))
274 .route("/api/v1/schemas", get(super::api::list_subjects))
275 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
276 .route(
277 "/api/v1/schemas/{subject}/versions",
278 get(super::api::list_schema_versions),
279 )
280 .route(
281 "/api/v1/schemas/validate",
282 post(super::api::validate_event_schema),
283 )
284 .route(
285 "/api/v1/schemas/{subject}/compatibility",
286 put(super::api::set_compatibility_mode),
287 )
288 .route("/api/v1/replay", post(super::api::start_replay))
290 .route("/api/v1/replay", get(super::api::list_replays))
291 .route(
292 "/api/v1/replay/{replay_id}",
293 get(super::api::get_replay_progress),
294 )
295 .route(
296 "/api/v1/replay/{replay_id}/cancel",
297 post(super::api::cancel_replay),
298 )
299 .route(
300 "/api/v1/replay/{replay_id}",
301 delete(super::api::delete_replay),
302 )
303 .route("/api/v1/pipelines", post(super::api::register_pipeline))
305 .route("/api/v1/pipelines", get(super::api::list_pipelines))
306 .route(
307 "/api/v1/pipelines/stats",
308 get(super::api::all_pipeline_stats),
309 )
310 .route(
311 "/api/v1/pipelines/{pipeline_id}",
312 get(super::api::get_pipeline),
313 )
314 .route(
315 "/api/v1/pipelines/{pipeline_id}",
316 delete(super::api::remove_pipeline),
317 )
318 .route(
319 "/api/v1/pipelines/{pipeline_id}/stats",
320 get(super::api::get_pipeline_stats),
321 )
322 .route(
323 "/api/v1/pipelines/{pipeline_id}/reset",
324 put(super::api::reset_pipeline),
325 )
326 .route("/api/v1/projections", get(super::api::list_projections))
328 .route(
329 "/api/v1/projections/{name}",
330 get(super::api::get_projection),
331 )
332 .route(
333 "/api/v1/projections/{name}",
334 delete(super::api::delete_projection),
335 )
336 .route(
337 "/api/v1/projections/{name}/state",
338 get(super::api::get_projection_state_summary),
339 )
340 .route(
341 "/api/v1/projections/{name}/reset",
342 post(super::api::reset_projection),
343 )
344 .route(
345 "/api/v1/projections/{name}/{entity_id}/state",
346 get(super::api::get_projection_state),
347 )
348 .route(
349 "/api/v1/projections/{name}/{entity_id}/state",
350 post(super::api::save_projection_state),
351 )
352 .route(
353 "/api/v1/projections/{name}/{entity_id}/state",
354 put(super::api::save_projection_state),
355 )
356 .route(
357 "/api/v1/projections/{name}/bulk",
358 post(super::api::bulk_get_projection_states),
359 )
360 .route(
361 "/api/v1/projections/{name}/bulk/save",
362 post(super::api::bulk_save_projection_states),
363 )
364 .route("/api/v1/webhooks", post(super::api::register_webhook))
366 .route("/api/v1/webhooks", get(super::api::list_webhooks))
367 .route(
368 "/api/v1/webhooks/{webhook_id}",
369 get(super::api::get_webhook),
370 )
371 .route(
372 "/api/v1/webhooks/{webhook_id}",
373 put(super::api::update_webhook),
374 )
375 .route(
376 "/api/v1/webhooks/{webhook_id}",
377 delete(super::api::delete_webhook),
378 )
379 .route(
380 "/api/v1/webhooks/{webhook_id}/deliveries",
381 get(super::api::list_webhook_deliveries),
382 )
383 .route("/api/v1/cluster/status", get(cluster_status_handler))
385 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
386 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
387 .route(
388 "/api/v1/cluster/members/{node_id}",
389 delete(cluster_remove_member_handler),
390 )
391 .route(
392 "/api/v1/cluster/members/{node_id}/heartbeat",
393 post(cluster_heartbeat_handler),
394 )
395 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
396 .route("/api/v1/cluster/election", post(cluster_election_handler))
397 .route(
398 "/api/v1/cluster/partitions",
399 get(cluster_partitions_handler),
400 )
401 .route("/api/v1/eventql", post(super::api::eventql_query))
403 .route("/api/v1/graphql", post(super::api::graphql_query))
404 .route("/api/v1/geospatial/query", post(super::api::geo_query))
405 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
406 .route(
407 "/api/v1/exactly-once/stats",
408 get(super::api::exactly_once_stats),
409 )
410 .route(
411 "/api/v1/schema-evolution/history/{event_type}",
412 get(super::api::schema_evolution_history),
413 )
414 .route(
415 "/api/v1/schema-evolution/schema/{event_type}",
416 get(super::api::schema_evolution_schema),
417 )
418 .route(
419 "/api/v1/schema-evolution/stats",
420 get(super::api::schema_evolution_stats),
421 )
422 .route("/api/v1/geo/status", get(geo_status_handler))
424 .route("/api/v1/geo/sync", post(geo_sync_handler))
425 .route("/api/v1/geo/peers", get(geo_peers_handler))
426 .route("/api/v1/geo/failover", post(geo_failover_handler))
427 .route("/internal/promote", post(promote_handler))
429 .route("/internal/repoint", post(repoint_handler))
430 .with_state(app_state.clone())
431 .layer(middleware::from_fn_with_state(
434 app_state,
435 read_only_middleware,
436 ))
437 .layer(middleware::from_fn_with_state(
438 rate_limit_state,
439 rate_limit_middleware,
440 ))
441 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
442 .layer(
443 CorsLayer::new()
444 .allow_origin(Any)
445 .allow_methods(Any)
446 .allow_headers(Any),
447 )
448 .layer(TraceLayer::new_for_http());
449
450 let listener = tokio::net::TcpListener::bind(addr).await?;
451
452 axum::serve(listener, app)
454 .with_graceful_shutdown(shutdown_signal())
455 .await?;
456
457 tracing::info!("🛑 AllSource Core shutdown complete");
458 Ok(())
459}
460
461const WRITE_PATHS: &[&str] = &[
463 "/api/v1/events",
464 "/api/v1/events/batch",
465 "/api/v1/snapshots",
466 "/api/v1/projections/",
467 "/api/v1/schemas",
468 "/api/v1/replay",
469 "/api/v1/pipelines",
470 "/api/v1/compaction/trigger",
471 "/api/v1/audit/events",
472 "/api/v1/config",
473 "/api/v1/webhooks",
474];
475
476fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
478 use axum::http::Method;
479 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
481 return false;
482 }
483 WRITE_PATHS
484 .iter()
485 .any(|write_path| path.starts_with(write_path))
486}
487
488fn is_internal_request(path: &str) -> bool {
490 path.starts_with("/internal/")
491 || path.starts_with("/api/v1/cluster/")
492 || path.starts_with("/api/v1/geo/")
493}
494
495async fn read_only_middleware(
501 State(state): State<AppState>,
502 request: axum::extract::Request,
503 next: axum::middleware::Next,
504) -> axum::response::Response {
505 let path = request.uri().path();
506 if state.role.load().is_follower()
507 && is_write_request(request.method(), path)
508 && !is_internal_request(path)
509 {
510 return (
511 axum::http::StatusCode::CONFLICT,
512 axum::Json(serde_json::json!({
513 "error": "read_only",
514 "message": "This node is a read-only follower"
515 })),
516 )
517 .into_response();
518 }
519 next.run(request).await
520}
521
522async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
527 let has_system_repos = state.service_container.has_system_repositories();
528
529 let system_streams = if has_system_repos {
530 let (tenant_count, config_count, total_events) =
531 if let Some(store) = state.service_container.system_store() {
532 use crate::domain::value_objects::system_stream::SystemDomain;
533 (
534 store.count_stream(SystemDomain::Tenant),
535 store.count_stream(SystemDomain::Config),
536 store.total_events(),
537 )
538 } else {
539 (0, 0, 0)
540 };
541
542 serde_json::json!({
543 "status": "healthy",
544 "mode": "event-sourced",
545 "total_events": total_events,
546 "tenant_events": tenant_count,
547 "config_events": config_count,
548 })
549 } else {
550 serde_json::json!({
551 "status": "disabled",
552 "mode": "in-memory",
553 })
554 };
555
556 let replication = {
557 let shipper_guard = state.wal_shipper.read().await;
558 if let Some(ref shipper) = *shipper_guard {
559 serde_json::to_value(shipper.status()).unwrap_or_default()
560 } else if let Some(ref receiver) = state.wal_receiver {
561 serde_json::to_value(receiver.status()).unwrap_or_default()
562 } else {
563 serde_json::json!(null)
564 }
565 };
566
567 let current_role = state.role.load();
568
569 Json(serde_json::json!({
570 "status": "healthy",
571 "service": "allsource-core",
572 "version": env!("CARGO_PKG_VERSION"),
573 "role": current_role,
574 "system_streams": system_streams,
575 "replication": replication,
576 }))
577}
578
579async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
585 let current_role = state.role.load();
586 if current_role == NodeRole::Leader {
587 return (
588 axum::http::StatusCode::OK,
589 Json(serde_json::json!({
590 "status": "already_leader",
591 "message": "This node is already the leader",
592 })),
593 );
594 }
595
596 tracing::info!("PROMOTE: Switching role from follower to leader");
597
598 state.role.store(NodeRole::Leader);
600
601 if let Some(ref receiver) = state.wal_receiver {
603 receiver.shutdown();
604 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
605 }
606
607 let replication_port = state.replication_port;
609 let (mut shipper, tx) = WalShipper::new();
610 state.store.enable_wal_replication(tx);
611 shipper.set_store(Arc::clone(&state.store));
612 shipper.set_metrics(state.store.metrics());
613 let shipper = Arc::new(shipper);
614
615 {
617 let mut shipper_guard = state.wal_shipper.write().await;
618 *shipper_guard = Some(Arc::clone(&shipper));
619 }
620
621 let shipper_clone = Arc::clone(&shipper);
623 tokio::spawn(async move {
624 if let Err(e) = shipper_clone.serve(replication_port).await {
625 tracing::error!("Promoted WAL shipper error: {}", e);
626 }
627 });
628
629 tracing::info!(
630 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
631 replication_port,
632 );
633
634 (
635 axum::http::StatusCode::OK,
636 Json(serde_json::json!({
637 "status": "promoted",
638 "role": "leader",
639 "replication_port": replication_port,
640 })),
641 )
642}
643
644async fn repoint_handler(
649 State(state): State<AppState>,
650 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
651) -> impl IntoResponse {
652 let current_role = state.role.load();
653 if current_role != NodeRole::Follower {
654 return (
655 axum::http::StatusCode::CONFLICT,
656 Json(serde_json::json!({
657 "error": "not_follower",
658 "message": "Repoint only applies to follower nodes",
659 })),
660 );
661 }
662
663 let new_leader = match params.get("leader") {
664 Some(l) if !l.is_empty() => l.clone(),
665 _ => {
666 return (
667 axum::http::StatusCode::BAD_REQUEST,
668 Json(serde_json::json!({
669 "error": "missing_leader",
670 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
671 })),
672 );
673 }
674 };
675
676 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
677
678 if let Some(ref receiver) = state.wal_receiver {
679 receiver.repoint(&new_leader);
680 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
681 } else {
682 tracing::warn!("REPOINT: No WAL receiver to repoint");
683 }
684
685 (
686 axum::http::StatusCode::OK,
687 Json(serde_json::json!({
688 "status": "repointed",
689 "new_leader": new_leader,
690 })),
691 )
692}
693
694async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
700 let Some(ref cm) = state.cluster_manager else {
701 return (
702 axum::http::StatusCode::SERVICE_UNAVAILABLE,
703 Json(serde_json::json!({
704 "error": "cluster_not_enabled",
705 "message": "Cluster mode is not enabled on this node"
706 })),
707 );
708 };
709
710 let status = cm.status().await;
711 (
712 axum::http::StatusCode::OK,
713 Json(serde_json::to_value(status).unwrap()),
714 )
715}
716
717async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
719 let Some(ref cm) = state.cluster_manager else {
720 return (
721 axum::http::StatusCode::SERVICE_UNAVAILABLE,
722 Json(serde_json::json!({
723 "error": "cluster_not_enabled",
724 "message": "Cluster mode is not enabled on this node"
725 })),
726 );
727 };
728
729 let members = cm.all_members();
730 (
731 axum::http::StatusCode::OK,
732 Json(serde_json::json!({
733 "members": members,
734 "count": members.len(),
735 })),
736 )
737}
738
739async fn cluster_add_member_handler(
741 State(state): State<AppState>,
742 Json(member): Json<ClusterMember>,
743) -> impl IntoResponse {
744 let Some(ref cm) = state.cluster_manager else {
745 return (
746 axum::http::StatusCode::SERVICE_UNAVAILABLE,
747 Json(serde_json::json!({
748 "error": "cluster_not_enabled",
749 "message": "Cluster mode is not enabled on this node"
750 })),
751 );
752 };
753
754 let node_id = member.node_id;
755 cm.add_member(member).await;
756
757 tracing::info!("Cluster member {} added", node_id);
758 (
759 axum::http::StatusCode::OK,
760 Json(serde_json::json!({
761 "status": "added",
762 "node_id": node_id,
763 })),
764 )
765}
766
767async fn cluster_remove_member_handler(
769 State(state): State<AppState>,
770 Path(node_id): Path<u32>,
771) -> impl IntoResponse {
772 let Some(ref cm) = state.cluster_manager else {
773 return (
774 axum::http::StatusCode::SERVICE_UNAVAILABLE,
775 Json(serde_json::json!({
776 "error": "cluster_not_enabled",
777 "message": "Cluster mode is not enabled on this node"
778 })),
779 );
780 };
781
782 match cm.remove_member(node_id).await {
783 Some(_) => {
784 tracing::info!("Cluster member {} removed", node_id);
785 (
786 axum::http::StatusCode::OK,
787 Json(serde_json::json!({
788 "status": "removed",
789 "node_id": node_id,
790 })),
791 )
792 }
793 None => (
794 axum::http::StatusCode::NOT_FOUND,
795 Json(serde_json::json!({
796 "error": "not_found",
797 "message": format!("Node {} not found in cluster", node_id),
798 })),
799 ),
800 }
801}
802
803#[derive(serde::Deserialize)]
805struct HeartbeatRequest {
806 wal_offset: u64,
807 #[serde(default = "default_true")]
808 healthy: bool,
809}
810
811fn default_true() -> bool {
812 true
813}
814
815async fn cluster_heartbeat_handler(
816 State(state): State<AppState>,
817 Path(node_id): Path<u32>,
818 Json(req): Json<HeartbeatRequest>,
819) -> impl IntoResponse {
820 let Some(ref cm) = state.cluster_manager else {
821 return (
822 axum::http::StatusCode::SERVICE_UNAVAILABLE,
823 Json(serde_json::json!({
824 "error": "cluster_not_enabled",
825 "message": "Cluster mode is not enabled on this node"
826 })),
827 );
828 };
829
830 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
831 (
832 axum::http::StatusCode::OK,
833 Json(serde_json::json!({
834 "status": "updated",
835 "node_id": node_id,
836 })),
837 )
838}
839
840async fn cluster_vote_handler(
842 State(state): State<AppState>,
843 Json(request): Json<VoteRequest>,
844) -> impl IntoResponse {
845 let Some(ref cm) = state.cluster_manager else {
846 return (
847 axum::http::StatusCode::SERVICE_UNAVAILABLE,
848 Json(serde_json::json!({
849 "error": "cluster_not_enabled",
850 "message": "Cluster mode is not enabled on this node"
851 })),
852 );
853 };
854
855 let response = cm.handle_vote_request(&request).await;
856 (
857 axum::http::StatusCode::OK,
858 Json(serde_json::to_value(response).unwrap()),
859 )
860}
861
862async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
864 let Some(ref cm) = state.cluster_manager else {
865 return (
866 axum::http::StatusCode::SERVICE_UNAVAILABLE,
867 Json(serde_json::json!({
868 "error": "cluster_not_enabled",
869 "message": "Cluster mode is not enabled on this node"
870 })),
871 );
872 };
873
874 let candidate = cm.select_leader_candidate();
876
877 match candidate {
878 Some(candidate_id) => {
879 let new_term = cm.start_election().await;
880 tracing::info!(
881 "Cluster election started: term={}, candidate={}",
882 new_term,
883 candidate_id,
884 );
885
886 if candidate_id == cm.self_id() {
889 cm.become_leader(new_term).await;
890 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
891 }
892
893 (
894 axum::http::StatusCode::OK,
895 Json(serde_json::json!({
896 "status": "election_started",
897 "term": new_term,
898 "candidate_id": candidate_id,
899 "self_is_leader": candidate_id == cm.self_id(),
900 })),
901 )
902 }
903 None => (
904 axum::http::StatusCode::CONFLICT,
905 Json(serde_json::json!({
906 "error": "no_candidates",
907 "message": "No healthy members available for leader election",
908 })),
909 ),
910 }
911}
912
913async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
915 let Some(ref cm) = state.cluster_manager else {
916 return (
917 axum::http::StatusCode::SERVICE_UNAVAILABLE,
918 Json(serde_json::json!({
919 "error": "cluster_not_enabled",
920 "message": "Cluster mode is not enabled on this node"
921 })),
922 );
923 };
924
925 let registry = cm.registry();
926 let distribution = registry.partition_distribution();
927 let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
928
929 (
930 axum::http::StatusCode::OK,
931 Json(serde_json::json!({
932 "total_partitions": total_partitions,
933 "node_count": registry.node_count(),
934 "healthy_node_count": registry.healthy_node_count(),
935 "distribution": distribution,
936 })),
937 )
938}
939
940async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
946 let Some(ref geo) = state.geo_replication else {
947 return (
948 axum::http::StatusCode::SERVICE_UNAVAILABLE,
949 Json(serde_json::json!({
950 "error": "geo_replication_not_enabled",
951 "message": "Geo-replication is not enabled on this node"
952 })),
953 );
954 };
955
956 let status = geo.status();
957 (
958 axum::http::StatusCode::OK,
959 Json(serde_json::to_value(status).unwrap()),
960 )
961}
962
963async fn geo_sync_handler(
965 State(state): State<AppState>,
966 Json(request): Json<GeoSyncRequest>,
967) -> impl IntoResponse {
968 let Some(ref geo) = state.geo_replication else {
969 return (
970 axum::http::StatusCode::SERVICE_UNAVAILABLE,
971 Json(serde_json::json!({
972 "error": "geo_replication_not_enabled",
973 "message": "Geo-replication is not enabled on this node"
974 })),
975 );
976 };
977
978 tracing::info!(
979 "Geo-sync received from region '{}': {} events",
980 request.source_region,
981 request.events.len(),
982 );
983
984 let response = geo.receive_sync(&request);
985 (
986 axum::http::StatusCode::OK,
987 Json(serde_json::to_value(response).unwrap()),
988 )
989}
990
991async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
993 let Some(ref geo) = state.geo_replication else {
994 return (
995 axum::http::StatusCode::SERVICE_UNAVAILABLE,
996 Json(serde_json::json!({
997 "error": "geo_replication_not_enabled",
998 "message": "Geo-replication is not enabled on this node"
999 })),
1000 );
1001 };
1002
1003 let status = geo.status();
1004 (
1005 axum::http::StatusCode::OK,
1006 Json(serde_json::json!({
1007 "region_id": status.region_id,
1008 "peers": status.peers,
1009 })),
1010 )
1011}
1012
1013async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1015 let Some(ref geo) = state.geo_replication else {
1016 return (
1017 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1018 Json(serde_json::json!({
1019 "error": "geo_replication_not_enabled",
1020 "message": "Geo-replication is not enabled on this node"
1021 })),
1022 );
1023 };
1024
1025 match geo.select_failover_region() {
1026 Some(failover_region) => {
1027 tracing::info!(
1028 "Geo-failover: selected region '{}' as failover target",
1029 failover_region,
1030 );
1031 (
1032 axum::http::StatusCode::OK,
1033 Json(serde_json::json!({
1034 "status": "failover_target_selected",
1035 "failover_region": failover_region,
1036 "message": "Region selected for failover. DNS/routing update required externally.",
1037 })),
1038 )
1039 }
1040 None => (
1041 axum::http::StatusCode::CONFLICT,
1042 Json(serde_json::json!({
1043 "error": "no_healthy_peers",
1044 "message": "No healthy peer regions available for failover",
1045 })),
1046 ),
1047 }
1048}
1049
1050async fn shutdown_signal() {
1052 let ctrl_c = async {
1053 tokio::signal::ctrl_c()
1054 .await
1055 .expect("failed to install Ctrl+C handler");
1056 };
1057
1058 #[cfg(unix)]
1059 let terminate = async {
1060 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1061 .expect("failed to install SIGTERM handler")
1062 .recv()
1063 .await;
1064 };
1065
1066 #[cfg(not(unix))]
1067 let terminate = std::future::pending::<()>();
1068
1069 tokio::select! {
1070 _ = ctrl_c => {
1071 tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
1072 }
1073 _ = terminate => {
1074 tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
1075 }
1076 }
1077}