allsource_core/infrastructure/web/
api.rs

1use crate::application::services::analytics::{
2    AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
3    EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
4};
5use crate::application::dto::{
6    EventDto, IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse,
7};
8use crate::infrastructure::persistence::compaction::CompactionResult;
9use crate::domain::entities::Event;
10use crate::error::Result;
11use crate::application::services::pipeline::{PipelineConfig, PipelineStats};
12use crate::application::services::replay::{ReplayProgress, StartReplayRequest, StartReplayResponse};
13use crate::application::services::schema::{
14    CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
15    ValidateEventResponse,
16};
17use crate::infrastructure::persistence::snapshot::{
18    CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
19    SnapshotInfo,
20};
21use crate::store::EventStore;
22use axum::{
23    extract::{Path, Query, State, WebSocketUpgrade},
24    response::{IntoResponse, Response},
25    routing::{get, post, put},
26    Json, Router,
27};
28use serde::Deserialize;
29use std::sync::Arc;
30use tower_http::cors::{Any, CorsLayer};
31use tower_http::trace::TraceLayer;
32
33type SharedStore = Arc<EventStore>;
34
35pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
36    let app = Router::new()
37        .route("/health", get(health))
38        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
39        .route("/api/v1/events", post(ingest_event))
40        .route("/api/v1/events/query", get(query_events))
41        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
42        .route("/api/v1/entities/:entity_id/state", get(get_entity_state))
43        .route(
44            "/api/v1/entities/:entity_id/snapshot",
45            get(get_entity_snapshot),
46        )
47        .route("/api/v1/stats", get(get_stats))
48        // v0.2: Advanced analytics endpoints
49        .route("/api/v1/analytics/frequency", get(analytics_frequency))
50        .route("/api/v1/analytics/summary", get(analytics_summary))
51        .route("/api/v1/analytics/correlation", get(analytics_correlation))
52        // v0.2: Snapshot management endpoints
53        .route("/api/v1/snapshots", post(create_snapshot))
54        .route("/api/v1/snapshots", get(list_snapshots))
55        .route(
56            "/api/v1/snapshots/:entity_id/latest",
57            get(get_latest_snapshot),
58        )
59        // v0.2: Compaction endpoints
60        .route("/api/v1/compaction/trigger", post(trigger_compaction))
61        .route("/api/v1/compaction/stats", get(compaction_stats))
62        // v0.5: Schema registry endpoints
63        .route("/api/v1/schemas", post(register_schema))
64        .route("/api/v1/schemas", get(list_subjects))
65        .route("/api/v1/schemas/:subject", get(get_schema))
66        .route(
67            "/api/v1/schemas/:subject/versions",
68            get(list_schema_versions),
69        )
70        .route("/api/v1/schemas/validate", post(validate_event_schema))
71        .route(
72            "/api/v1/schemas/:subject/compatibility",
73            put(set_compatibility_mode),
74        )
75        // v0.5: Replay and projection rebuild endpoints
76        .route("/api/v1/replay", post(start_replay))
77        .route("/api/v1/replay", get(list_replays))
78        .route("/api/v1/replay/:replay_id", get(get_replay_progress))
79        .route("/api/v1/replay/:replay_id/cancel", post(cancel_replay))
80        .route(
81            "/api/v1/replay/:replay_id",
82            axum::routing::delete(delete_replay),
83        )
84        // v0.5: Stream processing pipeline endpoints
85        .route("/api/v1/pipelines", post(register_pipeline))
86        .route("/api/v1/pipelines", get(list_pipelines))
87        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
88        .route("/api/v1/pipelines/:pipeline_id", get(get_pipeline))
89        .route(
90            "/api/v1/pipelines/:pipeline_id",
91            axum::routing::delete(remove_pipeline),
92        )
93        .route(
94            "/api/v1/pipelines/:pipeline_id/stats",
95            get(get_pipeline_stats),
96        )
97        .route("/api/v1/pipelines/:pipeline_id/reset", put(reset_pipeline))
98        // v0.7: Projection State API for Query Service integration
99        .route("/api/v1/projections", get(list_projections))
100        .route("/api/v1/projections/:name", get(get_projection))
101        .route(
102            "/api/v1/projections/:name/:entity_id/state",
103            get(get_projection_state),
104        )
105        .route(
106            "/api/v1/projections/:name/:entity_id/state",
107            put(save_projection_state),
108        )
109        .route(
110            "/api/v1/projections/:name/bulk",
111            post(bulk_get_projection_states),
112        )
113        .layer(
114            CorsLayer::new()
115                .allow_origin(Any)
116                .allow_methods(Any)
117                .allow_headers(Any),
118        )
119        .layer(TraceLayer::new_for_http())
120        .with_state(store);
121
122    let listener = tokio::net::TcpListener::bind(addr).await?;
123    axum::serve(listener, app).await?;
124
125    Ok(())
126}
127
128pub async fn health() -> impl IntoResponse {
129    Json(serde_json::json!({
130        "status": "healthy",
131        "service": "allsource-core",
132        "version": env!("CARGO_PKG_VERSION")
133    }))
134}
135
136// v0.6: Prometheus metrics endpoint
137pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
138    let metrics = store.metrics();
139
140    match metrics.encode() {
141        Ok(encoded) => Response::builder()
142            .status(200)
143            .header("Content-Type", "text/plain; version=0.0.4")
144            .body(encoded)
145            .unwrap()
146            .into_response(),
147        Err(e) => Response::builder()
148            .status(500)
149            .body(format!("Error encoding metrics: {}", e))
150            .unwrap()
151            .into_response(),
152    }
153}
154
155pub async fn ingest_event(
156    State(store): State<SharedStore>,
157    Json(req): Json<IngestEventRequest>,
158) -> Result<Json<IngestEventResponse>> {
159    // Create event using from_strings with default tenant
160    let event = Event::from_strings(
161        req.event_type,
162        req.entity_id,
163        "default".to_string(),
164        req.payload,
165        req.metadata,
166    )?;
167
168    let event_id = event.id;
169    let timestamp = event.timestamp;
170
171    store.ingest(event)?;
172
173    tracing::info!("Event ingested: {}", event_id);
174
175    Ok(Json(IngestEventResponse {
176        event_id,
177        timestamp,
178    }))
179}
180
181pub async fn query_events(
182    State(store): State<SharedStore>,
183    Query(req): Query<QueryEventsRequest>,
184) -> Result<Json<QueryEventsResponse>> {
185    let domain_events = store.query(req)?;
186    let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
187    let count = events.len();
188
189    tracing::debug!("Query returned {} events", count);
190
191    Ok(Json(QueryEventsResponse { events, count }))
192}
193
194#[derive(Deserialize)]
195pub struct EntityStateParams {
196    as_of: Option<chrono::DateTime<chrono::Utc>>,
197}
198
199pub async fn get_entity_state(
200    State(store): State<SharedStore>,
201    Path(entity_id): Path<String>,
202    Query(params): Query<EntityStateParams>,
203) -> Result<Json<serde_json::Value>> {
204    let state = store.reconstruct_state(&entity_id, params.as_of)?;
205
206    tracing::info!("State reconstructed for entity: {}", entity_id);
207
208    Ok(Json(state))
209}
210
211pub async fn get_entity_snapshot(
212    State(store): State<SharedStore>,
213    Path(entity_id): Path<String>,
214) -> Result<Json<serde_json::Value>> {
215    let snapshot = store.get_snapshot(&entity_id)?;
216
217    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
218
219    Ok(Json(snapshot))
220}
221
222pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
223    let stats = store.stats();
224    Json(stats)
225}
226
227// v0.2: WebSocket endpoint for real-time event streaming
228pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
229    let websocket_manager = store.websocket_manager();
230
231    ws.on_upgrade(move |socket| async move {
232        websocket_manager.handle_socket(socket).await;
233    })
234}
235
236// v0.2: Event frequency analytics endpoint
237pub async fn analytics_frequency(
238    State(store): State<SharedStore>,
239    Query(req): Query<EventFrequencyRequest>,
240) -> Result<Json<EventFrequencyResponse>> {
241    let response = AnalyticsEngine::event_frequency(&store, req)?;
242
243    tracing::debug!(
244        "Frequency analysis returned {} buckets",
245        response.buckets.len()
246    );
247
248    Ok(Json(response))
249}
250
251// v0.2: Statistical summary endpoint
252pub async fn analytics_summary(
253    State(store): State<SharedStore>,
254    Query(req): Query<StatsSummaryRequest>,
255) -> Result<Json<StatsSummaryResponse>> {
256    let response = AnalyticsEngine::stats_summary(&store, req)?;
257
258    tracing::debug!(
259        "Stats summary: {} events across {} entities",
260        response.total_events,
261        response.unique_entities
262    );
263
264    Ok(Json(response))
265}
266
267// v0.2: Event correlation analysis endpoint
268pub async fn analytics_correlation(
269    State(store): State<SharedStore>,
270    Query(req): Query<CorrelationRequest>,
271) -> Result<Json<CorrelationResponse>> {
272    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
273
274    tracing::debug!(
275        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
276        response.correlated_pairs,
277        response.total_a,
278        response.correlation_percentage
279    );
280
281    Ok(Json(response))
282}
283
284// v0.2: Create a snapshot for an entity
285pub async fn create_snapshot(
286    State(store): State<SharedStore>,
287    Json(req): Json<CreateSnapshotRequest>,
288) -> Result<Json<CreateSnapshotResponse>> {
289    store.create_snapshot(&req.entity_id)?;
290
291    let snapshot_manager = store.snapshot_manager();
292    let snapshot = snapshot_manager
293        .get_latest_snapshot(&req.entity_id)
294        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
295
296    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
297
298    Ok(Json(CreateSnapshotResponse {
299        snapshot_id: snapshot.id,
300        entity_id: snapshot.entity_id,
301        created_at: snapshot.created_at,
302        event_count: snapshot.event_count,
303        size_bytes: snapshot.metadata.size_bytes,
304    }))
305}
306
307// v0.2: List snapshots
308pub async fn list_snapshots(
309    State(store): State<SharedStore>,
310    Query(req): Query<ListSnapshotsRequest>,
311) -> Result<Json<ListSnapshotsResponse>> {
312    let snapshot_manager = store.snapshot_manager();
313
314    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
315        snapshot_manager
316            .get_all_snapshots(&entity_id)
317            .into_iter()
318            .map(SnapshotInfo::from)
319            .collect()
320    } else {
321        // List all entities with snapshots
322        let entities = snapshot_manager.list_entities();
323        entities
324            .iter()
325            .flat_map(|entity_id| {
326                snapshot_manager
327                    .get_all_snapshots(entity_id)
328                    .into_iter()
329                    .map(SnapshotInfo::from)
330            })
331            .collect()
332    };
333
334    let total = snapshots.len();
335
336    tracing::debug!("Listed {} snapshots", total);
337
338    Ok(Json(ListSnapshotsResponse { snapshots, total }))
339}
340
341// v0.2: Get latest snapshot for an entity
342pub async fn get_latest_snapshot(
343    State(store): State<SharedStore>,
344    Path(entity_id): Path<String>,
345) -> Result<Json<serde_json::Value>> {
346    let snapshot_manager = store.snapshot_manager();
347
348    let snapshot = snapshot_manager
349        .get_latest_snapshot(&entity_id)
350        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
351
352    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
353
354    Ok(Json(serde_json::json!({
355        "snapshot_id": snapshot.id,
356        "entity_id": snapshot.entity_id,
357        "created_at": snapshot.created_at,
358        "as_of": snapshot.as_of,
359        "event_count": snapshot.event_count,
360        "size_bytes": snapshot.metadata.size_bytes,
361        "snapshot_type": snapshot.metadata.snapshot_type,
362        "state": snapshot.state
363    })))
364}
365
366// v0.2: Trigger manual compaction
367pub async fn trigger_compaction(
368    State(store): State<SharedStore>,
369) -> Result<Json<CompactionResult>> {
370    let compaction_manager = store.compaction_manager().ok_or_else(|| {
371        crate::error::AllSourceError::InternalError(
372            "Compaction not enabled (no Parquet storage)".to_string(),
373        )
374    })?;
375
376    tracing::info!("📦 Manual compaction triggered via API");
377
378    let result = compaction_manager.compact_now()?;
379
380    Ok(Json(result))
381}
382
383// v0.2: Get compaction statistics
384pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
385    let compaction_manager = store.compaction_manager().ok_or_else(|| {
386        crate::error::AllSourceError::InternalError(
387            "Compaction not enabled (no Parquet storage)".to_string(),
388        )
389    })?;
390
391    let stats = compaction_manager.stats();
392    let config = compaction_manager.config();
393
394    Ok(Json(serde_json::json!({
395        "stats": stats,
396        "config": {
397            "min_files_to_compact": config.min_files_to_compact,
398            "target_file_size": config.target_file_size,
399            "max_file_size": config.max_file_size,
400            "small_file_threshold": config.small_file_threshold,
401            "compaction_interval_seconds": config.compaction_interval_seconds,
402            "auto_compact": config.auto_compact,
403            "strategy": config.strategy
404        }
405    })))
406}
407
408// v0.5: Register a new schema
409pub async fn register_schema(
410    State(store): State<SharedStore>,
411    Json(req): Json<RegisterSchemaRequest>,
412) -> Result<Json<RegisterSchemaResponse>> {
413    let schema_registry = store.schema_registry();
414
415    let response =
416        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
417
418    tracing::info!(
419        "📋 Schema registered: v{} for '{}'",
420        response.version,
421        response.subject
422    );
423
424    Ok(Json(response))
425}
426
427// v0.5: Get a schema by subject and optional version
428#[derive(Deserialize)]
429pub struct GetSchemaParams {
430    version: Option<u32>,
431}
432
433pub async fn get_schema(
434    State(store): State<SharedStore>,
435    Path(subject): Path<String>,
436    Query(params): Query<GetSchemaParams>,
437) -> Result<Json<serde_json::Value>> {
438    let schema_registry = store.schema_registry();
439
440    let schema = schema_registry.get_schema(&subject, params.version)?;
441
442    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
443
444    Ok(Json(serde_json::json!({
445        "id": schema.id,
446        "subject": schema.subject,
447        "version": schema.version,
448        "schema": schema.schema,
449        "created_at": schema.created_at,
450        "description": schema.description,
451        "tags": schema.tags
452    })))
453}
454
455// v0.5: List all versions of a schema subject
456pub async fn list_schema_versions(
457    State(store): State<SharedStore>,
458    Path(subject): Path<String>,
459) -> Result<Json<serde_json::Value>> {
460    let schema_registry = store.schema_registry();
461
462    let versions = schema_registry.list_versions(&subject)?;
463
464    Ok(Json(serde_json::json!({
465        "subject": subject,
466        "versions": versions
467    })))
468}
469
470// v0.5: List all schema subjects
471pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
472    let schema_registry = store.schema_registry();
473
474    let subjects = schema_registry.list_subjects();
475
476    Json(serde_json::json!({
477        "subjects": subjects,
478        "total": subjects.len()
479    }))
480}
481
482// v0.5: Validate an event against a schema
483pub async fn validate_event_schema(
484    State(store): State<SharedStore>,
485    Json(req): Json<ValidateEventRequest>,
486) -> Result<Json<ValidateEventResponse>> {
487    let schema_registry = store.schema_registry();
488
489    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
490
491    if response.valid {
492        tracing::debug!(
493            "✅ Event validated against schema '{}' v{}",
494            req.subject,
495            response.schema_version
496        );
497    } else {
498        tracing::warn!(
499            "❌ Event validation failed for '{}': {:?}",
500            req.subject,
501            response.errors
502        );
503    }
504
505    Ok(Json(response))
506}
507
508// v0.5: Set compatibility mode for a subject
509#[derive(Deserialize)]
510pub struct SetCompatibilityRequest {
511    compatibility: CompatibilityMode,
512}
513
514pub async fn set_compatibility_mode(
515    State(store): State<SharedStore>,
516    Path(subject): Path<String>,
517    Json(req): Json<SetCompatibilityRequest>,
518) -> Json<serde_json::Value> {
519    let schema_registry = store.schema_registry();
520
521    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
522
523    tracing::info!(
524        "🔧 Set compatibility mode for '{}' to {:?}",
525        subject,
526        req.compatibility
527    );
528
529    Json(serde_json::json!({
530        "subject": subject,
531        "compatibility": req.compatibility
532    }))
533}
534
535// v0.5: Start a replay operation
536pub async fn start_replay(
537    State(store): State<SharedStore>,
538    Json(req): Json<StartReplayRequest>,
539) -> Result<Json<StartReplayResponse>> {
540    let replay_manager = store.replay_manager();
541
542    let response = replay_manager.start_replay(store, req)?;
543
544    tracing::info!(
545        "🔄 Started replay {} with {} events",
546        response.replay_id,
547        response.total_events
548    );
549
550    Ok(Json(response))
551}
552
553// v0.5: Get replay progress
554pub async fn get_replay_progress(
555    State(store): State<SharedStore>,
556    Path(replay_id): Path<uuid::Uuid>,
557) -> Result<Json<ReplayProgress>> {
558    let replay_manager = store.replay_manager();
559
560    let progress = replay_manager.get_progress(replay_id)?;
561
562    Ok(Json(progress))
563}
564
565// v0.5: List all replay operations
566pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
567    let replay_manager = store.replay_manager();
568
569    let replays = replay_manager.list_replays();
570
571    Json(serde_json::json!({
572        "replays": replays,
573        "total": replays.len()
574    }))
575}
576
577// v0.5: Cancel a running replay
578pub async fn cancel_replay(
579    State(store): State<SharedStore>,
580    Path(replay_id): Path<uuid::Uuid>,
581) -> Result<Json<serde_json::Value>> {
582    let replay_manager = store.replay_manager();
583
584    replay_manager.cancel_replay(replay_id)?;
585
586    tracing::info!("🛑 Cancelled replay {}", replay_id);
587
588    Ok(Json(serde_json::json!({
589        "replay_id": replay_id,
590        "status": "cancelled"
591    })))
592}
593
594// v0.5: Delete a completed replay
595pub async fn delete_replay(
596    State(store): State<SharedStore>,
597    Path(replay_id): Path<uuid::Uuid>,
598) -> Result<Json<serde_json::Value>> {
599    let replay_manager = store.replay_manager();
600
601    let deleted = replay_manager.delete_replay(replay_id)?;
602
603    if deleted {
604        tracing::info!("🗑️  Deleted replay {}", replay_id);
605    }
606
607    Ok(Json(serde_json::json!({
608        "replay_id": replay_id,
609        "deleted": deleted
610    })))
611}
612
613// v0.5: Register a new pipeline
614pub async fn register_pipeline(
615    State(store): State<SharedStore>,
616    Json(config): Json<PipelineConfig>,
617) -> Result<Json<serde_json::Value>> {
618    let pipeline_manager = store.pipeline_manager();
619
620    let pipeline_id = pipeline_manager.register(config.clone());
621
622    tracing::info!(
623        "🔀 Pipeline registered: {} (name: {})",
624        pipeline_id,
625        config.name
626    );
627
628    Ok(Json(serde_json::json!({
629        "pipeline_id": pipeline_id,
630        "name": config.name,
631        "enabled": config.enabled
632    })))
633}
634
635// v0.5: List all pipelines
636pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
637    let pipeline_manager = store.pipeline_manager();
638
639    let pipelines = pipeline_manager.list();
640
641    tracing::debug!("Listed {} pipelines", pipelines.len());
642
643    Json(serde_json::json!({
644        "pipelines": pipelines,
645        "total": pipelines.len()
646    }))
647}
648
649// v0.5: Get a specific pipeline
650pub async fn get_pipeline(
651    State(store): State<SharedStore>,
652    Path(pipeline_id): Path<uuid::Uuid>,
653) -> Result<Json<PipelineConfig>> {
654    let pipeline_manager = store.pipeline_manager();
655
656    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
657        crate::error::AllSourceError::ValidationError(format!(
658            "Pipeline not found: {}",
659            pipeline_id
660        ))
661    })?;
662
663    Ok(Json(pipeline.config().clone()))
664}
665
666// v0.5: Remove a pipeline
667pub async fn remove_pipeline(
668    State(store): State<SharedStore>,
669    Path(pipeline_id): Path<uuid::Uuid>,
670) -> Result<Json<serde_json::Value>> {
671    let pipeline_manager = store.pipeline_manager();
672
673    let removed = pipeline_manager.remove(pipeline_id);
674
675    if removed {
676        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
677    }
678
679    Ok(Json(serde_json::json!({
680        "pipeline_id": pipeline_id,
681        "removed": removed
682    })))
683}
684
685// v0.5: Get statistics for all pipelines
686pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
687    let pipeline_manager = store.pipeline_manager();
688
689    let stats = pipeline_manager.all_stats();
690
691    Json(serde_json::json!({
692        "stats": stats,
693        "total": stats.len()
694    }))
695}
696
697// v0.5: Get statistics for a specific pipeline
698pub async fn get_pipeline_stats(
699    State(store): State<SharedStore>,
700    Path(pipeline_id): Path<uuid::Uuid>,
701) -> Result<Json<PipelineStats>> {
702    let pipeline_manager = store.pipeline_manager();
703
704    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
705        crate::error::AllSourceError::ValidationError(format!(
706            "Pipeline not found: {}",
707            pipeline_id
708        ))
709    })?;
710
711    Ok(Json(pipeline.stats()))
712}
713
714// v0.5: Reset a pipeline's state
715pub async fn reset_pipeline(
716    State(store): State<SharedStore>,
717    Path(pipeline_id): Path<uuid::Uuid>,
718) -> Result<Json<serde_json::Value>> {
719    let pipeline_manager = store.pipeline_manager();
720
721    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
722        crate::error::AllSourceError::ValidationError(format!(
723            "Pipeline not found: {}",
724            pipeline_id
725        ))
726    })?;
727
728    pipeline.reset();
729
730    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
731
732    Ok(Json(serde_json::json!({
733        "pipeline_id": pipeline_id,
734        "reset": true
735    })))
736}
737
738// =============================================================================
739// v0.7: Projection State API for Query Service Integration
740// =============================================================================
741
742/// List all registered projections
743pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
744    let projection_manager = store.projection_manager();
745
746    let projections: Vec<serde_json::Value> = projection_manager
747        .list_projections()
748        .iter()
749        .map(|(name, projection)| {
750            serde_json::json!({
751                "name": name,
752                "type": format!("{:?}", projection.name()),
753            })
754        })
755        .collect();
756
757    tracing::debug!("Listed {} projections", projections.len());
758
759    Json(serde_json::json!({
760        "projections": projections,
761        "total": projections.len()
762    }))
763}
764
765/// Get projection metadata by name
766pub async fn get_projection(
767    State(store): State<SharedStore>,
768    Path(name): Path<String>,
769) -> Result<Json<serde_json::Value>> {
770    let projection_manager = store.projection_manager();
771
772    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
773        crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
774    })?;
775
776    Ok(Json(serde_json::json!({
777        "name": projection.name(),
778        "found": true
779    })))
780}
781
782/// Get projection state for a specific entity
783///
784/// This endpoint allows the Elixir Query Service to fetch projection state
785/// from the Rust Core for synchronization.
786pub async fn get_projection_state(
787    State(store): State<SharedStore>,
788    Path((name, entity_id)): Path<(String, String)>,
789) -> Result<Json<serde_json::Value>> {
790    let projection_manager = store.projection_manager();
791
792    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
793        crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
794    })?;
795
796    let state = projection.get_state(&entity_id);
797
798    tracing::debug!(
799        "Projection state retrieved: {} / {}",
800        name,
801        entity_id
802    );
803
804    Ok(Json(serde_json::json!({
805        "projection": name,
806        "entity_id": entity_id,
807        "state": state,
808        "found": state.is_some()
809    })))
810}
811
812/// Request body for saving projection state
813#[derive(Debug, Deserialize)]
814pub struct SaveProjectionStateRequest {
815    pub state: serde_json::Value,
816}
817
818/// Save/update projection state for an entity
819///
820/// This endpoint allows external services (like Elixir Query Service) to
821/// store computed projection state back to the Core for persistence.
822pub async fn save_projection_state(
823    State(store): State<SharedStore>,
824    Path((name, entity_id)): Path<(String, String)>,
825    Json(req): Json<SaveProjectionStateRequest>,
826) -> Result<Json<serde_json::Value>> {
827    let projection_cache = store.projection_state_cache();
828
829    // Store in the projection state cache
830    projection_cache.insert(
831        format!("{}:{}", name, entity_id),
832        req.state.clone(),
833    );
834
835    tracing::info!(
836        "Projection state saved: {} / {}",
837        name,
838        entity_id
839    );
840
841    Ok(Json(serde_json::json!({
842        "projection": name,
843        "entity_id": entity_id,
844        "saved": true
845    })))
846}
847
848/// Bulk get projection states for multiple entities
849///
850/// Efficient endpoint for fetching multiple entity states in a single request.
851#[derive(Debug, Deserialize)]
852pub struct BulkGetStateRequest {
853    pub entity_ids: Vec<String>,
854}
855
856pub async fn bulk_get_projection_states(
857    State(store): State<SharedStore>,
858    Path(name): Path<String>,
859    Json(req): Json<BulkGetStateRequest>,
860) -> Result<Json<serde_json::Value>> {
861    let projection_manager = store.projection_manager();
862
863    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
864        crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
865    })?;
866
867    let states: Vec<serde_json::Value> = req
868        .entity_ids
869        .iter()
870        .map(|entity_id| {
871            let state = projection.get_state(entity_id);
872            serde_json::json!({
873                "entity_id": entity_id,
874                "state": state,
875                "found": state.is_some()
876            })
877        })
878        .collect();
879
880    tracing::debug!(
881        "Bulk projection state retrieved: {} entities from {}",
882        states.len(),
883        name
884    );
885
886    Ok(Json(serde_json::json!({
887        "projection": name,
888        "states": states,
889        "total": states.len()
890    })))
891}
892
893#[cfg(test)]
894mod tests {
895    use super::*;
896    use crate::domain::entities::Event;
897    use crate::store::EventStore;
898
899    fn create_test_store() -> Arc<EventStore> {
900        Arc::new(EventStore::new())
901    }
902
903    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
904        Event::from_strings(
905            event_type.to_string(),
906            entity_id.to_string(),
907            "test-stream".to_string(),
908            serde_json::json!({
909                "name": "Test",
910                "value": 42
911            }),
912            None,
913        )
914        .unwrap()
915    }
916
917    #[tokio::test]
918    async fn test_projection_state_cache() {
919        let store = create_test_store();
920
921        // Test cache insertion
922        let cache = store.projection_state_cache();
923        cache.insert(
924            "entity_snapshots:user-123".to_string(),
925            serde_json::json!({"name": "Test User", "age": 30}),
926        );
927
928        // Test cache retrieval
929        let state = cache.get("entity_snapshots:user-123");
930        assert!(state.is_some());
931        let state = state.unwrap();
932        assert_eq!(state["name"], "Test User");
933        assert_eq!(state["age"], 30);
934    }
935
936    #[tokio::test]
937    async fn test_projection_manager_list_projections() {
938        let store = create_test_store();
939
940        // List projections (built-in projections should be available)
941        let projection_manager = store.projection_manager();
942        let projections = projection_manager.list_projections();
943
944        // Should have entity_snapshots and event_counters
945        assert!(projections.len() >= 2);
946
947        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
948        assert!(names.contains(&"entity_snapshots"));
949        assert!(names.contains(&"event_counters"));
950    }
951
952    #[tokio::test]
953    async fn test_projection_state_after_event_ingestion() {
954        let store = create_test_store();
955
956        // Ingest an event
957        let event = create_test_event("user-456", "user.created");
958        store.ingest(event).unwrap();
959
960        // Get projection state
961        let projection_manager = store.projection_manager();
962        let snapshot_projection = projection_manager.get_projection("entity_snapshots").unwrap();
963
964        let state = snapshot_projection.get_state("user-456");
965        assert!(state.is_some());
966        let state = state.unwrap();
967        assert_eq!(state["name"], "Test");
968        assert_eq!(state["value"], 42);
969    }
970
971    #[tokio::test]
972    async fn test_projection_state_cache_multiple_entities() {
973        let store = create_test_store();
974        let cache = store.projection_state_cache();
975
976        // Insert multiple entities
977        for i in 0..10 {
978            cache.insert(
979                format!("entity_snapshots:entity-{}", i),
980                serde_json::json!({"id": i, "status": "active"}),
981            );
982        }
983
984        // Verify all insertions
985        assert_eq!(cache.len(), 10);
986
987        // Verify each entity
988        for i in 0..10 {
989            let key = format!("entity_snapshots:entity-{}", i);
990            let state = cache.get(&key);
991            assert!(state.is_some());
992            assert_eq!(state.unwrap()["id"], i);
993        }
994    }
995
996    #[tokio::test]
997    async fn test_projection_state_update() {
998        let store = create_test_store();
999        let cache = store.projection_state_cache();
1000
1001        // Initial state
1002        cache.insert(
1003            "entity_snapshots:user-789".to_string(),
1004            serde_json::json!({"balance": 100}),
1005        );
1006
1007        // Update state
1008        cache.insert(
1009            "entity_snapshots:user-789".to_string(),
1010            serde_json::json!({"balance": 150}),
1011        );
1012
1013        // Verify update
1014        let state = cache.get("entity_snapshots:user-789").unwrap();
1015        assert_eq!(state["balance"], 150);
1016    }
1017
1018    #[tokio::test]
1019    async fn test_event_counter_projection() {
1020        let store = create_test_store();
1021
1022        // Ingest events of different types
1023        store.ingest(create_test_event("user-1", "user.created")).unwrap();
1024        store.ingest(create_test_event("user-2", "user.created")).unwrap();
1025        store.ingest(create_test_event("user-1", "user.updated")).unwrap();
1026
1027        // Get event counter projection
1028        let projection_manager = store.projection_manager();
1029        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1030
1031        // Check counts
1032        let created_state = counter_projection.get_state("user.created");
1033        assert!(created_state.is_some());
1034        assert_eq!(created_state.unwrap()["count"], 2);
1035
1036        let updated_state = counter_projection.get_state("user.updated");
1037        assert!(updated_state.is_some());
1038        assert_eq!(updated_state.unwrap()["count"], 1);
1039    }
1040}