allsource_core/
api.rs

1use crate::analytics::{
2    AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
3    EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
4};
5use crate::compaction::CompactionResult;
6use crate::domain::entities::Event;
7use crate::error::Result;
8use crate::application::dto::{IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse, EventDto};
9use crate::pipeline::{PipelineConfig, PipelineStats};
10use crate::replay::{ReplayProgress, StartReplayRequest, StartReplayResponse};
11use crate::schema::{
12    CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
13    ValidateEventResponse,
14};
15use crate::snapshot::{
16    CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
17    SnapshotInfo,
18};
19use crate::store::EventStore;
20use axum::{
21    extract::{Path, Query, State, WebSocketUpgrade},
22    response::{IntoResponse, Response},
23    routing::{get, post, put},
24    Json, Router,
25};
26use serde::Deserialize;
27use std::sync::Arc;
28use tower_http::cors::{Any, CorsLayer};
29use tower_http::trace::TraceLayer;
30
31type SharedStore = Arc<EventStore>;
32
33pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
34    let app = Router::new()
35        .route("/health", get(health))
36        .route("/metrics", get(prometheus_metrics))  // v0.6: Prometheus metrics endpoint
37        .route("/api/v1/events", post(ingest_event))
38        .route("/api/v1/events/query", get(query_events))
39        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
40        .route("/api/v1/entities/:entity_id/state", get(get_entity_state))
41        .route("/api/v1/entities/:entity_id/snapshot", get(get_entity_snapshot))
42        .route("/api/v1/stats", get(get_stats))
43        // v0.2: Advanced analytics endpoints
44        .route("/api/v1/analytics/frequency", get(analytics_frequency))
45        .route("/api/v1/analytics/summary", get(analytics_summary))
46        .route("/api/v1/analytics/correlation", get(analytics_correlation))
47        // v0.2: Snapshot management endpoints
48        .route("/api/v1/snapshots", post(create_snapshot))
49        .route("/api/v1/snapshots", get(list_snapshots))
50        .route("/api/v1/snapshots/:entity_id/latest", get(get_latest_snapshot))
51        // v0.2: Compaction endpoints
52        .route("/api/v1/compaction/trigger", post(trigger_compaction))
53        .route("/api/v1/compaction/stats", get(compaction_stats))
54        // v0.5: Schema registry endpoints
55        .route("/api/v1/schemas", post(register_schema))
56        .route("/api/v1/schemas", get(list_subjects))
57        .route("/api/v1/schemas/:subject", get(get_schema))
58        .route("/api/v1/schemas/:subject/versions", get(list_schema_versions))
59        .route("/api/v1/schemas/validate", post(validate_event_schema))
60        .route("/api/v1/schemas/:subject/compatibility", put(set_compatibility_mode))
61        // v0.5: Replay and projection rebuild endpoints
62        .route("/api/v1/replay", post(start_replay))
63        .route("/api/v1/replay", get(list_replays))
64        .route("/api/v1/replay/:replay_id", get(get_replay_progress))
65        .route("/api/v1/replay/:replay_id/cancel", post(cancel_replay))
66        .route("/api/v1/replay/:replay_id", axum::routing::delete(delete_replay))
67        // v0.5: Stream processing pipeline endpoints
68        .route("/api/v1/pipelines", post(register_pipeline))
69        .route("/api/v1/pipelines", get(list_pipelines))
70        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
71        .route("/api/v1/pipelines/:pipeline_id", get(get_pipeline))
72        .route("/api/v1/pipelines/:pipeline_id", axum::routing::delete(remove_pipeline))
73        .route("/api/v1/pipelines/:pipeline_id/stats", get(get_pipeline_stats))
74        .route("/api/v1/pipelines/:pipeline_id/reset", put(reset_pipeline))
75        .layer(
76            CorsLayer::new()
77                .allow_origin(Any)
78                .allow_methods(Any)
79                .allow_headers(Any),
80        )
81        .layer(TraceLayer::new_for_http())
82        .with_state(store);
83
84    let listener = tokio::net::TcpListener::bind(addr).await?;
85    axum::serve(listener, app).await?;
86
87    Ok(())
88}
89
90pub async fn health() -> impl IntoResponse {
91    Json(serde_json::json!({
92        "status": "healthy",
93        "service": "allsource-core",
94        "version": env!("CARGO_PKG_VERSION")
95    }))
96}
97
98// v0.6: Prometheus metrics endpoint
99pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
100    let metrics = store.metrics();
101
102    match metrics.encode() {
103        Ok(encoded) => Response::builder()
104            .status(200)
105            .header("Content-Type", "text/plain; version=0.0.4")
106            .body(encoded)
107            .unwrap()
108            .into_response(),
109        Err(e) => Response::builder()
110            .status(500)
111            .body(format!("Error encoding metrics: {}", e))
112            .unwrap()
113            .into_response(),
114    }
115}
116
117pub async fn ingest_event(
118    State(store): State<SharedStore>,
119    Json(req): Json<IngestEventRequest>,
120) -> Result<Json<IngestEventResponse>> {
121    // Create event using from_strings with default tenant
122    let event = Event::from_strings(
123        req.event_type,
124        req.entity_id,
125        "default".to_string(),
126        req.payload,
127        req.metadata,
128    )?;
129
130    let event_id = event.id;
131    let timestamp = event.timestamp;
132
133    store.ingest(event)?;
134
135    tracing::info!("Event ingested: {}", event_id);
136
137    Ok(Json(IngestEventResponse {
138        event_id,
139        timestamp,
140    }))
141}
142
143pub async fn query_events(
144    State(store): State<SharedStore>,
145    Query(req): Query<QueryEventsRequest>,
146) -> Result<Json<QueryEventsResponse>> {
147    let domain_events = store.query(req)?;
148    let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
149    let count = events.len();
150
151    tracing::debug!("Query returned {} events", count);
152
153    Ok(Json(QueryEventsResponse { events, count }))
154}
155
156#[derive(Deserialize)]
157pub struct EntityStateParams {
158    as_of: Option<chrono::DateTime<chrono::Utc>>,
159}
160
161pub async fn get_entity_state(
162    State(store): State<SharedStore>,
163    Path(entity_id): Path<String>,
164    Query(params): Query<EntityStateParams>,
165) -> Result<Json<serde_json::Value>> {
166    let state = store.reconstruct_state(&entity_id, params.as_of)?;
167
168    tracing::info!("State reconstructed for entity: {}", entity_id);
169
170    Ok(Json(state))
171}
172
173pub async fn get_entity_snapshot(
174    State(store): State<SharedStore>,
175    Path(entity_id): Path<String>,
176) -> Result<Json<serde_json::Value>> {
177    let snapshot = store.get_snapshot(&entity_id)?;
178
179    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
180
181    Ok(Json(snapshot))
182}
183
184pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
185    let stats = store.stats();
186    Json(stats)
187}
188
189// v0.2: WebSocket endpoint for real-time event streaming
190pub async fn events_websocket(
191    ws: WebSocketUpgrade,
192    State(store): State<SharedStore>,
193) -> Response {
194    let websocket_manager = store.websocket_manager();
195
196    ws.on_upgrade(move |socket| async move {
197        websocket_manager.handle_socket(socket).await;
198    })
199}
200
201// v0.2: Event frequency analytics endpoint
202pub async fn analytics_frequency(
203    State(store): State<SharedStore>,
204    Query(req): Query<EventFrequencyRequest>,
205) -> Result<Json<EventFrequencyResponse>> {
206    let response = AnalyticsEngine::event_frequency(&store, req)?;
207
208    tracing::debug!(
209        "Frequency analysis returned {} buckets",
210        response.buckets.len()
211    );
212
213    Ok(Json(response))
214}
215
216// v0.2: Statistical summary endpoint
217pub async fn analytics_summary(
218    State(store): State<SharedStore>,
219    Query(req): Query<StatsSummaryRequest>,
220) -> Result<Json<StatsSummaryResponse>> {
221    let response = AnalyticsEngine::stats_summary(&store, req)?;
222
223    tracing::debug!(
224        "Stats summary: {} events across {} entities",
225        response.total_events,
226        response.unique_entities
227    );
228
229    Ok(Json(response))
230}
231
232// v0.2: Event correlation analysis endpoint
233pub async fn analytics_correlation(
234    State(store): State<SharedStore>,
235    Query(req): Query<CorrelationRequest>,
236) -> Result<Json<CorrelationResponse>> {
237    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
238
239    tracing::debug!(
240        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
241        response.correlated_pairs,
242        response.total_a,
243        response.correlation_percentage
244    );
245
246    Ok(Json(response))
247}
248
249// v0.2: Create a snapshot for an entity
250pub async fn create_snapshot(
251    State(store): State<SharedStore>,
252    Json(req): Json<CreateSnapshotRequest>,
253) -> Result<Json<CreateSnapshotResponse>> {
254    store.create_snapshot(&req.entity_id)?;
255
256    let snapshot_manager = store.snapshot_manager();
257    let snapshot = snapshot_manager
258        .get_latest_snapshot(&req.entity_id)
259        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
260
261    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
262
263    Ok(Json(CreateSnapshotResponse {
264        snapshot_id: snapshot.id,
265        entity_id: snapshot.entity_id,
266        created_at: snapshot.created_at,
267        event_count: snapshot.event_count,
268        size_bytes: snapshot.metadata.size_bytes,
269    }))
270}
271
272// v0.2: List snapshots
273pub async fn list_snapshots(
274    State(store): State<SharedStore>,
275    Query(req): Query<ListSnapshotsRequest>,
276) -> Result<Json<ListSnapshotsResponse>> {
277    let snapshot_manager = store.snapshot_manager();
278
279    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
280        snapshot_manager
281            .get_all_snapshots(&entity_id)
282            .into_iter()
283            .map(SnapshotInfo::from)
284            .collect()
285    } else {
286        // List all entities with snapshots
287        let entities = snapshot_manager.list_entities();
288        entities
289            .iter()
290            .flat_map(|entity_id| {
291                snapshot_manager
292                    .get_all_snapshots(entity_id)
293                    .into_iter()
294                    .map(SnapshotInfo::from)
295            })
296            .collect()
297    };
298
299    let total = snapshots.len();
300
301    tracing::debug!("Listed {} snapshots", total);
302
303    Ok(Json(ListSnapshotsResponse { snapshots, total }))
304}
305
306// v0.2: Get latest snapshot for an entity
307pub async fn get_latest_snapshot(
308    State(store): State<SharedStore>,
309    Path(entity_id): Path<String>,
310) -> Result<Json<serde_json::Value>> {
311    let snapshot_manager = store.snapshot_manager();
312
313    let snapshot = snapshot_manager
314        .get_latest_snapshot(&entity_id)
315        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
316
317    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
318
319    Ok(Json(serde_json::json!({
320        "snapshot_id": snapshot.id,
321        "entity_id": snapshot.entity_id,
322        "created_at": snapshot.created_at,
323        "as_of": snapshot.as_of,
324        "event_count": snapshot.event_count,
325        "size_bytes": snapshot.metadata.size_bytes,
326        "snapshot_type": snapshot.metadata.snapshot_type,
327        "state": snapshot.state
328    })))
329}
330
331// v0.2: Trigger manual compaction
332pub async fn trigger_compaction(State(store): State<SharedStore>) -> Result<Json<CompactionResult>> {
333    let compaction_manager = store
334        .compaction_manager()
335        .ok_or_else(|| crate::error::AllSourceError::InternalError(
336            "Compaction not enabled (no Parquet storage)".to_string()
337        ))?;
338
339    tracing::info!("📦 Manual compaction triggered via API");
340
341    let result = compaction_manager.compact_now()?;
342
343    Ok(Json(result))
344}
345
346// v0.2: Get compaction statistics
347pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
348    let compaction_manager = store
349        .compaction_manager()
350        .ok_or_else(|| crate::error::AllSourceError::InternalError(
351            "Compaction not enabled (no Parquet storage)".to_string()
352        ))?;
353
354    let stats = compaction_manager.stats();
355    let config = compaction_manager.config();
356
357    Ok(Json(serde_json::json!({
358        "stats": stats,
359        "config": {
360            "min_files_to_compact": config.min_files_to_compact,
361            "target_file_size": config.target_file_size,
362            "max_file_size": config.max_file_size,
363            "small_file_threshold": config.small_file_threshold,
364            "compaction_interval_seconds": config.compaction_interval_seconds,
365            "auto_compact": config.auto_compact,
366            "strategy": config.strategy
367        }
368    })))
369}
370
371// v0.5: Register a new schema
372pub async fn register_schema(
373    State(store): State<SharedStore>,
374    Json(req): Json<RegisterSchemaRequest>,
375) -> Result<Json<RegisterSchemaResponse>> {
376    let schema_registry = store.schema_registry();
377
378    let response = schema_registry.register_schema(
379        req.subject,
380        req.schema,
381        req.description,
382        req.tags,
383    )?;
384
385    tracing::info!("📋 Schema registered: v{} for '{}'", response.version, response.subject);
386
387    Ok(Json(response))
388}
389
390// v0.5: Get a schema by subject and optional version
391#[derive(Deserialize)]
392pub struct GetSchemaParams {
393    version: Option<u32>,
394}
395
396pub async fn get_schema(
397    State(store): State<SharedStore>,
398    Path(subject): Path<String>,
399    Query(params): Query<GetSchemaParams>,
400) -> Result<Json<serde_json::Value>> {
401    let schema_registry = store.schema_registry();
402
403    let schema = schema_registry.get_schema(&subject, params.version)?;
404
405    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
406
407    Ok(Json(serde_json::json!({
408        "id": schema.id,
409        "subject": schema.subject,
410        "version": schema.version,
411        "schema": schema.schema,
412        "created_at": schema.created_at,
413        "description": schema.description,
414        "tags": schema.tags
415    })))
416}
417
418// v0.5: List all versions of a schema subject
419pub async fn list_schema_versions(
420    State(store): State<SharedStore>,
421    Path(subject): Path<String>,
422) -> Result<Json<serde_json::Value>> {
423    let schema_registry = store.schema_registry();
424
425    let versions = schema_registry.list_versions(&subject)?;
426
427    Ok(Json(serde_json::json!({
428        "subject": subject,
429        "versions": versions
430    })))
431}
432
433// v0.5: List all schema subjects
434pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
435    let schema_registry = store.schema_registry();
436
437    let subjects = schema_registry.list_subjects();
438
439    Json(serde_json::json!({
440        "subjects": subjects,
441        "total": subjects.len()
442    }))
443}
444
445// v0.5: Validate an event against a schema
446pub async fn validate_event_schema(
447    State(store): State<SharedStore>,
448    Json(req): Json<ValidateEventRequest>,
449) -> Result<Json<ValidateEventResponse>> {
450    let schema_registry = store.schema_registry();
451
452    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
453
454    if response.valid {
455        tracing::debug!("✅ Event validated against schema '{}' v{}", req.subject, response.schema_version);
456    } else {
457        tracing::warn!("❌ Event validation failed for '{}': {:?}", req.subject, response.errors);
458    }
459
460    Ok(Json(response))
461}
462
463// v0.5: Set compatibility mode for a subject
464#[derive(Deserialize)]
465pub struct SetCompatibilityRequest {
466    compatibility: CompatibilityMode,
467}
468
469pub async fn set_compatibility_mode(
470    State(store): State<SharedStore>,
471    Path(subject): Path<String>,
472    Json(req): Json<SetCompatibilityRequest>,
473) -> Json<serde_json::Value> {
474    let schema_registry = store.schema_registry();
475
476    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
477
478    tracing::info!("🔧 Set compatibility mode for '{}' to {:?}", subject, req.compatibility);
479
480    Json(serde_json::json!({
481        "subject": subject,
482        "compatibility": req.compatibility
483    }))
484}
485
486// v0.5: Start a replay operation
487pub async fn start_replay(
488    State(store): State<SharedStore>,
489    Json(req): Json<StartReplayRequest>,
490) -> Result<Json<StartReplayResponse>> {
491    let replay_manager = store.replay_manager();
492
493    let response = replay_manager.start_replay(store, req)?;
494
495    tracing::info!("🔄 Started replay {} with {} events", response.replay_id, response.total_events);
496
497    Ok(Json(response))
498}
499
500// v0.5: Get replay progress
501pub async fn get_replay_progress(
502    State(store): State<SharedStore>,
503    Path(replay_id): Path<uuid::Uuid>,
504) -> Result<Json<ReplayProgress>> {
505    let replay_manager = store.replay_manager();
506
507    let progress = replay_manager.get_progress(replay_id)?;
508
509    Ok(Json(progress))
510}
511
512// v0.5: List all replay operations
513pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
514    let replay_manager = store.replay_manager();
515
516    let replays = replay_manager.list_replays();
517
518    Json(serde_json::json!({
519        "replays": replays,
520        "total": replays.len()
521    }))
522}
523
524// v0.5: Cancel a running replay
525pub async fn cancel_replay(
526    State(store): State<SharedStore>,
527    Path(replay_id): Path<uuid::Uuid>,
528) -> Result<Json<serde_json::Value>> {
529    let replay_manager = store.replay_manager();
530
531    replay_manager.cancel_replay(replay_id)?;
532
533    tracing::info!("🛑 Cancelled replay {}", replay_id);
534
535    Ok(Json(serde_json::json!({
536        "replay_id": replay_id,
537        "status": "cancelled"
538    })))
539}
540
541// v0.5: Delete a completed replay
542pub async fn delete_replay(
543    State(store): State<SharedStore>,
544    Path(replay_id): Path<uuid::Uuid>,
545) -> Result<Json<serde_json::Value>> {
546    let replay_manager = store.replay_manager();
547
548    let deleted = replay_manager.delete_replay(replay_id)?;
549
550    if deleted {
551        tracing::info!("🗑️  Deleted replay {}", replay_id);
552    }
553
554    Ok(Json(serde_json::json!({
555        "replay_id": replay_id,
556        "deleted": deleted
557    })))
558}
559
560// v0.5: Register a new pipeline
561pub async fn register_pipeline(
562    State(store): State<SharedStore>,
563    Json(config): Json<PipelineConfig>,
564) -> Result<Json<serde_json::Value>> {
565    let pipeline_manager = store.pipeline_manager();
566
567    let pipeline_id = pipeline_manager.register(config.clone());
568
569    tracing::info!(
570        "🔀 Pipeline registered: {} (name: {})",
571        pipeline_id,
572        config.name
573    );
574
575    Ok(Json(serde_json::json!({
576        "pipeline_id": pipeline_id,
577        "name": config.name,
578        "enabled": config.enabled
579    })))
580}
581
582// v0.5: List all pipelines
583pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
584    let pipeline_manager = store.pipeline_manager();
585
586    let pipelines = pipeline_manager.list();
587
588    tracing::debug!("Listed {} pipelines", pipelines.len());
589
590    Json(serde_json::json!({
591        "pipelines": pipelines,
592        "total": pipelines.len()
593    }))
594}
595
596// v0.5: Get a specific pipeline
597pub async fn get_pipeline(
598    State(store): State<SharedStore>,
599    Path(pipeline_id): Path<uuid::Uuid>,
600) -> Result<Json<PipelineConfig>> {
601    let pipeline_manager = store.pipeline_manager();
602
603    let pipeline = pipeline_manager
604        .get(pipeline_id)
605        .ok_or_else(|| crate::error::AllSourceError::ValidationError(
606            format!("Pipeline not found: {}", pipeline_id)
607        ))?;
608
609    Ok(Json(pipeline.config().clone()))
610}
611
612// v0.5: Remove a pipeline
613pub async fn remove_pipeline(
614    State(store): State<SharedStore>,
615    Path(pipeline_id): Path<uuid::Uuid>,
616) -> Result<Json<serde_json::Value>> {
617    let pipeline_manager = store.pipeline_manager();
618
619    let removed = pipeline_manager.remove(pipeline_id);
620
621    if removed {
622        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
623    }
624
625    Ok(Json(serde_json::json!({
626        "pipeline_id": pipeline_id,
627        "removed": removed
628    })))
629}
630
631// v0.5: Get statistics for all pipelines
632pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
633    let pipeline_manager = store.pipeline_manager();
634
635    let stats = pipeline_manager.all_stats();
636
637    Json(serde_json::json!({
638        "stats": stats,
639        "total": stats.len()
640    }))
641}
642
643// v0.5: Get statistics for a specific pipeline
644pub async fn get_pipeline_stats(
645    State(store): State<SharedStore>,
646    Path(pipeline_id): Path<uuid::Uuid>,
647) -> Result<Json<PipelineStats>> {
648    let pipeline_manager = store.pipeline_manager();
649
650    let pipeline = pipeline_manager
651        .get(pipeline_id)
652        .ok_or_else(|| crate::error::AllSourceError::ValidationError(
653            format!("Pipeline not found: {}", pipeline_id)
654        ))?;
655
656    Ok(Json(pipeline.stats()))
657}
658
659// v0.5: Reset a pipeline's state
660pub async fn reset_pipeline(
661    State(store): State<SharedStore>,
662    Path(pipeline_id): Path<uuid::Uuid>,
663) -> Result<Json<serde_json::Value>> {
664    let pipeline_manager = store.pipeline_manager();
665
666    let pipeline = pipeline_manager
667        .get(pipeline_id)
668        .ok_or_else(|| crate::error::AllSourceError::ValidationError(
669            format!("Pipeline not found: {}", pipeline_id)
670        ))?;
671
672    pipeline.reset();
673
674    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
675
676    Ok(Json(serde_json::json!({
677        "pipeline_id": pipeline_id,
678        "reset": true
679    })))
680}