1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 application::services::tenant_service::TenantManager,
5 infrastructure::{
6 cluster::{
7 ClusterManager, ClusterMember, GeoReplicationManager, GeoSyncRequest, VoteRequest,
8 },
9 replication::{WalReceiver, WalShipper},
10 security::{
11 auth::AuthManager,
12 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
13 rate_limit::RateLimiter,
14 },
15 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
16 },
17 store::EventStore,
18};
19use axum::{
20 Json, Router,
21 extract::{Path, State},
22 middleware,
23 response::IntoResponse,
24 routing::{delete, get, post, put},
25};
26use std::sync::Arc;
27use tower_http::{
28 cors::{Any, CorsLayer},
29 trace::TraceLayer,
30};
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
34#[serde(rename_all = "lowercase")]
35pub enum NodeRole {
36 Leader,
37 Follower,
38}
39
40impl NodeRole {
41 pub fn from_env() -> Self {
47 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
48 match role.to_lowercase().as_str() {
49 "follower" => return NodeRole::Follower,
50 "leader" => return NodeRole::Leader,
51 other => {
52 tracing::warn!(
53 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
54 other
55 );
56 return NodeRole::Leader;
57 }
58 }
59 }
60 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
61 && (read_only == "true" || read_only == "1")
62 {
63 return NodeRole::Follower;
64 }
65 NodeRole::Leader
66 }
67
68 pub fn is_follower(self) -> bool {
69 self == NodeRole::Follower
70 }
71
72 fn to_u8(self) -> u8 {
73 match self {
74 NodeRole::Leader => 0,
75 NodeRole::Follower => 1,
76 }
77 }
78
79 fn from_u8(v: u8) -> Self {
80 match v {
81 1 => NodeRole::Follower,
82 _ => NodeRole::Leader,
83 }
84 }
85}
86
87impl std::fmt::Display for NodeRole {
88 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
89 match self {
90 NodeRole::Leader => write!(f, "leader"),
91 NodeRole::Follower => write!(f, "follower"),
92 }
93 }
94}
95
96#[derive(Clone)]
101pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
102
103impl AtomicNodeRole {
104 pub fn new(role: NodeRole) -> Self {
105 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
106 }
107
108 pub fn load(&self) -> NodeRole {
109 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
110 }
111
112 pub fn store(&self, role: NodeRole) {
113 self.0
114 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
115 }
116}
117
118#[derive(Clone)]
120pub struct AppState {
121 pub store: Arc<EventStore>,
122 pub auth_manager: Arc<AuthManager>,
123 pub tenant_manager: Arc<TenantManager>,
124 pub service_container: ServiceContainer,
126 pub role: AtomicNodeRole,
128 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
131 pub wal_receiver: Option<Arc<WalReceiver>>,
133 pub replication_port: u16,
135 pub cluster_manager: Option<Arc<ClusterManager>>,
137 pub geo_replication: Option<Arc<GeoReplicationManager>>,
139}
140
141impl axum::extract::FromRef<AppState> for Arc<EventStore> {
144 fn from_ref(state: &AppState) -> Self {
145 state.store.clone()
146 }
147}
148
149pub async fn serve_v1(
150 store: Arc<EventStore>,
151 auth_manager: Arc<AuthManager>,
152 tenant_manager: Arc<TenantManager>,
153 rate_limiter: Arc<RateLimiter>,
154 service_container: ServiceContainer,
155 addr: &str,
156 role: NodeRole,
157 wal_shipper: Option<Arc<WalShipper>>,
158 wal_receiver: Option<Arc<WalReceiver>>,
159 replication_port: u16,
160 cluster_manager: Option<Arc<ClusterManager>>,
161 geo_replication: Option<Arc<GeoReplicationManager>>,
162) -> anyhow::Result<()> {
163 let app_state = AppState {
164 store,
165 auth_manager: auth_manager.clone(),
166 tenant_manager,
167 service_container,
168 role: AtomicNodeRole::new(role),
169 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
170 wal_receiver,
171 replication_port,
172 cluster_manager,
173 geo_replication,
174 };
175
176 let auth_state = AuthState {
177 auth_manager: auth_manager.clone(),
178 };
179
180 let rate_limit_state = RateLimitState { rate_limiter };
181
182 let app = Router::new()
183 .route("/health", get(health_v1))
185 .route("/metrics", get(super::api::prometheus_metrics))
186 .route("/api/v1/auth/register", post(register_handler))
188 .route("/api/v1/auth/login", post(login_handler))
189 .route("/api/v1/auth/me", get(me_handler))
190 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
191 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
192 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
193 .route("/api/v1/auth/users", get(list_users_handler))
194 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
195 .route("/api/v1/tenants", post(create_tenant_handler))
197 .route("/api/v1/tenants", get(list_tenants_handler))
198 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
199 .route("/api/v1/tenants/{id}", put(update_tenant_handler))
200 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
201 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
202 .route(
203 "/api/v1/tenants/{id}/deactivate",
204 post(deactivate_tenant_handler),
205 )
206 .route(
207 "/api/v1/tenants/{id}/activate",
208 post(activate_tenant_handler),
209 )
210 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
211 .route("/api/v1/audit/events", post(log_audit_event))
213 .route("/api/v1/audit/events", get(query_audit_events))
214 .route("/api/v1/config", get(list_configs))
216 .route("/api/v1/config", post(set_config))
217 .route("/api/v1/config/{key}", get(get_config))
218 .route("/api/v1/config/{key}", put(update_config))
219 .route("/api/v1/config/{key}", delete(delete_config))
220 .route(
222 "/api/v1/demo/seed",
223 post(super::demo_api::demo_seed_handler),
224 )
225 .route("/api/v1/events", post(super::api::ingest_event_v1))
227 .route(
228 "/api/v1/events/batch",
229 post(super::api::ingest_events_batch_v1),
230 )
231 .route("/api/v1/events/query", get(super::api::query_events))
232 .route(
233 "/api/v1/events/{event_id}",
234 get(super::api::get_event_by_id),
235 )
236 .route("/api/v1/events/stream", get(super::api::events_websocket))
237 .route("/api/v1/entities", get(super::api::list_entities))
238 .route(
239 "/api/v1/entities/duplicates",
240 get(super::api::detect_duplicates),
241 )
242 .route(
243 "/api/v1/entities/{entity_id}/state",
244 get(super::api::get_entity_state),
245 )
246 .route(
247 "/api/v1/entities/{entity_id}/snapshot",
248 get(super::api::get_entity_snapshot),
249 )
250 .route("/api/v1/stats", get(super::api::get_stats))
251 .route("/api/v1/streams", get(super::api::list_streams))
253 .route("/api/v1/event-types", get(super::api::list_event_types))
254 .route(
256 "/api/v1/analytics/frequency",
257 get(super::api::analytics_frequency),
258 )
259 .route(
260 "/api/v1/analytics/summary",
261 get(super::api::analytics_summary),
262 )
263 .route(
264 "/api/v1/analytics/correlation",
265 get(super::api::analytics_correlation),
266 )
267 .route("/api/v1/snapshots", post(super::api::create_snapshot))
269 .route("/api/v1/snapshots", get(super::api::list_snapshots))
270 .route(
271 "/api/v1/snapshots/{entity_id}/latest",
272 get(super::api::get_latest_snapshot),
273 )
274 .route(
276 "/api/v1/compaction/trigger",
277 post(super::api::trigger_compaction),
278 )
279 .route(
280 "/api/v1/compaction/stats",
281 get(super::api::compaction_stats),
282 )
283 .route("/api/v1/schemas", post(super::api::register_schema))
285 .route("/api/v1/schemas", get(super::api::list_subjects))
286 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
287 .route(
288 "/api/v1/schemas/{subject}/versions",
289 get(super::api::list_schema_versions),
290 )
291 .route(
292 "/api/v1/schemas/validate",
293 post(super::api::validate_event_schema),
294 )
295 .route(
296 "/api/v1/schemas/{subject}/compatibility",
297 put(super::api::set_compatibility_mode),
298 )
299 .route("/api/v1/replay", post(super::api::start_replay))
301 .route("/api/v1/replay", get(super::api::list_replays))
302 .route(
303 "/api/v1/replay/{replay_id}",
304 get(super::api::get_replay_progress),
305 )
306 .route(
307 "/api/v1/replay/{replay_id}/cancel",
308 post(super::api::cancel_replay),
309 )
310 .route(
311 "/api/v1/replay/{replay_id}",
312 delete(super::api::delete_replay),
313 )
314 .route("/api/v1/pipelines", post(super::api::register_pipeline))
316 .route("/api/v1/pipelines", get(super::api::list_pipelines))
317 .route(
318 "/api/v1/pipelines/stats",
319 get(super::api::all_pipeline_stats),
320 )
321 .route(
322 "/api/v1/pipelines/{pipeline_id}",
323 get(super::api::get_pipeline),
324 )
325 .route(
326 "/api/v1/pipelines/{pipeline_id}",
327 delete(super::api::remove_pipeline),
328 )
329 .route(
330 "/api/v1/pipelines/{pipeline_id}/stats",
331 get(super::api::get_pipeline_stats),
332 )
333 .route(
334 "/api/v1/pipelines/{pipeline_id}/reset",
335 put(super::api::reset_pipeline),
336 )
337 .route("/api/v1/projections", get(super::api::list_projections))
339 .route(
340 "/api/v1/projections/{name}",
341 get(super::api::get_projection),
342 )
343 .route(
344 "/api/v1/projections/{name}",
345 delete(super::api::delete_projection),
346 )
347 .route(
348 "/api/v1/projections/{name}/state",
349 get(super::api::get_projection_state_summary),
350 )
351 .route(
352 "/api/v1/projections/{name}/reset",
353 post(super::api::reset_projection),
354 )
355 .route(
356 "/api/v1/projections/{name}/{entity_id}/state",
357 get(super::api::get_projection_state),
358 )
359 .route(
360 "/api/v1/projections/{name}/{entity_id}/state",
361 post(super::api::save_projection_state),
362 )
363 .route(
364 "/api/v1/projections/{name}/{entity_id}/state",
365 put(super::api::save_projection_state),
366 )
367 .route(
368 "/api/v1/projections/{name}/bulk",
369 post(super::api::bulk_get_projection_states),
370 )
371 .route(
372 "/api/v1/projections/{name}/bulk/save",
373 post(super::api::bulk_save_projection_states),
374 )
375 .route("/api/v1/webhooks", post(super::api::register_webhook))
377 .route("/api/v1/webhooks", get(super::api::list_webhooks))
378 .route(
379 "/api/v1/webhooks/{webhook_id}",
380 get(super::api::get_webhook),
381 )
382 .route(
383 "/api/v1/webhooks/{webhook_id}",
384 put(super::api::update_webhook),
385 )
386 .route(
387 "/api/v1/webhooks/{webhook_id}",
388 delete(super::api::delete_webhook),
389 )
390 .route(
391 "/api/v1/webhooks/{webhook_id}/deliveries",
392 get(super::api::list_webhook_deliveries),
393 )
394 .route("/api/v1/cluster/status", get(cluster_status_handler))
396 .route("/api/v1/cluster/members", get(cluster_list_members_handler))
397 .route("/api/v1/cluster/members", post(cluster_add_member_handler))
398 .route(
399 "/api/v1/cluster/members/{node_id}",
400 delete(cluster_remove_member_handler),
401 )
402 .route(
403 "/api/v1/cluster/members/{node_id}/heartbeat",
404 post(cluster_heartbeat_handler),
405 )
406 .route("/api/v1/cluster/vote", post(cluster_vote_handler))
407 .route("/api/v1/cluster/election", post(cluster_election_handler))
408 .route(
409 "/api/v1/cluster/partitions",
410 get(cluster_partitions_handler),
411 )
412 .route("/api/v1/eventql", post(super::api::eventql_query))
414 .route("/api/v1/graphql", post(super::api::graphql_query))
415 .route("/api/v1/geospatial/query", post(super::api::geo_query))
416 .route("/api/v1/geospatial/stats", get(super::api::geo_stats))
417 .route(
418 "/api/v1/exactly-once/stats",
419 get(super::api::exactly_once_stats),
420 )
421 .route(
422 "/api/v1/schema-evolution/history/{event_type}",
423 get(super::api::schema_evolution_history),
424 )
425 .route(
426 "/api/v1/schema-evolution/schema/{event_type}",
427 get(super::api::schema_evolution_schema),
428 )
429 .route(
430 "/api/v1/schema-evolution/stats",
431 get(super::api::schema_evolution_stats),
432 )
433 .route("/api/v1/geo/status", get(geo_status_handler))
435 .route("/api/v1/geo/sync", post(geo_sync_handler))
436 .route("/api/v1/geo/peers", get(geo_peers_handler))
437 .route("/api/v1/geo/failover", post(geo_failover_handler))
438 .route("/internal/promote", post(promote_handler))
440 .route("/internal/repoint", post(repoint_handler));
441
442 #[cfg(feature = "embedded-sync")]
444 let app = app
445 .route("/api/v1/sync/pull", post(super::api::sync_pull_handler))
446 .route("/api/v1/sync/push", post(super::api::sync_push_handler));
447
448 let app = app
449 .with_state(app_state.clone())
450 .layer(middleware::from_fn_with_state(
453 app_state,
454 read_only_middleware,
455 ))
456 .layer(middleware::from_fn_with_state(
457 rate_limit_state,
458 rate_limit_middleware,
459 ))
460 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
461 .layer(
462 CorsLayer::new()
463 .allow_origin(Any)
464 .allow_methods(Any)
465 .allow_headers(Any),
466 )
467 .layer(TraceLayer::new_for_http());
468
469 let listener = tokio::net::TcpListener::bind(addr).await?;
470
471 axum::serve(listener, app)
473 .with_graceful_shutdown(shutdown_signal())
474 .await?;
475
476 tracing::info!("π AllSource Core shutdown complete");
477 Ok(())
478}
479
480const WRITE_PATHS: &[&str] = &[
482 "/api/v1/events",
483 "/api/v1/events/batch",
484 "/api/v1/snapshots",
485 "/api/v1/projections/",
486 "/api/v1/schemas",
487 "/api/v1/replay",
488 "/api/v1/pipelines",
489 "/api/v1/compaction/trigger",
490 "/api/v1/audit/events",
491 "/api/v1/config",
492 "/api/v1/webhooks",
493 "/api/v1/demo/seed",
494];
495
496fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
498 use axum::http::Method;
499 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
501 return false;
502 }
503 WRITE_PATHS
504 .iter()
505 .any(|write_path| path.starts_with(write_path))
506}
507
508fn is_internal_request(path: &str) -> bool {
510 path.starts_with("/internal/")
511 || path.starts_with("/api/v1/cluster/")
512 || path.starts_with("/api/v1/geo/")
513}
514
515async fn read_only_middleware(
521 State(state): State<AppState>,
522 request: axum::extract::Request,
523 next: axum::middleware::Next,
524) -> axum::response::Response {
525 let path = request.uri().path();
526 if state.role.load().is_follower()
527 && is_write_request(request.method(), path)
528 && !is_internal_request(path)
529 {
530 return (
531 axum::http::StatusCode::CONFLICT,
532 axum::Json(serde_json::json!({
533 "error": "read_only",
534 "message": "This node is a read-only follower"
535 })),
536 )
537 .into_response();
538 }
539 next.run(request).await
540}
541
542async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
547 let has_system_repos = state.service_container.has_system_repositories();
548
549 let system_streams = if has_system_repos {
550 let (tenant_count, config_count, total_events) =
551 if let Some(store) = state.service_container.system_store() {
552 use crate::domain::value_objects::system_stream::SystemDomain;
553 (
554 store.count_stream(SystemDomain::Tenant),
555 store.count_stream(SystemDomain::Config),
556 store.total_events(),
557 )
558 } else {
559 (0, 0, 0)
560 };
561
562 serde_json::json!({
563 "status": "healthy",
564 "mode": "event-sourced",
565 "total_events": total_events,
566 "tenant_events": tenant_count,
567 "config_events": config_count,
568 })
569 } else {
570 serde_json::json!({
571 "status": "disabled",
572 "mode": "in-memory",
573 })
574 };
575
576 let replication = {
577 let shipper_guard = state.wal_shipper.read().await;
578 if let Some(ref shipper) = *shipper_guard {
579 serde_json::to_value(shipper.status()).unwrap_or_default()
580 } else if let Some(ref receiver) = state.wal_receiver {
581 serde_json::to_value(receiver.status()).unwrap_or_default()
582 } else {
583 serde_json::json!(null)
584 }
585 };
586
587 let current_role = state.role.load();
588
589 Json(serde_json::json!({
590 "status": "healthy",
591 "service": "allsource-core",
592 "version": env!("CARGO_PKG_VERSION"),
593 "role": current_role,
594 "system_streams": system_streams,
595 "replication": replication,
596 }))
597}
598
599async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
605 let current_role = state.role.load();
606 if current_role == NodeRole::Leader {
607 return (
608 axum::http::StatusCode::OK,
609 Json(serde_json::json!({
610 "status": "already_leader",
611 "message": "This node is already the leader",
612 })),
613 );
614 }
615
616 tracing::info!("PROMOTE: Switching role from follower to leader");
617
618 state.role.store(NodeRole::Leader);
620
621 if let Some(ref receiver) = state.wal_receiver {
623 receiver.shutdown();
624 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
625 }
626
627 let replication_port = state.replication_port;
629 let (mut shipper, tx) = WalShipper::new();
630 state.store.enable_wal_replication(tx);
631 shipper.set_store(Arc::clone(&state.store));
632 shipper.set_metrics(state.store.metrics());
633 let shipper = Arc::new(shipper);
634
635 {
637 let mut shipper_guard = state.wal_shipper.write().await;
638 *shipper_guard = Some(Arc::clone(&shipper));
639 }
640
641 let shipper_clone = Arc::clone(&shipper);
643 tokio::spawn(async move {
644 if let Err(e) = shipper_clone.serve(replication_port).await {
645 tracing::error!("Promoted WAL shipper error: {}", e);
646 }
647 });
648
649 tracing::info!(
650 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
651 replication_port,
652 );
653
654 (
655 axum::http::StatusCode::OK,
656 Json(serde_json::json!({
657 "status": "promoted",
658 "role": "leader",
659 "replication_port": replication_port,
660 })),
661 )
662}
663
664async fn repoint_handler(
669 State(state): State<AppState>,
670 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
671) -> impl IntoResponse {
672 let current_role = state.role.load();
673 if current_role != NodeRole::Follower {
674 return (
675 axum::http::StatusCode::CONFLICT,
676 Json(serde_json::json!({
677 "error": "not_follower",
678 "message": "Repoint only applies to follower nodes",
679 })),
680 );
681 }
682
683 let new_leader = match params.get("leader") {
684 Some(l) if !l.is_empty() => l.clone(),
685 _ => {
686 return (
687 axum::http::StatusCode::BAD_REQUEST,
688 Json(serde_json::json!({
689 "error": "missing_leader",
690 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
691 })),
692 );
693 }
694 };
695
696 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
697
698 if let Some(ref receiver) = state.wal_receiver {
699 receiver.repoint(&new_leader);
700 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
701 } else {
702 tracing::warn!("REPOINT: No WAL receiver to repoint");
703 }
704
705 (
706 axum::http::StatusCode::OK,
707 Json(serde_json::json!({
708 "status": "repointed",
709 "new_leader": new_leader,
710 })),
711 )
712}
713
714async fn cluster_status_handler(State(state): State<AppState>) -> impl IntoResponse {
720 let Some(ref cm) = state.cluster_manager else {
721 return (
722 axum::http::StatusCode::SERVICE_UNAVAILABLE,
723 Json(serde_json::json!({
724 "error": "cluster_not_enabled",
725 "message": "Cluster mode is not enabled on this node"
726 })),
727 );
728 };
729
730 let status = cm.status().await;
731 (
732 axum::http::StatusCode::OK,
733 Json(serde_json::to_value(status).unwrap()),
734 )
735}
736
737async fn cluster_list_members_handler(State(state): State<AppState>) -> impl IntoResponse {
739 let Some(ref cm) = state.cluster_manager else {
740 return (
741 axum::http::StatusCode::SERVICE_UNAVAILABLE,
742 Json(serde_json::json!({
743 "error": "cluster_not_enabled",
744 "message": "Cluster mode is not enabled on this node"
745 })),
746 );
747 };
748
749 let members = cm.all_members();
750 (
751 axum::http::StatusCode::OK,
752 Json(serde_json::json!({
753 "members": members,
754 "count": members.len(),
755 })),
756 )
757}
758
759async fn cluster_add_member_handler(
761 State(state): State<AppState>,
762 Json(member): Json<ClusterMember>,
763) -> impl IntoResponse {
764 let Some(ref cm) = state.cluster_manager else {
765 return (
766 axum::http::StatusCode::SERVICE_UNAVAILABLE,
767 Json(serde_json::json!({
768 "error": "cluster_not_enabled",
769 "message": "Cluster mode is not enabled on this node"
770 })),
771 );
772 };
773
774 let node_id = member.node_id;
775 cm.add_member(member).await;
776
777 tracing::info!("Cluster member {} added", node_id);
778 (
779 axum::http::StatusCode::OK,
780 Json(serde_json::json!({
781 "status": "added",
782 "node_id": node_id,
783 })),
784 )
785}
786
787async fn cluster_remove_member_handler(
789 State(state): State<AppState>,
790 Path(node_id): Path<u32>,
791) -> impl IntoResponse {
792 let Some(ref cm) = state.cluster_manager else {
793 return (
794 axum::http::StatusCode::SERVICE_UNAVAILABLE,
795 Json(serde_json::json!({
796 "error": "cluster_not_enabled",
797 "message": "Cluster mode is not enabled on this node"
798 })),
799 );
800 };
801
802 match cm.remove_member(node_id).await {
803 Some(_) => {
804 tracing::info!("Cluster member {} removed", node_id);
805 (
806 axum::http::StatusCode::OK,
807 Json(serde_json::json!({
808 "status": "removed",
809 "node_id": node_id,
810 })),
811 )
812 }
813 None => (
814 axum::http::StatusCode::NOT_FOUND,
815 Json(serde_json::json!({
816 "error": "not_found",
817 "message": format!("Node {} not found in cluster", node_id),
818 })),
819 ),
820 }
821}
822
823#[derive(serde::Deserialize)]
825struct HeartbeatRequest {
826 wal_offset: u64,
827 #[serde(default = "default_true")]
828 healthy: bool,
829}
830
831fn default_true() -> bool {
832 true
833}
834
835async fn cluster_heartbeat_handler(
836 State(state): State<AppState>,
837 Path(node_id): Path<u32>,
838 Json(req): Json<HeartbeatRequest>,
839) -> impl IntoResponse {
840 let Some(ref cm) = state.cluster_manager else {
841 return (
842 axum::http::StatusCode::SERVICE_UNAVAILABLE,
843 Json(serde_json::json!({
844 "error": "cluster_not_enabled",
845 "message": "Cluster mode is not enabled on this node"
846 })),
847 );
848 };
849
850 cm.update_member_heartbeat(node_id, req.wal_offset, req.healthy);
851 (
852 axum::http::StatusCode::OK,
853 Json(serde_json::json!({
854 "status": "updated",
855 "node_id": node_id,
856 })),
857 )
858}
859
860async fn cluster_vote_handler(
862 State(state): State<AppState>,
863 Json(request): Json<VoteRequest>,
864) -> impl IntoResponse {
865 let Some(ref cm) = state.cluster_manager else {
866 return (
867 axum::http::StatusCode::SERVICE_UNAVAILABLE,
868 Json(serde_json::json!({
869 "error": "cluster_not_enabled",
870 "message": "Cluster mode is not enabled on this node"
871 })),
872 );
873 };
874
875 let response = cm.handle_vote_request(&request).await;
876 (
877 axum::http::StatusCode::OK,
878 Json(serde_json::to_value(response).unwrap()),
879 )
880}
881
882async fn cluster_election_handler(State(state): State<AppState>) -> impl IntoResponse {
884 let Some(ref cm) = state.cluster_manager else {
885 return (
886 axum::http::StatusCode::SERVICE_UNAVAILABLE,
887 Json(serde_json::json!({
888 "error": "cluster_not_enabled",
889 "message": "Cluster mode is not enabled on this node"
890 })),
891 );
892 };
893
894 let candidate = cm.select_leader_candidate();
896
897 match candidate {
898 Some(candidate_id) => {
899 let new_term = cm.start_election().await;
900 tracing::info!(
901 "Cluster election started: term={}, candidate={}",
902 new_term,
903 candidate_id,
904 );
905
906 if candidate_id == cm.self_id() {
909 cm.become_leader(new_term).await;
910 tracing::info!("Node {} became leader at term {}", candidate_id, new_term);
911 }
912
913 (
914 axum::http::StatusCode::OK,
915 Json(serde_json::json!({
916 "status": "election_started",
917 "term": new_term,
918 "candidate_id": candidate_id,
919 "self_is_leader": candidate_id == cm.self_id(),
920 })),
921 )
922 }
923 None => (
924 axum::http::StatusCode::CONFLICT,
925 Json(serde_json::json!({
926 "error": "no_candidates",
927 "message": "No healthy members available for leader election",
928 })),
929 ),
930 }
931}
932
933async fn cluster_partitions_handler(State(state): State<AppState>) -> impl IntoResponse {
935 let Some(ref cm) = state.cluster_manager else {
936 return (
937 axum::http::StatusCode::SERVICE_UNAVAILABLE,
938 Json(serde_json::json!({
939 "error": "cluster_not_enabled",
940 "message": "Cluster mode is not enabled on this node"
941 })),
942 );
943 };
944
945 let registry = cm.registry();
946 let distribution = registry.partition_distribution();
947 let total_partitions: usize = distribution.values().map(|v| v.len()).sum();
948
949 (
950 axum::http::StatusCode::OK,
951 Json(serde_json::json!({
952 "total_partitions": total_partitions,
953 "node_count": registry.node_count(),
954 "healthy_node_count": registry.healthy_node_count(),
955 "distribution": distribution,
956 })),
957 )
958}
959
960async fn geo_status_handler(State(state): State<AppState>) -> impl IntoResponse {
966 let Some(ref geo) = state.geo_replication else {
967 return (
968 axum::http::StatusCode::SERVICE_UNAVAILABLE,
969 Json(serde_json::json!({
970 "error": "geo_replication_not_enabled",
971 "message": "Geo-replication is not enabled on this node"
972 })),
973 );
974 };
975
976 let status = geo.status();
977 (
978 axum::http::StatusCode::OK,
979 Json(serde_json::to_value(status).unwrap()),
980 )
981}
982
983async fn geo_sync_handler(
985 State(state): State<AppState>,
986 Json(request): Json<GeoSyncRequest>,
987) -> impl IntoResponse {
988 let Some(ref geo) = state.geo_replication else {
989 return (
990 axum::http::StatusCode::SERVICE_UNAVAILABLE,
991 Json(serde_json::json!({
992 "error": "geo_replication_not_enabled",
993 "message": "Geo-replication is not enabled on this node"
994 })),
995 );
996 };
997
998 tracing::info!(
999 "Geo-sync received from region '{}': {} events",
1000 request.source_region,
1001 request.events.len(),
1002 );
1003
1004 let response = geo.receive_sync(&request);
1005 (
1006 axum::http::StatusCode::OK,
1007 Json(serde_json::to_value(response).unwrap()),
1008 )
1009}
1010
1011async fn geo_peers_handler(State(state): State<AppState>) -> impl IntoResponse {
1013 let Some(ref geo) = state.geo_replication else {
1014 return (
1015 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1016 Json(serde_json::json!({
1017 "error": "geo_replication_not_enabled",
1018 "message": "Geo-replication is not enabled on this node"
1019 })),
1020 );
1021 };
1022
1023 let status = geo.status();
1024 (
1025 axum::http::StatusCode::OK,
1026 Json(serde_json::json!({
1027 "region_id": status.region_id,
1028 "peers": status.peers,
1029 })),
1030 )
1031}
1032
1033async fn geo_failover_handler(State(state): State<AppState>) -> impl IntoResponse {
1035 let Some(ref geo) = state.geo_replication else {
1036 return (
1037 axum::http::StatusCode::SERVICE_UNAVAILABLE,
1038 Json(serde_json::json!({
1039 "error": "geo_replication_not_enabled",
1040 "message": "Geo-replication is not enabled on this node"
1041 })),
1042 );
1043 };
1044
1045 match geo.select_failover_region() {
1046 Some(failover_region) => {
1047 tracing::info!(
1048 "Geo-failover: selected region '{}' as failover target",
1049 failover_region,
1050 );
1051 (
1052 axum::http::StatusCode::OK,
1053 Json(serde_json::json!({
1054 "status": "failover_target_selected",
1055 "failover_region": failover_region,
1056 "message": "Region selected for failover. DNS/routing update required externally.",
1057 })),
1058 )
1059 }
1060 None => (
1061 axum::http::StatusCode::CONFLICT,
1062 Json(serde_json::json!({
1063 "error": "no_healthy_peers",
1064 "message": "No healthy peer regions available for failover",
1065 })),
1066 ),
1067 }
1068}
1069
1070async fn shutdown_signal() {
1072 let ctrl_c = async {
1073 tokio::signal::ctrl_c()
1074 .await
1075 .expect("failed to install Ctrl+C handler");
1076 };
1077
1078 #[cfg(unix)]
1079 let terminate = async {
1080 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1081 .expect("failed to install SIGTERM handler")
1082 .recv()
1083 .await;
1084 };
1085
1086 #[cfg(not(unix))]
1087 let terminate = std::future::pending::<()>();
1088
1089 tokio::select! {
1090 _ = ctrl_c => {
1091 tracing::info!("π€ Received Ctrl+C, initiating graceful shutdown...");
1092 }
1093 _ = terminate => {
1094 tracing::info!("π€ Received SIGTERM, initiating graceful shutdown...");
1095 }
1096 }
1097}