allsource_core/infrastructure/web/
api.rs

1use crate::application::dto::{
2    EventDto, IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse,
3};
4use crate::application::services::analytics::{
5    AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
6    EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
7};
8use crate::application::services::pipeline::{PipelineConfig, PipelineStats};
9use crate::application::services::replay::{
10    ReplayProgress, StartReplayRequest, StartReplayResponse,
11};
12use crate::application::services::schema::{
13    CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
14    ValidateEventResponse,
15};
16use crate::domain::entities::Event;
17use crate::error::Result;
18use crate::infrastructure::persistence::compaction::CompactionResult;
19use crate::infrastructure::persistence::snapshot::{
20    CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
21    SnapshotInfo,
22};
23use crate::store::EventStore;
24use axum::{
25    extract::{Path, Query, State, WebSocketUpgrade},
26    response::{IntoResponse, Response},
27    routing::{get, post, put},
28    Json, Router,
29};
30use serde::Deserialize;
31use std::sync::Arc;
32use tower_http::cors::{Any, CorsLayer};
33use tower_http::trace::TraceLayer;
34
35type SharedStore = Arc<EventStore>;
36
37pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
38    let app = Router::new()
39        .route("/health", get(health))
40        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
41        .route("/api/v1/events", post(ingest_event))
42        .route("/api/v1/events/query", get(query_events))
43        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
44        .route("/api/v1/entities/:entity_id/state", get(get_entity_state))
45        .route(
46            "/api/v1/entities/:entity_id/snapshot",
47            get(get_entity_snapshot),
48        )
49        .route("/api/v1/stats", get(get_stats))
50        // v0.2: Advanced analytics endpoints
51        .route("/api/v1/analytics/frequency", get(analytics_frequency))
52        .route("/api/v1/analytics/summary", get(analytics_summary))
53        .route("/api/v1/analytics/correlation", get(analytics_correlation))
54        // v0.2: Snapshot management endpoints
55        .route("/api/v1/snapshots", post(create_snapshot))
56        .route("/api/v1/snapshots", get(list_snapshots))
57        .route(
58            "/api/v1/snapshots/:entity_id/latest",
59            get(get_latest_snapshot),
60        )
61        // v0.2: Compaction endpoints
62        .route("/api/v1/compaction/trigger", post(trigger_compaction))
63        .route("/api/v1/compaction/stats", get(compaction_stats))
64        // v0.5: Schema registry endpoints
65        .route("/api/v1/schemas", post(register_schema))
66        .route("/api/v1/schemas", get(list_subjects))
67        .route("/api/v1/schemas/:subject", get(get_schema))
68        .route(
69            "/api/v1/schemas/:subject/versions",
70            get(list_schema_versions),
71        )
72        .route("/api/v1/schemas/validate", post(validate_event_schema))
73        .route(
74            "/api/v1/schemas/:subject/compatibility",
75            put(set_compatibility_mode),
76        )
77        // v0.5: Replay and projection rebuild endpoints
78        .route("/api/v1/replay", post(start_replay))
79        .route("/api/v1/replay", get(list_replays))
80        .route("/api/v1/replay/:replay_id", get(get_replay_progress))
81        .route("/api/v1/replay/:replay_id/cancel", post(cancel_replay))
82        .route(
83            "/api/v1/replay/:replay_id",
84            axum::routing::delete(delete_replay),
85        )
86        // v0.5: Stream processing pipeline endpoints
87        .route("/api/v1/pipelines", post(register_pipeline))
88        .route("/api/v1/pipelines", get(list_pipelines))
89        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
90        .route("/api/v1/pipelines/:pipeline_id", get(get_pipeline))
91        .route(
92            "/api/v1/pipelines/:pipeline_id",
93            axum::routing::delete(remove_pipeline),
94        )
95        .route(
96            "/api/v1/pipelines/:pipeline_id/stats",
97            get(get_pipeline_stats),
98        )
99        .route("/api/v1/pipelines/:pipeline_id/reset", put(reset_pipeline))
100        // v0.7: Projection State API for Query Service integration
101        .route("/api/v1/projections", get(list_projections))
102        .route("/api/v1/projections/:name", get(get_projection))
103        .route(
104            "/api/v1/projections/:name/:entity_id/state",
105            get(get_projection_state),
106        )
107        .route(
108            "/api/v1/projections/:name/:entity_id/state",
109            put(save_projection_state),
110        )
111        .route(
112            "/api/v1/projections/:name/bulk",
113            post(bulk_get_projection_states),
114        )
115        .layer(
116            CorsLayer::new()
117                .allow_origin(Any)
118                .allow_methods(Any)
119                .allow_headers(Any),
120        )
121        .layer(TraceLayer::new_for_http())
122        .with_state(store);
123
124    let listener = tokio::net::TcpListener::bind(addr).await?;
125    axum::serve(listener, app).await?;
126
127    Ok(())
128}
129
130pub async fn health() -> impl IntoResponse {
131    Json(serde_json::json!({
132        "status": "healthy",
133        "service": "allsource-core",
134        "version": env!("CARGO_PKG_VERSION")
135    }))
136}
137
138// v0.6: Prometheus metrics endpoint
139pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
140    let metrics = store.metrics();
141
142    match metrics.encode() {
143        Ok(encoded) => Response::builder()
144            .status(200)
145            .header("Content-Type", "text/plain; version=0.0.4")
146            .body(encoded)
147            .unwrap()
148            .into_response(),
149        Err(e) => Response::builder()
150            .status(500)
151            .body(format!("Error encoding metrics: {}", e))
152            .unwrap()
153            .into_response(),
154    }
155}
156
157pub async fn ingest_event(
158    State(store): State<SharedStore>,
159    Json(req): Json<IngestEventRequest>,
160) -> Result<Json<IngestEventResponse>> {
161    // Create event using from_strings with default tenant
162    let event = Event::from_strings(
163        req.event_type,
164        req.entity_id,
165        "default".to_string(),
166        req.payload,
167        req.metadata,
168    )?;
169
170    let event_id = event.id;
171    let timestamp = event.timestamp;
172
173    store.ingest(event)?;
174
175    tracing::info!("Event ingested: {}", event_id);
176
177    Ok(Json(IngestEventResponse {
178        event_id,
179        timestamp,
180    }))
181}
182
183pub async fn query_events(
184    State(store): State<SharedStore>,
185    Query(req): Query<QueryEventsRequest>,
186) -> Result<Json<QueryEventsResponse>> {
187    let domain_events = store.query(req)?;
188    let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
189    let count = events.len();
190
191    tracing::debug!("Query returned {} events", count);
192
193    Ok(Json(QueryEventsResponse { events, count }))
194}
195
196#[derive(Deserialize)]
197pub struct EntityStateParams {
198    as_of: Option<chrono::DateTime<chrono::Utc>>,
199}
200
201pub async fn get_entity_state(
202    State(store): State<SharedStore>,
203    Path(entity_id): Path<String>,
204    Query(params): Query<EntityStateParams>,
205) -> Result<Json<serde_json::Value>> {
206    let state = store.reconstruct_state(&entity_id, params.as_of)?;
207
208    tracing::info!("State reconstructed for entity: {}", entity_id);
209
210    Ok(Json(state))
211}
212
213pub async fn get_entity_snapshot(
214    State(store): State<SharedStore>,
215    Path(entity_id): Path<String>,
216) -> Result<Json<serde_json::Value>> {
217    let snapshot = store.get_snapshot(&entity_id)?;
218
219    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
220
221    Ok(Json(snapshot))
222}
223
224pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
225    let stats = store.stats();
226    Json(stats)
227}
228
229// v0.2: WebSocket endpoint for real-time event streaming
230pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
231    let websocket_manager = store.websocket_manager();
232
233    ws.on_upgrade(move |socket| async move {
234        websocket_manager.handle_socket(socket).await;
235    })
236}
237
238// v0.2: Event frequency analytics endpoint
239pub async fn analytics_frequency(
240    State(store): State<SharedStore>,
241    Query(req): Query<EventFrequencyRequest>,
242) -> Result<Json<EventFrequencyResponse>> {
243    let response = AnalyticsEngine::event_frequency(&store, req)?;
244
245    tracing::debug!(
246        "Frequency analysis returned {} buckets",
247        response.buckets.len()
248    );
249
250    Ok(Json(response))
251}
252
253// v0.2: Statistical summary endpoint
254pub async fn analytics_summary(
255    State(store): State<SharedStore>,
256    Query(req): Query<StatsSummaryRequest>,
257) -> Result<Json<StatsSummaryResponse>> {
258    let response = AnalyticsEngine::stats_summary(&store, req)?;
259
260    tracing::debug!(
261        "Stats summary: {} events across {} entities",
262        response.total_events,
263        response.unique_entities
264    );
265
266    Ok(Json(response))
267}
268
269// v0.2: Event correlation analysis endpoint
270pub async fn analytics_correlation(
271    State(store): State<SharedStore>,
272    Query(req): Query<CorrelationRequest>,
273) -> Result<Json<CorrelationResponse>> {
274    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
275
276    tracing::debug!(
277        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
278        response.correlated_pairs,
279        response.total_a,
280        response.correlation_percentage
281    );
282
283    Ok(Json(response))
284}
285
286// v0.2: Create a snapshot for an entity
287pub async fn create_snapshot(
288    State(store): State<SharedStore>,
289    Json(req): Json<CreateSnapshotRequest>,
290) -> Result<Json<CreateSnapshotResponse>> {
291    store.create_snapshot(&req.entity_id)?;
292
293    let snapshot_manager = store.snapshot_manager();
294    let snapshot = snapshot_manager
295        .get_latest_snapshot(&req.entity_id)
296        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
297
298    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
299
300    Ok(Json(CreateSnapshotResponse {
301        snapshot_id: snapshot.id,
302        entity_id: snapshot.entity_id,
303        created_at: snapshot.created_at,
304        event_count: snapshot.event_count,
305        size_bytes: snapshot.metadata.size_bytes,
306    }))
307}
308
309// v0.2: List snapshots
310pub async fn list_snapshots(
311    State(store): State<SharedStore>,
312    Query(req): Query<ListSnapshotsRequest>,
313) -> Result<Json<ListSnapshotsResponse>> {
314    let snapshot_manager = store.snapshot_manager();
315
316    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
317        snapshot_manager
318            .get_all_snapshots(&entity_id)
319            .into_iter()
320            .map(SnapshotInfo::from)
321            .collect()
322    } else {
323        // List all entities with snapshots
324        let entities = snapshot_manager.list_entities();
325        entities
326            .iter()
327            .flat_map(|entity_id| {
328                snapshot_manager
329                    .get_all_snapshots(entity_id)
330                    .into_iter()
331                    .map(SnapshotInfo::from)
332            })
333            .collect()
334    };
335
336    let total = snapshots.len();
337
338    tracing::debug!("Listed {} snapshots", total);
339
340    Ok(Json(ListSnapshotsResponse { snapshots, total }))
341}
342
343// v0.2: Get latest snapshot for an entity
344pub async fn get_latest_snapshot(
345    State(store): State<SharedStore>,
346    Path(entity_id): Path<String>,
347) -> Result<Json<serde_json::Value>> {
348    let snapshot_manager = store.snapshot_manager();
349
350    let snapshot = snapshot_manager
351        .get_latest_snapshot(&entity_id)
352        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
353
354    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
355
356    Ok(Json(serde_json::json!({
357        "snapshot_id": snapshot.id,
358        "entity_id": snapshot.entity_id,
359        "created_at": snapshot.created_at,
360        "as_of": snapshot.as_of,
361        "event_count": snapshot.event_count,
362        "size_bytes": snapshot.metadata.size_bytes,
363        "snapshot_type": snapshot.metadata.snapshot_type,
364        "state": snapshot.state
365    })))
366}
367
368// v0.2: Trigger manual compaction
369pub async fn trigger_compaction(
370    State(store): State<SharedStore>,
371) -> Result<Json<CompactionResult>> {
372    let compaction_manager = store.compaction_manager().ok_or_else(|| {
373        crate::error::AllSourceError::InternalError(
374            "Compaction not enabled (no Parquet storage)".to_string(),
375        )
376    })?;
377
378    tracing::info!("📦 Manual compaction triggered via API");
379
380    let result = compaction_manager.compact_now()?;
381
382    Ok(Json(result))
383}
384
385// v0.2: Get compaction statistics
386pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
387    let compaction_manager = store.compaction_manager().ok_or_else(|| {
388        crate::error::AllSourceError::InternalError(
389            "Compaction not enabled (no Parquet storage)".to_string(),
390        )
391    })?;
392
393    let stats = compaction_manager.stats();
394    let config = compaction_manager.config();
395
396    Ok(Json(serde_json::json!({
397        "stats": stats,
398        "config": {
399            "min_files_to_compact": config.min_files_to_compact,
400            "target_file_size": config.target_file_size,
401            "max_file_size": config.max_file_size,
402            "small_file_threshold": config.small_file_threshold,
403            "compaction_interval_seconds": config.compaction_interval_seconds,
404            "auto_compact": config.auto_compact,
405            "strategy": config.strategy
406        }
407    })))
408}
409
410// v0.5: Register a new schema
411pub async fn register_schema(
412    State(store): State<SharedStore>,
413    Json(req): Json<RegisterSchemaRequest>,
414) -> Result<Json<RegisterSchemaResponse>> {
415    let schema_registry = store.schema_registry();
416
417    let response =
418        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
419
420    tracing::info!(
421        "📋 Schema registered: v{} for '{}'",
422        response.version,
423        response.subject
424    );
425
426    Ok(Json(response))
427}
428
429// v0.5: Get a schema by subject and optional version
430#[derive(Deserialize)]
431pub struct GetSchemaParams {
432    version: Option<u32>,
433}
434
435pub async fn get_schema(
436    State(store): State<SharedStore>,
437    Path(subject): Path<String>,
438    Query(params): Query<GetSchemaParams>,
439) -> Result<Json<serde_json::Value>> {
440    let schema_registry = store.schema_registry();
441
442    let schema = schema_registry.get_schema(&subject, params.version)?;
443
444    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
445
446    Ok(Json(serde_json::json!({
447        "id": schema.id,
448        "subject": schema.subject,
449        "version": schema.version,
450        "schema": schema.schema,
451        "created_at": schema.created_at,
452        "description": schema.description,
453        "tags": schema.tags
454    })))
455}
456
457// v0.5: List all versions of a schema subject
458pub async fn list_schema_versions(
459    State(store): State<SharedStore>,
460    Path(subject): Path<String>,
461) -> Result<Json<serde_json::Value>> {
462    let schema_registry = store.schema_registry();
463
464    let versions = schema_registry.list_versions(&subject)?;
465
466    Ok(Json(serde_json::json!({
467        "subject": subject,
468        "versions": versions
469    })))
470}
471
472// v0.5: List all schema subjects
473pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
474    let schema_registry = store.schema_registry();
475
476    let subjects = schema_registry.list_subjects();
477
478    Json(serde_json::json!({
479        "subjects": subjects,
480        "total": subjects.len()
481    }))
482}
483
484// v0.5: Validate an event against a schema
485pub async fn validate_event_schema(
486    State(store): State<SharedStore>,
487    Json(req): Json<ValidateEventRequest>,
488) -> Result<Json<ValidateEventResponse>> {
489    let schema_registry = store.schema_registry();
490
491    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
492
493    if response.valid {
494        tracing::debug!(
495            "✅ Event validated against schema '{}' v{}",
496            req.subject,
497            response.schema_version
498        );
499    } else {
500        tracing::warn!(
501            "❌ Event validation failed for '{}': {:?}",
502            req.subject,
503            response.errors
504        );
505    }
506
507    Ok(Json(response))
508}
509
510// v0.5: Set compatibility mode for a subject
511#[derive(Deserialize)]
512pub struct SetCompatibilityRequest {
513    compatibility: CompatibilityMode,
514}
515
516pub async fn set_compatibility_mode(
517    State(store): State<SharedStore>,
518    Path(subject): Path<String>,
519    Json(req): Json<SetCompatibilityRequest>,
520) -> Json<serde_json::Value> {
521    let schema_registry = store.schema_registry();
522
523    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
524
525    tracing::info!(
526        "🔧 Set compatibility mode for '{}' to {:?}",
527        subject,
528        req.compatibility
529    );
530
531    Json(serde_json::json!({
532        "subject": subject,
533        "compatibility": req.compatibility
534    }))
535}
536
537// v0.5: Start a replay operation
538pub async fn start_replay(
539    State(store): State<SharedStore>,
540    Json(req): Json<StartReplayRequest>,
541) -> Result<Json<StartReplayResponse>> {
542    let replay_manager = store.replay_manager();
543
544    let response = replay_manager.start_replay(store, req)?;
545
546    tracing::info!(
547        "🔄 Started replay {} with {} events",
548        response.replay_id,
549        response.total_events
550    );
551
552    Ok(Json(response))
553}
554
555// v0.5: Get replay progress
556pub async fn get_replay_progress(
557    State(store): State<SharedStore>,
558    Path(replay_id): Path<uuid::Uuid>,
559) -> Result<Json<ReplayProgress>> {
560    let replay_manager = store.replay_manager();
561
562    let progress = replay_manager.get_progress(replay_id)?;
563
564    Ok(Json(progress))
565}
566
567// v0.5: List all replay operations
568pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
569    let replay_manager = store.replay_manager();
570
571    let replays = replay_manager.list_replays();
572
573    Json(serde_json::json!({
574        "replays": replays,
575        "total": replays.len()
576    }))
577}
578
579// v0.5: Cancel a running replay
580pub async fn cancel_replay(
581    State(store): State<SharedStore>,
582    Path(replay_id): Path<uuid::Uuid>,
583) -> Result<Json<serde_json::Value>> {
584    let replay_manager = store.replay_manager();
585
586    replay_manager.cancel_replay(replay_id)?;
587
588    tracing::info!("🛑 Cancelled replay {}", replay_id);
589
590    Ok(Json(serde_json::json!({
591        "replay_id": replay_id,
592        "status": "cancelled"
593    })))
594}
595
596// v0.5: Delete a completed replay
597pub async fn delete_replay(
598    State(store): State<SharedStore>,
599    Path(replay_id): Path<uuid::Uuid>,
600) -> Result<Json<serde_json::Value>> {
601    let replay_manager = store.replay_manager();
602
603    let deleted = replay_manager.delete_replay(replay_id)?;
604
605    if deleted {
606        tracing::info!("🗑️  Deleted replay {}", replay_id);
607    }
608
609    Ok(Json(serde_json::json!({
610        "replay_id": replay_id,
611        "deleted": deleted
612    })))
613}
614
615// v0.5: Register a new pipeline
616pub async fn register_pipeline(
617    State(store): State<SharedStore>,
618    Json(config): Json<PipelineConfig>,
619) -> Result<Json<serde_json::Value>> {
620    let pipeline_manager = store.pipeline_manager();
621
622    let pipeline_id = pipeline_manager.register(config.clone());
623
624    tracing::info!(
625        "🔀 Pipeline registered: {} (name: {})",
626        pipeline_id,
627        config.name
628    );
629
630    Ok(Json(serde_json::json!({
631        "pipeline_id": pipeline_id,
632        "name": config.name,
633        "enabled": config.enabled
634    })))
635}
636
637// v0.5: List all pipelines
638pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
639    let pipeline_manager = store.pipeline_manager();
640
641    let pipelines = pipeline_manager.list();
642
643    tracing::debug!("Listed {} pipelines", pipelines.len());
644
645    Json(serde_json::json!({
646        "pipelines": pipelines,
647        "total": pipelines.len()
648    }))
649}
650
651// v0.5: Get a specific pipeline
652pub async fn get_pipeline(
653    State(store): State<SharedStore>,
654    Path(pipeline_id): Path<uuid::Uuid>,
655) -> Result<Json<PipelineConfig>> {
656    let pipeline_manager = store.pipeline_manager();
657
658    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
659        crate::error::AllSourceError::ValidationError(format!(
660            "Pipeline not found: {}",
661            pipeline_id
662        ))
663    })?;
664
665    Ok(Json(pipeline.config().clone()))
666}
667
668// v0.5: Remove a pipeline
669pub async fn remove_pipeline(
670    State(store): State<SharedStore>,
671    Path(pipeline_id): Path<uuid::Uuid>,
672) -> Result<Json<serde_json::Value>> {
673    let pipeline_manager = store.pipeline_manager();
674
675    let removed = pipeline_manager.remove(pipeline_id);
676
677    if removed {
678        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
679    }
680
681    Ok(Json(serde_json::json!({
682        "pipeline_id": pipeline_id,
683        "removed": removed
684    })))
685}
686
687// v0.5: Get statistics for all pipelines
688pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
689    let pipeline_manager = store.pipeline_manager();
690
691    let stats = pipeline_manager.all_stats();
692
693    Json(serde_json::json!({
694        "stats": stats,
695        "total": stats.len()
696    }))
697}
698
699// v0.5: Get statistics for a specific pipeline
700pub async fn get_pipeline_stats(
701    State(store): State<SharedStore>,
702    Path(pipeline_id): Path<uuid::Uuid>,
703) -> Result<Json<PipelineStats>> {
704    let pipeline_manager = store.pipeline_manager();
705
706    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
707        crate::error::AllSourceError::ValidationError(format!(
708            "Pipeline not found: {}",
709            pipeline_id
710        ))
711    })?;
712
713    Ok(Json(pipeline.stats()))
714}
715
716// v0.5: Reset a pipeline's state
717pub async fn reset_pipeline(
718    State(store): State<SharedStore>,
719    Path(pipeline_id): Path<uuid::Uuid>,
720) -> Result<Json<serde_json::Value>> {
721    let pipeline_manager = store.pipeline_manager();
722
723    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
724        crate::error::AllSourceError::ValidationError(format!(
725            "Pipeline not found: {}",
726            pipeline_id
727        ))
728    })?;
729
730    pipeline.reset();
731
732    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
733
734    Ok(Json(serde_json::json!({
735        "pipeline_id": pipeline_id,
736        "reset": true
737    })))
738}
739
740// =============================================================================
741// v0.7: Projection State API for Query Service Integration
742// =============================================================================
743
744/// List all registered projections
745pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
746    let projection_manager = store.projection_manager();
747
748    let projections: Vec<serde_json::Value> = projection_manager
749        .list_projections()
750        .iter()
751        .map(|(name, projection)| {
752            serde_json::json!({
753                "name": name,
754                "type": format!("{:?}", projection.name()),
755            })
756        })
757        .collect();
758
759    tracing::debug!("Listed {} projections", projections.len());
760
761    Json(serde_json::json!({
762        "projections": projections,
763        "total": projections.len()
764    }))
765}
766
767/// Get projection metadata by name
768pub async fn get_projection(
769    State(store): State<SharedStore>,
770    Path(name): Path<String>,
771) -> Result<Json<serde_json::Value>> {
772    let projection_manager = store.projection_manager();
773
774    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
775        crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
776    })?;
777
778    Ok(Json(serde_json::json!({
779        "name": projection.name(),
780        "found": true
781    })))
782}
783
784/// Get projection state for a specific entity
785///
786/// This endpoint allows the Elixir Query Service to fetch projection state
787/// from the Rust Core for synchronization.
788pub async fn get_projection_state(
789    State(store): State<SharedStore>,
790    Path((name, entity_id)): Path<(String, String)>,
791) -> Result<Json<serde_json::Value>> {
792    let projection_manager = store.projection_manager();
793
794    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
795        crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
796    })?;
797
798    let state = projection.get_state(&entity_id);
799
800    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
801
802    Ok(Json(serde_json::json!({
803        "projection": name,
804        "entity_id": entity_id,
805        "state": state,
806        "found": state.is_some()
807    })))
808}
809
810/// Request body for saving projection state
811#[derive(Debug, Deserialize)]
812pub struct SaveProjectionStateRequest {
813    pub state: serde_json::Value,
814}
815
816/// Save/update projection state for an entity
817///
818/// This endpoint allows external services (like Elixir Query Service) to
819/// store computed projection state back to the Core for persistence.
820pub async fn save_projection_state(
821    State(store): State<SharedStore>,
822    Path((name, entity_id)): Path<(String, String)>,
823    Json(req): Json<SaveProjectionStateRequest>,
824) -> Result<Json<serde_json::Value>> {
825    let projection_cache = store.projection_state_cache();
826
827    // Store in the projection state cache
828    projection_cache.insert(format!("{}:{}", name, entity_id), req.state.clone());
829
830    tracing::info!("Projection state saved: {} / {}", name, entity_id);
831
832    Ok(Json(serde_json::json!({
833        "projection": name,
834        "entity_id": entity_id,
835        "saved": true
836    })))
837}
838
839/// Bulk get projection states for multiple entities
840///
841/// Efficient endpoint for fetching multiple entity states in a single request.
842#[derive(Debug, Deserialize)]
843pub struct BulkGetStateRequest {
844    pub entity_ids: Vec<String>,
845}
846
847pub async fn bulk_get_projection_states(
848    State(store): State<SharedStore>,
849    Path(name): Path<String>,
850    Json(req): Json<BulkGetStateRequest>,
851) -> Result<Json<serde_json::Value>> {
852    let projection_manager = store.projection_manager();
853
854    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
855        crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
856    })?;
857
858    let states: Vec<serde_json::Value> = req
859        .entity_ids
860        .iter()
861        .map(|entity_id| {
862            let state = projection.get_state(entity_id);
863            serde_json::json!({
864                "entity_id": entity_id,
865                "state": state,
866                "found": state.is_some()
867            })
868        })
869        .collect();
870
871    tracing::debug!(
872        "Bulk projection state retrieved: {} entities from {}",
873        states.len(),
874        name
875    );
876
877    Ok(Json(serde_json::json!({
878        "projection": name,
879        "states": states,
880        "total": states.len()
881    })))
882}
883
884#[cfg(test)]
885mod tests {
886    use super::*;
887    use crate::domain::entities::Event;
888    use crate::store::EventStore;
889
890    fn create_test_store() -> Arc<EventStore> {
891        Arc::new(EventStore::new())
892    }
893
894    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
895        Event::from_strings(
896            event_type.to_string(),
897            entity_id.to_string(),
898            "test-stream".to_string(),
899            serde_json::json!({
900                "name": "Test",
901                "value": 42
902            }),
903            None,
904        )
905        .unwrap()
906    }
907
908    #[tokio::test]
909    async fn test_projection_state_cache() {
910        let store = create_test_store();
911
912        // Test cache insertion
913        let cache = store.projection_state_cache();
914        cache.insert(
915            "entity_snapshots:user-123".to_string(),
916            serde_json::json!({"name": "Test User", "age": 30}),
917        );
918
919        // Test cache retrieval
920        let state = cache.get("entity_snapshots:user-123");
921        assert!(state.is_some());
922        let state = state.unwrap();
923        assert_eq!(state["name"], "Test User");
924        assert_eq!(state["age"], 30);
925    }
926
927    #[tokio::test]
928    async fn test_projection_manager_list_projections() {
929        let store = create_test_store();
930
931        // List projections (built-in projections should be available)
932        let projection_manager = store.projection_manager();
933        let projections = projection_manager.list_projections();
934
935        // Should have entity_snapshots and event_counters
936        assert!(projections.len() >= 2);
937
938        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
939        assert!(names.contains(&"entity_snapshots"));
940        assert!(names.contains(&"event_counters"));
941    }
942
943    #[tokio::test]
944    async fn test_projection_state_after_event_ingestion() {
945        let store = create_test_store();
946
947        // Ingest an event
948        let event = create_test_event("user-456", "user.created");
949        store.ingest(event).unwrap();
950
951        // Get projection state
952        let projection_manager = store.projection_manager();
953        let snapshot_projection = projection_manager
954            .get_projection("entity_snapshots")
955            .unwrap();
956
957        let state = snapshot_projection.get_state("user-456");
958        assert!(state.is_some());
959        let state = state.unwrap();
960        assert_eq!(state["name"], "Test");
961        assert_eq!(state["value"], 42);
962    }
963
964    #[tokio::test]
965    async fn test_projection_state_cache_multiple_entities() {
966        let store = create_test_store();
967        let cache = store.projection_state_cache();
968
969        // Insert multiple entities
970        for i in 0..10 {
971            cache.insert(
972                format!("entity_snapshots:entity-{}", i),
973                serde_json::json!({"id": i, "status": "active"}),
974            );
975        }
976
977        // Verify all insertions
978        assert_eq!(cache.len(), 10);
979
980        // Verify each entity
981        for i in 0..10 {
982            let key = format!("entity_snapshots:entity-{}", i);
983            let state = cache.get(&key);
984            assert!(state.is_some());
985            assert_eq!(state.unwrap()["id"], i);
986        }
987    }
988
989    #[tokio::test]
990    async fn test_projection_state_update() {
991        let store = create_test_store();
992        let cache = store.projection_state_cache();
993
994        // Initial state
995        cache.insert(
996            "entity_snapshots:user-789".to_string(),
997            serde_json::json!({"balance": 100}),
998        );
999
1000        // Update state
1001        cache.insert(
1002            "entity_snapshots:user-789".to_string(),
1003            serde_json::json!({"balance": 150}),
1004        );
1005
1006        // Verify update
1007        let state = cache.get("entity_snapshots:user-789").unwrap();
1008        assert_eq!(state["balance"], 150);
1009    }
1010
1011    #[tokio::test]
1012    async fn test_event_counter_projection() {
1013        let store = create_test_store();
1014
1015        // Ingest events of different types
1016        store
1017            .ingest(create_test_event("user-1", "user.created"))
1018            .unwrap();
1019        store
1020            .ingest(create_test_event("user-2", "user.created"))
1021            .unwrap();
1022        store
1023            .ingest(create_test_event("user-1", "user.updated"))
1024            .unwrap();
1025
1026        // Get event counter projection
1027        let projection_manager = store.projection_manager();
1028        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1029
1030        // Check counts
1031        let created_state = counter_projection.get_state("user.created");
1032        assert!(created_state.is_some());
1033        assert_eq!(created_state.unwrap()["count"], 2);
1034
1035        let updated_state = counter_projection.get_state("user.updated");
1036        assert!(updated_state.is_some());
1037        assert_eq!(updated_state.unwrap()["count"], 1);
1038    }
1039}