Skip to main content

allsource_core/infrastructure/web/
api.rs

1use crate::{
2    application::{
3        dto::{
4            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
5            IngestEventsBatchResponse, QueryEventsRequest, QueryEventsResponse,
6        },
7        services::{
8            analytics::{
9                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
10                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
11            },
12            pipeline::{PipelineConfig, PipelineStats},
13            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
14            schema::{
15                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
16                ValidateEventRequest, ValidateEventResponse,
17            },
18            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
19        },
20    },
21    domain::entities::Event,
22    error::Result,
23    infrastructure::{
24        persistence::{
25            compaction::CompactionResult,
26            snapshot::{
27                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
28                ListSnapshotsResponse, SnapshotInfo,
29            },
30        },
31        query::{
32            eventql::EventQLRequest,
33            geospatial::GeoQueryRequest,
34            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
35        },
36        replication::ReplicationMode,
37        web::api_v1::AppState,
38    },
39    store::{EventStore, EventTypeInfo, StreamInfo},
40};
41use axum::{
42    Json, Router,
43    extract::{Path, Query, State, WebSocketUpgrade},
44    response::{IntoResponse, Response},
45    routing::{get, post, put},
46};
47use serde::Deserialize;
48use std::sync::Arc;
49use tower_http::{
50    cors::{Any, CorsLayer},
51    trace::TraceLayer,
52};
53
54type SharedStore = Arc<EventStore>;
55
56/// Wait for follower ACK(s) in semi-sync/sync replication modes.
57///
58/// In async mode (default), returns immediately. In semi-sync mode, waits for
59/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
60/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
61async fn await_replication_ack(state: &AppState) {
62    let shipper_guard = state.wal_shipper.read().await;
63    if let Some(ref shipper) = *shipper_guard {
64        let mode = shipper.replication_mode();
65        if mode == ReplicationMode::Async {
66            return;
67        }
68
69        let target_offset = shipper.current_leader_offset();
70        if target_offset == 0 {
71            return;
72        }
73
74        let shipper = Arc::clone(shipper);
75        // Drop the read guard before the async wait to avoid holding it across await
76        drop(shipper_guard);
77
78        let timer = state
79            .store
80            .metrics()
81            .replication_ack_wait_seconds
82            .start_timer();
83        let acked = shipper.wait_for_ack(target_offset).await;
84        timer.observe_duration();
85
86        if !acked {
87            tracing::warn!(
88                "Replication ACK timeout in {} mode (offset {}). \
89                 Write succeeded locally but follower confirmation pending.",
90                mode,
91                target_offset,
92            );
93        }
94    }
95}
96
97pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
98    let app = Router::new()
99        .route("/health", get(health))
100        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
101        .route("/api/v1/events", post(ingest_event))
102        .route("/api/v1/events/batch", post(ingest_events_batch))
103        .route("/api/v1/events/query", get(query_events))
104        .route("/api/v1/events/{event_id}", get(get_event_by_id))
105        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
106        // v0.10: Stream and event type discovery endpoints
107        .route("/api/v1/streams", get(list_streams))
108        .route("/api/v1/event-types", get(list_event_types))
109        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
110        .route(
111            "/api/v1/entities/{entity_id}/snapshot",
112            get(get_entity_snapshot),
113        )
114        .route("/api/v1/stats", get(get_stats))
115        // v0.2: Advanced analytics endpoints
116        .route("/api/v1/analytics/frequency", get(analytics_frequency))
117        .route("/api/v1/analytics/summary", get(analytics_summary))
118        .route("/api/v1/analytics/correlation", get(analytics_correlation))
119        // v0.2: Snapshot management endpoints
120        .route("/api/v1/snapshots", post(create_snapshot))
121        .route("/api/v1/snapshots", get(list_snapshots))
122        .route(
123            "/api/v1/snapshots/{entity_id}/latest",
124            get(get_latest_snapshot),
125        )
126        // v0.2: Compaction endpoints
127        .route("/api/v1/compaction/trigger", post(trigger_compaction))
128        .route("/api/v1/compaction/stats", get(compaction_stats))
129        // v0.5: Schema registry endpoints
130        .route("/api/v1/schemas", post(register_schema))
131        .route("/api/v1/schemas", get(list_subjects))
132        .route("/api/v1/schemas/{subject}", get(get_schema))
133        .route(
134            "/api/v1/schemas/{subject}/versions",
135            get(list_schema_versions),
136        )
137        .route("/api/v1/schemas/validate", post(validate_event_schema))
138        .route(
139            "/api/v1/schemas/{subject}/compatibility",
140            put(set_compatibility_mode),
141        )
142        // v0.5: Replay and projection rebuild endpoints
143        .route("/api/v1/replay", post(start_replay))
144        .route("/api/v1/replay", get(list_replays))
145        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
146        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
147        .route(
148            "/api/v1/replay/{replay_id}",
149            axum::routing::delete(delete_replay),
150        )
151        // v0.5: Stream processing pipeline endpoints
152        .route("/api/v1/pipelines", post(register_pipeline))
153        .route("/api/v1/pipelines", get(list_pipelines))
154        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
155        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
156        .route(
157            "/api/v1/pipelines/{pipeline_id}",
158            axum::routing::delete(remove_pipeline),
159        )
160        .route(
161            "/api/v1/pipelines/{pipeline_id}/stats",
162            get(get_pipeline_stats),
163        )
164        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
165        // v0.7: Projection State API for Query Service integration
166        .route("/api/v1/projections", get(list_projections))
167        .route("/api/v1/projections/{name}", get(get_projection))
168        .route(
169            "/api/v1/projections/{name}",
170            axum::routing::delete(delete_projection),
171        )
172        .route(
173            "/api/v1/projections/{name}/state",
174            get(get_projection_state_summary),
175        )
176        .route("/api/v1/projections/{name}/reset", post(reset_projection))
177        .route(
178            "/api/v1/projections/{name}/{entity_id}/state",
179            get(get_projection_state),
180        )
181        .route(
182            "/api/v1/projections/{name}/{entity_id}/state",
183            post(save_projection_state),
184        )
185        .route(
186            "/api/v1/projections/{name}/{entity_id}/state",
187            put(save_projection_state),
188        )
189        .route(
190            "/api/v1/projections/{name}/bulk",
191            post(bulk_get_projection_states),
192        )
193        .route(
194            "/api/v1/projections/{name}/bulk/save",
195            post(bulk_save_projection_states),
196        )
197        // v0.11: Webhook management endpoints
198        .route("/api/v1/webhooks", post(register_webhook))
199        .route("/api/v1/webhooks", get(list_webhooks))
200        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
201        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
202        .route(
203            "/api/v1/webhooks/{webhook_id}",
204            axum::routing::delete(delete_webhook),
205        )
206        .route(
207            "/api/v1/webhooks/{webhook_id}/deliveries",
208            get(list_webhook_deliveries),
209        )
210        // v2.0: Advanced query features
211        .route("/api/v1/eventql", post(eventql_query))
212        .route("/api/v1/graphql", post(graphql_query))
213        .route("/api/v1/geospatial/query", post(geo_query))
214        .route("/api/v1/geospatial/stats", get(geo_stats))
215        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
216        .route(
217            "/api/v1/schema-evolution/history/{event_type}",
218            get(schema_evolution_history),
219        )
220        .route(
221            "/api/v1/schema-evolution/schema/{event_type}",
222            get(schema_evolution_schema),
223        )
224        .route(
225            "/api/v1/schema-evolution/stats",
226            get(schema_evolution_stats),
227        )
228        .layer(
229            CorsLayer::new()
230                .allow_origin(Any)
231                .allow_methods(Any)
232                .allow_headers(Any),
233        )
234        .layer(TraceLayer::new_for_http())
235        .with_state(store);
236
237    let listener = tokio::net::TcpListener::bind(addr).await?;
238    axum::serve(listener, app).await?;
239
240    Ok(())
241}
242
243pub async fn health() -> impl IntoResponse {
244    Json(serde_json::json!({
245        "status": "healthy",
246        "service": "allsource-core",
247        "version": env!("CARGO_PKG_VERSION")
248    }))
249}
250
251// v0.6: Prometheus metrics endpoint
252pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
253    let metrics = store.metrics();
254
255    match metrics.encode() {
256        Ok(encoded) => Response::builder()
257            .status(200)
258            .header("Content-Type", "text/plain; version=0.0.4")
259            .body(encoded)
260            .unwrap()
261            .into_response(),
262        Err(e) => Response::builder()
263            .status(500)
264            .body(format!("Error encoding metrics: {e}"))
265            .unwrap()
266            .into_response(),
267    }
268}
269
270pub async fn ingest_event(
271    State(store): State<SharedStore>,
272    Json(req): Json<IngestEventRequest>,
273) -> Result<Json<IngestEventResponse>> {
274    // Create event using from_strings with default tenant
275    let event = Event::from_strings(
276        req.event_type,
277        req.entity_id,
278        "default".to_string(),
279        req.payload,
280        req.metadata,
281    )?;
282
283    let event_id = event.id;
284    let timestamp = event.timestamp;
285
286    store.ingest(event)?;
287
288    tracing::info!("Event ingested: {}", event_id);
289
290    Ok(Json(IngestEventResponse {
291        event_id,
292        timestamp,
293    }))
294}
295
296/// Ingest a single event with semi-sync/sync replication ACK waiting.
297///
298/// Used by the v1 API (with auth and replication support).
299pub async fn ingest_event_v1(
300    State(state): State<AppState>,
301    Json(req): Json<IngestEventRequest>,
302) -> Result<Json<IngestEventResponse>> {
303    let event = Event::from_strings(
304        req.event_type,
305        req.entity_id,
306        "default".to_string(),
307        req.payload,
308        req.metadata,
309    )?;
310
311    let event_id = event.id;
312    let timestamp = event.timestamp;
313
314    state.store.ingest(event)?;
315
316    // Semi-sync/sync: wait for follower ACK(s) before returning
317    await_replication_ack(&state).await;
318
319    tracing::info!("Event ingested: {}", event_id);
320
321    Ok(Json(IngestEventResponse {
322        event_id,
323        timestamp,
324    }))
325}
326
327/// Batch ingest multiple events in a single request
328///
329/// This endpoint allows ingesting multiple events atomically, which is more
330/// efficient than making individual requests for each event.
331pub async fn ingest_events_batch(
332    State(store): State<SharedStore>,
333    Json(req): Json<IngestEventsBatchRequest>,
334) -> Result<Json<IngestEventsBatchResponse>> {
335    let total = req.events.len();
336    let mut ingested_events = Vec::with_capacity(total);
337
338    for event_req in req.events {
339        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
340
341        let event = Event::from_strings(
342            event_req.event_type,
343            event_req.entity_id,
344            tenant_id,
345            event_req.payload,
346            event_req.metadata,
347        )?;
348
349        let event_id = event.id;
350        let timestamp = event.timestamp;
351
352        store.ingest(event)?;
353
354        ingested_events.push(IngestEventResponse {
355            event_id,
356            timestamp,
357        });
358    }
359
360    let ingested = ingested_events.len();
361    tracing::info!("Batch ingested {} events", ingested);
362
363    Ok(Json(IngestEventsBatchResponse {
364        total,
365        ingested,
366        events: ingested_events,
367    }))
368}
369
370/// Batch ingest with semi-sync/sync replication ACK waiting.
371///
372/// Used by the v1 API (with auth and replication support).
373pub async fn ingest_events_batch_v1(
374    State(state): State<AppState>,
375    Json(req): Json<IngestEventsBatchRequest>,
376) -> Result<Json<IngestEventsBatchResponse>> {
377    let total = req.events.len();
378    let mut ingested_events = Vec::with_capacity(total);
379
380    for event_req in req.events {
381        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
382
383        let event = Event::from_strings(
384            event_req.event_type,
385            event_req.entity_id,
386            tenant_id,
387            event_req.payload,
388            event_req.metadata,
389        )?;
390
391        let event_id = event.id;
392        let timestamp = event.timestamp;
393
394        state.store.ingest(event)?;
395
396        ingested_events.push(IngestEventResponse {
397            event_id,
398            timestamp,
399        });
400    }
401
402    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
403    await_replication_ack(&state).await;
404
405    let ingested = ingested_events.len();
406    tracing::info!("Batch ingested {} events", ingested);
407
408    Ok(Json(IngestEventsBatchResponse {
409        total,
410        ingested,
411        events: ingested_events,
412    }))
413}
414
415pub async fn query_events(
416    State(store): State<SharedStore>,
417    Query(req): Query<QueryEventsRequest>,
418) -> Result<Json<QueryEventsResponse>> {
419    let domain_events = store.query(req)?;
420    let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
421    let count = events.len();
422
423    tracing::debug!("Query returned {} events", count);
424
425    Ok(Json(QueryEventsResponse { events, count }))
426}
427
428#[derive(Deserialize)]
429pub struct EntityStateParams {
430    as_of: Option<chrono::DateTime<chrono::Utc>>,
431}
432
433pub async fn get_entity_state(
434    State(store): State<SharedStore>,
435    Path(entity_id): Path<String>,
436    Query(params): Query<EntityStateParams>,
437) -> Result<Json<serde_json::Value>> {
438    let state = store.reconstruct_state(&entity_id, params.as_of)?;
439
440    tracing::info!("State reconstructed for entity: {}", entity_id);
441
442    Ok(Json(state))
443}
444
445pub async fn get_entity_snapshot(
446    State(store): State<SharedStore>,
447    Path(entity_id): Path<String>,
448) -> Result<Json<serde_json::Value>> {
449    let snapshot = store.get_snapshot(&entity_id)?;
450
451    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
452
453    Ok(Json(snapshot))
454}
455
456pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
457    let stats = store.stats();
458    Json(stats)
459}
460
461// v0.10: List all streams (entity_ids) in the event store
462/// Query parameters for listing streams
463#[derive(Debug, Deserialize)]
464pub struct ListStreamsParams {
465    /// Optional limit on number of streams to return
466    pub limit: Option<usize>,
467    /// Optional offset for pagination
468    pub offset: Option<usize>,
469}
470
471/// Response for listing streams
472#[derive(Debug, serde::Serialize)]
473pub struct ListStreamsResponse {
474    pub streams: Vec<StreamInfo>,
475    pub total: usize,
476}
477
478pub async fn list_streams(
479    State(store): State<SharedStore>,
480    Query(params): Query<ListStreamsParams>,
481) -> Json<ListStreamsResponse> {
482    let mut streams = store.list_streams();
483    let total = streams.len();
484
485    // Sort by last_event_at descending (most recent first)
486    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
487
488    // Apply pagination
489    if let Some(offset) = params.offset {
490        if offset < streams.len() {
491            streams = streams[offset..].to_vec();
492        } else {
493            streams = vec![];
494        }
495    }
496
497    if let Some(limit) = params.limit {
498        streams.truncate(limit);
499    }
500
501    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
502
503    Json(ListStreamsResponse { streams, total })
504}
505
506// v0.10: List all event types in the event store
507/// Query parameters for listing event types
508#[derive(Debug, Deserialize)]
509pub struct ListEventTypesParams {
510    /// Optional limit on number of event types to return
511    pub limit: Option<usize>,
512    /// Optional offset for pagination
513    pub offset: Option<usize>,
514}
515
516/// Response for listing event types
517#[derive(Debug, serde::Serialize)]
518pub struct ListEventTypesResponse {
519    pub event_types: Vec<EventTypeInfo>,
520    pub total: usize,
521}
522
523pub async fn list_event_types(
524    State(store): State<SharedStore>,
525    Query(params): Query<ListEventTypesParams>,
526) -> Json<ListEventTypesResponse> {
527    let mut event_types = store.list_event_types();
528    let total = event_types.len();
529
530    // Sort by event_count descending (most used first)
531    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
532
533    // Apply pagination
534    if let Some(offset) = params.offset {
535        if offset < event_types.len() {
536            event_types = event_types[offset..].to_vec();
537        } else {
538            event_types = vec![];
539        }
540    }
541
542    if let Some(limit) = params.limit {
543        event_types.truncate(limit);
544    }
545
546    tracing::debug!(
547        "Listed {} event types (total: {})",
548        event_types.len(),
549        total
550    );
551
552    Json(ListEventTypesResponse { event_types, total })
553}
554
555// v0.2: WebSocket endpoint for real-time event streaming
556pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
557    let websocket_manager = store.websocket_manager();
558
559    ws.on_upgrade(move |socket| async move {
560        websocket_manager.handle_socket(socket).await;
561    })
562}
563
564// v0.2: Event frequency analytics endpoint
565pub async fn analytics_frequency(
566    State(store): State<SharedStore>,
567    Query(req): Query<EventFrequencyRequest>,
568) -> Result<Json<EventFrequencyResponse>> {
569    let response = AnalyticsEngine::event_frequency(&store, req)?;
570
571    tracing::debug!(
572        "Frequency analysis returned {} buckets",
573        response.buckets.len()
574    );
575
576    Ok(Json(response))
577}
578
579// v0.2: Statistical summary endpoint
580pub async fn analytics_summary(
581    State(store): State<SharedStore>,
582    Query(req): Query<StatsSummaryRequest>,
583) -> Result<Json<StatsSummaryResponse>> {
584    let response = AnalyticsEngine::stats_summary(&store, req)?;
585
586    tracing::debug!(
587        "Stats summary: {} events across {} entities",
588        response.total_events,
589        response.unique_entities
590    );
591
592    Ok(Json(response))
593}
594
595// v0.2: Event correlation analysis endpoint
596pub async fn analytics_correlation(
597    State(store): State<SharedStore>,
598    Query(req): Query<CorrelationRequest>,
599) -> Result<Json<CorrelationResponse>> {
600    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
601
602    tracing::debug!(
603        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
604        response.correlated_pairs,
605        response.total_a,
606        response.correlation_percentage
607    );
608
609    Ok(Json(response))
610}
611
612// v0.2: Create a snapshot for an entity
613pub async fn create_snapshot(
614    State(store): State<SharedStore>,
615    Json(req): Json<CreateSnapshotRequest>,
616) -> Result<Json<CreateSnapshotResponse>> {
617    store.create_snapshot(&req.entity_id)?;
618
619    let snapshot_manager = store.snapshot_manager();
620    let snapshot = snapshot_manager
621        .get_latest_snapshot(&req.entity_id)
622        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
623
624    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
625
626    Ok(Json(CreateSnapshotResponse {
627        snapshot_id: snapshot.id,
628        entity_id: snapshot.entity_id,
629        created_at: snapshot.created_at,
630        event_count: snapshot.event_count,
631        size_bytes: snapshot.metadata.size_bytes,
632    }))
633}
634
635// v0.2: List snapshots
636pub async fn list_snapshots(
637    State(store): State<SharedStore>,
638    Query(req): Query<ListSnapshotsRequest>,
639) -> Result<Json<ListSnapshotsResponse>> {
640    let snapshot_manager = store.snapshot_manager();
641
642    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
643        snapshot_manager
644            .get_all_snapshots(&entity_id)
645            .into_iter()
646            .map(SnapshotInfo::from)
647            .collect()
648    } else {
649        // List all entities with snapshots
650        let entities = snapshot_manager.list_entities();
651        entities
652            .iter()
653            .flat_map(|entity_id| {
654                snapshot_manager
655                    .get_all_snapshots(entity_id)
656                    .into_iter()
657                    .map(SnapshotInfo::from)
658            })
659            .collect()
660    };
661
662    let total = snapshots.len();
663
664    tracing::debug!("Listed {} snapshots", total);
665
666    Ok(Json(ListSnapshotsResponse { snapshots, total }))
667}
668
669// v0.2: Get latest snapshot for an entity
670pub async fn get_latest_snapshot(
671    State(store): State<SharedStore>,
672    Path(entity_id): Path<String>,
673) -> Result<Json<serde_json::Value>> {
674    let snapshot_manager = store.snapshot_manager();
675
676    let snapshot = snapshot_manager
677        .get_latest_snapshot(&entity_id)
678        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
679
680    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
681
682    Ok(Json(serde_json::json!({
683        "snapshot_id": snapshot.id,
684        "entity_id": snapshot.entity_id,
685        "created_at": snapshot.created_at,
686        "as_of": snapshot.as_of,
687        "event_count": snapshot.event_count,
688        "size_bytes": snapshot.metadata.size_bytes,
689        "snapshot_type": snapshot.metadata.snapshot_type,
690        "state": snapshot.state
691    })))
692}
693
694// v0.2: Trigger manual compaction
695pub async fn trigger_compaction(
696    State(store): State<SharedStore>,
697) -> Result<Json<CompactionResult>> {
698    let compaction_manager = store.compaction_manager().ok_or_else(|| {
699        crate::error::AllSourceError::InternalError(
700            "Compaction not enabled (no Parquet storage)".to_string(),
701        )
702    })?;
703
704    tracing::info!("📦 Manual compaction triggered via API");
705
706    let result = compaction_manager.compact_now()?;
707
708    Ok(Json(result))
709}
710
711// v0.2: Get compaction statistics
712pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
713    let compaction_manager = store.compaction_manager().ok_or_else(|| {
714        crate::error::AllSourceError::InternalError(
715            "Compaction not enabled (no Parquet storage)".to_string(),
716        )
717    })?;
718
719    let stats = compaction_manager.stats();
720    let config = compaction_manager.config();
721
722    Ok(Json(serde_json::json!({
723        "stats": stats,
724        "config": {
725            "min_files_to_compact": config.min_files_to_compact,
726            "target_file_size": config.target_file_size,
727            "max_file_size": config.max_file_size,
728            "small_file_threshold": config.small_file_threshold,
729            "compaction_interval_seconds": config.compaction_interval_seconds,
730            "auto_compact": config.auto_compact,
731            "strategy": config.strategy
732        }
733    })))
734}
735
736// v0.5: Register a new schema
737pub async fn register_schema(
738    State(store): State<SharedStore>,
739    Json(req): Json<RegisterSchemaRequest>,
740) -> Result<Json<RegisterSchemaResponse>> {
741    let schema_registry = store.schema_registry();
742
743    let response =
744        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
745
746    tracing::info!(
747        "📋 Schema registered: v{} for '{}'",
748        response.version,
749        response.subject
750    );
751
752    Ok(Json(response))
753}
754
755// v0.5: Get a schema by subject and optional version
756#[derive(Deserialize)]
757pub struct GetSchemaParams {
758    version: Option<u32>,
759}
760
761pub async fn get_schema(
762    State(store): State<SharedStore>,
763    Path(subject): Path<String>,
764    Query(params): Query<GetSchemaParams>,
765) -> Result<Json<serde_json::Value>> {
766    let schema_registry = store.schema_registry();
767
768    let schema = schema_registry.get_schema(&subject, params.version)?;
769
770    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
771
772    Ok(Json(serde_json::json!({
773        "id": schema.id,
774        "subject": schema.subject,
775        "version": schema.version,
776        "schema": schema.schema,
777        "created_at": schema.created_at,
778        "description": schema.description,
779        "tags": schema.tags
780    })))
781}
782
783// v0.5: List all versions of a schema subject
784pub async fn list_schema_versions(
785    State(store): State<SharedStore>,
786    Path(subject): Path<String>,
787) -> Result<Json<serde_json::Value>> {
788    let schema_registry = store.schema_registry();
789
790    let versions = schema_registry.list_versions(&subject)?;
791
792    Ok(Json(serde_json::json!({
793        "subject": subject,
794        "versions": versions
795    })))
796}
797
798// v0.5: List all schema subjects
799pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
800    let schema_registry = store.schema_registry();
801
802    let subjects = schema_registry.list_subjects();
803
804    Json(serde_json::json!({
805        "subjects": subjects,
806        "total": subjects.len()
807    }))
808}
809
810// v0.5: Validate an event against a schema
811pub async fn validate_event_schema(
812    State(store): State<SharedStore>,
813    Json(req): Json<ValidateEventRequest>,
814) -> Result<Json<ValidateEventResponse>> {
815    let schema_registry = store.schema_registry();
816
817    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
818
819    if response.valid {
820        tracing::debug!(
821            "✅ Event validated against schema '{}' v{}",
822            req.subject,
823            response.schema_version
824        );
825    } else {
826        tracing::warn!(
827            "❌ Event validation failed for '{}': {:?}",
828            req.subject,
829            response.errors
830        );
831    }
832
833    Ok(Json(response))
834}
835
836// v0.5: Set compatibility mode for a subject
837#[derive(Deserialize)]
838pub struct SetCompatibilityRequest {
839    compatibility: CompatibilityMode,
840}
841
842pub async fn set_compatibility_mode(
843    State(store): State<SharedStore>,
844    Path(subject): Path<String>,
845    Json(req): Json<SetCompatibilityRequest>,
846) -> Json<serde_json::Value> {
847    let schema_registry = store.schema_registry();
848
849    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
850
851    tracing::info!(
852        "🔧 Set compatibility mode for '{}' to {:?}",
853        subject,
854        req.compatibility
855    );
856
857    Json(serde_json::json!({
858        "subject": subject,
859        "compatibility": req.compatibility
860    }))
861}
862
863// v0.5: Start a replay operation
864pub async fn start_replay(
865    State(store): State<SharedStore>,
866    Json(req): Json<StartReplayRequest>,
867) -> Result<Json<StartReplayResponse>> {
868    let replay_manager = store.replay_manager();
869
870    let response = replay_manager.start_replay(store, req)?;
871
872    tracing::info!(
873        "🔄 Started replay {} with {} events",
874        response.replay_id,
875        response.total_events
876    );
877
878    Ok(Json(response))
879}
880
881// v0.5: Get replay progress
882pub async fn get_replay_progress(
883    State(store): State<SharedStore>,
884    Path(replay_id): Path<uuid::Uuid>,
885) -> Result<Json<ReplayProgress>> {
886    let replay_manager = store.replay_manager();
887
888    let progress = replay_manager.get_progress(replay_id)?;
889
890    Ok(Json(progress))
891}
892
893// v0.5: List all replay operations
894pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
895    let replay_manager = store.replay_manager();
896
897    let replays = replay_manager.list_replays();
898
899    Json(serde_json::json!({
900        "replays": replays,
901        "total": replays.len()
902    }))
903}
904
905// v0.5: Cancel a running replay
906pub async fn cancel_replay(
907    State(store): State<SharedStore>,
908    Path(replay_id): Path<uuid::Uuid>,
909) -> Result<Json<serde_json::Value>> {
910    let replay_manager = store.replay_manager();
911
912    replay_manager.cancel_replay(replay_id)?;
913
914    tracing::info!("🛑 Cancelled replay {}", replay_id);
915
916    Ok(Json(serde_json::json!({
917        "replay_id": replay_id,
918        "status": "cancelled"
919    })))
920}
921
922// v0.5: Delete a completed replay
923pub async fn delete_replay(
924    State(store): State<SharedStore>,
925    Path(replay_id): Path<uuid::Uuid>,
926) -> Result<Json<serde_json::Value>> {
927    let replay_manager = store.replay_manager();
928
929    let deleted = replay_manager.delete_replay(replay_id)?;
930
931    if deleted {
932        tracing::info!("🗑️  Deleted replay {}", replay_id);
933    }
934
935    Ok(Json(serde_json::json!({
936        "replay_id": replay_id,
937        "deleted": deleted
938    })))
939}
940
941// v0.5: Register a new pipeline
942pub async fn register_pipeline(
943    State(store): State<SharedStore>,
944    Json(config): Json<PipelineConfig>,
945) -> Result<Json<serde_json::Value>> {
946    let pipeline_manager = store.pipeline_manager();
947
948    let pipeline_id = pipeline_manager.register(config.clone());
949
950    tracing::info!(
951        "🔀 Pipeline registered: {} (name: {})",
952        pipeline_id,
953        config.name
954    );
955
956    Ok(Json(serde_json::json!({
957        "pipeline_id": pipeline_id,
958        "name": config.name,
959        "enabled": config.enabled
960    })))
961}
962
963// v0.5: List all pipelines
964pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
965    let pipeline_manager = store.pipeline_manager();
966
967    let pipelines = pipeline_manager.list();
968
969    tracing::debug!("Listed {} pipelines", pipelines.len());
970
971    Json(serde_json::json!({
972        "pipelines": pipelines,
973        "total": pipelines.len()
974    }))
975}
976
977// v0.5: Get a specific pipeline
978pub async fn get_pipeline(
979    State(store): State<SharedStore>,
980    Path(pipeline_id): Path<uuid::Uuid>,
981) -> Result<Json<PipelineConfig>> {
982    let pipeline_manager = store.pipeline_manager();
983
984    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
985        crate::error::AllSourceError::ValidationError(format!(
986            "Pipeline not found: {}",
987            pipeline_id
988        ))
989    })?;
990
991    Ok(Json(pipeline.config().clone()))
992}
993
994// v0.5: Remove a pipeline
995pub async fn remove_pipeline(
996    State(store): State<SharedStore>,
997    Path(pipeline_id): Path<uuid::Uuid>,
998) -> Result<Json<serde_json::Value>> {
999    let pipeline_manager = store.pipeline_manager();
1000
1001    let removed = pipeline_manager.remove(pipeline_id);
1002
1003    if removed {
1004        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1005    }
1006
1007    Ok(Json(serde_json::json!({
1008        "pipeline_id": pipeline_id,
1009        "removed": removed
1010    })))
1011}
1012
1013// v0.5: Get statistics for all pipelines
1014pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1015    let pipeline_manager = store.pipeline_manager();
1016
1017    let stats = pipeline_manager.all_stats();
1018
1019    Json(serde_json::json!({
1020        "stats": stats,
1021        "total": stats.len()
1022    }))
1023}
1024
1025// v0.5: Get statistics for a specific pipeline
1026pub async fn get_pipeline_stats(
1027    State(store): State<SharedStore>,
1028    Path(pipeline_id): Path<uuid::Uuid>,
1029) -> Result<Json<PipelineStats>> {
1030    let pipeline_manager = store.pipeline_manager();
1031
1032    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1033        crate::error::AllSourceError::ValidationError(format!(
1034            "Pipeline not found: {}",
1035            pipeline_id
1036        ))
1037    })?;
1038
1039    Ok(Json(pipeline.stats()))
1040}
1041
1042// v0.5: Reset a pipeline's state
1043pub async fn reset_pipeline(
1044    State(store): State<SharedStore>,
1045    Path(pipeline_id): Path<uuid::Uuid>,
1046) -> Result<Json<serde_json::Value>> {
1047    let pipeline_manager = store.pipeline_manager();
1048
1049    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1050        crate::error::AllSourceError::ValidationError(format!(
1051            "Pipeline not found: {}",
1052            pipeline_id
1053        ))
1054    })?;
1055
1056    pipeline.reset();
1057
1058    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1059
1060    Ok(Json(serde_json::json!({
1061        "pipeline_id": pipeline_id,
1062        "reset": true
1063    })))
1064}
1065
1066// =============================================================================
1067// v0.11: Single Event Lookup by ID
1068// =============================================================================
1069
1070/// Get a single event by UUID
1071pub async fn get_event_by_id(
1072    State(store): State<SharedStore>,
1073    Path(event_id): Path<uuid::Uuid>,
1074) -> Result<Json<serde_json::Value>> {
1075    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1076        crate::error::AllSourceError::EntityNotFound(format!("Event '{}' not found", event_id))
1077    })?;
1078
1079    let dto = EventDto::from(&event);
1080
1081    tracing::debug!("Event retrieved by ID: {}", event_id);
1082
1083    Ok(Json(serde_json::json!({
1084        "event": dto,
1085        "found": true
1086    })))
1087}
1088
1089// =============================================================================
1090// v0.7: Projection State API for Query Service Integration
1091// =============================================================================
1092
1093/// List all registered projections
1094pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1095    let projection_manager = store.projection_manager();
1096
1097    let projections: Vec<serde_json::Value> = projection_manager
1098        .list_projections()
1099        .iter()
1100        .map(|(name, projection)| {
1101            serde_json::json!({
1102                "name": name,
1103                "type": format!("{:?}", projection.name()),
1104            })
1105        })
1106        .collect();
1107
1108    tracing::debug!("Listed {} projections", projections.len());
1109
1110    Json(serde_json::json!({
1111        "projections": projections,
1112        "total": projections.len()
1113    }))
1114}
1115
1116/// Get projection metadata by name
1117pub async fn get_projection(
1118    State(store): State<SharedStore>,
1119    Path(name): Path<String>,
1120) -> Result<Json<serde_json::Value>> {
1121    let projection_manager = store.projection_manager();
1122
1123    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1124        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1125    })?;
1126
1127    Ok(Json(serde_json::json!({
1128        "name": projection.name(),
1129        "found": true
1130    })))
1131}
1132
1133/// Get projection state for a specific entity
1134///
1135/// This endpoint allows the Elixir Query Service to fetch projection state
1136/// from the Rust Core for synchronization.
1137pub async fn get_projection_state(
1138    State(store): State<SharedStore>,
1139    Path((name, entity_id)): Path<(String, String)>,
1140) -> Result<Json<serde_json::Value>> {
1141    let projection_manager = store.projection_manager();
1142
1143    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1144        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1145    })?;
1146
1147    let state = projection.get_state(&entity_id);
1148
1149    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1150
1151    Ok(Json(serde_json::json!({
1152        "projection": name,
1153        "entity_id": entity_id,
1154        "state": state,
1155        "found": state.is_some()
1156    })))
1157}
1158
1159/// Delete (clear) a projection by name
1160///
1161/// Removes all state from the projection. The projection definition remains
1162/// registered but its accumulated state is cleared.
1163pub async fn delete_projection(
1164    State(store): State<SharedStore>,
1165    Path(name): Path<String>,
1166) -> Result<Json<serde_json::Value>> {
1167    let projection_manager = store.projection_manager();
1168
1169    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1170        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1171    })?;
1172
1173    projection.clear();
1174
1175    // Also clear any cached state for this projection
1176    let cache = store.projection_state_cache();
1177    let prefix = format!("{name}:");
1178    let keys_to_remove: Vec<String> = cache
1179        .iter()
1180        .filter(|entry| entry.key().starts_with(&prefix))
1181        .map(|entry| entry.key().clone())
1182        .collect();
1183    for key in keys_to_remove {
1184        cache.remove(&key);
1185    }
1186
1187    tracing::info!("Projection deleted (cleared): {}", name);
1188
1189    Ok(Json(serde_json::json!({
1190        "projection": name,
1191        "deleted": true
1192    })))
1193}
1194
1195/// Get aggregate projection state (all entities)
1196///
1197/// Returns summary information about a projection's state across all entities.
1198pub async fn get_projection_state_summary(
1199    State(store): State<SharedStore>,
1200    Path(name): Path<String>,
1201) -> Result<Json<serde_json::Value>> {
1202    let projection_manager = store.projection_manager();
1203
1204    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1205        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1206    })?;
1207
1208    // Collect cached states for this projection
1209    let cache = store.projection_state_cache();
1210    let prefix = format!("{name}:");
1211    let states: Vec<serde_json::Value> = cache
1212        .iter()
1213        .filter(|entry| entry.key().starts_with(&prefix))
1214        .map(|entry| {
1215            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1216            serde_json::json!({
1217                "entity_id": entity_id,
1218                "state": entry.value().clone()
1219            })
1220        })
1221        .collect();
1222
1223    let total = states.len();
1224
1225    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1226
1227    Ok(Json(serde_json::json!({
1228        "projection": name,
1229        "states": states,
1230        "total": total
1231    })))
1232}
1233
1234/// Reset a projection to its initial state
1235///
1236/// Clears all accumulated state and reprocesses events from the beginning.
1237pub async fn reset_projection(
1238    State(store): State<SharedStore>,
1239    Path(name): Path<String>,
1240) -> Result<Json<serde_json::Value>> {
1241    let reprocessed = store.reset_projection(&name)?;
1242
1243    tracing::info!(
1244        "Projection reset: {} ({} events reprocessed)",
1245        name,
1246        reprocessed
1247    );
1248
1249    Ok(Json(serde_json::json!({
1250        "projection": name,
1251        "reset": true,
1252        "events_reprocessed": reprocessed
1253    })))
1254}
1255
1256/// Request body for saving projection state
1257#[derive(Debug, Deserialize)]
1258pub struct SaveProjectionStateRequest {
1259    pub state: serde_json::Value,
1260}
1261
1262/// Save/update projection state for an entity
1263///
1264/// This endpoint allows external services (like Elixir Query Service) to
1265/// store computed projection state back to the Core for persistence.
1266pub async fn save_projection_state(
1267    State(store): State<SharedStore>,
1268    Path((name, entity_id)): Path<(String, String)>,
1269    Json(req): Json<SaveProjectionStateRequest>,
1270) -> Result<Json<serde_json::Value>> {
1271    let projection_cache = store.projection_state_cache();
1272
1273    // Store in the projection state cache
1274    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1275
1276    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1277
1278    Ok(Json(serde_json::json!({
1279        "projection": name,
1280        "entity_id": entity_id,
1281        "saved": true
1282    })))
1283}
1284
1285/// Bulk get projection states for multiple entities
1286///
1287/// Efficient endpoint for fetching multiple entity states in a single request.
1288#[derive(Debug, Deserialize)]
1289pub struct BulkGetStateRequest {
1290    pub entity_ids: Vec<String>,
1291}
1292
1293/// Bulk save projection states for multiple entities
1294///
1295/// Efficient endpoint for saving multiple entity states in a single request.
1296#[derive(Debug, Deserialize)]
1297pub struct BulkSaveStateRequest {
1298    pub states: Vec<BulkSaveStateItem>,
1299}
1300
1301#[derive(Debug, Deserialize)]
1302pub struct BulkSaveStateItem {
1303    pub entity_id: String,
1304    pub state: serde_json::Value,
1305}
1306
1307pub async fn bulk_get_projection_states(
1308    State(store): State<SharedStore>,
1309    Path(name): Path<String>,
1310    Json(req): Json<BulkGetStateRequest>,
1311) -> Result<Json<serde_json::Value>> {
1312    let projection_manager = store.projection_manager();
1313
1314    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1315        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1316    })?;
1317
1318    let states: Vec<serde_json::Value> = req
1319        .entity_ids
1320        .iter()
1321        .map(|entity_id| {
1322            let state = projection.get_state(entity_id);
1323            serde_json::json!({
1324                "entity_id": entity_id,
1325                "state": state,
1326                "found": state.is_some()
1327            })
1328        })
1329        .collect();
1330
1331    tracing::debug!(
1332        "Bulk projection state retrieved: {} entities from {}",
1333        states.len(),
1334        name
1335    );
1336
1337    Ok(Json(serde_json::json!({
1338        "projection": name,
1339        "states": states,
1340        "total": states.len()
1341    })))
1342}
1343
1344/// Bulk save projection states for multiple entities
1345///
1346/// This endpoint allows efficient batch saving of projection states,
1347/// critical for high-throughput event processing pipelines.
1348pub async fn bulk_save_projection_states(
1349    State(store): State<SharedStore>,
1350    Path(name): Path<String>,
1351    Json(req): Json<BulkSaveStateRequest>,
1352) -> Result<Json<serde_json::Value>> {
1353    let projection_cache = store.projection_state_cache();
1354
1355    let mut saved_count = 0;
1356    for item in &req.states {
1357        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1358        saved_count += 1;
1359    }
1360
1361    tracing::info!(
1362        "Bulk projection state saved: {} entities for {}",
1363        saved_count,
1364        name
1365    );
1366
1367    Ok(Json(serde_json::json!({
1368        "projection": name,
1369        "saved": saved_count,
1370        "total": req.states.len()
1371    })))
1372}
1373
1374// =============================================================================
1375// v0.11: Webhook Management API
1376// =============================================================================
1377
1378/// Query parameters for listing webhooks
1379#[derive(Debug, Deserialize)]
1380pub struct ListWebhooksParams {
1381    pub tenant_id: Option<String>,
1382}
1383
1384/// Register a new webhook subscription
1385pub async fn register_webhook(
1386    State(store): State<SharedStore>,
1387    Json(req): Json<RegisterWebhookRequest>,
1388) -> Json<serde_json::Value> {
1389    let registry = store.webhook_registry();
1390    let webhook = registry.register(req);
1391
1392    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1393
1394    Json(serde_json::json!({
1395        "webhook": webhook,
1396        "created": true
1397    }))
1398}
1399
1400/// List webhooks, optionally filtered by tenant_id
1401pub async fn list_webhooks(
1402    State(store): State<SharedStore>,
1403    Query(params): Query<ListWebhooksParams>,
1404) -> Json<serde_json::Value> {
1405    let registry = store.webhook_registry();
1406
1407    let webhooks = if let Some(tenant_id) = params.tenant_id {
1408        registry.list_by_tenant(&tenant_id)
1409    } else {
1410        // Without tenant filter, return empty (tenants should always filter)
1411        vec![]
1412    };
1413
1414    let total = webhooks.len();
1415
1416    Json(serde_json::json!({
1417        "webhooks": webhooks,
1418        "total": total
1419    }))
1420}
1421
1422/// Get a specific webhook by ID
1423pub async fn get_webhook(
1424    State(store): State<SharedStore>,
1425    Path(webhook_id): Path<uuid::Uuid>,
1426) -> Result<Json<serde_json::Value>> {
1427    let registry = store.webhook_registry();
1428
1429    let webhook = registry.get(webhook_id).ok_or_else(|| {
1430        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1431    })?;
1432
1433    Ok(Json(serde_json::json!({
1434        "webhook": webhook,
1435        "found": true
1436    })))
1437}
1438
1439/// Update a webhook subscription
1440pub async fn update_webhook(
1441    State(store): State<SharedStore>,
1442    Path(webhook_id): Path<uuid::Uuid>,
1443    Json(req): Json<UpdateWebhookRequest>,
1444) -> Result<Json<serde_json::Value>> {
1445    let registry = store.webhook_registry();
1446
1447    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1448        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1449    })?;
1450
1451    tracing::info!("Webhook updated: {}", webhook_id);
1452
1453    Ok(Json(serde_json::json!({
1454        "webhook": webhook,
1455        "updated": true
1456    })))
1457}
1458
1459/// Delete a webhook subscription
1460pub async fn delete_webhook(
1461    State(store): State<SharedStore>,
1462    Path(webhook_id): Path<uuid::Uuid>,
1463) -> Result<Json<serde_json::Value>> {
1464    let registry = store.webhook_registry();
1465
1466    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1467        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1468    })?;
1469
1470    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1471
1472    Ok(Json(serde_json::json!({
1473        "webhook_id": webhook_id,
1474        "deleted": true
1475    })))
1476}
1477
1478/// Query parameters for listing webhook deliveries
1479#[derive(Debug, Deserialize)]
1480pub struct ListDeliveriesParams {
1481    pub limit: Option<usize>,
1482}
1483
1484/// List delivery history for a webhook
1485pub async fn list_webhook_deliveries(
1486    State(store): State<SharedStore>,
1487    Path(webhook_id): Path<uuid::Uuid>,
1488    Query(params): Query<ListDeliveriesParams>,
1489) -> Result<Json<serde_json::Value>> {
1490    let registry = store.webhook_registry();
1491
1492    // Verify webhook exists
1493    registry.get(webhook_id).ok_or_else(|| {
1494        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1495    })?;
1496
1497    let limit = params.limit.unwrap_or(50);
1498    let deliveries = registry.get_deliveries(webhook_id, limit);
1499    let total = deliveries.len();
1500
1501    Ok(Json(serde_json::json!({
1502        "webhook_id": webhook_id,
1503        "deliveries": deliveries,
1504        "total": total
1505    })))
1506}
1507
1508// =============================================================================
1509// v2.0: Advanced Query Features
1510// =============================================================================
1511
1512/// EventQL: Execute SQL queries over events using DataFusion
1513pub async fn eventql_query(
1514    State(store): State<SharedStore>,
1515    Json(req): Json<EventQLRequest>,
1516) -> Result<Json<serde_json::Value>> {
1517    let events = store.snapshot_events();
1518    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1519        Ok(response) => Ok(Json(serde_json::json!({
1520            "columns": response.columns,
1521            "rows": response.rows,
1522            "row_count": response.row_count,
1523        }))),
1524        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1525    }
1526}
1527
1528/// GraphQL: Execute GraphQL queries
1529pub async fn graphql_query(
1530    State(store): State<SharedStore>,
1531    Json(req): Json<GraphQLRequest>,
1532) -> Json<serde_json::Value> {
1533    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1534        Ok(f) => f,
1535        Err(e) => {
1536            return Json(
1537                serde_json::to_value(GraphQLResponse {
1538                    data: None,
1539                    errors: vec![GraphQLError { message: e }],
1540                })
1541                .unwrap(),
1542            );
1543        }
1544    };
1545
1546    let mut data = serde_json::Map::new();
1547    let mut errors = Vec::new();
1548
1549    for field in &fields {
1550        match field.name.as_str() {
1551            "events" => {
1552                let request = crate::application::dto::QueryEventsRequest {
1553                    entity_id: field.arguments.get("entity_id").cloned(),
1554                    event_type: field.arguments.get("event_type").cloned(),
1555                    tenant_id: field.arguments.get("tenant_id").cloned(),
1556                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1557                    as_of: None,
1558                    since: None,
1559                    until: None,
1560                };
1561                match store.query(request) {
1562                    Ok(events) => {
1563                        let json_events: Vec<serde_json::Value> = events
1564                            .iter()
1565                            .map(|e| {
1566                                crate::infrastructure::query::graphql::event_to_json(
1567                                    e,
1568                                    &field.fields,
1569                                )
1570                            })
1571                            .collect();
1572                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1573                    }
1574                    Err(e) => errors.push(GraphQLError {
1575                        message: format!("events query failed: {e}"),
1576                    }),
1577                }
1578            }
1579            "event" => {
1580                if let Some(id_str) = field.arguments.get("id") {
1581                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1582                        match store.get_event_by_id(&id) {
1583                            Ok(Some(event)) => {
1584                                data.insert(
1585                                    "event".to_string(),
1586                                    crate::infrastructure::query::graphql::event_to_json(
1587                                        &event,
1588                                        &field.fields,
1589                                    ),
1590                                );
1591                            }
1592                            Ok(None) => {
1593                                data.insert("event".to_string(), serde_json::Value::Null);
1594                            }
1595                            Err(e) => errors.push(GraphQLError {
1596                                message: format!("event lookup failed: {e}"),
1597                            }),
1598                        }
1599                    } else {
1600                        errors.push(GraphQLError {
1601                            message: format!("Invalid UUID: {id_str}"),
1602                        });
1603                    }
1604                } else {
1605                    errors.push(GraphQLError {
1606                        message: "event query requires 'id' argument".to_string(),
1607                    });
1608                }
1609            }
1610            "projections" => {
1611                let pm = store.projection_manager();
1612                let names: Vec<serde_json::Value> = pm
1613                    .list_projections()
1614                    .iter()
1615                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1616                    .collect();
1617                data.insert("projections".to_string(), serde_json::Value::Array(names));
1618            }
1619            "stats" => {
1620                let stats = store.stats();
1621                data.insert(
1622                    "stats".to_string(),
1623                    serde_json::json!({
1624                        "total_events": stats.total_events,
1625                        "total_entities": stats.total_entities,
1626                        "total_event_types": stats.total_event_types,
1627                    }),
1628                );
1629            }
1630            "__schema" => {
1631                data.insert(
1632                    "__schema".to_string(),
1633                    crate::infrastructure::query::graphql::introspection_schema(),
1634                );
1635            }
1636            other => {
1637                errors.push(GraphQLError {
1638                    message: format!("Unknown field: {other}"),
1639                });
1640            }
1641        }
1642    }
1643
1644    Json(
1645        serde_json::to_value(GraphQLResponse {
1646            data: Some(serde_json::Value::Object(data)),
1647            errors,
1648        })
1649        .unwrap(),
1650    )
1651}
1652
1653/// Geospatial: Query events by location
1654pub async fn geo_query(
1655    State(store): State<SharedStore>,
1656    Json(req): Json<GeoQueryRequest>,
1657) -> Json<serde_json::Value> {
1658    let events = store.snapshot_events();
1659    let geo_index = store.geo_index();
1660    let results =
1661        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1662    let total = results.len();
1663    Json(serde_json::json!({
1664        "results": results,
1665        "total": total,
1666    }))
1667}
1668
1669/// Geospatial index stats
1670pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1671    let stats = store.geo_index().stats();
1672    Json(serde_json::json!(stats))
1673}
1674
1675/// Exactly-once processing stats
1676pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1677    let stats = store.exactly_once().stats();
1678    Json(serde_json::json!(stats))
1679}
1680
1681/// Schema evolution history for an event type
1682pub async fn schema_evolution_history(
1683    State(store): State<SharedStore>,
1684    Path(event_type): Path<String>,
1685) -> Json<serde_json::Value> {
1686    let mgr = store.schema_evolution();
1687    let history = mgr.get_history(&event_type);
1688    let version = mgr.get_version(&event_type);
1689    Json(serde_json::json!({
1690        "event_type": event_type,
1691        "current_version": version,
1692        "history": history,
1693    }))
1694}
1695
1696/// Current inferred schema for an event type
1697pub async fn schema_evolution_schema(
1698    State(store): State<SharedStore>,
1699    Path(event_type): Path<String>,
1700) -> Json<serde_json::Value> {
1701    let mgr = store.schema_evolution();
1702    if let Some(schema) = mgr.get_schema(&event_type) {
1703        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1704        Json(serde_json::json!({
1705            "event_type": event_type,
1706            "version": mgr.get_version(&event_type),
1707            "inferred_schema": schema,
1708            "json_schema": json_schema,
1709        }))
1710    } else {
1711        Json(serde_json::json!({
1712            "event_type": event_type,
1713            "error": "No schema inferred for this event type"
1714        }))
1715    }
1716}
1717
1718/// Schema evolution stats
1719pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1720    let stats = store.schema_evolution().stats();
1721    let event_types = store.schema_evolution().list_event_types();
1722    Json(serde_json::json!({
1723        "stats": stats,
1724        "tracked_event_types": event_types,
1725    }))
1726}
1727
1728#[cfg(test)]
1729mod tests {
1730    use super::*;
1731    use crate::{domain::entities::Event, store::EventStore};
1732
1733    fn create_test_store() -> Arc<EventStore> {
1734        Arc::new(EventStore::new())
1735    }
1736
1737    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1738        Event::from_strings(
1739            event_type.to_string(),
1740            entity_id.to_string(),
1741            "test-stream".to_string(),
1742            serde_json::json!({
1743                "name": "Test",
1744                "value": 42
1745            }),
1746            None,
1747        )
1748        .unwrap()
1749    }
1750
1751    #[tokio::test]
1752    async fn test_projection_state_cache() {
1753        let store = create_test_store();
1754
1755        // Test cache insertion
1756        let cache = store.projection_state_cache();
1757        cache.insert(
1758            "entity_snapshots:user-123".to_string(),
1759            serde_json::json!({"name": "Test User", "age": 30}),
1760        );
1761
1762        // Test cache retrieval
1763        let state = cache.get("entity_snapshots:user-123");
1764        assert!(state.is_some());
1765        let state = state.unwrap();
1766        assert_eq!(state["name"], "Test User");
1767        assert_eq!(state["age"], 30);
1768    }
1769
1770    #[tokio::test]
1771    async fn test_projection_manager_list_projections() {
1772        let store = create_test_store();
1773
1774        // List projections (built-in projections should be available)
1775        let projection_manager = store.projection_manager();
1776        let projections = projection_manager.list_projections();
1777
1778        // Should have entity_snapshots and event_counters
1779        assert!(projections.len() >= 2);
1780
1781        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
1782        assert!(names.contains(&"entity_snapshots"));
1783        assert!(names.contains(&"event_counters"));
1784    }
1785
1786    #[tokio::test]
1787    async fn test_projection_state_after_event_ingestion() {
1788        let store = create_test_store();
1789
1790        // Ingest an event
1791        let event = create_test_event("user-456", "user.created");
1792        store.ingest(event).unwrap();
1793
1794        // Get projection state
1795        let projection_manager = store.projection_manager();
1796        let snapshot_projection = projection_manager
1797            .get_projection("entity_snapshots")
1798            .unwrap();
1799
1800        let state = snapshot_projection.get_state("user-456");
1801        assert!(state.is_some());
1802        let state = state.unwrap();
1803        assert_eq!(state["name"], "Test");
1804        assert_eq!(state["value"], 42);
1805    }
1806
1807    #[tokio::test]
1808    async fn test_projection_state_cache_multiple_entities() {
1809        let store = create_test_store();
1810        let cache = store.projection_state_cache();
1811
1812        // Insert multiple entities
1813        for i in 0..10 {
1814            cache.insert(
1815                format!("entity_snapshots:entity-{}", i),
1816                serde_json::json!({"id": i, "status": "active"}),
1817            );
1818        }
1819
1820        // Verify all insertions
1821        assert_eq!(cache.len(), 10);
1822
1823        // Verify each entity
1824        for i in 0..10 {
1825            let key = format!("entity_snapshots:entity-{}", i);
1826            let state = cache.get(&key);
1827            assert!(state.is_some());
1828            assert_eq!(state.unwrap()["id"], i);
1829        }
1830    }
1831
1832    #[tokio::test]
1833    async fn test_projection_state_update() {
1834        let store = create_test_store();
1835        let cache = store.projection_state_cache();
1836
1837        // Initial state
1838        cache.insert(
1839            "entity_snapshots:user-789".to_string(),
1840            serde_json::json!({"balance": 100}),
1841        );
1842
1843        // Update state
1844        cache.insert(
1845            "entity_snapshots:user-789".to_string(),
1846            serde_json::json!({"balance": 150}),
1847        );
1848
1849        // Verify update
1850        let state = cache.get("entity_snapshots:user-789").unwrap();
1851        assert_eq!(state["balance"], 150);
1852    }
1853
1854    #[tokio::test]
1855    async fn test_event_counter_projection() {
1856        let store = create_test_store();
1857
1858        // Ingest events of different types
1859        store
1860            .ingest(create_test_event("user-1", "user.created"))
1861            .unwrap();
1862        store
1863            .ingest(create_test_event("user-2", "user.created"))
1864            .unwrap();
1865        store
1866            .ingest(create_test_event("user-1", "user.updated"))
1867            .unwrap();
1868
1869        // Get event counter projection
1870        let projection_manager = store.projection_manager();
1871        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1872
1873        // Check counts
1874        let created_state = counter_projection.get_state("user.created");
1875        assert!(created_state.is_some());
1876        assert_eq!(created_state.unwrap()["count"], 2);
1877
1878        let updated_state = counter_projection.get_state("user.updated");
1879        assert!(updated_state.is_some());
1880        assert_eq!(updated_state.unwrap()["count"], 1);
1881    }
1882
1883    #[tokio::test]
1884    async fn test_projection_state_cache_key_format() {
1885        let store = create_test_store();
1886        let cache = store.projection_state_cache();
1887
1888        // Test standard key format: {projection_name}:{entity_id}
1889        let key = "orders:order-12345".to_string();
1890        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
1891
1892        let state = cache.get(&key).unwrap();
1893        assert_eq!(state["total"], 99.99);
1894    }
1895
1896    #[tokio::test]
1897    async fn test_projection_state_cache_removal() {
1898        let store = create_test_store();
1899        let cache = store.projection_state_cache();
1900
1901        // Insert and then remove
1902        cache.insert(
1903            "test:entity-1".to_string(),
1904            serde_json::json!({"data": "value"}),
1905        );
1906        assert_eq!(cache.len(), 1);
1907
1908        cache.remove("test:entity-1");
1909        assert_eq!(cache.len(), 0);
1910        assert!(cache.get("test:entity-1").is_none());
1911    }
1912
1913    #[tokio::test]
1914    async fn test_get_nonexistent_projection() {
1915        let store = create_test_store();
1916        let projection_manager = store.projection_manager();
1917
1918        // Requesting a non-existent projection should return None
1919        let projection = projection_manager.get_projection("nonexistent_projection");
1920        assert!(projection.is_none());
1921    }
1922
1923    #[tokio::test]
1924    async fn test_get_nonexistent_entity_state() {
1925        let store = create_test_store();
1926        let projection_manager = store.projection_manager();
1927
1928        // Get state for non-existent entity
1929        let snapshot_projection = projection_manager
1930            .get_projection("entity_snapshots")
1931            .unwrap();
1932        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
1933        assert!(state.is_none());
1934    }
1935
1936    #[tokio::test]
1937    async fn test_projection_state_cache_concurrent_access() {
1938        let store = create_test_store();
1939        let cache = store.projection_state_cache();
1940
1941        // Simulate concurrent writes
1942        let handles: Vec<_> = (0..10)
1943            .map(|i| {
1944                let cache_clone = cache.clone();
1945                tokio::spawn(async move {
1946                    cache_clone.insert(
1947                        format!("concurrent:entity-{}", i),
1948                        serde_json::json!({"thread": i}),
1949                    );
1950                })
1951            })
1952            .collect();
1953
1954        for handle in handles {
1955            handle.await.unwrap();
1956        }
1957
1958        // All 10 entries should be present
1959        assert_eq!(cache.len(), 10);
1960    }
1961
1962    #[tokio::test]
1963    async fn test_projection_state_large_payload() {
1964        let store = create_test_store();
1965        let cache = store.projection_state_cache();
1966
1967        // Create a large JSON payload (~10KB)
1968        let large_array: Vec<serde_json::Value> = (0..1000)
1969            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
1970            .collect();
1971
1972        cache.insert(
1973            "large:entity-1".to_string(),
1974            serde_json::json!({"items": large_array}),
1975        );
1976
1977        let state = cache.get("large:entity-1").unwrap();
1978        let items = state["items"].as_array().unwrap();
1979        assert_eq!(items.len(), 1000);
1980    }
1981
1982    #[tokio::test]
1983    async fn test_projection_state_complex_json() {
1984        let store = create_test_store();
1985        let cache = store.projection_state_cache();
1986
1987        // Complex nested JSON structure
1988        let complex_state = serde_json::json!({
1989            "user": {
1990                "id": "user-123",
1991                "profile": {
1992                    "name": "John Doe",
1993                    "email": "john@example.com",
1994                    "settings": {
1995                        "theme": "dark",
1996                        "notifications": true
1997                    }
1998                },
1999                "roles": ["admin", "user"],
2000                "metadata": {
2001                    "created_at": "2025-01-01T00:00:00Z",
2002                    "last_login": null
2003                }
2004            }
2005        });
2006
2007        cache.insert("complex:user-123".to_string(), complex_state);
2008
2009        let state = cache.get("complex:user-123").unwrap();
2010        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2011        assert_eq!(state["user"]["roles"][0], "admin");
2012        assert!(state["user"]["metadata"]["last_login"].is_null());
2013    }
2014
2015    #[tokio::test]
2016    async fn test_projection_state_cache_iteration() {
2017        let store = create_test_store();
2018        let cache = store.projection_state_cache();
2019
2020        // Insert entries
2021        for i in 0..5 {
2022            cache.insert(
2023                format!("iter:entity-{}", i),
2024                serde_json::json!({"index": i}),
2025            );
2026        }
2027
2028        // Iterate over all entries
2029        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2030        assert_eq!(entries.len(), 5);
2031    }
2032
2033    #[tokio::test]
2034    async fn test_projection_manager_get_entity_snapshots() {
2035        let store = create_test_store();
2036        let projection_manager = store.projection_manager();
2037
2038        // Get entity_snapshots projection specifically
2039        let projection = projection_manager.get_projection("entity_snapshots");
2040        assert!(projection.is_some());
2041        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2042    }
2043
2044    #[tokio::test]
2045    async fn test_projection_manager_get_event_counters() {
2046        let store = create_test_store();
2047        let projection_manager = store.projection_manager();
2048
2049        // Get event_counters projection specifically
2050        let projection = projection_manager.get_projection("event_counters");
2051        assert!(projection.is_some());
2052        assert_eq!(projection.unwrap().name(), "event_counters");
2053    }
2054
2055    #[tokio::test]
2056    async fn test_projection_state_cache_overwrite() {
2057        let store = create_test_store();
2058        let cache = store.projection_state_cache();
2059
2060        // Initial value
2061        cache.insert(
2062            "overwrite:entity-1".to_string(),
2063            serde_json::json!({"version": 1}),
2064        );
2065
2066        // Overwrite with new value
2067        cache.insert(
2068            "overwrite:entity-1".to_string(),
2069            serde_json::json!({"version": 2}),
2070        );
2071
2072        // Overwrite again
2073        cache.insert(
2074            "overwrite:entity-1".to_string(),
2075            serde_json::json!({"version": 3}),
2076        );
2077
2078        let state = cache.get("overwrite:entity-1").unwrap();
2079        assert_eq!(state["version"], 3);
2080
2081        // Should still be only 1 entry
2082        assert_eq!(cache.len(), 1);
2083    }
2084
2085    #[tokio::test]
2086    async fn test_projection_state_multiple_projections() {
2087        let store = create_test_store();
2088        let cache = store.projection_state_cache();
2089
2090        // Store states for different projections
2091        cache.insert(
2092            "entity_snapshots:user-1".to_string(),
2093            serde_json::json!({"name": "Alice"}),
2094        );
2095        cache.insert(
2096            "event_counters:user.created".to_string(),
2097            serde_json::json!({"count": 5}),
2098        );
2099        cache.insert(
2100            "custom_projection:order-1".to_string(),
2101            serde_json::json!({"total": 150.0}),
2102        );
2103
2104        // Verify each projection's state
2105        assert_eq!(
2106            cache.get("entity_snapshots:user-1").unwrap()["name"],
2107            "Alice"
2108        );
2109        assert_eq!(
2110            cache.get("event_counters:user.created").unwrap()["count"],
2111            5
2112        );
2113        assert_eq!(
2114            cache.get("custom_projection:order-1").unwrap()["total"],
2115            150.0
2116        );
2117    }
2118
2119    #[tokio::test]
2120    async fn test_bulk_projection_state_access() {
2121        let store = create_test_store();
2122
2123        // Ingest multiple events for different entities
2124        for i in 0..5 {
2125            let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
2126            store.ingest(event).unwrap();
2127        }
2128
2129        // Get projection and verify bulk access
2130        let projection_manager = store.projection_manager();
2131        let snapshot_projection = projection_manager
2132            .get_projection("entity_snapshots")
2133            .unwrap();
2134
2135        // Verify we can access all entities
2136        for i in 0..5 {
2137            let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
2138            assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
2139        }
2140    }
2141
2142    #[tokio::test]
2143    async fn test_bulk_save_projection_states() {
2144        let store = create_test_store();
2145        let cache = store.projection_state_cache();
2146
2147        // Simulate bulk save request
2148        let states = vec![
2149            BulkSaveStateItem {
2150                entity_id: "bulk-entity-1".to_string(),
2151                state: serde_json::json!({"name": "Entity 1", "value": 100}),
2152            },
2153            BulkSaveStateItem {
2154                entity_id: "bulk-entity-2".to_string(),
2155                state: serde_json::json!({"name": "Entity 2", "value": 200}),
2156            },
2157            BulkSaveStateItem {
2158                entity_id: "bulk-entity-3".to_string(),
2159                state: serde_json::json!({"name": "Entity 3", "value": 300}),
2160            },
2161        ];
2162
2163        let projection_name = "test_projection";
2164
2165        // Save states to cache (simulating bulk_save_projection_states handler)
2166        for item in &states {
2167            cache.insert(
2168                format!("{projection_name}:{}", item.entity_id),
2169                item.state.clone(),
2170            );
2171        }
2172
2173        // Verify all states were saved
2174        assert_eq!(cache.len(), 3);
2175
2176        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
2177        assert_eq!(state1["name"], "Entity 1");
2178        assert_eq!(state1["value"], 100);
2179
2180        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
2181        assert_eq!(state2["name"], "Entity 2");
2182        assert_eq!(state2["value"], 200);
2183
2184        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
2185        assert_eq!(state3["name"], "Entity 3");
2186        assert_eq!(state3["value"], 300);
2187    }
2188
2189    #[tokio::test]
2190    async fn test_bulk_save_empty_states() {
2191        let store = create_test_store();
2192        let cache = store.projection_state_cache();
2193
2194        // Clear cache
2195        cache.clear();
2196
2197        // Empty states should work fine
2198        let states: Vec<BulkSaveStateItem> = vec![];
2199        assert_eq!(states.len(), 0);
2200
2201        // Cache should remain empty
2202        assert_eq!(cache.len(), 0);
2203    }
2204
2205    #[tokio::test]
2206    async fn test_bulk_save_overwrites_existing() {
2207        let store = create_test_store();
2208        let cache = store.projection_state_cache();
2209
2210        // Insert initial state
2211        cache.insert(
2212            "test:entity-1".to_string(),
2213            serde_json::json!({"version": 1, "data": "initial"}),
2214        );
2215
2216        // Bulk save with updated state
2217        let new_state = serde_json::json!({"version": 2, "data": "updated"});
2218        cache.insert("test:entity-1".to_string(), new_state);
2219
2220        // Verify overwrite
2221        let state = cache.get("test:entity-1").unwrap();
2222        assert_eq!(state["version"], 2);
2223        assert_eq!(state["data"], "updated");
2224    }
2225
2226    #[tokio::test]
2227    async fn test_bulk_save_high_volume() {
2228        let store = create_test_store();
2229        let cache = store.projection_state_cache();
2230
2231        // Simulate high volume save (1000 entities)
2232        for i in 0..1000 {
2233            cache.insert(
2234                format!("volume_test:entity-{}", i),
2235                serde_json::json!({"index": i, "status": "active"}),
2236            );
2237        }
2238
2239        // Verify count
2240        assert_eq!(cache.len(), 1000);
2241
2242        // Spot check some entries
2243        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
2244        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
2245        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
2246    }
2247
2248    #[tokio::test]
2249    async fn test_bulk_save_different_projections() {
2250        let store = create_test_store();
2251        let cache = store.projection_state_cache();
2252
2253        // Save to multiple projections in bulk
2254        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
2255
2256        for proj in projections.iter() {
2257            for i in 0..5 {
2258                cache.insert(
2259                    format!("{proj}:entity-{i}"),
2260                    serde_json::json!({"projection": proj, "id": i}),
2261                );
2262            }
2263        }
2264
2265        // Verify total count (3 projections * 5 entities)
2266        assert_eq!(cache.len(), 15);
2267
2268        // Verify each projection
2269        for proj in projections.iter() {
2270            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
2271            assert_eq!(state["projection"], *proj);
2272        }
2273    }
2274}