1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 domain::repositories::TenantRepository,
5 infrastructure::{
6 cluster::{
7 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8 },
9 replication::{WalReceiver, WalShipper},
10 security::{
11 auth::AuthManager,
12 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13 rate_limit::RateLimiter,
14 },
15 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16 },
17 store::EventStore,
18};
19use axum::{
20 Json, Router,
21 extract::{Path, State},
22 middleware,
23 response::IntoResponse,
24 routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28 cors::{Any, CorsLayer},
29 trace::TraceLayer,
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36 Leader,
37 Follower,
38}
39
40impl NodeRole {
41 pub fn from_env() -> Self {
47 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48 match role.to_lowercase().as_str() {
49 "follower" => return NodeRole::Follower,
50 "leader" => return NodeRole::Leader,
51 other => {
52 tracing::warn!(
53 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54 other
55 );
56 return NodeRole::Leader;
57 }
58 }
59 }
60 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61 && (read_only == "true" || read_only == "1")
62 {
63 return NodeRole::Follower;
64 }
65 NodeRole::Leader
66 }
67
68 pub fn is_follower(self) -> bool {
69 self == NodeRole::Follower
70 }
71
72 fn to_u8(self) -> u8 {
73 match self {
74 NodeRole::Leader => 0,
75 NodeRole::Follower => 1,
76 }
77 }
78
79 fn from_u8(v: u8) -> Self {
80 match v {
81 1 => NodeRole::Follower,
82 _ => NodeRole::Leader,
83 }
84 }
85}
86
87impl std::fmt::Display for NodeRole {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 NodeRole::Leader => write!(f, "leader"),
91 NodeRole::Follower => write!(f, "follower"),
92 }
93 }
94}
95
96#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104 pub fn new(role: NodeRole) -> Self {
105 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106 }
107
108 pub fn load(&self) -> NodeRole {
109 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110 }
111
112 pub fn store(&self, role: NodeRole) {
113 self.0
114 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115 }
116}
117
118#[derive(Clone)]
120pub struct AppState {
121 pub store: Arc<EventStore>,
122 pub auth_manager: Arc<AuthManager>,
123 pub tenant_repo: Arc<dyn TenantRepository>,
124 pub service_container: ServiceContainer,
126 pub role: AtomicNodeRole,
128 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131 pub wal_receiver: Option<Arc<WalReceiver>>,
133 pub replication_port: u16,
135 pub cluster_manager: Option<Arc<ClusterManager>>,
137 pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144 fn from_ref(state: &AppState) -> Self {
145 state.store.clone()
146 }
147}
148
149pub async fn serve_v1(
150 store: Arc<EventStore>,
151 auth_manager: Arc<AuthManager>,
152 tenant_repo: Arc<dyn TenantRepository>,
153 rate_limiter: Arc<RateLimiter>,
154 service_container: ServiceContainer,
155 addr: &str,
156 role: NodeRole,
157 wal_shipper: Option<Arc<WalShipper>>,
158 wal_receiver: Option<Arc<WalReceiver>>,
159 replication_port: u16,
160 cluster_manager: Option<Arc<ClusterManager>>,
161 geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163 let app_state = AppState {
164 store,
165 auth_manager: auth_manager.clone(),
166 tenant_repo,
167 service_container,
168 role: AtomicNodeRole::new(role),
169 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170 wal_receiver,
171 replication_port,
172 cluster_manager,
173 geo_replication,
174 };
175
176 let auth_state = AuthState {
177 auth_manager: auth_manager.clone(),
178 };
179
180 let rate_limit_state = RateLimitState { rate_limiter };
181
182 let app = Router::new()
183 .route("/health", get(health_v1))
185 .route("/metrics", get(super::api::prometheus_metrics))
186 .route("/api/v1/auth/register", post(register_handler))
188 .route("/api/v1/auth/login", post(login_handler))
189 .route("/api/v1/auth/me", get(me_handler))
190 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193 .route("/api/v1/auth/users", get(list_users_handler))
194 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195 .route("/api/v1/tenants", post(create_tenant_handler))
197 .route("/api/v1/tenants", get(list_tenants_handler))
198 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202 .route(
203 "/api/v1/tenants/{id}/deactivate",
204 post(deactivate_tenant_handler),
205 )
206 .route(
207 "/api/v1/tenants/{id}/activate",
208 post(activate_tenant_handler),
209 )
210 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211 .route("/api/v1/audit/events", post(log_audit_event))
213 .route("/api/v1/audit/events", get(query_audit_events))
214 .route("/api/v1/config", get(list_configs))
216 .route("/api/v1/config", post(set_config))
217 .route("/api/v1/config/{key}", get(get_config))
218 .route("/api/v1/config/{key}", put(update_config))
219 .route("/api/v1/config/{key}", delete(delete_config))
220 .route(
222 "/api/v1/demo/seed",
223 post(super::demo_api::demo_seed_handler),
224 )
225 .route("/api/v1/events", post(super::api::ingest_event_v1))
227 .route(
228 "/api/v1/events/batch",
229 post(super::api::ingest_events_batch_v1),
230 )
231 .route("/api/v1/events/query", get(super::api::query_events))
232 .route(
233 "/api/v1/events/{event_id}",
234 get(super::api::get_event_by_id),
235 )
236 .route("/api/v1/events/stream", get(super::api::events_websocket))
237 .route("/api/v1/entities", get(super::api::list_entities))
238 .route(
239 "/api/v1/entities/duplicates",
240 get(super::api::detect_duplicates),
241 )
242 .route(
243 "/api/v1/entities/{entity_id}/state",
244 get(super::api::get_entity_state),
245 )
246 .route(
247 "/api/v1/entities/{entity_id}/snapshot",
248 get(super::api::get_entity_snapshot),
249 )
250 .route("/api/v1/stats", get(super::api::get_stats))
251 .route("/api/v1/streams", get(super::api::list_streams))
253 .route("/api/v1/event-types", get(super::api::list_event_types))
254 .route(
256 "/api/v1/analytics/frequency",
257 get(super::api::analytics_frequency),
258 )
259 .route(
260 "/api/v1/analytics/summary",
261 get(super::api::analytics_summary),
262 )
263 .route(
264 "/api/v1/analytics/correlation",
265 get(super::api::analytics_correlation),
266 )
267 .route("/api/v1/snapshots", post(super::api::create_snapshot))
269 .route("/api/v1/snapshots", get(super::api::list_snapshots))
270 .route(
271 "/api/v1/snapshots/{entity_id}/latest",
272 get(super::api::get_latest_snapshot),
273 )
274 .route(
276 "/api/v1/compaction/trigger",
277 post(super::api::trigger_compaction),
278 )
279 .route(
280 "/api/v1/compaction/stats",
281 get(super::api::compaction_stats),
282 )
283 .route("/api/v1/schemas", post(super::api::register_schema))
285 .route("/api/v1/schemas", get(super::api::list_subjects))
286 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287 .route(
288 "/api/v1/schemas/{subject}/versions",
289 get(super::api::list_schema_versions),
290 )
291 .route(
292 "/api/v1/schemas/validate",
293 post(super::api::validate_event_schema),
294 )
295 .route(
296 "/api/v1/schemas/{subject}/compatibility",
297 put(super::api::set_compatibility_mode),
298 )
299 .route("/api/v1/replay", post(super::api::start_replay))
301 .route("/api/v1/replay", get(super::api::list_replays))
302 .route(
303 "/api/v1/replay/{replay_id}",
304 get(super::api::get_replay_progress),
305 )
306 .route(
307 "/api/v1/replay/{replay_id}/cancel",
308 post(super::api::cancel_replay),
309 )
310 .route(
311 "/api/v1/replay/{replay_id}",
312 delete(super::api::delete_replay),
313 )
314 .route("/api/v1/pipelines", post(super::api::register_pipeline))
316 .route("/api/v1/pipelines", get(super::api::list_pipelines))
317 .route(
318 "/api/v1/pipelines/stats",
319 get(super::api::all_pipeline_stats),
320 )
321 .route(
322 "/api/v1/pipelines/{pipeline_id}",
323 get(super::api::get_pipeline),
324 )
325 .route(
326 "/api/v1/pipelines/{pipeline_id}",
327 delete(super::api::remove_pipeline),
328 )
329 .route(
330 "/api/v1/pipelines/{pipeline_id}/stats",
331 get(super::api::get_pipeline_stats),
332 )
333 .route(
334 "/api/v1/pipelines/{pipeline_id}/reset",
335 put(super::api::reset_pipeline),
336 )
337 .route("/api/v1/projections", get(super::api::list_projections))
339 .route(
340 "/api/v1/projections/{name}",
341 get(super::api::get_projection),
342 )
343 .route(
344 "/api/v1/projections/{name}",
345 delete(super::api::delete_projection),
346 )
347 .route(
348 "/api/v1/projections/{name}/state",
349 get(super::api::get_projection_state_summary),
350 )
351 .route(
352 "/api/v1/projections/{name}/reset",
353 post(super::api::reset_projection),
354 )
355 .route(
356 "/api/v1/projections/{name}/pause",
357 post(super::api::pause_projection),
358 )
359 .route(
360 "/api/v1/projections/{name}/start",
361 post(super::api::start_projection),
362 )
363 .route(
364 "/api/v1/projections/{name}/{entity_id}/state",
365 get(super::api::get_projection_state),
366 )
367 .route(
368 "/api/v1/projections/{name}/{entity_id}/state",
369 post(super::api::save_projection_state),
370 )
371 .route(
372 "/api/v1/projections/{name}/{entity_id}/state",
373 put(super::api::save_projection_state),
374 )
375 .route(
376 "/api/v1/projections/{name}/bulk",
377 post(super::api::bulk_get_projection_states),
378 )
379 .route(
380 "/api/v1/projections/{name}/bulk/save",
381 post(super::api::bulk_save_projection_states),
382 )
383 .route("/api/v1/webhooks", post(super::api::register_webhook))
385 .route("/api/v1/webhooks", get(super::api::list_webhooks))
386 .route(
387 "/api/v1/webhooks/{webhook_id}",
388 get(super::api::get_webhook),
389 )
390 .route(
391 "/api/v1/webhooks/{webhook_id}",
392 put(super::api::update_webhook),
393 )
394 .route(
395 "/api/v1/webhooks/{webhook_id}",
396 delete(super::api::delete_webhook),
397 )
398 .route(
399 "/api/v1/webhooks/{webhook_id}/deliveries",
400 get(super::api::list_webhook_deliveries),
401 )
402 .route("/api/v1/consumers", post(super::api::register_consumer))
404 .route(
405 "/api/v1/consumers/{consumer_id}",
406 get(super::api::get_consumer),
407 )
408 .route(
409 "/api/v1/consumers/{consumer_id}/events",
410 get(super::api::poll_consumer_events),
411 )
412 .route(
413 "/api/v1/consumers/{consumer_id}/ack",
414 post(super::api::ack_consumer),
415 )
416 .route("/api/v1/cluster/status", get(cluster_status_handler))
418 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
419 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
420 .route(
421 "/api/v1/cluster/members/{node_id}",
422 delete(cluster_remove_member_handler),
423 )
424 .route(
425 "/api/v1/cluster/members/{node_id}/heartbeat",
426 post(cluster_heartbeat_handler),
427 )
428 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
429 .route("/api/v1/cluster/election", post(cluster_election_handler))
430 .route(
431 "/api/v1/cluster/partitions",
432 get(cluster_partitions_handler),
433 )
434 .route("/api/v1/eventql", post(super::api::eventql_query))
436 .route("/api/v1/graphql", post(super::api::graphql_query))
437 .route("/api/v1/geospatial/query", post(super::api::geo_query))
438 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
439 .route(
440 "/api/v1/exactly-once/stats",
441 get(super::api::exactly_once_stats),
442 )
443 .route(
444 "/api/v1/schema-evolution/history/{event_type}",
445 get(super::api::schema_evolution_history),
446 )
447 .route(
448 "/api/v1/schema-evolution/schema/{event_type}",
449 get(super::api::schema_evolution_schema),
450 )
451 .route(
452 "/api/v1/schema-evolution/stats",
453 get(super::api::schema_evolution_stats),
454 )
455 .route("/api/v1/geo/status", get(geo_status_handler))
457 .route("/api/v1/geo/sync", post(geo_sync_handler))
458 .route("/api/v1/geo/peers", get(geo_peers_handler))
459 .route("/api/v1/geo/failover", post(geo_failover_handler))
460 .route("/internal/promote", post(promote_handler))
462 .route("/internal/repoint", post(repoint_handler));
463
464 #[cfg(feature = "embedded-sync")]
466 let app = app
467 .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
468 .route("/api/v1/sync/push", post(super::api::sync_push_handler));
469
470 let app = app
471 .with_state(app_state.clone())
472 .layer(middleware::from_fn_with_state(
475 app_state,
476 read_only_middleware,
477 ))
478 .layer(middleware::from_fn_with_state(
479 rate_limit_state,
480 rate_limit_middleware,
481 ))
482 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
483 .layer(
484 CorsLayer::new()
485 .allow_origin(Any)
486 .allow_methods(Any)
487 .allow_headers(Any),
488 )
489 .layer(TraceLayer::new_for_http());
490
491 let listener = tokio::net::TcpListener::bind(addr).await?;
492
493 axum::serve(listener, app)
495 .with_graceful_shutdown(shutdown_signal())
496 .await?;
497
498 tracing::info!("π AllSource Core shutdown complete");
499 Ok(())
500}
501
502const WRITE_PATHS: &[&str] = &[
504 "/api/v1/events",
505 "/api/v1/events/batch",
506 "/api/v1/snapshots",
507 "/api/v1/projections/",
508 "/api/v1/schemas",
509 "/api/v1/replay",
510 "/api/v1/pipelines",
511 "/api/v1/compaction/trigger",
512 "/api/v1/audit/events",
513 "/api/v1/config",
514 "/api/v1/webhooks",
515 "/api/v1/demo/seed",
516];
517
518fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
520 use axum::http::Method;
521 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
523 return false;
524 }
525 WRITE_PATHS
526 .iter()
527 .any(|write_path| path.starts_with(write_path))
528}
529
530fn is_internal_request(path: &str) -> bool {
532 path.starts_with("/internal/")
533 || path.starts_with("/api/v1/cluster/")
534 || path.starts_with("/api/v1/geo/")
535}
536
537async fn read_only_middleware(
543 State(state): State<AppState>,
544 request: axum::extract::Request,
545 next: axum::middleware::Next,
546) -> axum::response::Response {
547 let path = request.uri().path();
548 if state.role.load().is_follower()
549 && is_write_request(request.method(), path)
550 && !is_internal_request(path)
551 {
552 return (
553 axum::http::StatusCode::CONFLICT,
554 axum::Json(serde_json::json!({
555 "error": "read_only",
556 "message": "This node is a read-only follower"
557 })),
558 )
559 .into_response();
560 }
561 next.run(request).await
562}
563
564async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
569 let has_system_repos = state.service_container.has_system_repositories();
570
571 let system_streams = if has_system_repos {
572 let (tenant_count, config_count, total_events) =
573 if let Some(store) = state.service_container.system_store() {
574 use crate::domain::value_objects::system_stream::SystemDomain;
575 (
576 store.count_stream(SystemDomain::Tenant),
577 store.count_stream(SystemDomain::Config),
578 store.total_events(),
579 )
580 } else {
581 (0, 0, 0)
582 };
583
584 serde_json::json!({
585 "status": "healthy",
586 "mode": "event-sourced",
587 "total_events": total_events,
588 "tenant_events": tenant_count,
589 "config_events": config_count,
590 })
591 } else {
592 serde_json::json!({
593 "status": "disabled",
594 "mode": "in-memory",
595 })
596 };
597
598 let replication = {
599 let shipper_guard = state.wal_shipper.read().await;
600 if let Some(ref shipper) = *shipper_guard {
601 serde_json::to_value(shipper.status()).unwrap_or_default()
602 } else if let Some(ref receiver) = state.wal_receiver {
603 serde_json::to_value(receiver.status()).unwrap_or_default()
604 } else {
605 serde_json::json!(null)
606 }
607 };
608
609 let current_role = state.role.load();
610
611 Json(serde_json::json!({
612 "status": "healthy",
613 "service": "allsource-core",
614 "version": env!("CARGO_PKG_VERSION"),
615 "role": current_role,
616 "system_streams": system_streams,
617 "replication": replication,
618 }))
619}
620
621async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
627 let current_role = state.role.load();
628 if current_role == NodeRole::Leader {
629 return (
630 axum::http::StatusCode::OK,
631 Json(serde_json::json!({
632 "status": "already_leader",
633 "message": "This node is already the leader",
634 })),
635 );
636 }
637
638 tracing::info!("PROMOTE: Switching role from follower to leader");
639
640 state.role.store(NodeRole::Leader);
642
643 if let Some(ref receiver) = state.wal_receiver {
645 receiver.shutdown();
646 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
647 }
648
649 let replication_port = state.replication_port;
651 let (mut shipper, tx) = WalShipper::new();
652 state.store.enable_wal_replication(tx);
653 shipper.set_store(Arc::clone(&state.store));
654 shipper.set_metrics(state.store.metrics());
655 let shipper = Arc::new(shipper);
656
657 {
659 let mut shipper_guard = state.wal_shipper.write().await;
660 *shipper_guard = Some(Arc::clone(&shipper));
661 }
662
663 let shipper_clone = Arc::clone(&shipper);
665 tokio::spawn(async move {
666 if let Err(e) = shipper_clone.serve(replication_port).await {
667 tracing::error!("Promoted WAL shipper error: {}", e);
668 }
669 });
670
671 tracing::info!(
672 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
673 replication_port,
674 );
675
676 (
677 axum::http::StatusCode::OK,
678 Json(serde_json::json!({
679 "status": "promoted",
680 "role": "leader",
681 "replication_port": replication_port,
682 })),
683 )
684}
685
686async fn repoint_handler(
691 State(state): State<AppState>,
692 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
693) -> impl IntoResponse {
694 let current_role = state.role.load();
695 if current_role != NodeRole::Follower {
696 return (
697 axum::http::StatusCode::CONFLICT,
698 Json(serde_json::json!({
699 "error": "not_follower",
700 "message": "Repoint only applies to follower nodes",
701 })),
702 );
703 }
704
705 let new_leader = match params.get("leader") {
706 Some(l) if !l.is_empty() => l.clone(),
707 _ => {
708 return (
709 axum::http::StatusCode::BAD_REQUEST,
710 Json(serde_json::json!({
711 "error": "missing_leader",
712 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
713 })),
714 );
715 }
716 };
717
718 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
719
720 if let Some(ref receiver) = state.wal_receiver {
721 receiver.repoint(&new_leader);
722 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
723 } else {
724 tracing::warn!("REPOINT: No WAL receiver to repoint");
725 }
726
727 (
728 axum::http::StatusCode::OK,
729 Json(serde_json::json!({
730 "status": "repointed",
731 "new_leader": new_leader,
732 })),
733 )
734}
735
736async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
742 let Some(ref cm) = state.cluster_manager else {
743 return (
744 axum::http::StatusCode::SERVICE_UNAVAILABLE,
745 Json(serde_json::json!({
746 "error": "cluster_not_enabled",
747 "message": "Cluster mode is not enabled on this node"
748 })),
749 );
750 };
751
752 let status = cm.status().await;
753 (
754 axum::http::StatusCode::OK,
755 Json(serde_json::to_value(status).unwrap()),
756 )
757}
758
759async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
761 let Some(ref cm) = state.cluster_manager else {
762 return (
763 axum::http::StatusCode::SERVICE_UNAVAILABLE,
764 Json(serde_json::json!({
765 "error": "cluster_not_enabled",
766 "message": "Cluster mode is not enabled on this node"
767 })),
768 );
769 };
770
771 let members = cm.all_members();
772 (
773 axum::http::StatusCode::OK,
774 Json(serde_json::json!({
775 "members": members,
776 "count": members.len(),
777 })),
778 )
779}
780
781async fn cluster_add_member_handler(
783 State(state): State<AppState>,
784 Json(member): Json<ClusterMember>,
785) -> impl IntoResponse {
786 let Some(ref cm) = state.cluster_manager else {
787 return (
788 axum::http::StatusCode::SERVICE_UNAVAILABLE,
789 Json(serde_json::json!({
790 "error": "cluster_not_enabled",
791 "message": "Cluster mode is not enabled on this node"
792 })),
793 );
794 };
795
796 let node_id = member.node_id;
797 cm.add_member(member).await;
798
799 tracing::info!("Cluster member {} added", node_id);
800 (
801 axum::http::StatusCode::OK,
802 Json(serde_json::json!({
803 "status": "added",
804 "node_id": node_id,
805 })),
806 )
807}
808
809async fn cluster_remove_member_handler(
811 State(state): State<AppState>,
812 Path(node_id): Path<u32>,
813) -> impl IntoResponse {
814 let Some(ref cm) = state.cluster_manager else {
815 return (
816 axum::http::StatusCode::SERVICE_UNAVAILABLE,
817 Json(serde_json::json!({
818 "error": "cluster_not_enabled",
819 "message": "Cluster mode is not enabled on this node"
820 })),
821 );
822 };
823
824 match cm.remove_member(node_id).await {
825 Some(_) => {
826 tracing::info!("Cluster member {} removed", node_id);
827 (
828 axum::http::StatusCode::OK,
829 Json(serde_json::json!({
830 "status": "removed",
831 "node_id": node_id,
832 })),
833 )
834 }
835 None => (
836 axum::http::StatusCode::NOT_FOUND,
837 Json(serde_json::json!({
838 "error": "not_found",
839 "message": format!("Node {} not found in cluster", node_id),
840 })),
841 ),
842 }
843}
844
845#[derive(serde::Deserialize)]
847struct HeartbeatRequest {
848 wal_offset: u64,
849 #[serde(default = "default_true")]
850 healthy: bool,
851}
852
853fn default_true() -> bool {
854 true
855}
856
857async fn cluster_heartbeat_handler(
858 State(state): State<AppState>,
859 Path(node_id): Path<u32>,
860 Json(req): Json<HeartbeatRequest>,
861) -> impl IntoResponse {
862 let Some(ref cm) = state.cluster_manager else {
863 return (
864 axum::http::StatusCode::SERVICE_UNAVAILABLE,
865 Json(serde_json::json!({
866 "error": "cluster_not_enabled",
867 "message": "Cluster mode is not enabled on this node"
868 })),
869 );
870 };
871
872 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
873 (
874 axum::http::StatusCode::OK,
875 Json(serde_json::json!({
876 "status": "updated",
877 "node_id": node_id,
878 })),
879 )
880}
881
882async fn cluster_vote_handler(
884 State(state): State<AppState>,
885 Json(request): Json<VoteRequest>,
886) -> impl IntoResponse {
887 let Some(ref cm) = state.cluster_manager else {
888 return (
889 axum::http::StatusCode::SERVICE_UNAVAILABLE,
890 Json(serde_json::json!({
891 "error": "cluster_not_enabled",
892 "message": "Cluster mode is not enabled on this node"
893 })),
894 );
895 };
896
897 let response = cm.handle_vote_request(&request).await;
898 (
899 axum::http::StatusCode::OK,
900 Json(serde_json::to_value(response).unwrap()),
901 )
902}
903
904async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
906 let Some(ref cm) = state.cluster_manager else {
907 return (
908 axum::http::StatusCode::SERVICE_UNAVAILABLE,
909 Json(serde_json::json!({
910 "error": "cluster_not_enabled",
911 "message": "Cluster mode is not enabled on this node"
912 })),
913 );
914 };
915
916 let candidate = cm.select_leader_candidate();
918
919 match candidate {
920 Some(candidate_id) => {
921 let new_term = cm.start_election().await;
922 tracing::info!(
923 "Cluster election started: term={}, candidate={}",
924 new_term,
925 candidate_id,
926 );
927
928 if candidate_id == cm.self_id() {
931 cm.become_leader(new_term).await;
932 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
933 }
934
935 (
936 axum::http::StatusCode::OK,
937 Json(serde_json::json!({
938 "status": "election_started",
939 "term": new_term,
940 "candidate_id": candidate_id,
941 "self_is_leader": candidate_id == cm.self_id(),
942 })),
943 )
944 }
945 None => (
946 axum::http::StatusCode::CONFLICT,
947 Json(serde_json::json!({
948 "error": "no_candidates",
949 "message": "No healthy members available for leader election",
950 })),
951 ),
952 }
953}
954
955async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
957 let Some(ref cm) = state.cluster_manager else {
958 return (
959 axum::http::StatusCode::SERVICE_UNAVAILABLE,
960 Json(serde_json::json!({
961 "error": "cluster_not_enabled",
962 "message": "Cluster mode is not enabled on this node"
963 })),
964 );
965 };
966
967 let registry = cm.registry();
968 let distribution = registry.partition_distribution();
969 let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
970
971 (
972 axum::http::StatusCode::OK,
973 Json(serde_json::json!({
974 "total_partitions": total_partitions,
975 "node_count": registry.node_count(),
976 "healthy_node_count": registry.healthy_node_count(),
977 "distribution": distribution,
978 })),
979 )
980}
981
982async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
988 let Some(ref geo) = state.geo_replication else {
989 return (
990 axum::http::StatusCode::SERVICE_UNAVAILABLE,
991 Json(serde_json::json!({
992 "error": "geo_replication_not_enabled",
993 "message": "Geo-replication is not enabled on this node"
994 })),
995 );
996 };
997
998 let status = geo.status();
999 (
1000 axum::http::StatusCode::OK,
1001 Json(serde_json::to_value(status).unwrap()),
1002 )
1003}
1004
1005async fn geo_sync_handler(
1007 State(state): State<AppState>,
1008 Json(request): Json<GeoSyncRequest>,
1009) -> impl IntoResponse {
1010 let Some(ref geo) = state.geo_replication else {
1011 return (
1012 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1013 Json(serde_json::json!({
1014 "error": "geo_replication_not_enabled",
1015 "message": "Geo-replication is not enabled on this node"
1016 })),
1017 );
1018 };
1019
1020 tracing::info!(
1021 "Geo-sync received from region '{}': {} events",
1022 request.source_region,
1023 request.events.len(),
1024 );
1025
1026 let response = geo.receive_sync(&request);
1027 (
1028 axum::http::StatusCode::OK,
1029 Json(serde_json::to_value(response).unwrap()),
1030 )
1031}
1032
1033async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1035 let Some(ref geo) = state.geo_replication else {
1036 return (
1037 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1038 Json(serde_json::json!({
1039 "error": "geo_replication_not_enabled",
1040 "message": "Geo-replication is not enabled on this node"
1041 })),
1042 );
1043 };
1044
1045 let status = geo.status();
1046 (
1047 axum::http::StatusCode::OK,
1048 Json(serde_json::json!({
1049 "region_id": status.region_id,
1050 "peers": status.peers,
1051 })),
1052 )
1053}
1054
1055async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1057 let Some(ref geo) = state.geo_replication else {
1058 return (
1059 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1060 Json(serde_json::json!({
1061 "error": "geo_replication_not_enabled",
1062 "message": "Geo-replication is not enabled on this node"
1063 })),
1064 );
1065 };
1066
1067 match geo.select_failover_region() {
1068 Some(failover_region) => {
1069 tracing::info!(
1070 "Geo-failover: selected region '{}' as failover target",
1071 failover_region,
1072 );
1073 (
1074 axum::http::StatusCode::OK,
1075 Json(serde_json::json!({
1076 "status": "failover_target_selected",
1077 "failover_region": failover_region,
1078 "message": "Region selected for failover. DNS/routing update required externally.",
1079 })),
1080 )
1081 }
1082 None => (
1083 axum::http::StatusCode::CONFLICT,
1084 Json(serde_json::json!({
1085 "error": "no_healthy_peers",
1086 "message": "No healthy peer regions available for failover",
1087 })),
1088 ),
1089 }
1090}
1091
1092async fn shutdown_signal() {
1094 let ctrl_c = async {
1095 tokio::signal::ctrl_c()
1096 .await
1097 .expect("failed to install Ctrl+C handler");
1098 };
1099
1100 #[cfg(unix)]
1101 let terminate = async {
1102 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1103 .expect("failed to install SIGTERM handler")
1104 .recv()
1105 .await;
1106 };
1107
1108 #[cfg(not(unix))]
1109 let terminate = std::future::pending::<()>();
1110
1111 tokio::select! {
1112 _ = ctrl_c => {
1113 tracing::info!("π€ Received Ctrl+C, initiating graceful shutdown...");
1114 }
1115 _ = terminate => {
1116 tracing::info!("π€ Received SIGTERM, initiating graceful shutdown...");
1117 }
1118 }
1119}