1use crate::infrastructure::di::ServiceContainer;
3#[cfg(feature = "multi-tenant")]
4use crate::infrastructure::web::tenant_api::*;
5use crate::{
6 domain::repositories::TenantRepository,
7 infrastructure::{
8 cluster::{
9 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
10 },
11 replication::{WalReceiver, WalShipper},
12 security::{
13 auth::AuthManager,
14 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
15 rate_limit::RateLimiter,
16 },
17 web::{audit_api::*, auth_api::*, config_api::*},
18 },
19 store::EventStore,
20};
21use axum::{
22 Json, Router,
23 extract::{Path, State},
24 middleware,
25 response::IntoResponse,
26 routing::{delete, get, post, put},
27};
28use std::sync::Arc;
29use tower_http::{
30 cors::{Any, CorsLayer},
31 trace::TraceLayer,
32};
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
36#[serde(rename_all = "lowercase")]
37pub enum NodeRole {
38 Leader,
39 Follower,
40}
41
42impl NodeRole {
43 pub fn from_env() -> Self {
49 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
50 match role.to_lowercase().as_str() {
51 "follower" => return NodeRole::Follower,
52 "leader" => return NodeRole::Leader,
53 other => {
54 tracing::warn!(
55 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
56 other
57 );
58 return NodeRole::Leader;
59 }
60 }
61 }
62 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
63 && (read_only == "true" || read_only == "1")
64 {
65 return NodeRole::Follower;
66 }
67 NodeRole::Leader
68 }
69
70 pub fn is_follower(self) -> bool {
71 self == NodeRole::Follower
72 }
73
74 fn to_u8(self) -> u8 {
75 match self {
76 NodeRole::Leader => 0,
77 NodeRole::Follower => 1,
78 }
79 }
80
81 fn from_u8(v: u8) -> Self {
82 match v {
83 1 => NodeRole::Follower,
84 _ => NodeRole::Leader,
85 }
86 }
87}
88
89impl std::fmt::Display for NodeRole {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self {
92 NodeRole::Leader => write!(f, "leader"),
93 NodeRole::Follower => write!(f, "follower"),
94 }
95 }
96}
97
98#[derive(Clone)]
103pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
104
105impl AtomicNodeRole {
106 pub fn new(role: NodeRole) -> Self {
107 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
108 }
109
110 pub fn load(&self) -> NodeRole {
111 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
112 }
113
114 pub fn store(&self, role: NodeRole) {
115 self.0
116 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
117 }
118}
119
120#[derive(Clone)]
122pub struct AppState {
123 pub store: Arc<EventStore>,
124 pub auth_manager: Arc<AuthManager>,
125 pub tenant_repo: Arc<dyn TenantRepository>,
126 pub service_container: ServiceContainer,
128 pub role: AtomicNodeRole,
130 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
133 pub wal_receiver: Option<Arc<WalReceiver>>,
135 pub replication_port: u16,
137 pub cluster_manager: Option<Arc<ClusterManager>>,
139 pub geo_replication: Option<Arc<GeoReplicationManager>>,
141}
142
143impl axum::extract::FromRef<AppState> for Arc<EventStore> {
146 fn from_ref(state: &AppState) -> Self {
147 state.store.clone()
148 }
149}
150
151pub async fn serve_v1(
152 store: Arc<EventStore>,
153 auth_manager: Arc<AuthManager>,
154 tenant_repo: Arc<dyn TenantRepository>,
155 rate_limiter: Arc<RateLimiter>,
156 service_container: ServiceContainer,
157 addr: &str,
158 role: NodeRole,
159 wal_shipper: Option<Arc<WalShipper>>,
160 wal_receiver: Option<Arc<WalReceiver>>,
161 replication_port: u16,
162 cluster_manager: Option<Arc<ClusterManager>>,
163 geo_replication: Option<Arc<GeoReplicationManager>>,
164) -> anyhow::Result<()> {
165 let app_state = AppState {
166 store,
167 auth_manager: auth_manager.clone(),
168 tenant_repo,
169 service_container,
170 role: AtomicNodeRole::new(role),
171 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
172 wal_receiver,
173 replication_port,
174 cluster_manager,
175 geo_replication,
176 };
177
178 let auth_state = AuthState {
179 auth_manager: auth_manager.clone(),
180 };
181
182 let rate_limit_state = RateLimitState { rate_limiter };
183
184 let app = Router::new()
185 .route("/health", get(health_v1))
187 .route("/metrics", get(super::api::prometheus_metrics))
188 .route("/api/v1/auth/register", post(register_handler))
190 .route("/api/v1/auth/login", post(login_handler))
191 .route("/api/v1/auth/me", get(me_handler))
192 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
193 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
194 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
195 .route("/api/v1/auth/users", get(list_users_handler))
196 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
197 ;
199 #[cfg(feature = "multi-tenant")]
200 let app = app
201 .route("/api/v1/tenants", post(create_tenant_handler))
202 .route("/api/v1/tenants", get(list_tenants_handler))
203 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
204 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
205 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
206 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
207 .route(
208 "/api/v1/tenants/{id}/deactivate",
209 post(deactivate_tenant_handler),
210 )
211 .route(
212 "/api/v1/tenants/{id}/activate",
213 post(activate_tenant_handler),
214 )
215 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler));
216 let app = app
217 .route("/api/v1/audit/events", post(log_audit_event))
219 .route("/api/v1/audit/events", get(query_audit_events))
220 .route("/api/v1/config", get(list_configs))
222 .route("/api/v1/config", post(set_config))
223 .route("/api/v1/config/{key}", get(get_config))
224 .route("/api/v1/config/{key}", put(update_config))
225 .route("/api/v1/config/{key}", delete(delete_config))
226 .route(
228 "/api/v1/demo/seed",
229 post(super::demo_api::demo_seed_handler),
230 )
231 .route("/api/v1/events", post(super::api::ingest_event_v1))
233 .route(
234 "/api/v1/events/batch",
235 post(super::api::ingest_events_batch_v1),
236 )
237 .route("/api/v1/events/query", get(super::api::query_events))
238 .route(
239 "/api/v1/events/{event_id}",
240 get(super::api::get_event_by_id),
241 )
242 .route("/api/v1/events/stream", get(super::api::events_websocket))
243 .route("/api/v1/entities", get(super::api::list_entities))
244 .route(
245 "/api/v1/entities/duplicates",
246 get(super::api::detect_duplicates),
247 )
248 .route(
249 "/api/v1/entities/{entity_id}/state",
250 get(super::api::get_entity_state),
251 )
252 .route(
253 "/api/v1/entities/{entity_id}/snapshot",
254 get(super::api::get_entity_snapshot),
255 )
256 .route("/api/v1/stats", get(super::api::get_stats))
257 .route("/api/v1/streams", get(super::api::list_streams))
259 .route("/api/v1/event-types", get(super::api::list_event_types))
260 .route(
262 "/api/v1/analytics/frequency",
263 get(super::api::analytics_frequency),
264 )
265 .route(
266 "/api/v1/analytics/summary",
267 get(super::api::analytics_summary),
268 )
269 .route(
270 "/api/v1/analytics/correlation",
271 get(super::api::analytics_correlation),
272 )
273 .route("/api/v1/snapshots", post(super::api::create_snapshot))
275 .route("/api/v1/snapshots", get(super::api::list_snapshots))
276 .route(
277 "/api/v1/snapshots/{entity_id}/latest",
278 get(super::api::get_latest_snapshot),
279 )
280 .route(
282 "/api/v1/compaction/trigger",
283 post(super::api::trigger_compaction),
284 )
285 .route(
286 "/api/v1/compaction/stats",
287 get(super::api::compaction_stats),
288 )
289 .route("/api/v1/schemas", post(super::api::register_schema))
291 .route("/api/v1/schemas", get(super::api::list_subjects))
292 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
293 .route(
294 "/api/v1/schemas/{subject}/versions",
295 get(super::api::list_schema_versions),
296 )
297 .route(
298 "/api/v1/schemas/validate",
299 post(super::api::validate_event_schema),
300 )
301 .route(
302 "/api/v1/schemas/{subject}/compatibility",
303 put(super::api::set_compatibility_mode),
304 )
305 .route("/api/v1/replay", post(super::api::start_replay))
307 .route("/api/v1/replay", get(super::api::list_replays))
308 .route(
309 "/api/v1/replay/{replay_id}",
310 get(super::api::get_replay_progress),
311 )
312 .route(
313 "/api/v1/replay/{replay_id}/cancel",
314 post(super::api::cancel_replay),
315 )
316 .route(
317 "/api/v1/replay/{replay_id}",
318 delete(super::api::delete_replay),
319 )
320 .route("/api/v1/pipelines", post(super::api::register_pipeline))
322 .route("/api/v1/pipelines", get(super::api::list_pipelines))
323 .route(
324 "/api/v1/pipelines/stats",
325 get(super::api::all_pipeline_stats),
326 )
327 .route(
328 "/api/v1/pipelines/{pipeline_id}",
329 get(super::api::get_pipeline),
330 )
331 .route(
332 "/api/v1/pipelines/{pipeline_id}",
333 delete(super::api::remove_pipeline),
334 )
335 .route(
336 "/api/v1/pipelines/{pipeline_id}/stats",
337 get(super::api::get_pipeline_stats),
338 )
339 .route(
340 "/api/v1/pipelines/{pipeline_id}/reset",
341 put(super::api::reset_pipeline),
342 )
343 .route("/api/v1/projections", get(super::api::list_projections))
345 .route(
346 "/api/v1/projections/{name}",
347 get(super::api::get_projection),
348 )
349 .route(
350 "/api/v1/projections/{name}",
351 delete(super::api::delete_projection),
352 )
353 .route(
354 "/api/v1/projections/{name}/state",
355 get(super::api::get_projection_state_summary),
356 )
357 .route(
358 "/api/v1/projections/{name}/reset",
359 post(super::api::reset_projection),
360 )
361 .route(
362 "/api/v1/projections/{name}/pause",
363 post(super::api::pause_projection),
364 )
365 .route(
366 "/api/v1/projections/{name}/start",
367 post(super::api::start_projection),
368 )
369 .route(
370 "/api/v1/projections/{name}/{entity_id}/state",
371 get(super::api::get_projection_state),
372 )
373 .route(
374 "/api/v1/projections/{name}/{entity_id}/state",
375 post(super::api::save_projection_state),
376 )
377 .route(
378 "/api/v1/projections/{name}/{entity_id}/state",
379 put(super::api::save_projection_state),
380 )
381 .route(
382 "/api/v1/projections/{name}/bulk",
383 post(super::api::bulk_get_projection_states),
384 )
385 .route(
386 "/api/v1/projections/{name}/bulk/save",
387 post(super::api::bulk_save_projection_states),
388 )
389 .route("/api/v1/webhooks", post(super::api::register_webhook))
391 .route("/api/v1/webhooks", get(super::api::list_webhooks))
392 .route(
393 "/api/v1/webhooks/{webhook_id}",
394 get(super::api::get_webhook),
395 )
396 .route(
397 "/api/v1/webhooks/{webhook_id}",
398 put(super::api::update_webhook),
399 )
400 .route(
401 "/api/v1/webhooks/{webhook_id}",
402 delete(super::api::delete_webhook),
403 )
404 .route(
405 "/api/v1/webhooks/{webhook_id}/deliveries",
406 get(super::api::list_webhook_deliveries),
407 )
408 .route("/api/v1/consumers", post(super::api::register_consumer))
410 .route(
411 "/api/v1/consumers/{consumer_id}",
412 get(super::api::get_consumer),
413 )
414 .route(
415 "/api/v1/consumers/{consumer_id}/events",
416 get(super::api::poll_consumer_events),
417 )
418 .route(
419 "/api/v1/consumers/{consumer_id}/ack",
420 post(super::api::ack_consumer),
421 )
422 .route("/api/v1/cluster/status", get(cluster_status_handler))
424 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
425 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
426 .route(
427 "/api/v1/cluster/members/{node_id}",
428 delete(cluster_remove_member_handler),
429 )
430 .route(
431 "/api/v1/cluster/members/{node_id}/heartbeat",
432 post(cluster_heartbeat_handler),
433 )
434 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
435 .route("/api/v1/cluster/election", post(cluster_election_handler))
436 .route(
437 "/api/v1/cluster/partitions",
438 get(cluster_partitions_handler),
439 )
440 .route("/api/v1/graphql", post(super::api::graphql_query))
442 .route("/api/v1/geospatial/query", post(super::api::geo_query))
443 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
444 .route(
445 "/api/v1/exactly-once/stats",
446 get(super::api::exactly_once_stats),
447 )
448 .route(
449 "/api/v1/schema-evolution/history/{event_type}",
450 get(super::api::schema_evolution_history),
451 )
452 .route(
453 "/api/v1/schema-evolution/schema/{event_type}",
454 get(super::api::schema_evolution_schema),
455 )
456 .route(
457 "/api/v1/schema-evolution/stats",
458 get(super::api::schema_evolution_stats),
459 )
460 .route("/api/v1/geo/status", get(geo_status_handler))
462 .route("/api/v1/geo/sync", post(geo_sync_handler))
463 .route("/api/v1/geo/peers", get(geo_peers_handler))
464 .route("/api/v1/geo/failover", post(geo_failover_handler));
465 #[cfg(feature = "replication")]
467 let app = app
468 .route("/internal/promote", post(promote_handler))
469 .route("/internal/repoint", post(repoint_handler));
470 let app = app;
471
472 #[cfg(feature = "embedded-sync")]
474 let app = app
475 .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
476 .route("/api/v1/sync/push", post(super::api::sync_push_handler));
477
478 let app = app
479 .with_state(app_state.clone())
480 .layer(middleware::from_fn_with_state(
483 app_state,
484 read_only_middleware,
485 ))
486 .layer(middleware::from_fn_with_state(
487 rate_limit_state,
488 rate_limit_middleware,
489 ))
490 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
491 .layer(
492 CorsLayer::new()
493 .allow_origin(Any)
494 .allow_methods(Any)
495 .allow_headers(Any),
496 )
497 .layer(TraceLayer::new_for_http());
498
499 let listener = tokio::net::TcpListener::bind(addr).await?;
500
501 axum::serve(listener, app)
503 .with_graceful_shutdown(shutdown_signal())
504 .await?;
505
506 tracing::info!("π AllSource Core shutdown complete");
507 Ok(())
508}
509
510const WRITE_PATHS: &[&str] = &[
512 "/api/v1/events",
513 "/api/v1/events/batch",
514 "/api/v1/snapshots",
515 "/api/v1/projections/",
516 "/api/v1/schemas",
517 "/api/v1/replay",
518 "/api/v1/pipelines",
519 "/api/v1/compaction/trigger",
520 "/api/v1/audit/events",
521 "/api/v1/config",
522 "/api/v1/webhooks",
523 "/api/v1/demo/seed",
524];
525
526fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
528 use axum::http::Method;
529 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
531 return false;
532 }
533 WRITE_PATHS
534 .iter()
535 .any(|write_path| path.starts_with(write_path))
536}
537
538fn is_internal_request(path: &str) -> bool {
540 path.starts_with("/internal/")
541 || path.starts_with("/api/v1/cluster/")
542 || path.starts_with("/api/v1/geo/")
543}
544
545async fn read_only_middleware(
551 State(state): State<AppState>,
552 request: axum::extract::Request,
553 next: axum::middleware::Next,
554) -> axum::response::Response {
555 let path = request.uri().path();
556 if state.role.load().is_follower()
557 && is_write_request(request.method(), path)
558 && !is_internal_request(path)
559 {
560 return (
561 axum::http::StatusCode::CONFLICT,
562 axum::Json(serde_json::json!({
563 "error": "read_only",
564 "message": "This node is a read-only follower"
565 })),
566 )
567 .into_response();
568 }
569 next.run(request).await
570}
571
572async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
577 let has_system_repos = state.service_container.has_system_repositories();
578
579 let system_streams = if has_system_repos {
580 let (tenant_count, config_count, total_events) =
581 if let Some(store) = state.service_container.system_store() {
582 use crate::domain::value_objects::system_stream::SystemDomain;
583 (
584 store.count_stream(SystemDomain::Tenant),
585 store.count_stream(SystemDomain::Config),
586 store.total_events(),
587 )
588 } else {
589 (0, 0, 0)
590 };
591
592 serde_json::json!({
593 "status": "healthy",
594 "mode": "event-sourced",
595 "total_events": total_events,
596 "tenant_events": tenant_count,
597 "config_events": config_count,
598 })
599 } else {
600 serde_json::json!({
601 "status": "disabled",
602 "mode": "in-memory",
603 })
604 };
605
606 let replication = {
607 #[cfg(feature = "replication")]
608 {
609 let shipper_guard = state.wal_shipper.read().await;
610 if let Some(ref shipper) = *shipper_guard {
611 serde_json::to_value(shipper.status()).unwrap_or_default()
612 } else if let Some(ref receiver) = state.wal_receiver {
613 serde_json::to_value(receiver.status()).unwrap_or_default()
614 } else {
615 serde_json::json!(null)
616 }
617 }
618 #[cfg(not(feature = "replication"))]
619 serde_json::json!({"edition": "community", "status": "not_available"})
620 };
621
622 let current_role = state.role.load();
623
624 Json(serde_json::json!({
625 "status": "healthy",
626 "service": "allsource-core",
627 "version": env!("CARGO_PKG_VERSION"),
628 "role": current_role,
629 "system_streams": system_streams,
630 "replication": replication,
631 }))
632}
633
634#[cfg(feature = "replication")]
640async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
641 let current_role = state.role.load();
642 if current_role == NodeRole::Leader {
643 return (
644 axum::http::StatusCode::OK,
645 Json(serde_json::json!({
646 "status": "already_leader",
647 "message": "This node is already the leader",
648 })),
649 );
650 }
651
652 tracing::info!("PROMOTE: Switching role from follower to leader");
653
654 state.role.store(NodeRole::Leader);
656
657 if let Some(ref receiver) = state.wal_receiver {
659 receiver.shutdown();
660 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
661 }
662
663 let replication_port = state.replication_port;
665 let (mut shipper, tx) = WalShipper::new();
666 state.store.enable_wal_replication(tx);
667 shipper.set_store(Arc::clone(&state.store));
668 shipper.set_metrics(state.store.metrics());
669 let shipper = Arc::new(shipper);
670
671 {
673 let mut shipper_guard = state.wal_shipper.write().await;
674 *shipper_guard = Some(Arc::clone(&shipper));
675 }
676
677 let shipper_clone = Arc::clone(&shipper);
679 tokio::spawn(async move {
680 if let Err(e) = shipper_clone.serve(replication_port).await {
681 tracing::error!("Promoted WAL shipper error: {}", e);
682 }
683 });
684
685 tracing::info!(
686 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
687 replication_port,
688 );
689
690 (
691 axum::http::StatusCode::OK,
692 Json(serde_json::json!({
693 "status": "promoted",
694 "role": "leader",
695 "replication_port": replication_port,
696 })),
697 )
698}
699
700#[cfg(feature = "replication")]
705async fn repoint_handler(
706 State(state): State<AppState>,
707 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
708) -> impl IntoResponse {
709 let current_role = state.role.load();
710 if current_role != NodeRole::Follower {
711 return (
712 axum::http::StatusCode::CONFLICT,
713 Json(serde_json::json!({
714 "error": "not_follower",
715 "message": "Repoint only applies to follower nodes",
716 })),
717 );
718 }
719
720 let new_leader = match params.get("leader") {
721 Some(l) if !l.is_empty() => l.clone(),
722 _ => {
723 return (
724 axum::http::StatusCode::BAD_REQUEST,
725 Json(serde_json::json!({
726 "error": "missing_leader",
727 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
728 })),
729 );
730 }
731 };
732
733 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
734
735 if let Some(ref receiver) = state.wal_receiver {
736 receiver.repoint(&new_leader);
737 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
738 } else {
739 tracing::warn!("REPOINT: No WAL receiver to repoint");
740 }
741
742 (
743 axum::http::StatusCode::OK,
744 Json(serde_json::json!({
745 "status": "repointed",
746 "new_leader": new_leader,
747 })),
748 )
749}
750
751async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
757 let Some(ref cm) = state.cluster_manager else {
758 return (
759 axum::http::StatusCode::SERVICE_UNAVAILABLE,
760 Json(serde_json::json!({
761 "error": "cluster_not_enabled",
762 "message": "Cluster mode is not enabled on this node"
763 })),
764 );
765 };
766
767 let status = cm.status().await;
768 (
769 axum::http::StatusCode::OK,
770 Json(serde_json::to_value(status).unwrap()),
771 )
772}
773
774async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
776 let Some(ref cm) = state.cluster_manager else {
777 return (
778 axum::http::StatusCode::SERVICE_UNAVAILABLE,
779 Json(serde_json::json!({
780 "error": "cluster_not_enabled",
781 "message": "Cluster mode is not enabled on this node"
782 })),
783 );
784 };
785
786 let members = cm.all_members();
787 (
788 axum::http::StatusCode::OK,
789 Json(serde_json::json!({
790 "members": members,
791 "count": members.len(),
792 })),
793 )
794}
795
796async fn cluster_add_member_handler(
798 State(state): State<AppState>,
799 Json(member): Json<ClusterMember>,
800) -> impl IntoResponse {
801 let Some(ref cm) = state.cluster_manager else {
802 return (
803 axum::http::StatusCode::SERVICE_UNAVAILABLE,
804 Json(serde_json::json!({
805 "error": "cluster_not_enabled",
806 "message": "Cluster mode is not enabled on this node"
807 })),
808 );
809 };
810
811 let node_id = member.node_id;
812 cm.add_member(member).await;
813
814 tracing::info!("Cluster member {} added", node_id);
815 (
816 axum::http::StatusCode::OK,
817 Json(serde_json::json!({
818 "status": "added",
819 "node_id": node_id,
820 })),
821 )
822}
823
824async fn cluster_remove_member_handler(
826 State(state): State<AppState>,
827 Path(node_id): Path<u32>,
828) -> impl IntoResponse {
829 let Some(ref cm) = state.cluster_manager else {
830 return (
831 axum::http::StatusCode::SERVICE_UNAVAILABLE,
832 Json(serde_json::json!({
833 "error": "cluster_not_enabled",
834 "message": "Cluster mode is not enabled on this node"
835 })),
836 );
837 };
838
839 match cm.remove_member(node_id).await {
840 Some(_) => {
841 tracing::info!("Cluster member {} removed", node_id);
842 (
843 axum::http::StatusCode::OK,
844 Json(serde_json::json!({
845 "status": "removed",
846 "node_id": node_id,
847 })),
848 )
849 }
850 None => (
851 axum::http::StatusCode::NOT_FOUND,
852 Json(serde_json::json!({
853 "error": "not_found",
854 "message": format!("Node {} not found in cluster", node_id),
855 })),
856 ),
857 }
858}
859
860#[derive(serde::Deserialize)]
862struct HeartbeatRequest {
863 wal_offset: u64,
864 #[serde(default = "default_true")]
865 healthy: bool,
866}
867
868fn default_true() -> bool {
869 true
870}
871
872async fn cluster_heartbeat_handler(
873 State(state): State<AppState>,
874 Path(node_id): Path<u32>,
875 Json(req): Json<HeartbeatRequest>,
876) -> impl IntoResponse {
877 let Some(ref cm) = state.cluster_manager else {
878 return (
879 axum::http::StatusCode::SERVICE_UNAVAILABLE,
880 Json(serde_json::json!({
881 "error": "cluster_not_enabled",
882 "message": "Cluster mode is not enabled on this node"
883 })),
884 );
885 };
886
887 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
888 (
889 axum::http::StatusCode::OK,
890 Json(serde_json::json!({
891 "status": "updated",
892 "node_id": node_id,
893 })),
894 )
895}
896
897async fn cluster_vote_handler(
899 State(state): State<AppState>,
900 Json(request): Json<VoteRequest>,
901) -> impl IntoResponse {
902 let Some(ref cm) = state.cluster_manager else {
903 return (
904 axum::http::StatusCode::SERVICE_UNAVAILABLE,
905 Json(serde_json::json!({
906 "error": "cluster_not_enabled",
907 "message": "Cluster mode is not enabled on this node"
908 })),
909 );
910 };
911
912 let response = cm.handle_vote_request(&request).await;
913 (
914 axum::http::StatusCode::OK,
915 Json(serde_json::to_value(response).unwrap()),
916 )
917}
918
919async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
921 let Some(ref cm) = state.cluster_manager else {
922 return (
923 axum::http::StatusCode::SERVICE_UNAVAILABLE,
924 Json(serde_json::json!({
925 "error": "cluster_not_enabled",
926 "message": "Cluster mode is not enabled on this node"
927 })),
928 );
929 };
930
931 let candidate = cm.select_leader_candidate();
933
934 match candidate {
935 Some(candidate_id) => {
936 let new_term = cm.start_election().await;
937 tracing::info!(
938 "Cluster election started: term={}, candidate={}",
939 new_term,
940 candidate_id,
941 );
942
943 if candidate_id == cm.self_id() {
946 cm.become_leader(new_term).await;
947 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
948 }
949
950 (
951 axum::http::StatusCode::OK,
952 Json(serde_json::json!({
953 "status": "election_started",
954 "term": new_term,
955 "candidate_id": candidate_id,
956 "self_is_leader": candidate_id == cm.self_id(),
957 })),
958 )
959 }
960 None => (
961 axum::http::StatusCode::CONFLICT,
962 Json(serde_json::json!({
963 "error": "no_candidates",
964 "message": "No healthy members available for leader election",
965 })),
966 ),
967 }
968}
969
970async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
972 let Some(ref cm) = state.cluster_manager else {
973 return (
974 axum::http::StatusCode::SERVICE_UNAVAILABLE,
975 Json(serde_json::json!({
976 "error": "cluster_not_enabled",
977 "message": "Cluster mode is not enabled on this node"
978 })),
979 );
980 };
981
982 let registry = cm.registry();
983 let distribution = registry.partition_distribution();
984 let total_partitions: usize = distribution.values().map(std::vec::Vec::len).sum();
985
986 (
987 axum::http::StatusCode::OK,
988 Json(serde_json::json!({
989 "total_partitions": total_partitions,
990 "node_count": registry.node_count(),
991 "healthy_node_count": registry.healthy_node_count(),
992 "distribution": distribution,
993 })),
994 )
995}
996
997async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
1003 let Some(ref geo) = state.geo_replication else {
1004 return (
1005 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1006 Json(serde_json::json!({
1007 "error": "geo_replication_not_enabled",
1008 "message": "Geo-replication is not enabled on this node"
1009 })),
1010 );
1011 };
1012
1013 let status = geo.status();
1014 (
1015 axum::http::StatusCode::OK,
1016 Json(serde_json::to_value(status).unwrap()),
1017 )
1018}
1019
1020async fn geo_sync_handler(
1022 State(state): State<AppState>,
1023 Json(request): Json<GeoSyncRequest>,
1024) -> impl IntoResponse {
1025 let Some(ref geo) = state.geo_replication else {
1026 return (
1027 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1028 Json(serde_json::json!({
1029 "error": "geo_replication_not_enabled",
1030 "message": "Geo-replication is not enabled on this node"
1031 })),
1032 );
1033 };
1034
1035 tracing::info!(
1036 "Geo-sync received from region '{}': {} events",
1037 request.source_region,
1038 request.events.len(),
1039 );
1040
1041 let response = geo.receive_sync(&request);
1042 (
1043 axum::http::StatusCode::OK,
1044 Json(serde_json::to_value(response).unwrap()),
1045 )
1046}
1047
1048async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1050 let Some(ref geo) = state.geo_replication else {
1051 return (
1052 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1053 Json(serde_json::json!({
1054 "error": "geo_replication_not_enabled",
1055 "message": "Geo-replication is not enabled on this node"
1056 })),
1057 );
1058 };
1059
1060 let status = geo.status();
1061 (
1062 axum::http::StatusCode::OK,
1063 Json(serde_json::json!({
1064 "region_id": status.region_id,
1065 "peers": status.peers,
1066 })),
1067 )
1068}
1069
1070async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1072 let Some(ref geo) = state.geo_replication else {
1073 return (
1074 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1075 Json(serde_json::json!({
1076 "error": "geo_replication_not_enabled",
1077 "message": "Geo-replication is not enabled on this node"
1078 })),
1079 );
1080 };
1081
1082 match geo.select_failover_region() {
1083 Some(failover_region) => {
1084 tracing::info!(
1085 "Geo-failover: selected region '{}' as failover target",
1086 failover_region,
1087 );
1088 (
1089 axum::http::StatusCode::OK,
1090 Json(serde_json::json!({
1091 "status": "failover_target_selected",
1092 "failover_region": failover_region,
1093 "message": "Region selected for failover. DNS/routing update required externally.",
1094 })),
1095 )
1096 }
1097 None => (
1098 axum::http::StatusCode::CONFLICT,
1099 Json(serde_json::json!({
1100 "error": "no_healthy_peers",
1101 "message": "No healthy peer regions available for failover",
1102 })),
1103 ),
1104 }
1105}
1106
1107async fn shutdown_signal() {
1109 let ctrl_c = async {
1110 tokio::signal::ctrl_c()
1111 .await
1112 .expect("failed to install Ctrl+C handler");
1113 };
1114
1115 #[cfg(unix)]
1116 let terminate = async {
1117 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1118 .expect("failed to install SIGTERM handler")
1119 .recv()
1120 .await;
1121 };
1122
1123 #[cfg(not(unix))]
1124 let terminate = std::future::pending::<()>();
1125
1126 tokio::select! {
1127 () = ctrl_c => {
1128 tracing::info!("π€ Received Ctrl+C, initiating graceful shutdown...");
1129 }
1130 () = terminate => {
1131 tracing::info!("π€ Received SIGTERM, initiating graceful shutdown...");
1132 }
1133 }
1134}