1use crate::analytics::{
2 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
3 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
4};
5use crate::compaction::CompactionResult;
6use crate::domain::entities::Event;
7use crate::error::Result;
8use crate::application::dto::{IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse, EventDto};
9use crate::pipeline::{PipelineConfig, PipelineStats};
10use crate::replay::{ReplayProgress, StartReplayRequest, StartReplayResponse};
11use crate::schema::{
12 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
13 ValidateEventResponse,
14};
15use crate::snapshot::{
16 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
17 SnapshotInfo,
18};
19use crate::store::EventStore;
20use axum::{
21 extract::{Path, Query, State, WebSocketUpgrade},
22 response::{IntoResponse, Response},
23 routing::{get, post, put},
24 Json, Router,
25};
26use serde::Deserialize;
27use std::sync::Arc;
28use tower_http::cors::{Any, CorsLayer};
29use tower_http::trace::TraceLayer;
30
31type SharedStore = Arc<EventStore>;
32
33pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
34 let app = Router::new()
35 .route("/health", get(health))
36 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
38 .route("/api/v1/events/query", get(query_events))
39 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/entities/:entity_id/state", get(get_entity_state))
41 .route("/api/v1/entities/:entity_id/snapshot", get(get_entity_snapshot))
42 .route("/api/v1/stats", get(get_stats))
43 .route("/api/v1/analytics/frequency", get(analytics_frequency))
45 .route("/api/v1/analytics/summary", get(analytics_summary))
46 .route("/api/v1/analytics/correlation", get(analytics_correlation))
47 .route("/api/v1/snapshots", post(create_snapshot))
49 .route("/api/v1/snapshots", get(list_snapshots))
50 .route("/api/v1/snapshots/:entity_id/latest", get(get_latest_snapshot))
51 .route("/api/v1/compaction/trigger", post(trigger_compaction))
53 .route("/api/v1/compaction/stats", get(compaction_stats))
54 .route("/api/v1/schemas", post(register_schema))
56 .route("/api/v1/schemas", get(list_subjects))
57 .route("/api/v1/schemas/:subject", get(get_schema))
58 .route("/api/v1/schemas/:subject/versions", get(list_schema_versions))
59 .route("/api/v1/schemas/validate", post(validate_event_schema))
60 .route("/api/v1/schemas/:subject/compatibility", put(set_compatibility_mode))
61 .route("/api/v1/replay", post(start_replay))
63 .route("/api/v1/replay", get(list_replays))
64 .route("/api/v1/replay/:replay_id", get(get_replay_progress))
65 .route("/api/v1/replay/:replay_id/cancel", post(cancel_replay))
66 .route("/api/v1/replay/:replay_id", axum::routing::delete(delete_replay))
67 .route("/api/v1/pipelines", post(register_pipeline))
69 .route("/api/v1/pipelines", get(list_pipelines))
70 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
71 .route("/api/v1/pipelines/:pipeline_id", get(get_pipeline))
72 .route("/api/v1/pipelines/:pipeline_id", axum::routing::delete(remove_pipeline))
73 .route("/api/v1/pipelines/:pipeline_id/stats", get(get_pipeline_stats))
74 .route("/api/v1/pipelines/:pipeline_id/reset", put(reset_pipeline))
75 .layer(
76 CorsLayer::new()
77 .allow_origin(Any)
78 .allow_methods(Any)
79 .allow_headers(Any),
80 )
81 .layer(TraceLayer::new_for_http())
82 .with_state(store);
83
84 let listener = tokio::net::TcpListener::bind(addr).await?;
85 axum::serve(listener, app).await?;
86
87 Ok(())
88}
89
90pub async fn health() -> impl IntoResponse {
91 Json(serde_json::json!({
92 "status": "healthy",
93 "service": "allsource-core",
94 "version": env!("CARGO_PKG_VERSION")
95 }))
96}
97
98pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
100 let metrics = store.metrics();
101
102 match metrics.encode() {
103 Ok(encoded) => Response::builder()
104 .status(200)
105 .header("Content-Type", "text/plain; version=0.0.4")
106 .body(encoded)
107 .unwrap()
108 .into_response(),
109 Err(e) => Response::builder()
110 .status(500)
111 .body(format!("Error encoding metrics: {}", e))
112 .unwrap()
113 .into_response(),
114 }
115}
116
117pub async fn ingest_event(
118 State(store): State<SharedStore>,
119 Json(req): Json<IngestEventRequest>,
120) -> Result<Json<IngestEventResponse>> {
121 let event = Event::from_strings(
123 req.event_type,
124 req.entity_id,
125 "default".to_string(),
126 req.payload,
127 req.metadata,
128 )?;
129
130 let event_id = event.id;
131 let timestamp = event.timestamp;
132
133 store.ingest(event)?;
134
135 tracing::info!("Event ingested: {}", event_id);
136
137 Ok(Json(IngestEventResponse {
138 event_id,
139 timestamp,
140 }))
141}
142
143pub async fn query_events(
144 State(store): State<SharedStore>,
145 Query(req): Query<QueryEventsRequest>,
146) -> Result<Json<QueryEventsResponse>> {
147 let domain_events = store.query(req)?;
148 let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
149 let count = events.len();
150
151 tracing::debug!("Query returned {} events", count);
152
153 Ok(Json(QueryEventsResponse { events, count }))
154}
155
156#[derive(Deserialize)]
157pub struct EntityStateParams {
158 as_of: Option<chrono::DateTime<chrono::Utc>>,
159}
160
161pub async fn get_entity_state(
162 State(store): State<SharedStore>,
163 Path(entity_id): Path<String>,
164 Query(params): Query<EntityStateParams>,
165) -> Result<Json<serde_json::Value>> {
166 let state = store.reconstruct_state(&entity_id, params.as_of)?;
167
168 tracing::info!("State reconstructed for entity: {}", entity_id);
169
170 Ok(Json(state))
171}
172
173pub async fn get_entity_snapshot(
174 State(store): State<SharedStore>,
175 Path(entity_id): Path<String>,
176) -> Result<Json<serde_json::Value>> {
177 let snapshot = store.get_snapshot(&entity_id)?;
178
179 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
180
181 Ok(Json(snapshot))
182}
183
184pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
185 let stats = store.stats();
186 Json(stats)
187}
188
189pub async fn events_websocket(
191 ws: WebSocketUpgrade,
192 State(store): State<SharedStore>,
193) -> Response {
194 let websocket_manager = store.websocket_manager();
195
196 ws.on_upgrade(move |socket| async move {
197 websocket_manager.handle_socket(socket).await;
198 })
199}
200
201pub async fn analytics_frequency(
203 State(store): State<SharedStore>,
204 Query(req): Query<EventFrequencyRequest>,
205) -> Result<Json<EventFrequencyResponse>> {
206 let response = AnalyticsEngine::event_frequency(&store, req)?;
207
208 tracing::debug!(
209 "Frequency analysis returned {} buckets",
210 response.buckets.len()
211 );
212
213 Ok(Json(response))
214}
215
216pub async fn analytics_summary(
218 State(store): State<SharedStore>,
219 Query(req): Query<StatsSummaryRequest>,
220) -> Result<Json<StatsSummaryResponse>> {
221 let response = AnalyticsEngine::stats_summary(&store, req)?;
222
223 tracing::debug!(
224 "Stats summary: {} events across {} entities",
225 response.total_events,
226 response.unique_entities
227 );
228
229 Ok(Json(response))
230}
231
232pub async fn analytics_correlation(
234 State(store): State<SharedStore>,
235 Query(req): Query<CorrelationRequest>,
236) -> Result<Json<CorrelationResponse>> {
237 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
238
239 tracing::debug!(
240 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
241 response.correlated_pairs,
242 response.total_a,
243 response.correlation_percentage
244 );
245
246 Ok(Json(response))
247}
248
249pub async fn create_snapshot(
251 State(store): State<SharedStore>,
252 Json(req): Json<CreateSnapshotRequest>,
253) -> Result<Json<CreateSnapshotResponse>> {
254 store.create_snapshot(&req.entity_id)?;
255
256 let snapshot_manager = store.snapshot_manager();
257 let snapshot = snapshot_manager
258 .get_latest_snapshot(&req.entity_id)
259 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
260
261 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
262
263 Ok(Json(CreateSnapshotResponse {
264 snapshot_id: snapshot.id,
265 entity_id: snapshot.entity_id,
266 created_at: snapshot.created_at,
267 event_count: snapshot.event_count,
268 size_bytes: snapshot.metadata.size_bytes,
269 }))
270}
271
272pub async fn list_snapshots(
274 State(store): State<SharedStore>,
275 Query(req): Query<ListSnapshotsRequest>,
276) -> Result<Json<ListSnapshotsResponse>> {
277 let snapshot_manager = store.snapshot_manager();
278
279 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
280 snapshot_manager
281 .get_all_snapshots(&entity_id)
282 .into_iter()
283 .map(SnapshotInfo::from)
284 .collect()
285 } else {
286 let entities = snapshot_manager.list_entities();
288 entities
289 .iter()
290 .flat_map(|entity_id| {
291 snapshot_manager
292 .get_all_snapshots(entity_id)
293 .into_iter()
294 .map(SnapshotInfo::from)
295 })
296 .collect()
297 };
298
299 let total = snapshots.len();
300
301 tracing::debug!("Listed {} snapshots", total);
302
303 Ok(Json(ListSnapshotsResponse { snapshots, total }))
304}
305
306pub async fn get_latest_snapshot(
308 State(store): State<SharedStore>,
309 Path(entity_id): Path<String>,
310) -> Result<Json<serde_json::Value>> {
311 let snapshot_manager = store.snapshot_manager();
312
313 let snapshot = snapshot_manager
314 .get_latest_snapshot(&entity_id)
315 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
316
317 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
318
319 Ok(Json(serde_json::json!({
320 "snapshot_id": snapshot.id,
321 "entity_id": snapshot.entity_id,
322 "created_at": snapshot.created_at,
323 "as_of": snapshot.as_of,
324 "event_count": snapshot.event_count,
325 "size_bytes": snapshot.metadata.size_bytes,
326 "snapshot_type": snapshot.metadata.snapshot_type,
327 "state": snapshot.state
328 })))
329}
330
331pub async fn trigger_compaction(State(store): State<SharedStore>) -> Result<Json<CompactionResult>> {
333 let compaction_manager = store
334 .compaction_manager()
335 .ok_or_else(|| crate::error::AllSourceError::InternalError(
336 "Compaction not enabled (no Parquet storage)".to_string()
337 ))?;
338
339 tracing::info!("📦 Manual compaction triggered via API");
340
341 let result = compaction_manager.compact_now()?;
342
343 Ok(Json(result))
344}
345
346pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
348 let compaction_manager = store
349 .compaction_manager()
350 .ok_or_else(|| crate::error::AllSourceError::InternalError(
351 "Compaction not enabled (no Parquet storage)".to_string()
352 ))?;
353
354 let stats = compaction_manager.stats();
355 let config = compaction_manager.config();
356
357 Ok(Json(serde_json::json!({
358 "stats": stats,
359 "config": {
360 "min_files_to_compact": config.min_files_to_compact,
361 "target_file_size": config.target_file_size,
362 "max_file_size": config.max_file_size,
363 "small_file_threshold": config.small_file_threshold,
364 "compaction_interval_seconds": config.compaction_interval_seconds,
365 "auto_compact": config.auto_compact,
366 "strategy": config.strategy
367 }
368 })))
369}
370
371pub async fn register_schema(
373 State(store): State<SharedStore>,
374 Json(req): Json<RegisterSchemaRequest>,
375) -> Result<Json<RegisterSchemaResponse>> {
376 let schema_registry = store.schema_registry();
377
378 let response = schema_registry.register_schema(
379 req.subject,
380 req.schema,
381 req.description,
382 req.tags,
383 )?;
384
385 tracing::info!("📋 Schema registered: v{} for '{}'", response.version, response.subject);
386
387 Ok(Json(response))
388}
389
390#[derive(Deserialize)]
392pub struct GetSchemaParams {
393 version: Option<u32>,
394}
395
396pub async fn get_schema(
397 State(store): State<SharedStore>,
398 Path(subject): Path<String>,
399 Query(params): Query<GetSchemaParams>,
400) -> Result<Json<serde_json::Value>> {
401 let schema_registry = store.schema_registry();
402
403 let schema = schema_registry.get_schema(&subject, params.version)?;
404
405 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
406
407 Ok(Json(serde_json::json!({
408 "id": schema.id,
409 "subject": schema.subject,
410 "version": schema.version,
411 "schema": schema.schema,
412 "created_at": schema.created_at,
413 "description": schema.description,
414 "tags": schema.tags
415 })))
416}
417
418pub async fn list_schema_versions(
420 State(store): State<SharedStore>,
421 Path(subject): Path<String>,
422) -> Result<Json<serde_json::Value>> {
423 let schema_registry = store.schema_registry();
424
425 let versions = schema_registry.list_versions(&subject)?;
426
427 Ok(Json(serde_json::json!({
428 "subject": subject,
429 "versions": versions
430 })))
431}
432
433pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
435 let schema_registry = store.schema_registry();
436
437 let subjects = schema_registry.list_subjects();
438
439 Json(serde_json::json!({
440 "subjects": subjects,
441 "total": subjects.len()
442 }))
443}
444
445pub async fn validate_event_schema(
447 State(store): State<SharedStore>,
448 Json(req): Json<ValidateEventRequest>,
449) -> Result<Json<ValidateEventResponse>> {
450 let schema_registry = store.schema_registry();
451
452 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
453
454 if response.valid {
455 tracing::debug!("✅ Event validated against schema '{}' v{}", req.subject, response.schema_version);
456 } else {
457 tracing::warn!("❌ Event validation failed for '{}': {:?}", req.subject, response.errors);
458 }
459
460 Ok(Json(response))
461}
462
463#[derive(Deserialize)]
465pub struct SetCompatibilityRequest {
466 compatibility: CompatibilityMode,
467}
468
469pub async fn set_compatibility_mode(
470 State(store): State<SharedStore>,
471 Path(subject): Path<String>,
472 Json(req): Json<SetCompatibilityRequest>,
473) -> Json<serde_json::Value> {
474 let schema_registry = store.schema_registry();
475
476 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
477
478 tracing::info!("🔧 Set compatibility mode for '{}' to {:?}", subject, req.compatibility);
479
480 Json(serde_json::json!({
481 "subject": subject,
482 "compatibility": req.compatibility
483 }))
484}
485
486pub async fn start_replay(
488 State(store): State<SharedStore>,
489 Json(req): Json<StartReplayRequest>,
490) -> Result<Json<StartReplayResponse>> {
491 let replay_manager = store.replay_manager();
492
493 let response = replay_manager.start_replay(store, req)?;
494
495 tracing::info!("🔄 Started replay {} with {} events", response.replay_id, response.total_events);
496
497 Ok(Json(response))
498}
499
500pub async fn get_replay_progress(
502 State(store): State<SharedStore>,
503 Path(replay_id): Path<uuid::Uuid>,
504) -> Result<Json<ReplayProgress>> {
505 let replay_manager = store.replay_manager();
506
507 let progress = replay_manager.get_progress(replay_id)?;
508
509 Ok(Json(progress))
510}
511
512pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
514 let replay_manager = store.replay_manager();
515
516 let replays = replay_manager.list_replays();
517
518 Json(serde_json::json!({
519 "replays": replays,
520 "total": replays.len()
521 }))
522}
523
524pub async fn cancel_replay(
526 State(store): State<SharedStore>,
527 Path(replay_id): Path<uuid::Uuid>,
528) -> Result<Json<serde_json::Value>> {
529 let replay_manager = store.replay_manager();
530
531 replay_manager.cancel_replay(replay_id)?;
532
533 tracing::info!("🛑 Cancelled replay {}", replay_id);
534
535 Ok(Json(serde_json::json!({
536 "replay_id": replay_id,
537 "status": "cancelled"
538 })))
539}
540
541pub async fn delete_replay(
543 State(store): State<SharedStore>,
544 Path(replay_id): Path<uuid::Uuid>,
545) -> Result<Json<serde_json::Value>> {
546 let replay_manager = store.replay_manager();
547
548 let deleted = replay_manager.delete_replay(replay_id)?;
549
550 if deleted {
551 tracing::info!("🗑️ Deleted replay {}", replay_id);
552 }
553
554 Ok(Json(serde_json::json!({
555 "replay_id": replay_id,
556 "deleted": deleted
557 })))
558}
559
560pub async fn register_pipeline(
562 State(store): State<SharedStore>,
563 Json(config): Json<PipelineConfig>,
564) -> Result<Json<serde_json::Value>> {
565 let pipeline_manager = store.pipeline_manager();
566
567 let pipeline_id = pipeline_manager.register(config.clone());
568
569 tracing::info!(
570 "🔀 Pipeline registered: {} (name: {})",
571 pipeline_id,
572 config.name
573 );
574
575 Ok(Json(serde_json::json!({
576 "pipeline_id": pipeline_id,
577 "name": config.name,
578 "enabled": config.enabled
579 })))
580}
581
582pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
584 let pipeline_manager = store.pipeline_manager();
585
586 let pipelines = pipeline_manager.list();
587
588 tracing::debug!("Listed {} pipelines", pipelines.len());
589
590 Json(serde_json::json!({
591 "pipelines": pipelines,
592 "total": pipelines.len()
593 }))
594}
595
596pub async fn get_pipeline(
598 State(store): State<SharedStore>,
599 Path(pipeline_id): Path<uuid::Uuid>,
600) -> Result<Json<PipelineConfig>> {
601 let pipeline_manager = store.pipeline_manager();
602
603 let pipeline = pipeline_manager
604 .get(pipeline_id)
605 .ok_or_else(|| crate::error::AllSourceError::ValidationError(
606 format!("Pipeline not found: {}", pipeline_id)
607 ))?;
608
609 Ok(Json(pipeline.config().clone()))
610}
611
612pub async fn remove_pipeline(
614 State(store): State<SharedStore>,
615 Path(pipeline_id): Path<uuid::Uuid>,
616) -> Result<Json<serde_json::Value>> {
617 let pipeline_manager = store.pipeline_manager();
618
619 let removed = pipeline_manager.remove(pipeline_id);
620
621 if removed {
622 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
623 }
624
625 Ok(Json(serde_json::json!({
626 "pipeline_id": pipeline_id,
627 "removed": removed
628 })))
629}
630
631pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
633 let pipeline_manager = store.pipeline_manager();
634
635 let stats = pipeline_manager.all_stats();
636
637 Json(serde_json::json!({
638 "stats": stats,
639 "total": stats.len()
640 }))
641}
642
643pub async fn get_pipeline_stats(
645 State(store): State<SharedStore>,
646 Path(pipeline_id): Path<uuid::Uuid>,
647) -> Result<Json<PipelineStats>> {
648 let pipeline_manager = store.pipeline_manager();
649
650 let pipeline = pipeline_manager
651 .get(pipeline_id)
652 .ok_or_else(|| crate::error::AllSourceError::ValidationError(
653 format!("Pipeline not found: {}", pipeline_id)
654 ))?;
655
656 Ok(Json(pipeline.stats()))
657}
658
659pub async fn reset_pipeline(
661 State(store): State<SharedStore>,
662 Path(pipeline_id): Path<uuid::Uuid>,
663) -> Result<Json<serde_json::Value>> {
664 let pipeline_manager = store.pipeline_manager();
665
666 let pipeline = pipeline_manager
667 .get(pipeline_id)
668 .ok_or_else(|| crate::error::AllSourceError::ValidationError(
669 format!("Pipeline not found: {}", pipeline_id)
670 ))?;
671
672 pipeline.reset();
673
674 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
675
676 Ok(Json(serde_json::json!({
677 "pipeline_id": pipeline_id,
678 "reset": true
679 })))
680}