Skip to main content

allsource_core/infrastructure/web/
api.rs

1use crate::application::dto::{
2    EventDto, IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse,
3};
4use crate::application::services::analytics::{
5    AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
6    EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
7};
8use crate::application::services::pipeline::{PipelineConfig, PipelineStats};
9use crate::application::services::replay::{
10    ReplayProgress, StartReplayRequest, StartReplayResponse,
11};
12use crate::application::services::schema::{
13    CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
14    ValidateEventResponse,
15};
16use crate::domain::entities::Event;
17use crate::error::Result;
18use crate::infrastructure::persistence::compaction::CompactionResult;
19use crate::infrastructure::persistence::snapshot::{
20    CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
21    SnapshotInfo,
22};
23use crate::store::EventStore;
24use axum::{
25    extract::{Path, Query, State, WebSocketUpgrade},
26    response::{IntoResponse, Response},
27    routing::{get, post, put},
28    Json, Router,
29};
30use serde::Deserialize;
31use std::sync::Arc;
32use tower_http::cors::{Any, CorsLayer};
33use tower_http::trace::TraceLayer;
34
35type SharedStore = Arc<EventStore>;
36
37pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
38    let app = Router::new()
39        .route("/health", get(health))
40        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
41        .route("/api/v1/events", post(ingest_event))
42        .route("/api/v1/events/query", get(query_events))
43        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
44        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
45        .route(
46            "/api/v1/entities/{entity_id}/snapshot",
47            get(get_entity_snapshot),
48        )
49        .route("/api/v1/stats", get(get_stats))
50        // v0.2: Advanced analytics endpoints
51        .route("/api/v1/analytics/frequency", get(analytics_frequency))
52        .route("/api/v1/analytics/summary", get(analytics_summary))
53        .route("/api/v1/analytics/correlation", get(analytics_correlation))
54        // v0.2: Snapshot management endpoints
55        .route("/api/v1/snapshots", post(create_snapshot))
56        .route("/api/v1/snapshots", get(list_snapshots))
57        .route(
58            "/api/v1/snapshots/{entity_id}/latest",
59            get(get_latest_snapshot),
60        )
61        // v0.2: Compaction endpoints
62        .route("/api/v1/compaction/trigger", post(trigger_compaction))
63        .route("/api/v1/compaction/stats", get(compaction_stats))
64        // v0.5: Schema registry endpoints
65        .route("/api/v1/schemas", post(register_schema))
66        .route("/api/v1/schemas", get(list_subjects))
67        .route("/api/v1/schemas/{subject}", get(get_schema))
68        .route(
69            "/api/v1/schemas/{subject}/versions",
70            get(list_schema_versions),
71        )
72        .route("/api/v1/schemas/validate", post(validate_event_schema))
73        .route(
74            "/api/v1/schemas/{subject}/compatibility",
75            put(set_compatibility_mode),
76        )
77        // v0.5: Replay and projection rebuild endpoints
78        .route("/api/v1/replay", post(start_replay))
79        .route("/api/v1/replay", get(list_replays))
80        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
81        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
82        .route(
83            "/api/v1/replay/{replay_id}",
84            axum::routing::delete(delete_replay),
85        )
86        // v0.5: Stream processing pipeline endpoints
87        .route("/api/v1/pipelines", post(register_pipeline))
88        .route("/api/v1/pipelines", get(list_pipelines))
89        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
90        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
91        .route(
92            "/api/v1/pipelines/{pipeline_id}",
93            axum::routing::delete(remove_pipeline),
94        )
95        .route(
96            "/api/v1/pipelines/{pipeline_id}/stats",
97            get(get_pipeline_stats),
98        )
99        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
100        // v0.7: Projection State API for Query Service integration
101        .route("/api/v1/projections", get(list_projections))
102        .route("/api/v1/projections/{name}", get(get_projection))
103        .route(
104            "/api/v1/projections/{name}/{entity_id}/state",
105            get(get_projection_state),
106        )
107        .route(
108            "/api/v1/projections/{name}/{entity_id}/state",
109            post(save_projection_state),
110        )
111        .route(
112            "/api/v1/projections/{name}/{entity_id}/state",
113            put(save_projection_state),
114        )
115        .route(
116            "/api/v1/projections/{name}/bulk",
117            post(bulk_get_projection_states),
118        )
119        .route(
120            "/api/v1/projections/{name}/bulk/save",
121            post(bulk_save_projection_states),
122        )
123        .layer(
124            CorsLayer::new()
125                .allow_origin(Any)
126                .allow_methods(Any)
127                .allow_headers(Any),
128        )
129        .layer(TraceLayer::new_for_http())
130        .with_state(store);
131
132    let listener = tokio::net::TcpListener::bind(addr).await?;
133    axum::serve(listener, app).await?;
134
135    Ok(())
136}
137
138pub async fn health() -> impl IntoResponse {
139    Json(serde_json::json!({
140        "status": "healthy",
141        "service": "allsource-core",
142        "version": env!("CARGO_PKG_VERSION")
143    }))
144}
145
146// v0.6: Prometheus metrics endpoint
147pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
148    let metrics = store.metrics();
149
150    match metrics.encode() {
151        Ok(encoded) => Response::builder()
152            .status(200)
153            .header("Content-Type", "text/plain; version=0.0.4")
154            .body(encoded)
155            .unwrap()
156            .into_response(),
157        Err(e) => Response::builder()
158            .status(500)
159            .body(format!("Error encoding metrics: {e}"))
160            .unwrap()
161            .into_response(),
162    }
163}
164
165pub async fn ingest_event(
166    State(store): State<SharedStore>,
167    Json(req): Json<IngestEventRequest>,
168) -> Result<Json<IngestEventResponse>> {
169    // Create event using from_strings with default tenant
170    let event = Event::from_strings(
171        req.event_type,
172        req.entity_id,
173        "default".to_string(),
174        req.payload,
175        req.metadata,
176    )?;
177
178    let event_id = event.id;
179    let timestamp = event.timestamp;
180
181    store.ingest(event)?;
182
183    tracing::info!("Event ingested: {}", event_id);
184
185    Ok(Json(IngestEventResponse {
186        event_id,
187        timestamp,
188    }))
189}
190
191pub async fn query_events(
192    State(store): State<SharedStore>,
193    Query(req): Query<QueryEventsRequest>,
194) -> Result<Json<QueryEventsResponse>> {
195    let domain_events = store.query(req)?;
196    let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
197    let count = events.len();
198
199    tracing::debug!("Query returned {} events", count);
200
201    Ok(Json(QueryEventsResponse { events, count }))
202}
203
204#[derive(Deserialize)]
205pub struct EntityStateParams {
206    as_of: Option<chrono::DateTime<chrono::Utc>>,
207}
208
209pub async fn get_entity_state(
210    State(store): State<SharedStore>,
211    Path(entity_id): Path<String>,
212    Query(params): Query<EntityStateParams>,
213) -> Result<Json<serde_json::Value>> {
214    let state = store.reconstruct_state(&entity_id, params.as_of)?;
215
216    tracing::info!("State reconstructed for entity: {}", entity_id);
217
218    Ok(Json(state))
219}
220
221pub async fn get_entity_snapshot(
222    State(store): State<SharedStore>,
223    Path(entity_id): Path<String>,
224) -> Result<Json<serde_json::Value>> {
225    let snapshot = store.get_snapshot(&entity_id)?;
226
227    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
228
229    Ok(Json(snapshot))
230}
231
232pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
233    let stats = store.stats();
234    Json(stats)
235}
236
237// v0.2: WebSocket endpoint for real-time event streaming
238pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
239    let websocket_manager = store.websocket_manager();
240
241    ws.on_upgrade(move |socket| async move {
242        websocket_manager.handle_socket(socket).await;
243    })
244}
245
246// v0.2: Event frequency analytics endpoint
247pub async fn analytics_frequency(
248    State(store): State<SharedStore>,
249    Query(req): Query<EventFrequencyRequest>,
250) -> Result<Json<EventFrequencyResponse>> {
251    let response = AnalyticsEngine::event_frequency(&store, req)?;
252
253    tracing::debug!(
254        "Frequency analysis returned {} buckets",
255        response.buckets.len()
256    );
257
258    Ok(Json(response))
259}
260
261// v0.2: Statistical summary endpoint
262pub async fn analytics_summary(
263    State(store): State<SharedStore>,
264    Query(req): Query<StatsSummaryRequest>,
265) -> Result<Json<StatsSummaryResponse>> {
266    let response = AnalyticsEngine::stats_summary(&store, req)?;
267
268    tracing::debug!(
269        "Stats summary: {} events across {} entities",
270        response.total_events,
271        response.unique_entities
272    );
273
274    Ok(Json(response))
275}
276
277// v0.2: Event correlation analysis endpoint
278pub async fn analytics_correlation(
279    State(store): State<SharedStore>,
280    Query(req): Query<CorrelationRequest>,
281) -> Result<Json<CorrelationResponse>> {
282    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
283
284    tracing::debug!(
285        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
286        response.correlated_pairs,
287        response.total_a,
288        response.correlation_percentage
289    );
290
291    Ok(Json(response))
292}
293
294// v0.2: Create a snapshot for an entity
295pub async fn create_snapshot(
296    State(store): State<SharedStore>,
297    Json(req): Json<CreateSnapshotRequest>,
298) -> Result<Json<CreateSnapshotResponse>> {
299    store.create_snapshot(&req.entity_id)?;
300
301    let snapshot_manager = store.snapshot_manager();
302    let snapshot = snapshot_manager
303        .get_latest_snapshot(&req.entity_id)
304        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
305
306    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
307
308    Ok(Json(CreateSnapshotResponse {
309        snapshot_id: snapshot.id,
310        entity_id: snapshot.entity_id,
311        created_at: snapshot.created_at,
312        event_count: snapshot.event_count,
313        size_bytes: snapshot.metadata.size_bytes,
314    }))
315}
316
317// v0.2: List snapshots
318pub async fn list_snapshots(
319    State(store): State<SharedStore>,
320    Query(req): Query<ListSnapshotsRequest>,
321) -> Result<Json<ListSnapshotsResponse>> {
322    let snapshot_manager = store.snapshot_manager();
323
324    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
325        snapshot_manager
326            .get_all_snapshots(&entity_id)
327            .into_iter()
328            .map(SnapshotInfo::from)
329            .collect()
330    } else {
331        // List all entities with snapshots
332        let entities = snapshot_manager.list_entities();
333        entities
334            .iter()
335            .flat_map(|entity_id| {
336                snapshot_manager
337                    .get_all_snapshots(entity_id)
338                    .into_iter()
339                    .map(SnapshotInfo::from)
340            })
341            .collect()
342    };
343
344    let total = snapshots.len();
345
346    tracing::debug!("Listed {} snapshots", total);
347
348    Ok(Json(ListSnapshotsResponse { snapshots, total }))
349}
350
351// v0.2: Get latest snapshot for an entity
352pub async fn get_latest_snapshot(
353    State(store): State<SharedStore>,
354    Path(entity_id): Path<String>,
355) -> Result<Json<serde_json::Value>> {
356    let snapshot_manager = store.snapshot_manager();
357
358    let snapshot = snapshot_manager
359        .get_latest_snapshot(&entity_id)
360        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
361
362    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
363
364    Ok(Json(serde_json::json!({
365        "snapshot_id": snapshot.id,
366        "entity_id": snapshot.entity_id,
367        "created_at": snapshot.created_at,
368        "as_of": snapshot.as_of,
369        "event_count": snapshot.event_count,
370        "size_bytes": snapshot.metadata.size_bytes,
371        "snapshot_type": snapshot.metadata.snapshot_type,
372        "state": snapshot.state
373    })))
374}
375
376// v0.2: Trigger manual compaction
377pub async fn trigger_compaction(
378    State(store): State<SharedStore>,
379) -> Result<Json<CompactionResult>> {
380    let compaction_manager = store.compaction_manager().ok_or_else(|| {
381        crate::error::AllSourceError::InternalError(
382            "Compaction not enabled (no Parquet storage)".to_string(),
383        )
384    })?;
385
386    tracing::info!("📦 Manual compaction triggered via API");
387
388    let result = compaction_manager.compact_now()?;
389
390    Ok(Json(result))
391}
392
393// v0.2: Get compaction statistics
394pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
395    let compaction_manager = store.compaction_manager().ok_or_else(|| {
396        crate::error::AllSourceError::InternalError(
397            "Compaction not enabled (no Parquet storage)".to_string(),
398        )
399    })?;
400
401    let stats = compaction_manager.stats();
402    let config = compaction_manager.config();
403
404    Ok(Json(serde_json::json!({
405        "stats": stats,
406        "config": {
407            "min_files_to_compact": config.min_files_to_compact,
408            "target_file_size": config.target_file_size,
409            "max_file_size": config.max_file_size,
410            "small_file_threshold": config.small_file_threshold,
411            "compaction_interval_seconds": config.compaction_interval_seconds,
412            "auto_compact": config.auto_compact,
413            "strategy": config.strategy
414        }
415    })))
416}
417
418// v0.5: Register a new schema
419pub async fn register_schema(
420    State(store): State<SharedStore>,
421    Json(req): Json<RegisterSchemaRequest>,
422) -> Result<Json<RegisterSchemaResponse>> {
423    let schema_registry = store.schema_registry();
424
425    let response =
426        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
427
428    tracing::info!(
429        "📋 Schema registered: v{} for '{}'",
430        response.version,
431        response.subject
432    );
433
434    Ok(Json(response))
435}
436
437// v0.5: Get a schema by subject and optional version
438#[derive(Deserialize)]
439pub struct GetSchemaParams {
440    version: Option<u32>,
441}
442
443pub async fn get_schema(
444    State(store): State<SharedStore>,
445    Path(subject): Path<String>,
446    Query(params): Query<GetSchemaParams>,
447) -> Result<Json<serde_json::Value>> {
448    let schema_registry = store.schema_registry();
449
450    let schema = schema_registry.get_schema(&subject, params.version)?;
451
452    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
453
454    Ok(Json(serde_json::json!({
455        "id": schema.id,
456        "subject": schema.subject,
457        "version": schema.version,
458        "schema": schema.schema,
459        "created_at": schema.created_at,
460        "description": schema.description,
461        "tags": schema.tags
462    })))
463}
464
465// v0.5: List all versions of a schema subject
466pub async fn list_schema_versions(
467    State(store): State<SharedStore>,
468    Path(subject): Path<String>,
469) -> Result<Json<serde_json::Value>> {
470    let schema_registry = store.schema_registry();
471
472    let versions = schema_registry.list_versions(&subject)?;
473
474    Ok(Json(serde_json::json!({
475        "subject": subject,
476        "versions": versions
477    })))
478}
479
480// v0.5: List all schema subjects
481pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
482    let schema_registry = store.schema_registry();
483
484    let subjects = schema_registry.list_subjects();
485
486    Json(serde_json::json!({
487        "subjects": subjects,
488        "total": subjects.len()
489    }))
490}
491
492// v0.5: Validate an event against a schema
493pub async fn validate_event_schema(
494    State(store): State<SharedStore>,
495    Json(req): Json<ValidateEventRequest>,
496) -> Result<Json<ValidateEventResponse>> {
497    let schema_registry = store.schema_registry();
498
499    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
500
501    if response.valid {
502        tracing::debug!(
503            "✅ Event validated against schema '{}' v{}",
504            req.subject,
505            response.schema_version
506        );
507    } else {
508        tracing::warn!(
509            "❌ Event validation failed for '{}': {:?}",
510            req.subject,
511            response.errors
512        );
513    }
514
515    Ok(Json(response))
516}
517
518// v0.5: Set compatibility mode for a subject
519#[derive(Deserialize)]
520pub struct SetCompatibilityRequest {
521    compatibility: CompatibilityMode,
522}
523
524pub async fn set_compatibility_mode(
525    State(store): State<SharedStore>,
526    Path(subject): Path<String>,
527    Json(req): Json<SetCompatibilityRequest>,
528) -> Json<serde_json::Value> {
529    let schema_registry = store.schema_registry();
530
531    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
532
533    tracing::info!(
534        "🔧 Set compatibility mode for '{}' to {:?}",
535        subject,
536        req.compatibility
537    );
538
539    Json(serde_json::json!({
540        "subject": subject,
541        "compatibility": req.compatibility
542    }))
543}
544
545// v0.5: Start a replay operation
546pub async fn start_replay(
547    State(store): State<SharedStore>,
548    Json(req): Json<StartReplayRequest>,
549) -> Result<Json<StartReplayResponse>> {
550    let replay_manager = store.replay_manager();
551
552    let response = replay_manager.start_replay(store, req)?;
553
554    tracing::info!(
555        "🔄 Started replay {} with {} events",
556        response.replay_id,
557        response.total_events
558    );
559
560    Ok(Json(response))
561}
562
563// v0.5: Get replay progress
564pub async fn get_replay_progress(
565    State(store): State<SharedStore>,
566    Path(replay_id): Path<uuid::Uuid>,
567) -> Result<Json<ReplayProgress>> {
568    let replay_manager = store.replay_manager();
569
570    let progress = replay_manager.get_progress(replay_id)?;
571
572    Ok(Json(progress))
573}
574
575// v0.5: List all replay operations
576pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
577    let replay_manager = store.replay_manager();
578
579    let replays = replay_manager.list_replays();
580
581    Json(serde_json::json!({
582        "replays": replays,
583        "total": replays.len()
584    }))
585}
586
587// v0.5: Cancel a running replay
588pub async fn cancel_replay(
589    State(store): State<SharedStore>,
590    Path(replay_id): Path<uuid::Uuid>,
591) -> Result<Json<serde_json::Value>> {
592    let replay_manager = store.replay_manager();
593
594    replay_manager.cancel_replay(replay_id)?;
595
596    tracing::info!("🛑 Cancelled replay {}", replay_id);
597
598    Ok(Json(serde_json::json!({
599        "replay_id": replay_id,
600        "status": "cancelled"
601    })))
602}
603
604// v0.5: Delete a completed replay
605pub async fn delete_replay(
606    State(store): State<SharedStore>,
607    Path(replay_id): Path<uuid::Uuid>,
608) -> Result<Json<serde_json::Value>> {
609    let replay_manager = store.replay_manager();
610
611    let deleted = replay_manager.delete_replay(replay_id)?;
612
613    if deleted {
614        tracing::info!("🗑️  Deleted replay {}", replay_id);
615    }
616
617    Ok(Json(serde_json::json!({
618        "replay_id": replay_id,
619        "deleted": deleted
620    })))
621}
622
623// v0.5: Register a new pipeline
624pub async fn register_pipeline(
625    State(store): State<SharedStore>,
626    Json(config): Json<PipelineConfig>,
627) -> Result<Json<serde_json::Value>> {
628    let pipeline_manager = store.pipeline_manager();
629
630    let pipeline_id = pipeline_manager.register(config.clone());
631
632    tracing::info!(
633        "🔀 Pipeline registered: {} (name: {})",
634        pipeline_id,
635        config.name
636    );
637
638    Ok(Json(serde_json::json!({
639        "pipeline_id": pipeline_id,
640        "name": config.name,
641        "enabled": config.enabled
642    })))
643}
644
645// v0.5: List all pipelines
646pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
647    let pipeline_manager = store.pipeline_manager();
648
649    let pipelines = pipeline_manager.list();
650
651    tracing::debug!("Listed {} pipelines", pipelines.len());
652
653    Json(serde_json::json!({
654        "pipelines": pipelines,
655        "total": pipelines.len()
656    }))
657}
658
659// v0.5: Get a specific pipeline
660pub async fn get_pipeline(
661    State(store): State<SharedStore>,
662    Path(pipeline_id): Path<uuid::Uuid>,
663) -> Result<Json<PipelineConfig>> {
664    let pipeline_manager = store.pipeline_manager();
665
666    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
667        crate::error::AllSourceError::ValidationError(format!(
668            "Pipeline not found: {}",
669            pipeline_id
670        ))
671    })?;
672
673    Ok(Json(pipeline.config().clone()))
674}
675
676// v0.5: Remove a pipeline
677pub async fn remove_pipeline(
678    State(store): State<SharedStore>,
679    Path(pipeline_id): Path<uuid::Uuid>,
680) -> Result<Json<serde_json::Value>> {
681    let pipeline_manager = store.pipeline_manager();
682
683    let removed = pipeline_manager.remove(pipeline_id);
684
685    if removed {
686        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
687    }
688
689    Ok(Json(serde_json::json!({
690        "pipeline_id": pipeline_id,
691        "removed": removed
692    })))
693}
694
695// v0.5: Get statistics for all pipelines
696pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
697    let pipeline_manager = store.pipeline_manager();
698
699    let stats = pipeline_manager.all_stats();
700
701    Json(serde_json::json!({
702        "stats": stats,
703        "total": stats.len()
704    }))
705}
706
707// v0.5: Get statistics for a specific pipeline
708pub async fn get_pipeline_stats(
709    State(store): State<SharedStore>,
710    Path(pipeline_id): Path<uuid::Uuid>,
711) -> Result<Json<PipelineStats>> {
712    let pipeline_manager = store.pipeline_manager();
713
714    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
715        crate::error::AllSourceError::ValidationError(format!(
716            "Pipeline not found: {}",
717            pipeline_id
718        ))
719    })?;
720
721    Ok(Json(pipeline.stats()))
722}
723
724// v0.5: Reset a pipeline's state
725pub async fn reset_pipeline(
726    State(store): State<SharedStore>,
727    Path(pipeline_id): Path<uuid::Uuid>,
728) -> Result<Json<serde_json::Value>> {
729    let pipeline_manager = store.pipeline_manager();
730
731    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
732        crate::error::AllSourceError::ValidationError(format!(
733            "Pipeline not found: {}",
734            pipeline_id
735        ))
736    })?;
737
738    pipeline.reset();
739
740    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
741
742    Ok(Json(serde_json::json!({
743        "pipeline_id": pipeline_id,
744        "reset": true
745    })))
746}
747
748// =============================================================================
749// v0.7: Projection State API for Query Service Integration
750// =============================================================================
751
752/// List all registered projections
753pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
754    let projection_manager = store.projection_manager();
755
756    let projections: Vec<serde_json::Value> = projection_manager
757        .list_projections()
758        .iter()
759        .map(|(name, projection)| {
760            serde_json::json!({
761                "name": name,
762                "type": format!("{:?}", projection.name()),
763            })
764        })
765        .collect();
766
767    tracing::debug!("Listed {} projections", projections.len());
768
769    Json(serde_json::json!({
770        "projections": projections,
771        "total": projections.len()
772    }))
773}
774
775/// Get projection metadata by name
776pub async fn get_projection(
777    State(store): State<SharedStore>,
778    Path(name): Path<String>,
779) -> Result<Json<serde_json::Value>> {
780    let projection_manager = store.projection_manager();
781
782    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
783        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
784    })?;
785
786    Ok(Json(serde_json::json!({
787        "name": projection.name(),
788        "found": true
789    })))
790}
791
792/// Get projection state for a specific entity
793///
794/// This endpoint allows the Elixir Query Service to fetch projection state
795/// from the Rust Core for synchronization.
796pub async fn get_projection_state(
797    State(store): State<SharedStore>,
798    Path((name, entity_id)): Path<(String, String)>,
799) -> Result<Json<serde_json::Value>> {
800    let projection_manager = store.projection_manager();
801
802    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
803        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
804    })?;
805
806    let state = projection.get_state(&entity_id);
807
808    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
809
810    Ok(Json(serde_json::json!({
811        "projection": name,
812        "entity_id": entity_id,
813        "state": state,
814        "found": state.is_some()
815    })))
816}
817
818/// Request body for saving projection state
819#[derive(Debug, Deserialize)]
820pub struct SaveProjectionStateRequest {
821    pub state: serde_json::Value,
822}
823
824/// Save/update projection state for an entity
825///
826/// This endpoint allows external services (like Elixir Query Service) to
827/// store computed projection state back to the Core for persistence.
828pub async fn save_projection_state(
829    State(store): State<SharedStore>,
830    Path((name, entity_id)): Path<(String, String)>,
831    Json(req): Json<SaveProjectionStateRequest>,
832) -> Result<Json<serde_json::Value>> {
833    let projection_cache = store.projection_state_cache();
834
835    // Store in the projection state cache
836    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
837
838    tracing::info!("Projection state saved: {} / {}", name, entity_id);
839
840    Ok(Json(serde_json::json!({
841        "projection": name,
842        "entity_id": entity_id,
843        "saved": true
844    })))
845}
846
847/// Bulk get projection states for multiple entities
848///
849/// Efficient endpoint for fetching multiple entity states in a single request.
850#[derive(Debug, Deserialize)]
851pub struct BulkGetStateRequest {
852    pub entity_ids: Vec<String>,
853}
854
855/// Bulk save projection states for multiple entities
856///
857/// Efficient endpoint for saving multiple entity states in a single request.
858#[derive(Debug, Deserialize)]
859pub struct BulkSaveStateRequest {
860    pub states: Vec<BulkSaveStateItem>,
861}
862
863#[derive(Debug, Deserialize)]
864pub struct BulkSaveStateItem {
865    pub entity_id: String,
866    pub state: serde_json::Value,
867}
868
869pub async fn bulk_get_projection_states(
870    State(store): State<SharedStore>,
871    Path(name): Path<String>,
872    Json(req): Json<BulkGetStateRequest>,
873) -> Result<Json<serde_json::Value>> {
874    let projection_manager = store.projection_manager();
875
876    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
877        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
878    })?;
879
880    let states: Vec<serde_json::Value> = req
881        .entity_ids
882        .iter()
883        .map(|entity_id| {
884            let state = projection.get_state(entity_id);
885            serde_json::json!({
886                "entity_id": entity_id,
887                "state": state,
888                "found": state.is_some()
889            })
890        })
891        .collect();
892
893    tracing::debug!(
894        "Bulk projection state retrieved: {} entities from {}",
895        states.len(),
896        name
897    );
898
899    Ok(Json(serde_json::json!({
900        "projection": name,
901        "states": states,
902        "total": states.len()
903    })))
904}
905
906/// Bulk save projection states for multiple entities
907///
908/// This endpoint allows efficient batch saving of projection states,
909/// critical for high-throughput event processing pipelines.
910pub async fn bulk_save_projection_states(
911    State(store): State<SharedStore>,
912    Path(name): Path<String>,
913    Json(req): Json<BulkSaveStateRequest>,
914) -> Result<Json<serde_json::Value>> {
915    let projection_cache = store.projection_state_cache();
916
917    let mut saved_count = 0;
918    for item in &req.states {
919        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
920        saved_count += 1;
921    }
922
923    tracing::info!(
924        "Bulk projection state saved: {} entities for {}",
925        saved_count,
926        name
927    );
928
929    Ok(Json(serde_json::json!({
930        "projection": name,
931        "saved": saved_count,
932        "total": req.states.len()
933    })))
934}
935
936#[cfg(test)]
937mod tests {
938    use super::*;
939    use crate::domain::entities::Event;
940    use crate::store::EventStore;
941
942    fn create_test_store() -> Arc<EventStore> {
943        Arc::new(EventStore::new())
944    }
945
946    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
947        Event::from_strings(
948            event_type.to_string(),
949            entity_id.to_string(),
950            "test-stream".to_string(),
951            serde_json::json!({
952                "name": "Test",
953                "value": 42
954            }),
955            None,
956        )
957        .unwrap()
958    }
959
960    #[tokio::test]
961    async fn test_projection_state_cache() {
962        let store = create_test_store();
963
964        // Test cache insertion
965        let cache = store.projection_state_cache();
966        cache.insert(
967            "entity_snapshots:user-123".to_string(),
968            serde_json::json!({"name": "Test User", "age": 30}),
969        );
970
971        // Test cache retrieval
972        let state = cache.get("entity_snapshots:user-123");
973        assert!(state.is_some());
974        let state = state.unwrap();
975        assert_eq!(state["name"], "Test User");
976        assert_eq!(state["age"], 30);
977    }
978
979    #[tokio::test]
980    async fn test_projection_manager_list_projections() {
981        let store = create_test_store();
982
983        // List projections (built-in projections should be available)
984        let projection_manager = store.projection_manager();
985        let projections = projection_manager.list_projections();
986
987        // Should have entity_snapshots and event_counters
988        assert!(projections.len() >= 2);
989
990        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
991        assert!(names.contains(&"entity_snapshots"));
992        assert!(names.contains(&"event_counters"));
993    }
994
995    #[tokio::test]
996    async fn test_projection_state_after_event_ingestion() {
997        let store = create_test_store();
998
999        // Ingest an event
1000        let event = create_test_event("user-456", "user.created");
1001        store.ingest(event).unwrap();
1002
1003        // Get projection state
1004        let projection_manager = store.projection_manager();
1005        let snapshot_projection = projection_manager
1006            .get_projection("entity_snapshots")
1007            .unwrap();
1008
1009        let state = snapshot_projection.get_state("user-456");
1010        assert!(state.is_some());
1011        let state = state.unwrap();
1012        assert_eq!(state["name"], "Test");
1013        assert_eq!(state["value"], 42);
1014    }
1015
1016    #[tokio::test]
1017    async fn test_projection_state_cache_multiple_entities() {
1018        let store = create_test_store();
1019        let cache = store.projection_state_cache();
1020
1021        // Insert multiple entities
1022        for i in 0..10 {
1023            cache.insert(
1024                format!("entity_snapshots:entity-{}", i),
1025                serde_json::json!({"id": i, "status": "active"}),
1026            );
1027        }
1028
1029        // Verify all insertions
1030        assert_eq!(cache.len(), 10);
1031
1032        // Verify each entity
1033        for i in 0..10 {
1034            let key = format!("entity_snapshots:entity-{}", i);
1035            let state = cache.get(&key);
1036            assert!(state.is_some());
1037            assert_eq!(state.unwrap()["id"], i);
1038        }
1039    }
1040
1041    #[tokio::test]
1042    async fn test_projection_state_update() {
1043        let store = create_test_store();
1044        let cache = store.projection_state_cache();
1045
1046        // Initial state
1047        cache.insert(
1048            "entity_snapshots:user-789".to_string(),
1049            serde_json::json!({"balance": 100}),
1050        );
1051
1052        // Update state
1053        cache.insert(
1054            "entity_snapshots:user-789".to_string(),
1055            serde_json::json!({"balance": 150}),
1056        );
1057
1058        // Verify update
1059        let state = cache.get("entity_snapshots:user-789").unwrap();
1060        assert_eq!(state["balance"], 150);
1061    }
1062
1063    #[tokio::test]
1064    async fn test_event_counter_projection() {
1065        let store = create_test_store();
1066
1067        // Ingest events of different types
1068        store
1069            .ingest(create_test_event("user-1", "user.created"))
1070            .unwrap();
1071        store
1072            .ingest(create_test_event("user-2", "user.created"))
1073            .unwrap();
1074        store
1075            .ingest(create_test_event("user-1", "user.updated"))
1076            .unwrap();
1077
1078        // Get event counter projection
1079        let projection_manager = store.projection_manager();
1080        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1081
1082        // Check counts
1083        let created_state = counter_projection.get_state("user.created");
1084        assert!(created_state.is_some());
1085        assert_eq!(created_state.unwrap()["count"], 2);
1086
1087        let updated_state = counter_projection.get_state("user.updated");
1088        assert!(updated_state.is_some());
1089        assert_eq!(updated_state.unwrap()["count"], 1);
1090    }
1091
1092    #[tokio::test]
1093    async fn test_projection_state_cache_key_format() {
1094        let store = create_test_store();
1095        let cache = store.projection_state_cache();
1096
1097        // Test standard key format: {projection_name}:{entity_id}
1098        let key = "orders:order-12345".to_string();
1099        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
1100
1101        let state = cache.get(&key).unwrap();
1102        assert_eq!(state["total"], 99.99);
1103    }
1104
1105    #[tokio::test]
1106    async fn test_projection_state_cache_removal() {
1107        let store = create_test_store();
1108        let cache = store.projection_state_cache();
1109
1110        // Insert and then remove
1111        cache.insert("test:entity-1".to_string(), serde_json::json!({"data": "value"}));
1112        assert_eq!(cache.len(), 1);
1113
1114        cache.remove("test:entity-1");
1115        assert_eq!(cache.len(), 0);
1116        assert!(cache.get("test:entity-1").is_none());
1117    }
1118
1119    #[tokio::test]
1120    async fn test_get_nonexistent_projection() {
1121        let store = create_test_store();
1122        let projection_manager = store.projection_manager();
1123
1124        // Requesting a non-existent projection should return None
1125        let projection = projection_manager.get_projection("nonexistent_projection");
1126        assert!(projection.is_none());
1127    }
1128
1129    #[tokio::test]
1130    async fn test_get_nonexistent_entity_state() {
1131        let store = create_test_store();
1132        let projection_manager = store.projection_manager();
1133
1134        // Get state for non-existent entity
1135        let snapshot_projection = projection_manager.get_projection("entity_snapshots").unwrap();
1136        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
1137        assert!(state.is_none());
1138    }
1139
1140    #[tokio::test]
1141    async fn test_projection_state_cache_concurrent_access() {
1142        let store = create_test_store();
1143        let cache = store.projection_state_cache();
1144
1145        // Simulate concurrent writes
1146        let handles: Vec<_> = (0..10)
1147            .map(|i| {
1148                let cache_clone = cache.clone();
1149                tokio::spawn(async move {
1150                    cache_clone.insert(
1151                        format!("concurrent:entity-{}", i),
1152                        serde_json::json!({"thread": i}),
1153                    );
1154                })
1155            })
1156            .collect();
1157
1158        for handle in handles {
1159            handle.await.unwrap();
1160        }
1161
1162        // All 10 entries should be present
1163        assert_eq!(cache.len(), 10);
1164    }
1165
1166    #[tokio::test]
1167    async fn test_projection_state_large_payload() {
1168        let store = create_test_store();
1169        let cache = store.projection_state_cache();
1170
1171        // Create a large JSON payload (~10KB)
1172        let large_array: Vec<serde_json::Value> = (0..1000)
1173            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
1174            .collect();
1175
1176        cache.insert(
1177            "large:entity-1".to_string(),
1178            serde_json::json!({"items": large_array}),
1179        );
1180
1181        let state = cache.get("large:entity-1").unwrap();
1182        let items = state["items"].as_array().unwrap();
1183        assert_eq!(items.len(), 1000);
1184    }
1185
1186    #[tokio::test]
1187    async fn test_projection_state_complex_json() {
1188        let store = create_test_store();
1189        let cache = store.projection_state_cache();
1190
1191        // Complex nested JSON structure
1192        let complex_state = serde_json::json!({
1193            "user": {
1194                "id": "user-123",
1195                "profile": {
1196                    "name": "John Doe",
1197                    "email": "john@example.com",
1198                    "settings": {
1199                        "theme": "dark",
1200                        "notifications": true
1201                    }
1202                },
1203                "roles": ["admin", "user"],
1204                "metadata": {
1205                    "created_at": "2025-01-01T00:00:00Z",
1206                    "last_login": null
1207                }
1208            }
1209        });
1210
1211        cache.insert("complex:user-123".to_string(), complex_state);
1212
1213        let state = cache.get("complex:user-123").unwrap();
1214        assert_eq!(state["user"]["profile"]["name"], "John Doe");
1215        assert_eq!(state["user"]["roles"][0], "admin");
1216        assert!(state["user"]["metadata"]["last_login"].is_null());
1217    }
1218
1219    #[tokio::test]
1220    async fn test_projection_state_cache_iteration() {
1221        let store = create_test_store();
1222        let cache = store.projection_state_cache();
1223
1224        // Insert entries
1225        for i in 0..5 {
1226            cache.insert(
1227                format!("iter:entity-{}", i),
1228                serde_json::json!({"index": i}),
1229            );
1230        }
1231
1232        // Iterate over all entries
1233        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
1234        assert_eq!(entries.len(), 5);
1235    }
1236
1237    #[tokio::test]
1238    async fn test_projection_manager_get_entity_snapshots() {
1239        let store = create_test_store();
1240        let projection_manager = store.projection_manager();
1241
1242        // Get entity_snapshots projection specifically
1243        let projection = projection_manager.get_projection("entity_snapshots");
1244        assert!(projection.is_some());
1245        assert_eq!(projection.unwrap().name(), "entity_snapshots");
1246    }
1247
1248    #[tokio::test]
1249    async fn test_projection_manager_get_event_counters() {
1250        let store = create_test_store();
1251        let projection_manager = store.projection_manager();
1252
1253        // Get event_counters projection specifically
1254        let projection = projection_manager.get_projection("event_counters");
1255        assert!(projection.is_some());
1256        assert_eq!(projection.unwrap().name(), "event_counters");
1257    }
1258
1259    #[tokio::test]
1260    async fn test_projection_state_cache_overwrite() {
1261        let store = create_test_store();
1262        let cache = store.projection_state_cache();
1263
1264        // Initial value
1265        cache.insert("overwrite:entity-1".to_string(), serde_json::json!({"version": 1}));
1266
1267        // Overwrite with new value
1268        cache.insert("overwrite:entity-1".to_string(), serde_json::json!({"version": 2}));
1269
1270        // Overwrite again
1271        cache.insert("overwrite:entity-1".to_string(), serde_json::json!({"version": 3}));
1272
1273        let state = cache.get("overwrite:entity-1").unwrap();
1274        assert_eq!(state["version"], 3);
1275
1276        // Should still be only 1 entry
1277        assert_eq!(cache.len(), 1);
1278    }
1279
1280    #[tokio::test]
1281    async fn test_projection_state_multiple_projections() {
1282        let store = create_test_store();
1283        let cache = store.projection_state_cache();
1284
1285        // Store states for different projections
1286        cache.insert("entity_snapshots:user-1".to_string(), serde_json::json!({"name": "Alice"}));
1287        cache.insert("event_counters:user.created".to_string(), serde_json::json!({"count": 5}));
1288        cache.insert("custom_projection:order-1".to_string(), serde_json::json!({"total": 150.0}));
1289
1290        // Verify each projection's state
1291        assert_eq!(cache.get("entity_snapshots:user-1").unwrap()["name"], "Alice");
1292        assert_eq!(cache.get("event_counters:user.created").unwrap()["count"], 5);
1293        assert_eq!(cache.get("custom_projection:order-1").unwrap()["total"], 150.0);
1294    }
1295
1296    #[tokio::test]
1297    async fn test_bulk_projection_state_access() {
1298        let store = create_test_store();
1299
1300        // Ingest multiple events for different entities
1301        for i in 0..5 {
1302            let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
1303            store.ingest(event).unwrap();
1304        }
1305
1306        // Get projection and verify bulk access
1307        let projection_manager = store.projection_manager();
1308        let snapshot_projection = projection_manager.get_projection("entity_snapshots").unwrap();
1309
1310        // Verify we can access all entities
1311        for i in 0..5 {
1312            let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
1313            assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
1314        }
1315    }
1316
1317    #[tokio::test]
1318    async fn test_bulk_save_projection_states() {
1319        let store = create_test_store();
1320        let cache = store.projection_state_cache();
1321
1322        // Simulate bulk save request
1323        let states = vec![
1324            BulkSaveStateItem {
1325                entity_id: "bulk-entity-1".to_string(),
1326                state: serde_json::json!({"name": "Entity 1", "value": 100}),
1327            },
1328            BulkSaveStateItem {
1329                entity_id: "bulk-entity-2".to_string(),
1330                state: serde_json::json!({"name": "Entity 2", "value": 200}),
1331            },
1332            BulkSaveStateItem {
1333                entity_id: "bulk-entity-3".to_string(),
1334                state: serde_json::json!({"name": "Entity 3", "value": 300}),
1335            },
1336        ];
1337
1338        let projection_name = "test_projection";
1339
1340        // Save states to cache (simulating bulk_save_projection_states handler)
1341        for item in &states {
1342            cache.insert(
1343                format!("{projection_name}:{}", item.entity_id),
1344                item.state.clone(),
1345            );
1346        }
1347
1348        // Verify all states were saved
1349        assert_eq!(cache.len(), 3);
1350
1351        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
1352        assert_eq!(state1["name"], "Entity 1");
1353        assert_eq!(state1["value"], 100);
1354
1355        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
1356        assert_eq!(state2["name"], "Entity 2");
1357        assert_eq!(state2["value"], 200);
1358
1359        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
1360        assert_eq!(state3["name"], "Entity 3");
1361        assert_eq!(state3["value"], 300);
1362    }
1363
1364    #[tokio::test]
1365    async fn test_bulk_save_empty_states() {
1366        let store = create_test_store();
1367        let cache = store.projection_state_cache();
1368
1369        // Clear cache
1370        cache.clear();
1371
1372        // Empty states should work fine
1373        let states: Vec<BulkSaveStateItem> = vec![];
1374        assert_eq!(states.len(), 0);
1375
1376        // Cache should remain empty
1377        assert_eq!(cache.len(), 0);
1378    }
1379
1380    #[tokio::test]
1381    async fn test_bulk_save_overwrites_existing() {
1382        let store = create_test_store();
1383        let cache = store.projection_state_cache();
1384
1385        // Insert initial state
1386        cache.insert(
1387            "test:entity-1".to_string(),
1388            serde_json::json!({"version": 1, "data": "initial"}),
1389        );
1390
1391        // Bulk save with updated state
1392        let new_state = serde_json::json!({"version": 2, "data": "updated"});
1393        cache.insert("test:entity-1".to_string(), new_state);
1394
1395        // Verify overwrite
1396        let state = cache.get("test:entity-1").unwrap();
1397        assert_eq!(state["version"], 2);
1398        assert_eq!(state["data"], "updated");
1399    }
1400
1401    #[tokio::test]
1402    async fn test_bulk_save_high_volume() {
1403        let store = create_test_store();
1404        let cache = store.projection_state_cache();
1405
1406        // Simulate high volume save (1000 entities)
1407        for i in 0..1000 {
1408            cache.insert(
1409                format!("volume_test:entity-{}", i),
1410                serde_json::json!({"index": i, "status": "active"}),
1411            );
1412        }
1413
1414        // Verify count
1415        assert_eq!(cache.len(), 1000);
1416
1417        // Spot check some entries
1418        assert_eq!(
1419            cache.get("volume_test:entity-0").unwrap()["index"],
1420            0
1421        );
1422        assert_eq!(
1423            cache.get("volume_test:entity-500").unwrap()["index"],
1424            500
1425        );
1426        assert_eq!(
1427            cache.get("volume_test:entity-999").unwrap()["index"],
1428            999
1429        );
1430    }
1431
1432    #[tokio::test]
1433    async fn test_bulk_save_different_projections() {
1434        let store = create_test_store();
1435        let cache = store.projection_state_cache();
1436
1437        // Save to multiple projections in bulk
1438        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
1439
1440        for proj in projections.iter() {
1441            for i in 0..5 {
1442                cache.insert(
1443                    format!("{proj}:entity-{i}"),
1444                    serde_json::json!({"projection": proj, "id": i}),
1445                );
1446            }
1447        }
1448
1449        // Verify total count (3 projections * 5 entities)
1450        assert_eq!(cache.len(), 15);
1451
1452        // Verify each projection
1453        for proj in projections.iter() {
1454            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
1455            assert_eq!(state["projection"], *proj);
1456        }
1457    }
1458}