1use crate::infrastructure::di::ServiceContainer;
3#[cfg(feature = "multi-tenant")]
4use crate::infrastructure::web::tenant_api::*;
5use crate::{
6 domain::repositories::TenantRepository,
7 infrastructure::{
8 cluster::{
9 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
10 },
11 replication::{WalReceiver, WalShipper},
12 security::{
13 auth::AuthManager,
14 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
15 rate_limit::RateLimiter,
16 },
17 web::{audit_api::*, auth_api::*, config_api::*},
18 },
19 store::EventStore,
20};
21use axum::{
22 Json, Router,
23 extract::{Path, State},
24 middleware,
25 response::IntoResponse,
26 routing::{delete, get, post, put},
27};
28use std::sync::Arc;
29use tower_http::{
30 cors::{Any, CorsLayer},
31 trace::TraceLayer,
32};
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
36#[serde(rename_all = "lowercase")]
37pub enum NodeRole {
38 Leader,
39 Follower,
40}
41
42impl NodeRole {
43 pub fn from_env() -> Self {
49 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
50 match role.to_lowercase().as_str() {
51 "follower" => return NodeRole::Follower,
52 "leader" => return NodeRole::Leader,
53 other => {
54 tracing::warn!(
55 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
56 other
57 );
58 return NodeRole::Leader;
59 }
60 }
61 }
62 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
63 && (read_only == "true" || read_only == "1")
64 {
65 return NodeRole::Follower;
66 }
67 NodeRole::Leader
68 }
69
70 pub fn is_follower(self) -> bool {
71 self == NodeRole::Follower
72 }
73
74 fn to_u8(self) -> u8 {
75 match self {
76 NodeRole::Leader => 0,
77 NodeRole::Follower => 1,
78 }
79 }
80
81 fn from_u8(v: u8) -> Self {
82 match v {
83 1 => NodeRole::Follower,
84 _ => NodeRole::Leader,
85 }
86 }
87}
88
89impl std::fmt::Display for NodeRole {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 NodeRole::Leader => write!(f, "leader"),
93 NodeRole::Follower => write!(f, "follower"),
94 }
95 }
96}
97
98#[derive(Clone)]
103pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
104
105impl AtomicNodeRole {
106 pub fn new(role: NodeRole) -> Self {
107 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
108 }
109
110 pub fn load(&self) -> NodeRole {
111 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
112 }
113
114 pub fn store(&self, role: NodeRole) {
115 self.0
116 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
117 }
118}
119
120#[derive(Clone)]
122pub struct AppState {
123 pub store: Arc<EventStore>,
124 pub auth_manager: Arc<AuthManager>,
125 pub tenant_repo: Arc<dyn TenantRepository>,
126 pub service_container: ServiceContainer,
128 pub role: AtomicNodeRole,
130 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
133 pub wal_receiver: Option<Arc<WalReceiver>>,
135 pub replication_port: u16,
137 pub cluster_manager: Option<Arc<ClusterManager>>,
139 pub geo_replication: Option<Arc<GeoReplicationManager>>,
141}
142
143impl axum::extract::FromRef<AppState> for Arc<EventStore> {
146 fn from_ref(state: &AppState) -> Self {
147 state.store.clone()
148 }
149}
150
151pub async fn serve_v1(
152 store: Arc<EventStore>,
153 auth_manager: Arc<AuthManager>,
154 tenant_repo: Arc<dyn TenantRepository>,
155 rate_limiter: Arc<RateLimiter>,
156 service_container: ServiceContainer,
157 addr: &str,
158 role: NodeRole,
159 wal_shipper: Option<Arc<WalShipper>>,
160 wal_receiver: Option<Arc<WalReceiver>>,
161 replication_port: u16,
162 cluster_manager: Option<Arc<ClusterManager>>,
163 geo_replication: Option<Arc<GeoReplicationManager>>,
164) -> anyhow::Result<()> {
165 let app_state = AppState {
166 store,
167 auth_manager: auth_manager.clone(),
168 tenant_repo,
169 service_container,
170 role: AtomicNodeRole::new(role),
171 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
172 wal_receiver,
173 replication_port,
174 cluster_manager,
175 geo_replication,
176 };
177
178 let auth_state = AuthState {
179 auth_manager: auth_manager.clone(),
180 };
181
182 let rate_limit_state = RateLimitState { rate_limiter };
183
184 let app = Router::new()
185 .route("/health", get(health_v1))
187 .route("/metrics", get(super::api::prometheus_metrics))
188 .route("/api/v1/auth/register", post(register_handler))
190 .route("/api/v1/auth/login", post(login_handler))
191 .route("/api/v1/auth/me", get(me_handler))
192 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
193 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
194 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
195 .route("/api/v1/auth/users", get(list_users_handler))
196 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
197 ;
199 #[cfg(feature = "multi-tenant")]
200 let app = app
201 .route("/api/v1/tenants", post(create_tenant_handler))
202 .route("/api/v1/tenants", get(list_tenants_handler))
203 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
204 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
205 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
206 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
207 .route(
208 "/api/v1/tenants/{id}/deactivate",
209 post(deactivate_tenant_handler),
210 )
211 .route(
212 "/api/v1/tenants/{id}/activate",
213 post(activate_tenant_handler),
214 )
215 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler));
216 let app = app
217 .route("/api/v1/audit/events", post(log_audit_event))
219 .route("/api/v1/audit/events", get(query_audit_events))
220 .route("/api/v1/config", get(list_configs))
222 .route("/api/v1/config", post(set_config))
223 .route("/api/v1/config/{key}", get(get_config))
224 .route("/api/v1/config/{key}", put(update_config))
225 .route("/api/v1/config/{key}", delete(delete_config))
226 .route(
228 "/api/v1/demo/seed",
229 post(super::demo_api::demo_seed_handler),
230 )
231 .route("/api/v1/events", post(super::api::ingest_event_v1))
233 .route(
234 "/api/v1/events/batch",
235 post(super::api::ingest_events_batch_v1),
236 )
237 .route("/api/v1/events/query", get(super::api::query_events))
238 .route(
239 "/api/v1/events/{event_id}",
240 get(super::api::get_event_by_id),
241 )
242 .route("/api/v1/events/stream", get(super::api::events_websocket))
243 .route("/api/v1/entities", get(super::api::list_entities))
244 .route(
245 "/api/v1/entities/duplicates",
246 get(super::api::detect_duplicates),
247 )
248 .route(
249 "/api/v1/entities/{entity_id}/state",
250 get(super::api::get_entity_state),
251 )
252 .route(
253 "/api/v1/entities/{entity_id}/snapshot",
254 get(super::api::get_entity_snapshot),
255 )
256 .route("/api/v1/stats", get(super::api::get_stats))
257 .route("/api/v1/streams", get(super::api::list_streams))
259 .route("/api/v1/event-types", get(super::api::list_event_types))
260 .route(
262 "/api/v1/analytics/frequency",
263 get(super::api::analytics_frequency),
264 )
265 .route(
266 "/api/v1/analytics/summary",
267 get(super::api::analytics_summary),
268 )
269 .route(
270 "/api/v1/analytics/correlation",
271 get(super::api::analytics_correlation),
272 )
273 .route("/api/v1/snapshots", post(super::api::create_snapshot))
275 .route("/api/v1/snapshots", get(super::api::list_snapshots))
276 .route(
277 "/api/v1/snapshots/{entity_id}/latest",
278 get(super::api::get_latest_snapshot),
279 )
280 .route(
282 "/api/v1/compaction/trigger",
283 post(super::api::trigger_compaction),
284 )
285 .route(
286 "/api/v1/compaction/stats",
287 get(super::api::compaction_stats),
288 )
289 .route("/api/v1/schemas", post(super::api::register_schema))
291 .route("/api/v1/schemas", get(super::api::list_subjects))
292 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
293 .route(
294 "/api/v1/schemas/{subject}/versions",
295 get(super::api::list_schema_versions),
296 )
297 .route(
298 "/api/v1/schemas/validate",
299 post(super::api::validate_event_schema),
300 )
301 .route(
302 "/api/v1/schemas/{subject}/compatibility",
303 put(super::api::set_compatibility_mode),
304 )
305 .route("/api/v1/replay", post(super::api::start_replay))
307 .route("/api/v1/replay", get(super::api::list_replays))
308 .route(
309 "/api/v1/replay/{replay_id}",
310 get(super::api::get_replay_progress),
311 )
312 .route(
313 "/api/v1/replay/{replay_id}/cancel",
314 post(super::api::cancel_replay),
315 )
316 .route(
317 "/api/v1/replay/{replay_id}",
318 delete(super::api::delete_replay),
319 )
320 .route("/api/v1/pipelines", post(super::api::register_pipeline))
322 .route("/api/v1/pipelines", get(super::api::list_pipelines))
323 .route(
324 "/api/v1/pipelines/stats",
325 get(super::api::all_pipeline_stats),
326 )
327 .route(
328 "/api/v1/pipelines/{pipeline_id}",
329 get(super::api::get_pipeline),
330 )
331 .route(
332 "/api/v1/pipelines/{pipeline_id}",
333 delete(super::api::remove_pipeline),
334 )
335 .route(
336 "/api/v1/pipelines/{pipeline_id}/stats",
337 get(super::api::get_pipeline_stats),
338 )
339 .route(
340 "/api/v1/pipelines/{pipeline_id}/reset",
341 put(super::api::reset_pipeline),
342 )
343 .route("/api/v1/projections", get(super::api::list_projections))
345 .route(
346 "/api/v1/projections/{name}",
347 get(super::api::get_projection),
348 )
349 .route(
350 "/api/v1/projections/{name}",
351 delete(super::api::delete_projection),
352 )
353 .route(
354 "/api/v1/projections/{name}/state",
355 get(super::api::get_projection_state_summary),
356 )
357 .route(
358 "/api/v1/projections/{name}/reset",
359 post(super::api::reset_projection),
360 )
361 .route(
362 "/api/v1/projections/{name}/pause",
363 post(super::api::pause_projection),
364 )
365 .route(
366 "/api/v1/projections/{name}/start",
367 post(super::api::start_projection),
368 )
369 .route(
370 "/api/v1/projections/{name}/{entity_id}/state",
371 get(super::api::get_projection_state),
372 )
373 .route(
374 "/api/v1/projections/{name}/{entity_id}/state",
375 post(super::api::save_projection_state),
376 )
377 .route(
378 "/api/v1/projections/{name}/{entity_id}/state",
379 put(super::api::save_projection_state),
380 )
381 .route(
382 "/api/v1/projections/{name}/bulk",
383 post(super::api::bulk_get_projection_states),
384 )
385 .route(
386 "/api/v1/projections/{name}/bulk/save",
387 post(super::api::bulk_save_projection_states),
388 )
389 .route("/api/v1/webhooks", post(super::api::register_webhook))
391 .route("/api/v1/webhooks", get(super::api::list_webhooks))
392 .route(
393 "/api/v1/webhooks/{webhook_id}",
394 get(super::api::get_webhook),
395 )
396 .route(
397 "/api/v1/webhooks/{webhook_id}",
398 put(super::api::update_webhook),
399 )
400 .route(
401 "/api/v1/webhooks/{webhook_id}",
402 delete(super::api::delete_webhook),
403 )
404 .route(
405 "/api/v1/webhooks/{webhook_id}/deliveries",
406 get(super::api::list_webhook_deliveries),
407 )
408 .route("/api/v1/consumers", post(super::api::register_consumer))
410 .route(
411 "/api/v1/consumers/{consumer_id}",
412 get(super::api::get_consumer),
413 )
414 .route(
415 "/api/v1/consumers/{consumer_id}/events",
416 get(super::api::poll_consumer_events),
417 )
418 .route(
419 "/api/v1/consumers/{consumer_id}/ack",
420 post(super::api::ack_consumer),
421 )
422 .route("/api/v1/cluster/status", get(cluster_status_handler))
424 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
425 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
426 .route(
427 "/api/v1/cluster/members/{node_id}",
428 delete(cluster_remove_member_handler),
429 )
430 .route(
431 "/api/v1/cluster/members/{node_id}/heartbeat",
432 post(cluster_heartbeat_handler),
433 )
434 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
435 .route("/api/v1/cluster/election", post(cluster_election_handler))
436 .route(
437 "/api/v1/cluster/partitions",
438 get(cluster_partitions_handler),
439 )
440 .route("/api/v1/graphql", post(super::api::graphql_query))
442 .route("/api/v1/geospatial/query", post(super::api::geo_query))
443 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
444 .route(
445 "/api/v1/exactly-once/stats",
446 get(super::api::exactly_once_stats),
447 )
448 .route(
449 "/api/v1/schema-evolution/history/{event_type}",
450 get(super::api::schema_evolution_history),
451 )
452 .route(
453 "/api/v1/schema-evolution/schema/{event_type}",
454 get(super::api::schema_evolution_schema),
455 )
456 .route(
457 "/api/v1/schema-evolution/stats",
458 get(super::api::schema_evolution_stats),
459 )
460 .route("/api/v1/geo/status", get(geo_status_handler))
462 .route("/api/v1/geo/sync", post(geo_sync_handler))
463 .route("/api/v1/geo/peers", get(geo_peers_handler))
464 .route("/api/v1/geo/failover", post(geo_failover_handler));
465 #[cfg(feature = "replication")]
467 let app = app
468 .route("/internal/promote", post(promote_handler))
469 .route("/internal/repoint", post(repoint_handler));
470 let app = app;
471
472 #[cfg(feature = "embedded-sync")]
474 let app = app
475 .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
476 .route("/api/v1/sync/push", post(super::api::sync_push_handler));
477
478 let app = app
479 .with_state(app_state.clone())
480 .layer(middleware::from_fn_with_state(
483 app_state,
484 read_only_middleware,
485 ))
486 .layer(middleware::from_fn_with_state(
487 rate_limit_state,
488 rate_limit_middleware,
489 ))
490 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
491 .layer(
492 CorsLayer::new()
493 .allow_origin(Any)
494 .allow_methods(Any)
495 .allow_headers(Any),
496 )
497 .layer(TraceLayer::new_for_http());
498
499 #[cfg(feature = "prime")]
501 let app = {
502 let data_dir =
503 std::env::var("PRIME_DATA_DIR").unwrap_or_else(|_| "/tmp/prime-data".to_string());
504 match crate::prime::Prime::open(&data_dir).await {
505 Ok(prime) => {
506 let prime_state = Arc::new(super::prime_api::PrimeState { prime });
507 tracing::info!("Prime API enabled at /api/v1/prime/*");
508 app.nest(
509 "/api/v1/prime",
510 super::prime_api::prime_router().with_state(prime_state),
511 )
512 }
513 Err(e) => {
514 tracing::warn!("Prime API disabled: failed to open Prime: {e}");
515 app
516 }
517 }
518 };
519
520 let listener = tokio::net::TcpListener::bind(addr).await?;
521
522 axum::serve(listener, app)
524 .with_graceful_shutdown(shutdown_signal())
525 .await?;
526
527 tracing::info!("🛑 AllSource Core shutdown complete");
528 Ok(())
529}
530
531const WRITE_PATHS: &[&str] = &[
533 "/api/v1/events",
534 "/api/v1/events/batch",
535 "/api/v1/snapshots",
536 "/api/v1/projections/",
537 "/api/v1/schemas",
538 "/api/v1/replay",
539 "/api/v1/pipelines",
540 "/api/v1/compaction/trigger",
541 "/api/v1/audit/events",
542 "/api/v1/config",
543 "/api/v1/webhooks",
544 "/api/v1/demo/seed",
545];
546
547fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
549 use axum::http::Method;
550 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
552 return false;
553 }
554 WRITE_PATHS
555 .iter()
556 .any(|write_path| path.starts_with(write_path))
557}
558
559fn is_internal_request(path: &str) -> bool {
561 path.starts_with("/internal/")
562 || path.starts_with("/api/v1/cluster/")
563 || path.starts_with("/api/v1/geo/")
564}
565
566async fn read_only_middleware(
572 State(state): State<AppState>,
573 request: axum::extract::Request,
574 next: axum::middleware::Next,
575) -> axum::response::Response {
576 let path = request.uri().path();
577 if state.role.load().is_follower()
578 && is_write_request(request.method(), path)
579 && !is_internal_request(path)
580 {
581 return (
582 axum::http::StatusCode::CONFLICT,
583 axum::Json(serde_json::json!({
584 "error": "read_only",
585 "message": "This node is a read-only follower"
586 })),
587 )
588 .into_response();
589 }
590 next.run(request).await
591}
592
593async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
598 let has_system_repos = state.service_container.has_system_repositories();
599
600 let system_streams = if has_system_repos {
601 let (tenant_count, config_count, total_events) =
602 if let Some(store) = state.service_container.system_store() {
603 use crate::domain::value_objects::system_stream::SystemDomain;
604 (
605 store.count_stream(SystemDomain::Tenant),
606 store.count_stream(SystemDomain::Config),
607 store.total_events(),
608 )
609 } else {
610 (0, 0, 0)
611 };
612
613 serde_json::json!({
614 "status": "healthy",
615 "mode": "event-sourced",
616 "total_events": total_events,
617 "tenant_events": tenant_count,
618 "config_events": config_count,
619 })
620 } else {
621 serde_json::json!({
622 "status": "disabled",
623 "mode": "in-memory",
624 })
625 };
626
627 let replication = {
628 #[cfg(feature = "replication")]
629 {
630 let shipper_guard = state.wal_shipper.read().await;
631 if let Some(ref shipper) = *shipper_guard {
632 serde_json::to_value(shipper.status()).unwrap_or_default()
633 } else if let Some(ref receiver) = state.wal_receiver {
634 serde_json::to_value(receiver.status()).unwrap_or_default()
635 } else {
636 serde_json::json!(null)
637 }
638 }
639 #[cfg(not(feature = "replication"))]
640 serde_json::json!({"edition": "community", "status": "not_available"})
641 };
642
643 let current_role = state.role.load();
644
645 Json(serde_json::json!({
646 "status": "healthy",
647 "service": "allsource-core",
648 "version": env!("CARGO_PKG_VERSION"),
649 "role": current_role,
650 "system_streams": system_streams,
651 "replication": replication,
652 }))
653}
654
655#[cfg(feature = "replication")]
661async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
662 let current_role = state.role.load();
663 if current_role == NodeRole::Leader {
664 return (
665 axum::http::StatusCode::OK,
666 Json(serde_json::json!({
667 "status": "already_leader",
668 "message": "This node is already the leader",
669 })),
670 );
671 }
672
673 tracing::info!("PROMOTE: Switching role from follower to leader");
674
675 state.role.store(NodeRole::Leader);
677
678 if let Some(ref receiver) = state.wal_receiver {
680 receiver.shutdown();
681 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
682 }
683
684 let replication_port = state.replication_port;
686 let (mut shipper, tx) = WalShipper::new();
687 state.store.enable_wal_replication(tx);
688 shipper.set_store(Arc::clone(&state.store));
689 shipper.set_metrics(state.store.metrics());
690 let shipper = Arc::new(shipper);
691
692 {
694 let mut shipper_guard = state.wal_shipper.write().await;
695 *shipper_guard = Some(Arc::clone(&shipper));
696 }
697
698 let shipper_clone = Arc::clone(&shipper);
700 tokio::spawn(async move {
701 if let Err(e) = shipper_clone.serve(replication_port).await {
702 tracing::error!("Promoted WAL shipper error: {}", e);
703 }
704 });
705
706 tracing::info!(
707 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
708 replication_port,
709 );
710
711 (
712 axum::http::StatusCode::OK,
713 Json(serde_json::json!({
714 "status": "promoted",
715 "role": "leader",
716 "replication_port": replication_port,
717 })),
718 )
719}
720
721#[cfg(feature = "replication")]
726async fn repoint_handler(
727 State(state): State<AppState>,
728 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
729) -> impl IntoResponse {
730 let current_role = state.role.load();
731 if current_role != NodeRole::Follower {
732 return (
733 axum::http::StatusCode::CONFLICT,
734 Json(serde_json::json!({
735 "error": "not_follower",
736 "message": "Repoint only applies to follower nodes",
737 })),
738 );
739 }
740
741 let new_leader = match params.get("leader") {
742 Some(l) if !l.is_empty() => l.clone(),
743 _ => {
744 return (
745 axum::http::StatusCode::BAD_REQUEST,
746 Json(serde_json::json!({
747 "error": "missing_leader",
748 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
749 })),
750 );
751 }
752 };
753
754 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
755
756 if let Some(ref receiver) = state.wal_receiver {
757 receiver.repoint(&new_leader);
758 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
759 } else {
760 tracing::warn!("REPOINT: No WAL receiver to repoint");
761 }
762
763 (
764 axum::http::StatusCode::OK,
765 Json(serde_json::json!({
766 "status": "repointed",
767 "new_leader": new_leader,
768 })),
769 )
770}
771
772async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
778 let Some(ref cm) = state.cluster_manager else {
779 return (
780 axum::http::StatusCode::SERVICE_UNAVAILABLE,
781 Json(serde_json::json!({
782 "error": "cluster_not_enabled",
783 "message": "Cluster mode is not enabled on this node"
784 })),
785 );
786 };
787
788 let status = cm.status().await;
789 (
790 axum::http::StatusCode::OK,
791 Json(serde_json::to_value(status).unwrap()),
792 )
793}
794
795async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
797 let Some(ref cm) = state.cluster_manager else {
798 return (
799 axum::http::StatusCode::SERVICE_UNAVAILABLE,
800 Json(serde_json::json!({
801 "error": "cluster_not_enabled",
802 "message": "Cluster mode is not enabled on this node"
803 })),
804 );
805 };
806
807 let members = cm.all_members();
808 (
809 axum::http::StatusCode::OK,
810 Json(serde_json::json!({
811 "members": members,
812 "count": members.len(),
813 })),
814 )
815}
816
817async fn cluster_add_member_handler(
819 State(state): State<AppState>,
820 Json(member): Json<ClusterMember>,
821) -> impl IntoResponse {
822 let Some(ref cm) = state.cluster_manager else {
823 return (
824 axum::http::StatusCode::SERVICE_UNAVAILABLE,
825 Json(serde_json::json!({
826 "error": "cluster_not_enabled",
827 "message": "Cluster mode is not enabled on this node"
828 })),
829 );
830 };
831
832 let node_id = member.node_id;
833 cm.add_member(member).await;
834
835 tracing::info!("Cluster member {} added", node_id);
836 (
837 axum::http::StatusCode::OK,
838 Json(serde_json::json!({
839 "status": "added",
840 "node_id": node_id,
841 })),
842 )
843}
844
845async fn cluster_remove_member_handler(
847 State(state): State<AppState>,
848 Path(node_id): Path<u32>,
849) -> impl IntoResponse {
850 let Some(ref cm) = state.cluster_manager else {
851 return (
852 axum::http::StatusCode::SERVICE_UNAVAILABLE,
853 Json(serde_json::json!({
854 "error": "cluster_not_enabled",
855 "message": "Cluster mode is not enabled on this node"
856 })),
857 );
858 };
859
860 match cm.remove_member(node_id).await {
861 Some(_) => {
862 tracing::info!("Cluster member {} removed", node_id);
863 (
864 axum::http::StatusCode::OK,
865 Json(serde_json::json!({
866 "status": "removed",
867 "node_id": node_id,
868 })),
869 )
870 }
871 None => (
872 axum::http::StatusCode::NOT_FOUND,
873 Json(serde_json::json!({
874 "error": "not_found",
875 "message": format!("Node {} not found in cluster", node_id),
876 })),
877 ),
878 }
879}
880
881#[derive(serde::Deserialize)]
883struct HeartbeatRequest {
884 wal_offset: u64,
885 #[serde(default = "default_true")]
886 healthy: bool,
887}
888
889fn default_true() -> bool {
890 true
891}
892
893async fn cluster_heartbeat_handler(
894 State(state): State<AppState>,
895 Path(node_id): Path<u32>,
896 Json(req): Json<HeartbeatRequest>,
897) -> impl IntoResponse {
898 let Some(ref cm) = state.cluster_manager else {
899 return (
900 axum::http::StatusCode::SERVICE_UNAVAILABLE,
901 Json(serde_json::json!({
902 "error": "cluster_not_enabled",
903 "message": "Cluster mode is not enabled on this node"
904 })),
905 );
906 };
907
908 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
909 (
910 axum::http::StatusCode::OK,
911 Json(serde_json::json!({
912 "status": "updated",
913 "node_id": node_id,
914 })),
915 )
916}
917
918async fn cluster_vote_handler(
920 State(state): State<AppState>,
921 Json(request): Json<VoteRequest>,
922) -> impl IntoResponse {
923 let Some(ref cm) = state.cluster_manager else {
924 return (
925 axum::http::StatusCode::SERVICE_UNAVAILABLE,
926 Json(serde_json::json!({
927 "error": "cluster_not_enabled",
928 "message": "Cluster mode is not enabled on this node"
929 })),
930 );
931 };
932
933 let response = cm.handle_vote_request(&request).await;
934 (
935 axum::http::StatusCode::OK,
936 Json(serde_json::to_value(response).unwrap()),
937 )
938}
939
940async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
942 let Some(ref cm) = state.cluster_manager else {
943 return (
944 axum::http::StatusCode::SERVICE_UNAVAILABLE,
945 Json(serde_json::json!({
946 "error": "cluster_not_enabled",
947 "message": "Cluster mode is not enabled on this node"
948 })),
949 );
950 };
951
952 let candidate = cm.select_leader_candidate();
954
955 match candidate {
956 Some(candidate_id) => {
957 let new_term = cm.start_election().await;
958 tracing::info!(
959 "Cluster election started: term={}, candidate={}",
960 new_term,
961 candidate_id,
962 );
963
964 if candidate_id == cm.self_id() {
967 cm.become_leader(new_term).await;
968 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
969 }
970
971 (
972 axum::http::StatusCode::OK,
973 Json(serde_json::json!({
974 "status": "election_started",
975 "term": new_term,
976 "candidate_id": candidate_id,
977 "self_is_leader": candidate_id == cm.self_id(),
978 })),
979 )
980 }
981 None => (
982 axum::http::StatusCode::CONFLICT,
983 Json(serde_json::json!({
984 "error": "no_candidates",
985 "message": "No healthy members available for leader election",
986 })),
987 ),
988 }
989}
990
991async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
993 let Some(ref cm) = state.cluster_manager else {
994 return (
995 axum::http::StatusCode::SERVICE_UNAVAILABLE,
996 Json(serde_json::json!({
997 "error": "cluster_not_enabled",
998 "message": "Cluster mode is not enabled on this node"
999 })),
1000 );
1001 };
1002
1003 let registry = cm.registry();
1004 let distribution = registry.partition_distribution();
1005 let total_partitions: usize = distribution.values().map(std::vec::Vec::len).sum();
1006
1007 (
1008 axum::http::StatusCode::OK,
1009 Json(serde_json::json!({
1010 "total_partitions": total_partitions,
1011 "node_count": registry.node_count(),
1012 "healthy_node_count": registry.healthy_node_count(),
1013 "distribution": distribution,
1014 })),
1015 )
1016}
1017
1018async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
1024 let Some(ref geo) = state.geo_replication else {
1025 return (
1026 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1027 Json(serde_json::json!({
1028 "error": "geo_replication_not_enabled",
1029 "message": "Geo-replication is not enabled on this node"
1030 })),
1031 );
1032 };
1033
1034 let status = geo.status();
1035 (
1036 axum::http::StatusCode::OK,
1037 Json(serde_json::to_value(status).unwrap()),
1038 )
1039}
1040
1041async fn geo_sync_handler(
1043 State(state): State<AppState>,
1044 Json(request): Json<GeoSyncRequest>,
1045) -> impl IntoResponse {
1046 let Some(ref geo) = state.geo_replication else {
1047 return (
1048 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1049 Json(serde_json::json!({
1050 "error": "geo_replication_not_enabled",
1051 "message": "Geo-replication is not enabled on this node"
1052 })),
1053 );
1054 };
1055
1056 tracing::info!(
1057 "Geo-sync received from region '{}': {} events",
1058 request.source_region,
1059 request.events.len(),
1060 );
1061
1062 let response = geo.receive_sync(&request);
1063 (
1064 axum::http::StatusCode::OK,
1065 Json(serde_json::to_value(response).unwrap()),
1066 )
1067}
1068
1069async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1071 let Some(ref geo) = state.geo_replication else {
1072 return (
1073 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1074 Json(serde_json::json!({
1075 "error": "geo_replication_not_enabled",
1076 "message": "Geo-replication is not enabled on this node"
1077 })),
1078 );
1079 };
1080
1081 let status = geo.status();
1082 (
1083 axum::http::StatusCode::OK,
1084 Json(serde_json::json!({
1085 "region_id": status.region_id,
1086 "peers": status.peers,
1087 })),
1088 )
1089}
1090
1091async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1093 let Some(ref geo) = state.geo_replication else {
1094 return (
1095 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1096 Json(serde_json::json!({
1097 "error": "geo_replication_not_enabled",
1098 "message": "Geo-replication is not enabled on this node"
1099 })),
1100 );
1101 };
1102
1103 match geo.select_failover_region() {
1104 Some(failover_region) => {
1105 tracing::info!(
1106 "Geo-failover: selected region '{}' as failover target",
1107 failover_region,
1108 );
1109 (
1110 axum::http::StatusCode::OK,
1111 Json(serde_json::json!({
1112 "status": "failover_target_selected",
1113 "failover_region": failover_region,
1114 "message": "Region selected for failover. DNS/routing update required externally.",
1115 })),
1116 )
1117 }
1118 None => (
1119 axum::http::StatusCode::CONFLICT,
1120 Json(serde_json::json!({
1121 "error": "no_healthy_peers",
1122 "message": "No healthy peer regions available for failover",
1123 })),
1124 ),
1125 }
1126}
1127
1128async fn shutdown_signal() {
1130 let ctrl_c = async {
1131 tokio::signal::ctrl_c()
1132 .await
1133 .expect("failed to install Ctrl+C handler");
1134 };
1135
1136 #[cfg(unix)]
1137 let terminate = async {
1138 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1139 .expect("failed to install SIGTERM handler")
1140 .recv()
1141 .await;
1142 };
1143
1144 #[cfg(not(unix))]
1145 let terminate = std::future::pending::<()>();
1146
1147 tokio::select! {
1148 () = ctrl_c => {
1149 tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
1150 }
1151 () = terminate => {
1152 tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
1153 }
1154 }
1155}