1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 domain::repositories::TenantRepository,
5 infrastructure::{
6 cluster::{
7 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8 },
9 replication::{WalReceiver, WalShipper},
10 security::{
11 auth::AuthManager,
12 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13 rate_limit::RateLimiter,
14 },
15 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16 },
17 store::EventStore,
18};
19use axum::{
20 Json, Router,
21 extract::{Path, State},
22 middleware,
23 response::IntoResponse,
24 routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28 cors::{Any, CorsLayer},
29 trace::TraceLayer,
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36 Leader,
37 Follower,
38}
39
40impl NodeRole {
41 pub fn from_env() -> Self {
47 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48 match role.to_lowercase().as_str() {
49 "follower" => return NodeRole::Follower,
50 "leader" => return NodeRole::Leader,
51 other => {
52 tracing::warn!(
53 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54 other
55 );
56 return NodeRole::Leader;
57 }
58 }
59 }
60 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61 && (read_only == "true" || read_only == "1")
62 {
63 return NodeRole::Follower;
64 }
65 NodeRole::Leader
66 }
67
68 pub fn is_follower(self) -> bool {
69 self == NodeRole::Follower
70 }
71
72 fn to_u8(self) -> u8 {
73 match self {
74 NodeRole::Leader => 0,
75 NodeRole::Follower => 1,
76 }
77 }
78
79 fn from_u8(v: u8) -> Self {
80 match v {
81 1 => NodeRole::Follower,
82 _ => NodeRole::Leader,
83 }
84 }
85}
86
87impl std::fmt::Display for NodeRole {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 NodeRole::Leader => write!(f, "leader"),
91 NodeRole::Follower => write!(f, "follower"),
92 }
93 }
94}
95
96#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104 pub fn new(role: NodeRole) -> Self {
105 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106 }
107
108 pub fn load(&self) -> NodeRole {
109 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110 }
111
112 pub fn store(&self, role: NodeRole) {
113 self.0
114 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115 }
116}
117
118#[derive(Clone)]
120pub struct AppState {
121 pub store: Arc<EventStore>,
122 pub auth_manager: Arc<AuthManager>,
123 pub tenant_repo: Arc<dyn TenantRepository>,
124 pub service_container: ServiceContainer,
126 pub role: AtomicNodeRole,
128 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131 pub wal_receiver: Option<Arc<WalReceiver>>,
133 pub replication_port: u16,
135 pub cluster_manager: Option<Arc<ClusterManager>>,
137 pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144 fn from_ref(state: &AppState) -> Self {
145 state.store.clone()
146 }
147}
148
149pub async fn serve_v1(
150 store: Arc<EventStore>,
151 auth_manager: Arc<AuthManager>,
152 tenant_repo: Arc<dyn TenantRepository>,
153 rate_limiter: Arc<RateLimiter>,
154 service_container: ServiceContainer,
155 addr: &str,
156 role: NodeRole,
157 wal_shipper: Option<Arc<WalShipper>>,
158 wal_receiver: Option<Arc<WalReceiver>>,
159 replication_port: u16,
160 cluster_manager: Option<Arc<ClusterManager>>,
161 geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163 let app_state = AppState {
164 store,
165 auth_manager: auth_manager.clone(),
166 tenant_repo,
167 service_container,
168 role: AtomicNodeRole::new(role),
169 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170 wal_receiver,
171 replication_port,
172 cluster_manager,
173 geo_replication,
174 };
175
176 let auth_state = AuthState {
177 auth_manager: auth_manager.clone(),
178 };
179
180 let rate_limit_state = RateLimitState { rate_limiter };
181
182 let app = Router::new()
183 .route("/health", get(health_v1))
185 .route("/metrics", get(super::api::prometheus_metrics))
186 .route("/api/v1/auth/register", post(register_handler))
188 .route("/api/v1/auth/login", post(login_handler))
189 .route("/api/v1/auth/me", get(me_handler))
190 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193 .route("/api/v1/auth/users", get(list_users_handler))
194 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195 .route("/api/v1/tenants", post(create_tenant_handler))
197 .route("/api/v1/tenants", get(list_tenants_handler))
198 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202 .route(
203 "/api/v1/tenants/{id}/deactivate",
204 post(deactivate_tenant_handler),
205 )
206 .route(
207 "/api/v1/tenants/{id}/activate",
208 post(activate_tenant_handler),
209 )
210 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211 .route("/api/v1/audit/events", post(log_audit_event))
213 .route("/api/v1/audit/events", get(query_audit_events))
214 .route("/api/v1/config", get(list_configs))
216 .route("/api/v1/config", post(set_config))
217 .route("/api/v1/config/{key}", get(get_config))
218 .route("/api/v1/config/{key}", put(update_config))
219 .route("/api/v1/config/{key}", delete(delete_config))
220 .route(
222 "/api/v1/demo/seed",
223 post(super::demo_api::demo_seed_handler),
224 )
225 .route("/api/v1/events", post(super::api::ingest_event_v1))
227 .route(
228 "/api/v1/events/batch",
229 post(super::api::ingest_events_batch_v1),
230 )
231 .route("/api/v1/events/query", get(super::api::query_events))
232 .route(
233 "/api/v1/events/{event_id}",
234 get(super::api::get_event_by_id),
235 )
236 .route("/api/v1/events/stream", get(super::api::events_websocket))
237 .route("/api/v1/entities", get(super::api::list_entities))
238 .route(
239 "/api/v1/entities/duplicates",
240 get(super::api::detect_duplicates),
241 )
242 .route(
243 "/api/v1/entities/{entity_id}/state",
244 get(super::api::get_entity_state),
245 )
246 .route(
247 "/api/v1/entities/{entity_id}/snapshot",
248 get(super::api::get_entity_snapshot),
249 )
250 .route("/api/v1/stats", get(super::api::get_stats))
251 .route("/api/v1/streams", get(super::api::list_streams))
253 .route("/api/v1/event-types", get(super::api::list_event_types))
254 .route(
256 "/api/v1/analytics/frequency",
257 get(super::api::analytics_frequency),
258 )
259 .route(
260 "/api/v1/analytics/summary",
261 get(super::api::analytics_summary),
262 )
263 .route(
264 "/api/v1/analytics/correlation",
265 get(super::api::analytics_correlation),
266 )
267 .route("/api/v1/snapshots", post(super::api::create_snapshot))
269 .route("/api/v1/snapshots", get(super::api::list_snapshots))
270 .route(
271 "/api/v1/snapshots/{entity_id}/latest",
272 get(super::api::get_latest_snapshot),
273 )
274 .route(
276 "/api/v1/compaction/trigger",
277 post(super::api::trigger_compaction),
278 )
279 .route(
280 "/api/v1/compaction/stats",
281 get(super::api::compaction_stats),
282 )
283 .route("/api/v1/schemas", post(super::api::register_schema))
285 .route("/api/v1/schemas", get(super::api::list_subjects))
286 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287 .route(
288 "/api/v1/schemas/{subject}/versions",
289 get(super::api::list_schema_versions),
290 )
291 .route(
292 "/api/v1/schemas/validate",
293 post(super::api::validate_event_schema),
294 )
295 .route(
296 "/api/v1/schemas/{subject}/compatibility",
297 put(super::api::set_compatibility_mode),
298 )
299 .route("/api/v1/replay", post(super::api::start_replay))
301 .route("/api/v1/replay", get(super::api::list_replays))
302 .route(
303 "/api/v1/replay/{replay_id}",
304 get(super::api::get_replay_progress),
305 )
306 .route(
307 "/api/v1/replay/{replay_id}/cancel",
308 post(super::api::cancel_replay),
309 )
310 .route(
311 "/api/v1/replay/{replay_id}",
312 delete(super::api::delete_replay),
313 )
314 .route("/api/v1/pipelines", post(super::api::register_pipeline))
316 .route("/api/v1/pipelines", get(super::api::list_pipelines))
317 .route(
318 "/api/v1/pipelines/stats",
319 get(super::api::all_pipeline_stats),
320 )
321 .route(
322 "/api/v1/pipelines/{pipeline_id}",
323 get(super::api::get_pipeline),
324 )
325 .route(
326 "/api/v1/pipelines/{pipeline_id}",
327 delete(super::api::remove_pipeline),
328 )
329 .route(
330 "/api/v1/pipelines/{pipeline_id}/stats",
331 get(super::api::get_pipeline_stats),
332 )
333 .route(
334 "/api/v1/pipelines/{pipeline_id}/reset",
335 put(super::api::reset_pipeline),
336 )
337 .route("/api/v1/projections", get(super::api::list_projections))
339 .route(
340 "/api/v1/projections/{name}",
341 get(super::api::get_projection),
342 )
343 .route(
344 "/api/v1/projections/{name}",
345 delete(super::api::delete_projection),
346 )
347 .route(
348 "/api/v1/projections/{name}/state",
349 get(super::api::get_projection_state_summary),
350 )
351 .route(
352 "/api/v1/projections/{name}/reset",
353 post(super::api::reset_projection),
354 )
355 .route(
356 "/api/v1/projections/{name}/pause",
357 post(super::api::pause_projection),
358 )
359 .route(
360 "/api/v1/projections/{name}/start",
361 post(super::api::start_projection),
362 )
363 .route(
364 "/api/v1/projections/{name}/{entity_id}/state",
365 get(super::api::get_projection_state),
366 )
367 .route(
368 "/api/v1/projections/{name}/{entity_id}/state",
369 post(super::api::save_projection_state),
370 )
371 .route(
372 "/api/v1/projections/{name}/{entity_id}/state",
373 put(super::api::save_projection_state),
374 )
375 .route(
376 "/api/v1/projections/{name}/bulk",
377 post(super::api::bulk_get_projection_states),
378 )
379 .route(
380 "/api/v1/projections/{name}/bulk/save",
381 post(super::api::bulk_save_projection_states),
382 )
383 .route("/api/v1/webhooks", post(super::api::register_webhook))
385 .route("/api/v1/webhooks", get(super::api::list_webhooks))
386 .route(
387 "/api/v1/webhooks/{webhook_id}",
388 get(super::api::get_webhook),
389 )
390 .route(
391 "/api/v1/webhooks/{webhook_id}",
392 put(super::api::update_webhook),
393 )
394 .route(
395 "/api/v1/webhooks/{webhook_id}",
396 delete(super::api::delete_webhook),
397 )
398 .route(
399 "/api/v1/webhooks/{webhook_id}/deliveries",
400 get(super::api::list_webhook_deliveries),
401 )
402 .route("/api/v1/consumers", post(super::api::register_consumer))
404 .route(
405 "/api/v1/consumers/{consumer_id}",
406 get(super::api::get_consumer),
407 )
408 .route(
409 "/api/v1/consumers/{consumer_id}/events",
410 get(super::api::poll_consumer_events),
411 )
412 .route(
413 "/api/v1/consumers/{consumer_id}/ack",
414 post(super::api::ack_consumer),
415 )
416 .route("/api/v1/cluster/status", get(cluster_status_handler))
418 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
419 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
420 .route(
421 "/api/v1/cluster/members/{node_id}",
422 delete(cluster_remove_member_handler),
423 )
424 .route(
425 "/api/v1/cluster/members/{node_id}/heartbeat",
426 post(cluster_heartbeat_handler),
427 )
428 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
429 .route("/api/v1/cluster/election", post(cluster_election_handler))
430 .route(
431 "/api/v1/cluster/partitions",
432 get(cluster_partitions_handler),
433 )
434 .route("/api/v1/graphql", post(super::api::graphql_query))
436 .route("/api/v1/geospatial/query", post(super::api::geo_query))
437 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
438 .route(
439 "/api/v1/exactly-once/stats",
440 get(super::api::exactly_once_stats),
441 )
442 .route(
443 "/api/v1/schema-evolution/history/{event_type}",
444 get(super::api::schema_evolution_history),
445 )
446 .route(
447 "/api/v1/schema-evolution/schema/{event_type}",
448 get(super::api::schema_evolution_schema),
449 )
450 .route(
451 "/api/v1/schema-evolution/stats",
452 get(super::api::schema_evolution_stats),
453 )
454 .route("/api/v1/geo/status", get(geo_status_handler))
456 .route("/api/v1/geo/sync", post(geo_sync_handler))
457 .route("/api/v1/geo/peers", get(geo_peers_handler))
458 .route("/api/v1/geo/failover", post(geo_failover_handler))
459 .route("/internal/promote", post(promote_handler))
461 .route("/internal/repoint", post(repoint_handler));
462
463 #[cfg(feature = "embedded-sync")]
465 let app = app
466 .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
467 .route("/api/v1/sync/push", post(super::api::sync_push_handler));
468
469 let app = app
470 .with_state(app_state.clone())
471 .layer(middleware::from_fn_with_state(
474 app_state,
475 read_only_middleware,
476 ))
477 .layer(middleware::from_fn_with_state(
478 rate_limit_state,
479 rate_limit_middleware,
480 ))
481 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
482 .layer(
483 CorsLayer::new()
484 .allow_origin(Any)
485 .allow_methods(Any)
486 .allow_headers(Any),
487 )
488 .layer(TraceLayer::new_for_http());
489
490 let listener = tokio::net::TcpListener::bind(addr).await?;
491
492 axum::serve(listener, app)
494 .with_graceful_shutdown(shutdown_signal())
495 .await?;
496
497 tracing::info!("π AllSource Core shutdown complete");
498 Ok(())
499}
500
501const WRITE_PATHS: &[&str] = &[
503 "/api/v1/events",
504 "/api/v1/events/batch",
505 "/api/v1/snapshots",
506 "/api/v1/projections/",
507 "/api/v1/schemas",
508 "/api/v1/replay",
509 "/api/v1/pipelines",
510 "/api/v1/compaction/trigger",
511 "/api/v1/audit/events",
512 "/api/v1/config",
513 "/api/v1/webhooks",
514 "/api/v1/demo/seed",
515];
516
517fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
519 use axum::http::Method;
520 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
522 return false;
523 }
524 WRITE_PATHS
525 .iter()
526 .any(|write_path| path.starts_with(write_path))
527}
528
529fn is_internal_request(path: &str) -> bool {
531 path.starts_with("/internal/")
532 || path.starts_with("/api/v1/cluster/")
533 || path.starts_with("/api/v1/geo/")
534}
535
536async fn read_only_middleware(
542 State(state): State<AppState>,
543 request: axum::extract::Request,
544 next: axum::middleware::Next,
545) -> axum::response::Response {
546 let path = request.uri().path();
547 if state.role.load().is_follower()
548 && is_write_request(request.method(), path)
549 && !is_internal_request(path)
550 {
551 return (
552 axum::http::StatusCode::CONFLICT,
553 axum::Json(serde_json::json!({
554 "error": "read_only",
555 "message": "This node is a read-only follower"
556 })),
557 )
558 .into_response();
559 }
560 next.run(request).await
561}
562
563async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
568 let has_system_repos = state.service_container.has_system_repositories();
569
570 let system_streams = if has_system_repos {
571 let (tenant_count, config_count, total_events) =
572 if let Some(store) = state.service_container.system_store() {
573 use crate::domain::value_objects::system_stream::SystemDomain;
574 (
575 store.count_stream(SystemDomain::Tenant),
576 store.count_stream(SystemDomain::Config),
577 store.total_events(),
578 )
579 } else {
580 (0, 0, 0)
581 };
582
583 serde_json::json!({
584 "status": "healthy",
585 "mode": "event-sourced",
586 "total_events": total_events,
587 "tenant_events": tenant_count,
588 "config_events": config_count,
589 })
590 } else {
591 serde_json::json!({
592 "status": "disabled",
593 "mode": "in-memory",
594 })
595 };
596
597 let replication = {
598 let shipper_guard = state.wal_shipper.read().await;
599 if let Some(ref shipper) = *shipper_guard {
600 serde_json::to_value(shipper.status()).unwrap_or_default()
601 } else if let Some(ref receiver) = state.wal_receiver {
602 serde_json::to_value(receiver.status()).unwrap_or_default()
603 } else {
604 serde_json::json!(null)
605 }
606 };
607
608 let current_role = state.role.load();
609
610 Json(serde_json::json!({
611 "status": "healthy",
612 "service": "allsource-core",
613 "version": env!("CARGO_PKG_VERSION"),
614 "role": current_role,
615 "system_streams": system_streams,
616 "replication": replication,
617 }))
618}
619
620async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
626 let current_role = state.role.load();
627 if current_role == NodeRole::Leader {
628 return (
629 axum::http::StatusCode::OK,
630 Json(serde_json::json!({
631 "status": "already_leader",
632 "message": "This node is already the leader",
633 })),
634 );
635 }
636
637 tracing::info!("PROMOTE: Switching role from follower to leader");
638
639 state.role.store(NodeRole::Leader);
641
642 if let Some(ref receiver) = state.wal_receiver {
644 receiver.shutdown();
645 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
646 }
647
648 let replication_port = state.replication_port;
650 let (mut shipper, tx) = WalShipper::new();
651 state.store.enable_wal_replication(tx);
652 shipper.set_store(Arc::clone(&state.store));
653 shipper.set_metrics(state.store.metrics());
654 let shipper = Arc::new(shipper);
655
656 {
658 let mut shipper_guard = state.wal_shipper.write().await;
659 *shipper_guard = Some(Arc::clone(&shipper));
660 }
661
662 let shipper_clone = Arc::clone(&shipper);
664 tokio::spawn(async move {
665 if let Err(e) = shipper_clone.serve(replication_port).await {
666 tracing::error!("Promoted WAL shipper error: {}", e);
667 }
668 });
669
670 tracing::info!(
671 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
672 replication_port,
673 );
674
675 (
676 axum::http::StatusCode::OK,
677 Json(serde_json::json!({
678 "status": "promoted",
679 "role": "leader",
680 "replication_port": replication_port,
681 })),
682 )
683}
684
685async fn repoint_handler(
690 State(state): State<AppState>,
691 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
692) -> impl IntoResponse {
693 let current_role = state.role.load();
694 if current_role != NodeRole::Follower {
695 return (
696 axum::http::StatusCode::CONFLICT,
697 Json(serde_json::json!({
698 "error": "not_follower",
699 "message": "Repoint only applies to follower nodes",
700 })),
701 );
702 }
703
704 let new_leader = match params.get("leader") {
705 Some(l) if !l.is_empty() => l.clone(),
706 _ => {
707 return (
708 axum::http::StatusCode::BAD_REQUEST,
709 Json(serde_json::json!({
710 "error": "missing_leader",
711 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
712 })),
713 );
714 }
715 };
716
717 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
718
719 if let Some(ref receiver) = state.wal_receiver {
720 receiver.repoint(&new_leader);
721 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
722 } else {
723 tracing::warn!("REPOINT: No WAL receiver to repoint");
724 }
725
726 (
727 axum::http::StatusCode::OK,
728 Json(serde_json::json!({
729 "status": "repointed",
730 "new_leader": new_leader,
731 })),
732 )
733}
734
735async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
741 let Some(ref cm) = state.cluster_manager else {
742 return (
743 axum::http::StatusCode::SERVICE_UNAVAILABLE,
744 Json(serde_json::json!({
745 "error": "cluster_not_enabled",
746 "message": "Cluster mode is not enabled on this node"
747 })),
748 );
749 };
750
751 let status = cm.status().await;
752 (
753 axum::http::StatusCode::OK,
754 Json(serde_json::to_value(status).unwrap()),
755 )
756}
757
758async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
760 let Some(ref cm) = state.cluster_manager else {
761 return (
762 axum::http::StatusCode::SERVICE_UNAVAILABLE,
763 Json(serde_json::json!({
764 "error": "cluster_not_enabled",
765 "message": "Cluster mode is not enabled on this node"
766 })),
767 );
768 };
769
770 let members = cm.all_members();
771 (
772 axum::http::StatusCode::OK,
773 Json(serde_json::json!({
774 "members": members,
775 "count": members.len(),
776 })),
777 )
778}
779
780async fn cluster_add_member_handler(
782 State(state): State<AppState>,
783 Json(member): Json<ClusterMember>,
784) -> impl IntoResponse {
785 let Some(ref cm) = state.cluster_manager else {
786 return (
787 axum::http::StatusCode::SERVICE_UNAVAILABLE,
788 Json(serde_json::json!({
789 "error": "cluster_not_enabled",
790 "message": "Cluster mode is not enabled on this node"
791 })),
792 );
793 };
794
795 let node_id = member.node_id;
796 cm.add_member(member).await;
797
798 tracing::info!("Cluster member {} added", node_id);
799 (
800 axum::http::StatusCode::OK,
801 Json(serde_json::json!({
802 "status": "added",
803 "node_id": node_id,
804 })),
805 )
806}
807
808async fn cluster_remove_member_handler(
810 State(state): State<AppState>,
811 Path(node_id): Path<u32>,
812) -> impl IntoResponse {
813 let Some(ref cm) = state.cluster_manager else {
814 return (
815 axum::http::StatusCode::SERVICE_UNAVAILABLE,
816 Json(serde_json::json!({
817 "error": "cluster_not_enabled",
818 "message": "Cluster mode is not enabled on this node"
819 })),
820 );
821 };
822
823 match cm.remove_member(node_id).await {
824 Some(_) => {
825 tracing::info!("Cluster member {} removed", node_id);
826 (
827 axum::http::StatusCode::OK,
828 Json(serde_json::json!({
829 "status": "removed",
830 "node_id": node_id,
831 })),
832 )
833 }
834 None => (
835 axum::http::StatusCode::NOT_FOUND,
836 Json(serde_json::json!({
837 "error": "not_found",
838 "message": format!("Node {} not found in cluster", node_id),
839 })),
840 ),
841 }
842}
843
844#[derive(serde::Deserialize)]
846struct HeartbeatRequest {
847 wal_offset: u64,
848 #[serde(default = "default_true")]
849 healthy: bool,
850}
851
852fn default_true() -> bool {
853 true
854}
855
856async fn cluster_heartbeat_handler(
857 State(state): State<AppState>,
858 Path(node_id): Path<u32>,
859 Json(req): Json<HeartbeatRequest>,
860) -> impl IntoResponse {
861 let Some(ref cm) = state.cluster_manager else {
862 return (
863 axum::http::StatusCode::SERVICE_UNAVAILABLE,
864 Json(serde_json::json!({
865 "error": "cluster_not_enabled",
866 "message": "Cluster mode is not enabled on this node"
867 })),
868 );
869 };
870
871 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
872 (
873 axum::http::StatusCode::OK,
874 Json(serde_json::json!({
875 "status": "updated",
876 "node_id": node_id,
877 })),
878 )
879}
880
881async fn cluster_vote_handler(
883 State(state): State<AppState>,
884 Json(request): Json<VoteRequest>,
885) -> impl IntoResponse {
886 let Some(ref cm) = state.cluster_manager else {
887 return (
888 axum::http::StatusCode::SERVICE_UNAVAILABLE,
889 Json(serde_json::json!({
890 "error": "cluster_not_enabled",
891 "message": "Cluster mode is not enabled on this node"
892 })),
893 );
894 };
895
896 let response = cm.handle_vote_request(&request).await;
897 (
898 axum::http::StatusCode::OK,
899 Json(serde_json::to_value(response).unwrap()),
900 )
901}
902
903async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
905 let Some(ref cm) = state.cluster_manager else {
906 return (
907 axum::http::StatusCode::SERVICE_UNAVAILABLE,
908 Json(serde_json::json!({
909 "error": "cluster_not_enabled",
910 "message": "Cluster mode is not enabled on this node"
911 })),
912 );
913 };
914
915 let candidate = cm.select_leader_candidate();
917
918 match candidate {
919 Some(candidate_id) => {
920 let new_term = cm.start_election().await;
921 tracing::info!(
922 "Cluster election started: term={}, candidate={}",
923 new_term,
924 candidate_id,
925 );
926
927 if candidate_id == cm.self_id() {
930 cm.become_leader(new_term).await;
931 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
932 }
933
934 (
935 axum::http::StatusCode::OK,
936 Json(serde_json::json!({
937 "status": "election_started",
938 "term": new_term,
939 "candidate_id": candidate_id,
940 "self_is_leader": candidate_id == cm.self_id(),
941 })),
942 )
943 }
944 None => (
945 axum::http::StatusCode::CONFLICT,
946 Json(serde_json::json!({
947 "error": "no_candidates",
948 "message": "No healthy members available for leader election",
949 })),
950 ),
951 }
952}
953
954async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
956 let Some(ref cm) = state.cluster_manager else {
957 return (
958 axum::http::StatusCode::SERVICE_UNAVAILABLE,
959 Json(serde_json::json!({
960 "error": "cluster_not_enabled",
961 "message": "Cluster mode is not enabled on this node"
962 })),
963 );
964 };
965
966 let registry = cm.registry();
967 let distribution = registry.partition_distribution();
968 let total_partitions: usize = distribution.values().map(std::vec::Vec::len).sum();
969
970 (
971 axum::http::StatusCode::OK,
972 Json(serde_json::json!({
973 "total_partitions": total_partitions,
974 "node_count": registry.node_count(),
975 "healthy_node_count": registry.healthy_node_count(),
976 "distribution": distribution,
977 })),
978 )
979}
980
981async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
987 let Some(ref geo) = state.geo_replication else {
988 return (
989 axum::http::StatusCode::SERVICE_UNAVAILABLE,
990 Json(serde_json::json!({
991 "error": "geo_replication_not_enabled",
992 "message": "Geo-replication is not enabled on this node"
993 })),
994 );
995 };
996
997 let status = geo.status();
998 (
999 axum::http::StatusCode::OK,
1000 Json(serde_json::to_value(status).unwrap()),
1001 )
1002}
1003
1004async fn geo_sync_handler(
1006 State(state): State<AppState>,
1007 Json(request): Json<GeoSyncRequest>,
1008) -> impl IntoResponse {
1009 let Some(ref geo) = state.geo_replication else {
1010 return (
1011 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1012 Json(serde_json::json!({
1013 "error": "geo_replication_not_enabled",
1014 "message": "Geo-replication is not enabled on this node"
1015 })),
1016 );
1017 };
1018
1019 tracing::info!(
1020 "Geo-sync received from region '{}': {} events",
1021 request.source_region,
1022 request.events.len(),
1023 );
1024
1025 let response = geo.receive_sync(&request);
1026 (
1027 axum::http::StatusCode::OK,
1028 Json(serde_json::to_value(response).unwrap()),
1029 )
1030}
1031
1032async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1034 let Some(ref geo) = state.geo_replication else {
1035 return (
1036 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1037 Json(serde_json::json!({
1038 "error": "geo_replication_not_enabled",
1039 "message": "Geo-replication is not enabled on this node"
1040 })),
1041 );
1042 };
1043
1044 let status = geo.status();
1045 (
1046 axum::http::StatusCode::OK,
1047 Json(serde_json::json!({
1048 "region_id": status.region_id,
1049 "peers": status.peers,
1050 })),
1051 )
1052}
1053
1054async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1056 let Some(ref geo) = state.geo_replication else {
1057 return (
1058 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1059 Json(serde_json::json!({
1060 "error": "geo_replication_not_enabled",
1061 "message": "Geo-replication is not enabled on this node"
1062 })),
1063 );
1064 };
1065
1066 match geo.select_failover_region() {
1067 Some(failover_region) => {
1068 tracing::info!(
1069 "Geo-failover: selected region '{}' as failover target",
1070 failover_region,
1071 );
1072 (
1073 axum::http::StatusCode::OK,
1074 Json(serde_json::json!({
1075 "status": "failover_target_selected",
1076 "failover_region": failover_region,
1077 "message": "Region selected for failover. DNS/routing update required externally.",
1078 })),
1079 )
1080 }
1081 None => (
1082 axum::http::StatusCode::CONFLICT,
1083 Json(serde_json::json!({
1084 "error": "no_healthy_peers",
1085 "message": "No healthy peer regions available for failover",
1086 })),
1087 ),
1088 }
1089}
1090
1091async fn shutdown_signal() {
1093 let ctrl_c = async {
1094 tokio::signal::ctrl_c()
1095 .await
1096 .expect("failed to install Ctrl+C handler");
1097 };
1098
1099 #[cfg(unix)]
1100 let terminate = async {
1101 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1102 .expect("failed to install SIGTERM handler")
1103 .recv()
1104 .await;
1105 };
1106
1107 #[cfg(not(unix))]
1108 let terminate = std::future::pending::<()>();
1109
1110 tokio::select! {
1111 () = ctrl_c => {
1112 tracing::info!("π€ Received Ctrl+C, initiating graceful shutdown...");
1113 }
1114 () = terminate => {
1115 tracing::info!("π€ Received SIGTERM, initiating graceful shutdown...");
1116 }
1117 }
1118}