1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 application::services::tenant_service::TenantManager,
5 infrastructure::{
6 cluster::{
7 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8 },
9 replication::{WalReceiver, WalShipper},
10 security::{
11 auth::AuthManager,
12 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13 rate_limit::RateLimiter,
14 },
15 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16 },
17 store::EventStore,
18};
19use axum::{
20 Json, Router,
21 extract::{Path, State},
22 middleware,
23 response::IntoResponse,
24 routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28 cors::{Any, CorsLayer},
29 trace::TraceLayer,
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36 Leader,
37 Follower,
38}
39
40impl NodeRole {
41 pub fn from_env() -> Self {
47 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48 match role.to_lowercase().as_str() {
49 "follower" => return NodeRole::Follower,
50 "leader" => return NodeRole::Leader,
51 other => {
52 tracing::warn!(
53 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54 other
55 );
56 return NodeRole::Leader;
57 }
58 }
59 }
60 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61 && (read_only == "true" || read_only == "1")
62 {
63 return NodeRole::Follower;
64 }
65 NodeRole::Leader
66 }
67
68 pub fn is_follower(self) -> bool {
69 self == NodeRole::Follower
70 }
71
72 fn to_u8(self) -> u8 {
73 match self {
74 NodeRole::Leader => 0,
75 NodeRole::Follower => 1,
76 }
77 }
78
79 fn from_u8(v: u8) -> Self {
80 match v {
81 1 => NodeRole::Follower,
82 _ => NodeRole::Leader,
83 }
84 }
85}
86
87impl std::fmt::Display for NodeRole {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 NodeRole::Leader => write!(f, "leader"),
91 NodeRole::Follower => write!(f, "follower"),
92 }
93 }
94}
95
96#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104 pub fn new(role: NodeRole) -> Self {
105 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106 }
107
108 pub fn load(&self) -> NodeRole {
109 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110 }
111
112 pub fn store(&self, role: NodeRole) {
113 self.0
114 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115 }
116}
117
118#[derive(Clone)]
120pub struct AppState {
121 pub store: Arc<EventStore>,
122 pub auth_manager: Arc<AuthManager>,
123 pub tenant_manager: Arc<TenantManager>,
124 pub service_container: ServiceContainer,
126 pub role: AtomicNodeRole,
128 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131 pub wal_receiver: Option<Arc<WalReceiver>>,
133 pub replication_port: u16,
135 pub cluster_manager: Option<Arc<ClusterManager>>,
137 pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144 fn from_ref(state: &AppState) -> Self {
145 state.store.clone()
146 }
147}
148
149pub async fn serve_v1(
150 store: Arc<EventStore>,
151 auth_manager: Arc<AuthManager>,
152 tenant_manager: Arc<TenantManager>,
153 rate_limiter: Arc<RateLimiter>,
154 service_container: ServiceContainer,
155 addr: &str,
156 role: NodeRole,
157 wal_shipper: Option<Arc<WalShipper>>,
158 wal_receiver: Option<Arc<WalReceiver>>,
159 replication_port: u16,
160 cluster_manager: Option<Arc<ClusterManager>>,
161 geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163 let app_state = AppState {
164 store,
165 auth_manager: auth_manager.clone(),
166 tenant_manager,
167 service_container,
168 role: AtomicNodeRole::new(role),
169 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170 wal_receiver,
171 replication_port,
172 cluster_manager,
173 geo_replication,
174 };
175
176 let auth_state = AuthState {
177 auth_manager: auth_manager.clone(),
178 };
179
180 let rate_limit_state = RateLimitState { rate_limiter };
181
182 let app = Router::new()
183 .route("/health", get(health_v1))
185 .route("/metrics", get(super::api::prometheus_metrics))
186 .route("/api/v1/auth/register", post(register_handler))
188 .route("/api/v1/auth/login", post(login_handler))
189 .route("/api/v1/auth/me", get(me_handler))
190 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193 .route("/api/v1/auth/users", get(list_users_handler))
194 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195 .route("/api/v1/tenants", post(create_tenant_handler))
197 .route("/api/v1/tenants", get(list_tenants_handler))
198 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202 .route(
203 "/api/v1/tenants/{id}/deactivate",
204 post(deactivate_tenant_handler),
205 )
206 .route(
207 "/api/v1/tenants/{id}/activate",
208 post(activate_tenant_handler),
209 )
210 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211 .route("/api/v1/audit/events", post(log_audit_event))
213 .route("/api/v1/audit/events", get(query_audit_events))
214 .route("/api/v1/config", get(list_configs))
216 .route("/api/v1/config", post(set_config))
217 .route("/api/v1/config/{key}", get(get_config))
218 .route("/api/v1/config/{key}", put(update_config))
219 .route("/api/v1/config/{key}", delete(delete_config))
220 .route(
222 "/api/v1/demo/seed",
223 post(super::demo_api::demo_seed_handler),
224 )
225 .route("/api/v1/events", post(super::api::ingest_event_v1))
227 .route(
228 "/api/v1/events/batch",
229 post(super::api::ingest_events_batch_v1),
230 )
231 .route("/api/v1/events/query", get(super::api::query_events))
232 .route(
233 "/api/v1/events/{event_id}",
234 get(super::api::get_event_by_id),
235 )
236 .route("/api/v1/events/stream", get(super::api::events_websocket))
237 .route("/api/v1/entities", get(super::api::list_entities))
238 .route(
239 "/api/v1/entities/duplicates",
240 get(super::api::detect_duplicates),
241 )
242 .route(
243 "/api/v1/entities/{entity_id}/state",
244 get(super::api::get_entity_state),
245 )
246 .route(
247 "/api/v1/entities/{entity_id}/snapshot",
248 get(super::api::get_entity_snapshot),
249 )
250 .route("/api/v1/stats", get(super::api::get_stats))
251 .route("/api/v1/streams", get(super::api::list_streams))
253 .route("/api/v1/event-types", get(super::api::list_event_types))
254 .route(
256 "/api/v1/analytics/frequency",
257 get(super::api::analytics_frequency),
258 )
259 .route(
260 "/api/v1/analytics/summary",
261 get(super::api::analytics_summary),
262 )
263 .route(
264 "/api/v1/analytics/correlation",
265 get(super::api::analytics_correlation),
266 )
267 .route("/api/v1/snapshots", post(super::api::create_snapshot))
269 .route("/api/v1/snapshots", get(super::api::list_snapshots))
270 .route(
271 "/api/v1/snapshots/{entity_id}/latest",
272 get(super::api::get_latest_snapshot),
273 )
274 .route(
276 "/api/v1/compaction/trigger",
277 post(super::api::trigger_compaction),
278 )
279 .route(
280 "/api/v1/compaction/stats",
281 get(super::api::compaction_stats),
282 )
283 .route("/api/v1/schemas", post(super::api::register_schema))
285 .route("/api/v1/schemas", get(super::api::list_subjects))
286 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287 .route(
288 "/api/v1/schemas/{subject}/versions",
289 get(super::api::list_schema_versions),
290 )
291 .route(
292 "/api/v1/schemas/validate",
293 post(super::api::validate_event_schema),
294 )
295 .route(
296 "/api/v1/schemas/{subject}/compatibility",
297 put(super::api::set_compatibility_mode),
298 )
299 .route("/api/v1/replay", post(super::api::start_replay))
301 .route("/api/v1/replay", get(super::api::list_replays))
302 .route(
303 "/api/v1/replay/{replay_id}",
304 get(super::api::get_replay_progress),
305 )
306 .route(
307 "/api/v1/replay/{replay_id}/cancel",
308 post(super::api::cancel_replay),
309 )
310 .route(
311 "/api/v1/replay/{replay_id}",
312 delete(super::api::delete_replay),
313 )
314 .route("/api/v1/pipelines", post(super::api::register_pipeline))
316 .route("/api/v1/pipelines", get(super::api::list_pipelines))
317 .route(
318 "/api/v1/pipelines/stats",
319 get(super::api::all_pipeline_stats),
320 )
321 .route(
322 "/api/v1/pipelines/{pipeline_id}",
323 get(super::api::get_pipeline),
324 )
325 .route(
326 "/api/v1/pipelines/{pipeline_id}",
327 delete(super::api::remove_pipeline),
328 )
329 .route(
330 "/api/v1/pipelines/{pipeline_id}/stats",
331 get(super::api::get_pipeline_stats),
332 )
333 .route(
334 "/api/v1/pipelines/{pipeline_id}/reset",
335 put(super::api::reset_pipeline),
336 )
337 .route("/api/v1/projections", get(super::api::list_projections))
339 .route(
340 "/api/v1/projections/{name}",
341 get(super::api::get_projection),
342 )
343 .route(
344 "/api/v1/projections/{name}",
345 delete(super::api::delete_projection),
346 )
347 .route(
348 "/api/v1/projections/{name}/state",
349 get(super::api::get_projection_state_summary),
350 )
351 .route(
352 "/api/v1/projections/{name}/reset",
353 post(super::api::reset_projection),
354 )
355 .route(
356 "/api/v1/projections/{name}/pause",
357 post(super::api::pause_projection),
358 )
359 .route(
360 "/api/v1/projections/{name}/start",
361 post(super::api::start_projection),
362 )
363 .route(
364 "/api/v1/projections/{name}/{entity_id}/state",
365 get(super::api::get_projection_state),
366 )
367 .route(
368 "/api/v1/projections/{name}/{entity_id}/state",
369 post(super::api::save_projection_state),
370 )
371 .route(
372 "/api/v1/projections/{name}/{entity_id}/state",
373 put(super::api::save_projection_state),
374 )
375 .route(
376 "/api/v1/projections/{name}/bulk",
377 post(super::api::bulk_get_projection_states),
378 )
379 .route(
380 "/api/v1/projections/{name}/bulk/save",
381 post(super::api::bulk_save_projection_states),
382 )
383 .route("/api/v1/webhooks", post(super::api::register_webhook))
385 .route("/api/v1/webhooks", get(super::api::list_webhooks))
386 .route(
387 "/api/v1/webhooks/{webhook_id}",
388 get(super::api::get_webhook),
389 )
390 .route(
391 "/api/v1/webhooks/{webhook_id}",
392 put(super::api::update_webhook),
393 )
394 .route(
395 "/api/v1/webhooks/{webhook_id}",
396 delete(super::api::delete_webhook),
397 )
398 .route(
399 "/api/v1/webhooks/{webhook_id}/deliveries",
400 get(super::api::list_webhook_deliveries),
401 )
402 .route("/api/v1/cluster/status", get(cluster_status_handler))
404 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
405 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
406 .route(
407 "/api/v1/cluster/members/{node_id}",
408 delete(cluster_remove_member_handler),
409 )
410 .route(
411 "/api/v1/cluster/members/{node_id}/heartbeat",
412 post(cluster_heartbeat_handler),
413 )
414 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
415 .route("/api/v1/cluster/election", post(cluster_election_handler))
416 .route(
417 "/api/v1/cluster/partitions",
418 get(cluster_partitions_handler),
419 )
420 .route("/api/v1/eventql", post(super::api::eventql_query))
422 .route("/api/v1/graphql", post(super::api::graphql_query))
423 .route("/api/v1/geospatial/query", post(super::api::geo_query))
424 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
425 .route(
426 "/api/v1/exactly-once/stats",
427 get(super::api::exactly_once_stats),
428 )
429 .route(
430 "/api/v1/schema-evolution/history/{event_type}",
431 get(super::api::schema_evolution_history),
432 )
433 .route(
434 "/api/v1/schema-evolution/schema/{event_type}",
435 get(super::api::schema_evolution_schema),
436 )
437 .route(
438 "/api/v1/schema-evolution/stats",
439 get(super::api::schema_evolution_stats),
440 )
441 .route("/api/v1/geo/status", get(geo_status_handler))
443 .route("/api/v1/geo/sync", post(geo_sync_handler))
444 .route("/api/v1/geo/peers", get(geo_peers_handler))
445 .route("/api/v1/geo/failover", post(geo_failover_handler))
446 .route("/internal/promote", post(promote_handler))
448 .route("/internal/repoint", post(repoint_handler));
449
450 #[cfg(feature = "embedded-sync")]
452 let app = app
453 .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
454 .route("/api/v1/sync/push", post(super::api::sync_push_handler));
455
456 let app = app
457 .with_state(app_state.clone())
458 .layer(middleware::from_fn_with_state(
461 app_state,
462 read_only_middleware,
463 ))
464 .layer(middleware::from_fn_with_state(
465 rate_limit_state,
466 rate_limit_middleware,
467 ))
468 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
469 .layer(
470 CorsLayer::new()
471 .allow_origin(Any)
472 .allow_methods(Any)
473 .allow_headers(Any),
474 )
475 .layer(TraceLayer::new_for_http());
476
477 let listener = tokio::net::TcpListener::bind(addr).await?;
478
479 axum::serve(listener, app)
481 .with_graceful_shutdown(shutdown_signal())
482 .await?;
483
484 tracing::info!("π AllSource Core shutdown complete");
485 Ok(())
486}
487
488const WRITE_PATHS: &[&str] = &[
490 "/api/v1/events",
491 "/api/v1/events/batch",
492 "/api/v1/snapshots",
493 "/api/v1/projections/",
494 "/api/v1/schemas",
495 "/api/v1/replay",
496 "/api/v1/pipelines",
497 "/api/v1/compaction/trigger",
498 "/api/v1/audit/events",
499 "/api/v1/config",
500 "/api/v1/webhooks",
501 "/api/v1/demo/seed",
502];
503
504fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
506 use axum::http::Method;
507 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
509 return false;
510 }
511 WRITE_PATHS
512 .iter()
513 .any(|write_path| path.starts_with(write_path))
514}
515
516fn is_internal_request(path: &str) -> bool {
518 path.starts_with("/internal/")
519 || path.starts_with("/api/v1/cluster/")
520 || path.starts_with("/api/v1/geo/")
521}
522
523async fn read_only_middleware(
529 State(state): State<AppState>,
530 request: axum::extract::Request,
531 next: axum::middleware::Next,
532) -> axum::response::Response {
533 let path = request.uri().path();
534 if state.role.load().is_follower()
535 && is_write_request(request.method(), path)
536 && !is_internal_request(path)
537 {
538 return (
539 axum::http::StatusCode::CONFLICT,
540 axum::Json(serde_json::json!({
541 "error": "read_only",
542 "message": "This node is a read-only follower"
543 })),
544 )
545 .into_response();
546 }
547 next.run(request).await
548}
549
550async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
555 let has_system_repos = state.service_container.has_system_repositories();
556
557 let system_streams = if has_system_repos {
558 let (tenant_count, config_count, total_events) =
559 if let Some(store) = state.service_container.system_store() {
560 use crate::domain::value_objects::system_stream::SystemDomain;
561 (
562 store.count_stream(SystemDomain::Tenant),
563 store.count_stream(SystemDomain::Config),
564 store.total_events(),
565 )
566 } else {
567 (0, 0, 0)
568 };
569
570 serde_json::json!({
571 "status": "healthy",
572 "mode": "event-sourced",
573 "total_events": total_events,
574 "tenant_events": tenant_count,
575 "config_events": config_count,
576 })
577 } else {
578 serde_json::json!({
579 "status": "disabled",
580 "mode": "in-memory",
581 })
582 };
583
584 let replication = {
585 let shipper_guard = state.wal_shipper.read().await;
586 if let Some(ref shipper) = *shipper_guard {
587 serde_json::to_value(shipper.status()).unwrap_or_default()
588 } else if let Some(ref receiver) = state.wal_receiver {
589 serde_json::to_value(receiver.status()).unwrap_or_default()
590 } else {
591 serde_json::json!(null)
592 }
593 };
594
595 let current_role = state.role.load();
596
597 Json(serde_json::json!({
598 "status": "healthy",
599 "service": "allsource-core",
600 "version": env!("CARGO_PKG_VERSION"),
601 "role": current_role,
602 "system_streams": system_streams,
603 "replication": replication,
604 }))
605}
606
607async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
613 let current_role = state.role.load();
614 if current_role == NodeRole::Leader {
615 return (
616 axum::http::StatusCode::OK,
617 Json(serde_json::json!({
618 "status": "already_leader",
619 "message": "This node is already the leader",
620 })),
621 );
622 }
623
624 tracing::info!("PROMOTE: Switching role from follower to leader");
625
626 state.role.store(NodeRole::Leader);
628
629 if let Some(ref receiver) = state.wal_receiver {
631 receiver.shutdown();
632 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
633 }
634
635 let replication_port = state.replication_port;
637 let (mut shipper, tx) = WalShipper::new();
638 state.store.enable_wal_replication(tx);
639 shipper.set_store(Arc::clone(&state.store));
640 shipper.set_metrics(state.store.metrics());
641 let shipper = Arc::new(shipper);
642
643 {
645 let mut shipper_guard = state.wal_shipper.write().await;
646 *shipper_guard = Some(Arc::clone(&shipper));
647 }
648
649 let shipper_clone = Arc::clone(&shipper);
651 tokio::spawn(async move {
652 if let Err(e) = shipper_clone.serve(replication_port).await {
653 tracing::error!("Promoted WAL shipper error: {}", e);
654 }
655 });
656
657 tracing::info!(
658 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
659 replication_port,
660 );
661
662 (
663 axum::http::StatusCode::OK,
664 Json(serde_json::json!({
665 "status": "promoted",
666 "role": "leader",
667 "replication_port": replication_port,
668 })),
669 )
670}
671
672async fn repoint_handler(
677 State(state): State<AppState>,
678 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
679) -> impl IntoResponse {
680 let current_role = state.role.load();
681 if current_role != NodeRole::Follower {
682 return (
683 axum::http::StatusCode::CONFLICT,
684 Json(serde_json::json!({
685 "error": "not_follower",
686 "message": "Repoint only applies to follower nodes",
687 })),
688 );
689 }
690
691 let new_leader = match params.get("leader") {
692 Some(l) if !l.is_empty() => l.clone(),
693 _ => {
694 return (
695 axum::http::StatusCode::BAD_REQUEST,
696 Json(serde_json::json!({
697 "error": "missing_leader",
698 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
699 })),
700 );
701 }
702 };
703
704 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
705
706 if let Some(ref receiver) = state.wal_receiver {
707 receiver.repoint(&new_leader);
708 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
709 } else {
710 tracing::warn!("REPOINT: No WAL receiver to repoint");
711 }
712
713 (
714 axum::http::StatusCode::OK,
715 Json(serde_json::json!({
716 "status": "repointed",
717 "new_leader": new_leader,
718 })),
719 )
720}
721
722async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
728 let Some(ref cm) = state.cluster_manager else {
729 return (
730 axum::http::StatusCode::SERVICE_UNAVAILABLE,
731 Json(serde_json::json!({
732 "error": "cluster_not_enabled",
733 "message": "Cluster mode is not enabled on this node"
734 })),
735 );
736 };
737
738 let status = cm.status().await;
739 (
740 axum::http::StatusCode::OK,
741 Json(serde_json::to_value(status).unwrap()),
742 )
743}
744
745async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
747 let Some(ref cm) = state.cluster_manager else {
748 return (
749 axum::http::StatusCode::SERVICE_UNAVAILABLE,
750 Json(serde_json::json!({
751 "error": "cluster_not_enabled",
752 "message": "Cluster mode is not enabled on this node"
753 })),
754 );
755 };
756
757 let members = cm.all_members();
758 (
759 axum::http::StatusCode::OK,
760 Json(serde_json::json!({
761 "members": members,
762 "count": members.len(),
763 })),
764 )
765}
766
767async fn cluster_add_member_handler(
769 State(state): State<AppState>,
770 Json(member): Json<ClusterMember>,
771) -> impl IntoResponse {
772 let Some(ref cm) = state.cluster_manager else {
773 return (
774 axum::http::StatusCode::SERVICE_UNAVAILABLE,
775 Json(serde_json::json!({
776 "error": "cluster_not_enabled",
777 "message": "Cluster mode is not enabled on this node"
778 })),
779 );
780 };
781
782 let node_id = member.node_id;
783 cm.add_member(member).await;
784
785 tracing::info!("Cluster member {} added", node_id);
786 (
787 axum::http::StatusCode::OK,
788 Json(serde_json::json!({
789 "status": "added",
790 "node_id": node_id,
791 })),
792 )
793}
794
795async fn cluster_remove_member_handler(
797 State(state): State<AppState>,
798 Path(node_id): Path<u32>,
799) -> impl IntoResponse {
800 let Some(ref cm) = state.cluster_manager else {
801 return (
802 axum::http::StatusCode::SERVICE_UNAVAILABLE,
803 Json(serde_json::json!({
804 "error": "cluster_not_enabled",
805 "message": "Cluster mode is not enabled on this node"
806 })),
807 );
808 };
809
810 match cm.remove_member(node_id).await {
811 Some(_) => {
812 tracing::info!("Cluster member {} removed", node_id);
813 (
814 axum::http::StatusCode::OK,
815 Json(serde_json::json!({
816 "status": "removed",
817 "node_id": node_id,
818 })),
819 )
820 }
821 None => (
822 axum::http::StatusCode::NOT_FOUND,
823 Json(serde_json::json!({
824 "error": "not_found",
825 "message": format!("Node {} not found in cluster", node_id),
826 })),
827 ),
828 }
829}
830
831#[derive(serde::Deserialize)]
833struct HeartbeatRequest {
834 wal_offset: u64,
835 #[serde(default = "default_true")]
836 healthy: bool,
837}
838
839fn default_true() -> bool {
840 true
841}
842
843async fn cluster_heartbeat_handler(
844 State(state): State<AppState>,
845 Path(node_id): Path<u32>,
846 Json(req): Json<HeartbeatRequest>,
847) -> impl IntoResponse {
848 let Some(ref cm) = state.cluster_manager else {
849 return (
850 axum::http::StatusCode::SERVICE_UNAVAILABLE,
851 Json(serde_json::json!({
852 "error": "cluster_not_enabled",
853 "message": "Cluster mode is not enabled on this node"
854 })),
855 );
856 };
857
858 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
859 (
860 axum::http::StatusCode::OK,
861 Json(serde_json::json!({
862 "status": "updated",
863 "node_id": node_id,
864 })),
865 )
866}
867
868async fn cluster_vote_handler(
870 State(state): State<AppState>,
871 Json(request): Json<VoteRequest>,
872) -> impl IntoResponse {
873 let Some(ref cm) = state.cluster_manager else {
874 return (
875 axum::http::StatusCode::SERVICE_UNAVAILABLE,
876 Json(serde_json::json!({
877 "error": "cluster_not_enabled",
878 "message": "Cluster mode is not enabled on this node"
879 })),
880 );
881 };
882
883 let response = cm.handle_vote_request(&request).await;
884 (
885 axum::http::StatusCode::OK,
886 Json(serde_json::to_value(response).unwrap()),
887 )
888}
889
890async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
892 let Some(ref cm) = state.cluster_manager else {
893 return (
894 axum::http::StatusCode::SERVICE_UNAVAILABLE,
895 Json(serde_json::json!({
896 "error": "cluster_not_enabled",
897 "message": "Cluster mode is not enabled on this node"
898 })),
899 );
900 };
901
902 let candidate = cm.select_leader_candidate();
904
905 match candidate {
906 Some(candidate_id) => {
907 let new_term = cm.start_election().await;
908 tracing::info!(
909 "Cluster election started: term={}, candidate={}",
910 new_term,
911 candidate_id,
912 );
913
914 if candidate_id == cm.self_id() {
917 cm.become_leader(new_term).await;
918 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
919 }
920
921 (
922 axum::http::StatusCode::OK,
923 Json(serde_json::json!({
924 "status": "election_started",
925 "term": new_term,
926 "candidate_id": candidate_id,
927 "self_is_leader": candidate_id == cm.self_id(),
928 })),
929 )
930 }
931 None => (
932 axum::http::StatusCode::CONFLICT,
933 Json(serde_json::json!({
934 "error": "no_candidates",
935 "message": "No healthy members available for leader election",
936 })),
937 ),
938 }
939}
940
941async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
943 let Some(ref cm) = state.cluster_manager else {
944 return (
945 axum::http::StatusCode::SERVICE_UNAVAILABLE,
946 Json(serde_json::json!({
947 "error": "cluster_not_enabled",
948 "message": "Cluster mode is not enabled on this node"
949 })),
950 );
951 };
952
953 let registry = cm.registry();
954 let distribution = registry.partition_distribution();
955 let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
956
957 (
958 axum::http::StatusCode::OK,
959 Json(serde_json::json!({
960 "total_partitions": total_partitions,
961 "node_count": registry.node_count(),
962 "healthy_node_count": registry.healthy_node_count(),
963 "distribution": distribution,
964 })),
965 )
966}
967
968async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
974 let Some(ref geo) = state.geo_replication else {
975 return (
976 axum::http::StatusCode::SERVICE_UNAVAILABLE,
977 Json(serde_json::json!({
978 "error": "geo_replication_not_enabled",
979 "message": "Geo-replication is not enabled on this node"
980 })),
981 );
982 };
983
984 let status = geo.status();
985 (
986 axum::http::StatusCode::OK,
987 Json(serde_json::to_value(status).unwrap()),
988 )
989}
990
991async fn geo_sync_handler(
993 State(state): State<AppState>,
994 Json(request): Json<GeoSyncRequest>,
995) -> impl IntoResponse {
996 let Some(ref geo) = state.geo_replication else {
997 return (
998 axum::http::StatusCode::SERVICE_UNAVAILABLE,
999 Json(serde_json::json!({
1000 "error": "geo_replication_not_enabled",
1001 "message": "Geo-replication is not enabled on this node"
1002 })),
1003 );
1004 };
1005
1006 tracing::info!(
1007 "Geo-sync received from region '{}': {} events",
1008 request.source_region,
1009 request.events.len(),
1010 );
1011
1012 let response = geo.receive_sync(&request);
1013 (
1014 axum::http::StatusCode::OK,
1015 Json(serde_json::to_value(response).unwrap()),
1016 )
1017}
1018
1019async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1021 let Some(ref geo) = state.geo_replication else {
1022 return (
1023 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1024 Json(serde_json::json!({
1025 "error": "geo_replication_not_enabled",
1026 "message": "Geo-replication is not enabled on this node"
1027 })),
1028 );
1029 };
1030
1031 let status = geo.status();
1032 (
1033 axum::http::StatusCode::OK,
1034 Json(serde_json::json!({
1035 "region_id": status.region_id,
1036 "peers": status.peers,
1037 })),
1038 )
1039}
1040
1041async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1043 let Some(ref geo) = state.geo_replication else {
1044 return (
1045 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1046 Json(serde_json::json!({
1047 "error": "geo_replication_not_enabled",
1048 "message": "Geo-replication is not enabled on this node"
1049 })),
1050 );
1051 };
1052
1053 match geo.select_failover_region() {
1054 Some(failover_region) => {
1055 tracing::info!(
1056 "Geo-failover: selected region '{}' as failover target",
1057 failover_region,
1058 );
1059 (
1060 axum::http::StatusCode::OK,
1061 Json(serde_json::json!({
1062 "status": "failover_target_selected",
1063 "failover_region": failover_region,
1064 "message": "Region selected for failover. DNS/routing update required externally.",
1065 })),
1066 )
1067 }
1068 None => (
1069 axum::http::StatusCode::CONFLICT,
1070 Json(serde_json::json!({
1071 "error": "no_healthy_peers",
1072 "message": "No healthy peer regions available for failover",
1073 })),
1074 ),
1075 }
1076}
1077
1078async fn shutdown_signal() {
1080 let ctrl_c = async {
1081 tokio::signal::ctrl_c()
1082 .await
1083 .expect("failed to install Ctrl+C handler");
1084 };
1085
1086 #[cfg(unix)]
1087 let terminate = async {
1088 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1089 .expect("failed to install SIGTERM handler")
1090 .recv()
1091 .await;
1092 };
1093
1094 #[cfg(not(unix))]
1095 let terminate = std::future::pending::<()>();
1096
1097 tokio::select! {
1098 _ = ctrl_c => {
1099 tracing::info!("π€ Received Ctrl+C, initiating graceful shutdown...");
1100 }
1101 _ = terminate => {
1102 tracing::info!("π€ Received SIGTERM, initiating graceful shutdown...");
1103 }
1104 }
1105}