1use crate::infrastructure::di::ServiceContainer;
3use crate::{
4 application::services::tenant_service::TenantManager,
5 infrastructure::{
6 replication::{WalReceiver, WalShipper},
7 security::{
8 auth::AuthManager,
9 middleware::{AuthState, RateLimitState, auth_middleware, rate_limit_middleware},
10 rate_limit::RateLimiter,
11 },
12 web::{audit_api::*, auth_api::*, config_api::*, tenant_api::*},
13 },
14 store::EventStore,
15};
16use axum::{
17 Json, Router,
18 extract::State,
19 middleware,
20 response::IntoResponse,
21 routing::{delete, get, post, put},
22};
23use std::sync::Arc;
24use tower_http::{
25 cors::{Any, CorsLayer},
26 trace::TraceLayer,
27};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
31#[serde(rename_all = "lowercase")]
32pub enum NodeRole {
33 Leader,
34 Follower,
35}
36
37impl NodeRole {
38 pub fn from_env() -> Self {
44 if let Ok(role) = std::env::var("ALLSOURCE_ROLE") {
45 match role.to_lowercase().as_str() {
46 "follower" => return NodeRole::Follower,
47 "leader" => return NodeRole::Leader,
48 other => {
49 tracing::warn!(
50 "Unknown ALLSOURCE_ROLE value '{}', defaulting to leader",
51 other
52 );
53 return NodeRole::Leader;
54 }
55 }
56 }
57 if let Ok(read_only) = std::env::var("ALLSOURCE_READ_ONLY")
58 && (read_only == "true" || read_only == "1")
59 {
60 return NodeRole::Follower;
61 }
62 NodeRole::Leader
63 }
64
65 pub fn is_follower(self) -> bool {
66 self == NodeRole::Follower
67 }
68
69 fn to_u8(self) -> u8 {
70 match self {
71 NodeRole::Leader => 0,
72 NodeRole::Follower => 1,
73 }
74 }
75
76 fn from_u8(v: u8) -> Self {
77 match v {
78 1 => NodeRole::Follower,
79 _ => NodeRole::Leader,
80 }
81 }
82}
83
84impl std::fmt::Display for NodeRole {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 match self {
87 NodeRole::Leader => write!(f, "leader"),
88 NodeRole::Follower => write!(f, "follower"),
89 }
90 }
91}
92
93#[derive(Clone)]
98pub struct AtomicNodeRole(Arc<std::sync::atomic::AtomicU8>);
99
100impl AtomicNodeRole {
101 pub fn new(role: NodeRole) -> Self {
102 Self(Arc::new(std::sync::atomic::AtomicU8::new(role.to_u8())))
103 }
104
105 pub fn load(&self) -> NodeRole {
106 NodeRole::from_u8(self.0.load(std::sync::atomic::Ordering::Relaxed))
107 }
108
109 pub fn store(&self, role: NodeRole) {
110 self.0
111 .store(role.to_u8(), std::sync::atomic::Ordering::Relaxed);
112 }
113}
114
115#[derive(Clone)]
117pub struct AppState {
118 pub store: Arc<EventStore>,
119 pub auth_manager: Arc<AuthManager>,
120 pub tenant_manager: Arc<TenantManager>,
121 pub service_container: ServiceContainer,
123 pub role: AtomicNodeRole,
125 pub wal_shipper: Arc<tokio::sync::RwLock<Option<Arc<WalShipper>>>>,
128 pub wal_receiver: Option<Arc<WalReceiver>>,
130 pub replication_port: u16,
132}
133
134impl axum::extract::FromRef<AppState> for Arc<EventStore> {
137 fn from_ref(state: &AppState) -> Self {
138 state.store.clone()
139 }
140}
141
142pub async fn serve_v1(
143 store: Arc<EventStore>,
144 auth_manager: Arc<AuthManager>,
145 tenant_manager: Arc<TenantManager>,
146 rate_limiter: Arc<RateLimiter>,
147 service_container: ServiceContainer,
148 addr: &str,
149 role: NodeRole,
150 wal_shipper: Option<Arc<WalShipper>>,
151 wal_receiver: Option<Arc<WalReceiver>>,
152 replication_port: u16,
153) -> anyhow::Result<()> {
154 let app_state = AppState {
155 store,
156 auth_manager: auth_manager.clone(),
157 tenant_manager,
158 service_container,
159 role: AtomicNodeRole::new(role),
160 wal_shipper: Arc::new(tokio::sync::RwLock::new(wal_shipper)),
161 wal_receiver,
162 replication_port,
163 };
164
165 let auth_state = AuthState {
166 auth_manager: auth_manager.clone(),
167 };
168
169 let rate_limit_state = RateLimitState { rate_limiter };
170
171 let app = Router::new()
172 .route("/health", get(health_v1))
174 .route("/metrics", get(super::api::prometheus_metrics))
175 .route("/api/v1/auth/register", post(register_handler))
177 .route("/api/v1/auth/login", post(login_handler))
178 .route("/api/v1/auth/me", get(me_handler))
179 .route("/api/v1/auth/api-keys", post(create_api_key_handler))
180 .route("/api/v1/auth/api-keys", get(list_api_keys_handler))
181 .route("/api/v1/auth/api-keys/{id}", delete(revoke_api_key_handler))
182 .route("/api/v1/auth/users", get(list_users_handler))
183 .route("/api/v1/auth/users/{id}", delete(delete_user_handler))
184 .route("/api/v1/tenants", post(create_tenant_handler))
186 .route("/api/v1/tenants", get(list_tenants_handler))
187 .route("/api/v1/tenants/{id}", get(get_tenant_handler))
188 .route("/api/v1/tenants/{id}/stats", get(get_tenant_stats_handler))
189 .route("/api/v1/tenants/{id}/quotas", put(update_quotas_handler))
190 .route(
191 "/api/v1/tenants/{id}/deactivate",
192 post(deactivate_tenant_handler),
193 )
194 .route(
195 "/api/v1/tenants/{id}/activate",
196 post(activate_tenant_handler),
197 )
198 .route("/api/v1/tenants/{id}", delete(delete_tenant_handler))
199 .route("/api/v1/audit/events", post(log_audit_event))
201 .route("/api/v1/audit/events", get(query_audit_events))
202 .route("/api/v1/config", get(list_configs))
204 .route("/api/v1/config", post(set_config))
205 .route("/api/v1/config/{key}", get(get_config))
206 .route("/api/v1/config/{key}", put(update_config))
207 .route("/api/v1/config/{key}", delete(delete_config))
208 .route("/api/v1/events", post(super::api::ingest_event_v1))
210 .route(
211 "/api/v1/events/batch",
212 post(super::api::ingest_events_batch_v1),
213 )
214 .route("/api/v1/events/query", get(super::api::query_events))
215 .route("/api/v1/events/stream", get(super::api::events_websocket))
216 .route(
217 "/api/v1/entities/{entity_id}/state",
218 get(super::api::get_entity_state),
219 )
220 .route(
221 "/api/v1/entities/{entity_id}/snapshot",
222 get(super::api::get_entity_snapshot),
223 )
224 .route("/api/v1/stats", get(super::api::get_stats))
225 .route("/api/v1/streams", get(super::api::list_streams))
227 .route("/api/v1/event-types", get(super::api::list_event_types))
228 .route(
230 "/api/v1/analytics/frequency",
231 get(super::api::analytics_frequency),
232 )
233 .route(
234 "/api/v1/analytics/summary",
235 get(super::api::analytics_summary),
236 )
237 .route(
238 "/api/v1/analytics/correlation",
239 get(super::api::analytics_correlation),
240 )
241 .route("/api/v1/snapshots", post(super::api::create_snapshot))
243 .route("/api/v1/snapshots", get(super::api::list_snapshots))
244 .route(
245 "/api/v1/snapshots/{entity_id}/latest",
246 get(super::api::get_latest_snapshot),
247 )
248 .route(
250 "/api/v1/compaction/trigger",
251 post(super::api::trigger_compaction),
252 )
253 .route(
254 "/api/v1/compaction/stats",
255 get(super::api::compaction_stats),
256 )
257 .route("/api/v1/schemas", post(super::api::register_schema))
259 .route("/api/v1/schemas", get(super::api::list_subjects))
260 .route("/api/v1/schemas/{subject}", get(super::api::get_schema))
261 .route(
262 "/api/v1/schemas/{subject}/versions",
263 get(super::api::list_schema_versions),
264 )
265 .route(
266 "/api/v1/schemas/validate",
267 post(super::api::validate_event_schema),
268 )
269 .route(
270 "/api/v1/schemas/{subject}/compatibility",
271 put(super::api::set_compatibility_mode),
272 )
273 .route("/api/v1/replay", post(super::api::start_replay))
275 .route("/api/v1/replay", get(super::api::list_replays))
276 .route(
277 "/api/v1/replay/{replay_id}",
278 get(super::api::get_replay_progress),
279 )
280 .route(
281 "/api/v1/replay/{replay_id}/cancel",
282 post(super::api::cancel_replay),
283 )
284 .route(
285 "/api/v1/replay/{replay_id}",
286 delete(super::api::delete_replay),
287 )
288 .route("/api/v1/pipelines", post(super::api::register_pipeline))
290 .route("/api/v1/pipelines", get(super::api::list_pipelines))
291 .route(
292 "/api/v1/pipelines/stats",
293 get(super::api::all_pipeline_stats),
294 )
295 .route(
296 "/api/v1/pipelines/{pipeline_id}",
297 get(super::api::get_pipeline),
298 )
299 .route(
300 "/api/v1/pipelines/{pipeline_id}",
301 delete(super::api::remove_pipeline),
302 )
303 .route(
304 "/api/v1/pipelines/{pipeline_id}/stats",
305 get(super::api::get_pipeline_stats),
306 )
307 .route(
308 "/api/v1/pipelines/{pipeline_id}/reset",
309 put(super::api::reset_pipeline),
310 )
311 .route("/api/v1/projections", get(super::api::list_projections))
313 .route(
314 "/api/v1/projections/{name}",
315 get(super::api::get_projection),
316 )
317 .route(
318 "/api/v1/projections/{name}/{entity_id}/state",
319 get(super::api::get_projection_state),
320 )
321 .route(
322 "/api/v1/projections/{name}/{entity_id}/state",
323 post(super::api::save_projection_state),
324 )
325 .route(
326 "/api/v1/projections/{name}/{entity_id}/state",
327 put(super::api::save_projection_state),
328 )
329 .route(
330 "/api/v1/projections/{name}/bulk",
331 post(super::api::bulk_get_projection_states),
332 )
333 .route(
334 "/api/v1/projections/{name}/bulk/save",
335 post(super::api::bulk_save_projection_states),
336 )
337 .route("/internal/promote", post(promote_handler))
339 .route("/internal/repoint", post(repoint_handler))
340 .with_state(app_state.clone())
341 .layer(middleware::from_fn_with_state(
344 app_state,
345 read_only_middleware,
346 ))
347 .layer(middleware::from_fn_with_state(
348 rate_limit_state,
349 rate_limit_middleware,
350 ))
351 .layer(middleware::from_fn_with_state(auth_state, auth_middleware))
352 .layer(
353 CorsLayer::new()
354 .allow_origin(Any)
355 .allow_methods(Any)
356 .allow_headers(Any),
357 )
358 .layer(TraceLayer::new_for_http());
359
360 let listener = tokio::net::TcpListener::bind(addr).await?;
361
362 axum::serve(listener, app)
364 .with_graceful_shutdown(shutdown_signal())
365 .await?;
366
367 tracing::info!("🛑 AllSource Core shutdown complete");
368 Ok(())
369}
370
371const WRITE_PATHS: &[&str] = &[
373 "/api/v1/events",
374 "/api/v1/events/batch",
375 "/api/v1/snapshots",
376 "/api/v1/projections/",
377 "/api/v1/schemas",
378 "/api/v1/replay",
379 "/api/v1/pipelines",
380 "/api/v1/compaction/trigger",
381 "/api/v1/audit/events",
382 "/api/v1/config",
383];
384
385fn is_write_request(method: &axum::http::Method, path: &str) -> bool {
387 use axum::http::Method;
388 if method != Method::POST && method != Method::PUT && method != Method::DELETE {
390 return false;
391 }
392 WRITE_PATHS
393 .iter()
394 .any(|write_path| path.starts_with(write_path))
395}
396
397fn is_internal_request(path: &str) -> bool {
399 path.starts_with("/internal/")
400}
401
402async fn read_only_middleware(
408 State(state): State<AppState>,
409 request: axum::extract::Request,
410 next: axum::middleware::Next,
411) -> axum::response::Response {
412 let path = request.uri().path();
413 if state.role.load().is_follower()
414 && is_write_request(request.method(), path)
415 && !is_internal_request(path)
416 {
417 return (
418 axum::http::StatusCode::CONFLICT,
419 axum::Json(serde_json::json!({
420 "error": "read_only",
421 "message": "This node is a read-only follower"
422 })),
423 )
424 .into_response();
425 }
426 next.run(request).await
427}
428
429async fn health_v1(State(state): State<AppState>) -> impl IntoResponse {
434 let has_system_repos = state.service_container.has_system_repositories();
435
436 let system_streams = if has_system_repos {
437 let (tenant_count, config_count, total_events) =
438 if let Some(store) = state.service_container.system_store() {
439 use crate::domain::value_objects::system_stream::SystemDomain;
440 (
441 store.count_stream(SystemDomain::Tenant),
442 store.count_stream(SystemDomain::Config),
443 store.total_events(),
444 )
445 } else {
446 (0, 0, 0)
447 };
448
449 serde_json::json!({
450 "status": "healthy",
451 "mode": "event-sourced",
452 "total_events": total_events,
453 "tenant_events": tenant_count,
454 "config_events": config_count,
455 })
456 } else {
457 serde_json::json!({
458 "status": "disabled",
459 "mode": "in-memory",
460 })
461 };
462
463 let replication = {
464 let shipper_guard = state.wal_shipper.read().await;
465 if let Some(ref shipper) = *shipper_guard {
466 serde_json::to_value(shipper.status()).unwrap_or_default()
467 } else if let Some(ref receiver) = state.wal_receiver {
468 serde_json::to_value(receiver.status()).unwrap_or_default()
469 } else {
470 serde_json::json!(null)
471 }
472 };
473
474 let current_role = state.role.load();
475
476 Json(serde_json::json!({
477 "status": "healthy",
478 "service": "allsource-core",
479 "version": env!("CARGO_PKG_VERSION"),
480 "role": current_role,
481 "system_streams": system_streams,
482 "replication": replication,
483 }))
484}
485
486async fn promote_handler(State(state): State<AppState>) -> impl IntoResponse {
492 let current_role = state.role.load();
493 if current_role == NodeRole::Leader {
494 return (
495 axum::http::StatusCode::OK,
496 Json(serde_json::json!({
497 "status": "already_leader",
498 "message": "This node is already the leader",
499 })),
500 );
501 }
502
503 tracing::info!("PROMOTE: Switching role from follower to leader");
504
505 state.role.store(NodeRole::Leader);
507
508 if let Some(ref receiver) = state.wal_receiver {
510 receiver.shutdown();
511 tracing::info!("PROMOTE: WAL receiver shutdown signalled");
512 }
513
514 let replication_port = state.replication_port;
516 let (mut shipper, tx) = WalShipper::new();
517 state.store.enable_wal_replication(tx);
518 shipper.set_store(Arc::clone(&state.store));
519 shipper.set_metrics(state.store.metrics());
520 let shipper = Arc::new(shipper);
521
522 {
524 let mut shipper_guard = state.wal_shipper.write().await;
525 *shipper_guard = Some(Arc::clone(&shipper));
526 }
527
528 let shipper_clone = Arc::clone(&shipper);
530 tokio::spawn(async move {
531 if let Err(e) = shipper_clone.serve(replication_port).await {
532 tracing::error!("Promoted WAL shipper error: {}", e);
533 }
534 });
535
536 tracing::info!(
537 "PROMOTE: Now accepting writes. WAL shipper listening on port {}",
538 replication_port,
539 );
540
541 (
542 axum::http::StatusCode::OK,
543 Json(serde_json::json!({
544 "status": "promoted",
545 "role": "leader",
546 "replication_port": replication_port,
547 })),
548 )
549}
550
551async fn repoint_handler(
556 State(state): State<AppState>,
557 axum::extract::Query(params): axum::extract::Query<std::collections::HashMap<String, String>>,
558) -> impl IntoResponse {
559 let current_role = state.role.load();
560 if current_role != NodeRole::Follower {
561 return (
562 axum::http::StatusCode::CONFLICT,
563 Json(serde_json::json!({
564 "error": "not_follower",
565 "message": "Repoint only applies to follower nodes",
566 })),
567 );
568 }
569
570 let new_leader = match params.get("leader") {
571 Some(l) if !l.is_empty() => l.clone(),
572 _ => {
573 return (
574 axum::http::StatusCode::BAD_REQUEST,
575 Json(serde_json::json!({
576 "error": "missing_leader",
577 "message": "Query parameter 'leader' is required (e.g. ?leader=new-leader:3910)",
578 })),
579 );
580 }
581 };
582
583 tracing::info!("REPOINT: Switching replication target to {}", new_leader);
584
585 if let Some(ref receiver) = state.wal_receiver {
586 receiver.repoint(&new_leader);
587 tracing::info!("REPOINT: WAL receiver repointed to {}", new_leader);
588 } else {
589 tracing::warn!("REPOINT: No WAL receiver to repoint");
590 }
591
592 (
593 axum::http::StatusCode::OK,
594 Json(serde_json::json!({
595 "status": "repointed",
596 "new_leader": new_leader,
597 })),
598 )
599}
600
601async fn shutdown_signal() {
603 let ctrl_c = async {
604 tokio::signal::ctrl_c()
605 .await
606 .expect("failed to install Ctrl+C handler");
607 };
608
609 #[cfg(unix)]
610 let terminate = async {
611 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
612 .expect("failed to install SIGTERM handler")
613 .recv()
614 .await;
615 };
616
617 #[cfg(not(unix))]
618 let terminate = std::future::pending::<()>();
619
620 tokio::select! {
621 _ = ctrl_c => {
622 tracing::info!("📤 Received Ctrl+C, initiating graceful shutdown...");
623 }
624 _ = terminate => {
625 tracing::info!("📤 Received SIGTERM, initiating graceful shutdown...");
626 }
627 }
628}