Skip to main content

allsource_core/infrastructure/web/
api.rs

1use crate::{
2    application::{
3        dto::{
4            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
5            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
6            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
7            QueryEventsRequest, QueryEventsResponse,
8        },
9        services::{
10            analytics::{
11                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
12                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
13            },
14            pipeline::{PipelineConfig, PipelineStats},
15            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
16            schema::{
17                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
18                ValidateEventRequest, ValidateEventResponse,
19            },
20            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
21        },
22    },
23    domain::entities::Event,
24    error::Result,
25    infrastructure::{
26        persistence::{
27            compaction::CompactionResult,
28            snapshot::{
29                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
30                ListSnapshotsResponse, SnapshotInfo,
31            },
32        },
33        query::{
34            eventql::EventQLRequest,
35            geospatial::GeoQueryRequest,
36            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
37        },
38        replication::ReplicationMode,
39        web::api_v1::AppState,
40    },
41    store::{EventStore, EventTypeInfo, StreamInfo},
42};
43use axum::{
44    Json, Router,
45    extract::{Path, Query, State, WebSocketUpgrade},
46    response::{IntoResponse, Response},
47    routing::{get, post, put},
48};
49use serde::Deserialize;
50use std::sync::Arc;
51use tower_http::{
52    cors::{Any, CorsLayer},
53    trace::TraceLayer,
54};
55
56type SharedStore = Arc<EventStore>;
57
58/// Wait for follower ACK(s) in semi-sync/sync replication modes.
59///
60/// In async mode (default), returns immediately. In semi-sync mode, waits for
61/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
62/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
63async fn await_replication_ack(state: &AppState) {
64    let shipper_guard = state.wal_shipper.read().await;
65    if let Some(ref shipper) = *shipper_guard {
66        let mode = shipper.replication_mode();
67        if mode == ReplicationMode::Async {
68            return;
69        }
70
71        let target_offset = shipper.current_leader_offset();
72        if target_offset == 0 {
73            return;
74        }
75
76        let shipper = Arc::clone(shipper);
77        // Drop the read guard before the async wait to avoid holding it across await
78        drop(shipper_guard);
79
80        let timer = state
81            .store
82            .metrics()
83            .replication_ack_wait_seconds
84            .start_timer();
85        let acked = shipper.wait_for_ack(target_offset).await;
86        timer.observe_duration();
87
88        if !acked {
89            tracing::warn!(
90                "Replication ACK timeout in {} mode (offset {}). \
91                 Write succeeded locally but follower confirmation pending.",
92                mode,
93                target_offset,
94            );
95        }
96    }
97}
98
99pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
100    let app = Router::new()
101        .route("/health", get(health))
102        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
103        .route("/api/v1/events", post(ingest_event))
104        .route("/api/v1/events/batch", post(ingest_events_batch))
105        .route("/api/v1/events/query", get(query_events))
106        .route("/api/v1/events/{event_id}", get(get_event_by_id))
107        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
108        // v0.10: Stream and event type discovery endpoints
109        .route("/api/v1/streams", get(list_streams))
110        .route("/api/v1/event-types", get(list_event_types))
111        .route("/api/v1/entities/duplicates", get(detect_duplicates))
112        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
113        .route(
114            "/api/v1/entities/{entity_id}/snapshot",
115            get(get_entity_snapshot),
116        )
117        .route("/api/v1/stats", get(get_stats))
118        // v0.2: Advanced analytics endpoints
119        .route("/api/v1/analytics/frequency", get(analytics_frequency))
120        .route("/api/v1/analytics/summary", get(analytics_summary))
121        .route("/api/v1/analytics/correlation", get(analytics_correlation))
122        // v0.2: Snapshot management endpoints
123        .route("/api/v1/snapshots", post(create_snapshot))
124        .route("/api/v1/snapshots", get(list_snapshots))
125        .route(
126            "/api/v1/snapshots/{entity_id}/latest",
127            get(get_latest_snapshot),
128        )
129        // v0.2: Compaction endpoints
130        .route("/api/v1/compaction/trigger", post(trigger_compaction))
131        .route("/api/v1/compaction/stats", get(compaction_stats))
132        // v0.5: Schema registry endpoints
133        .route("/api/v1/schemas", post(register_schema))
134        .route("/api/v1/schemas", get(list_subjects))
135        .route("/api/v1/schemas/{subject}", get(get_schema))
136        .route(
137            "/api/v1/schemas/{subject}/versions",
138            get(list_schema_versions),
139        )
140        .route("/api/v1/schemas/validate", post(validate_event_schema))
141        .route(
142            "/api/v1/schemas/{subject}/compatibility",
143            put(set_compatibility_mode),
144        )
145        // v0.5: Replay and projection rebuild endpoints
146        .route("/api/v1/replay", post(start_replay))
147        .route("/api/v1/replay", get(list_replays))
148        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
149        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
150        .route(
151            "/api/v1/replay/{replay_id}",
152            axum::routing::delete(delete_replay),
153        )
154        // v0.5: Stream processing pipeline endpoints
155        .route("/api/v1/pipelines", post(register_pipeline))
156        .route("/api/v1/pipelines", get(list_pipelines))
157        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
158        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
159        .route(
160            "/api/v1/pipelines/{pipeline_id}",
161            axum::routing::delete(remove_pipeline),
162        )
163        .route(
164            "/api/v1/pipelines/{pipeline_id}/stats",
165            get(get_pipeline_stats),
166        )
167        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
168        // v0.7: Projection State API for Query Service integration
169        .route("/api/v1/projections", get(list_projections))
170        .route("/api/v1/projections/{name}", get(get_projection))
171        .route(
172            "/api/v1/projections/{name}",
173            axum::routing::delete(delete_projection),
174        )
175        .route(
176            "/api/v1/projections/{name}/state",
177            get(get_projection_state_summary),
178        )
179        .route("/api/v1/projections/{name}/reset", post(reset_projection))
180        .route(
181            "/api/v1/projections/{name}/{entity_id}/state",
182            get(get_projection_state),
183        )
184        .route(
185            "/api/v1/projections/{name}/{entity_id}/state",
186            post(save_projection_state),
187        )
188        .route(
189            "/api/v1/projections/{name}/{entity_id}/state",
190            put(save_projection_state),
191        )
192        .route(
193            "/api/v1/projections/{name}/bulk",
194            post(bulk_get_projection_states),
195        )
196        .route(
197            "/api/v1/projections/{name}/bulk/save",
198            post(bulk_save_projection_states),
199        )
200        // v0.11: Webhook management endpoints
201        .route("/api/v1/webhooks", post(register_webhook))
202        .route("/api/v1/webhooks", get(list_webhooks))
203        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
204        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
205        .route(
206            "/api/v1/webhooks/{webhook_id}",
207            axum::routing::delete(delete_webhook),
208        )
209        .route(
210            "/api/v1/webhooks/{webhook_id}/deliveries",
211            get(list_webhook_deliveries),
212        )
213        // v2.0: Advanced query features
214        .route("/api/v1/eventql", post(eventql_query))
215        .route("/api/v1/graphql", post(graphql_query))
216        .route("/api/v1/geospatial/query", post(geo_query))
217        .route("/api/v1/geospatial/stats", get(geo_stats))
218        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
219        .route(
220            "/api/v1/schema-evolution/history/{event_type}",
221            get(schema_evolution_history),
222        )
223        .route(
224            "/api/v1/schema-evolution/schema/{event_type}",
225            get(schema_evolution_schema),
226        )
227        .route(
228            "/api/v1/schema-evolution/stats",
229            get(schema_evolution_stats),
230        )
231        .layer(
232            CorsLayer::new()
233                .allow_origin(Any)
234                .allow_methods(Any)
235                .allow_headers(Any),
236        )
237        .layer(TraceLayer::new_for_http())
238        .with_state(store);
239
240    let listener = tokio::net::TcpListener::bind(addr).await?;
241    axum::serve(listener, app).await?;
242
243    Ok(())
244}
245
246pub async fn health() -> impl IntoResponse {
247    Json(serde_json::json!({
248        "status": "healthy",
249        "service": "allsource-core",
250        "version": env!("CARGO_PKG_VERSION")
251    }))
252}
253
254// v0.6: Prometheus metrics endpoint
255pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
256    let metrics = store.metrics();
257
258    match metrics.encode() {
259        Ok(encoded) => Response::builder()
260            .status(200)
261            .header("Content-Type", "text/plain; version=0.0.4")
262            .body(encoded)
263            .unwrap()
264            .into_response(),
265        Err(e) => Response::builder()
266            .status(500)
267            .body(format!("Error encoding metrics: {e}"))
268            .unwrap()
269            .into_response(),
270    }
271}
272
273pub async fn ingest_event(
274    State(store): State<SharedStore>,
275    Json(req): Json<IngestEventRequest>,
276) -> Result<Json<IngestEventResponse>> {
277    // Create event using from_strings with default tenant
278    let event = Event::from_strings(
279        req.event_type,
280        req.entity_id,
281        "default".to_string(),
282        req.payload,
283        req.metadata,
284    )?;
285
286    let event_id = event.id;
287    let timestamp = event.timestamp;
288
289    store.ingest(event)?;
290
291    tracing::info!("Event ingested: {}", event_id);
292
293    Ok(Json(IngestEventResponse {
294        event_id,
295        timestamp,
296    }))
297}
298
299/// Ingest a single event with semi-sync/sync replication ACK waiting.
300///
301/// Used by the v1 API (with auth and replication support).
302pub async fn ingest_event_v1(
303    State(state): State<AppState>,
304    Json(req): Json<IngestEventRequest>,
305) -> Result<Json<IngestEventResponse>> {
306    let event = Event::from_strings(
307        req.event_type,
308        req.entity_id,
309        "default".to_string(),
310        req.payload,
311        req.metadata,
312    )?;
313
314    let event_id = event.id;
315    let timestamp = event.timestamp;
316
317    state.store.ingest(event)?;
318
319    // Semi-sync/sync: wait for follower ACK(s) before returning
320    await_replication_ack(&state).await;
321
322    tracing::info!("Event ingested: {}", event_id);
323
324    Ok(Json(IngestEventResponse {
325        event_id,
326        timestamp,
327    }))
328}
329
330/// Batch ingest multiple events in a single request
331///
332/// This endpoint allows ingesting multiple events atomically, which is more
333/// efficient than making individual requests for each event.
334pub async fn ingest_events_batch(
335    State(store): State<SharedStore>,
336    Json(req): Json<IngestEventsBatchRequest>,
337) -> Result<Json<IngestEventsBatchResponse>> {
338    let total = req.events.len();
339    let mut ingested_events = Vec::with_capacity(total);
340
341    for event_req in req.events {
342        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
343
344        let event = Event::from_strings(
345            event_req.event_type,
346            event_req.entity_id,
347            tenant_id,
348            event_req.payload,
349            event_req.metadata,
350        )?;
351
352        let event_id = event.id;
353        let timestamp = event.timestamp;
354
355        store.ingest(event)?;
356
357        ingested_events.push(IngestEventResponse {
358            event_id,
359            timestamp,
360        });
361    }
362
363    let ingested = ingested_events.len();
364    tracing::info!("Batch ingested {} events", ingested);
365
366    Ok(Json(IngestEventsBatchResponse {
367        total,
368        ingested,
369        events: ingested_events,
370    }))
371}
372
373/// Batch ingest with semi-sync/sync replication ACK waiting.
374///
375/// Used by the v1 API (with auth and replication support).
376pub async fn ingest_events_batch_v1(
377    State(state): State<AppState>,
378    Json(req): Json<IngestEventsBatchRequest>,
379) -> Result<Json<IngestEventsBatchResponse>> {
380    let total = req.events.len();
381    let mut ingested_events = Vec::with_capacity(total);
382
383    for event_req in req.events {
384        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
385
386        let event = Event::from_strings(
387            event_req.event_type,
388            event_req.entity_id,
389            tenant_id,
390            event_req.payload,
391            event_req.metadata,
392        )?;
393
394        let event_id = event.id;
395        let timestamp = event.timestamp;
396
397        state.store.ingest(event)?;
398
399        ingested_events.push(IngestEventResponse {
400            event_id,
401            timestamp,
402        });
403    }
404
405    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
406    await_replication_ack(&state).await;
407
408    let ingested = ingested_events.len();
409    tracing::info!("Batch ingested {} events", ingested);
410
411    Ok(Json(IngestEventsBatchResponse {
412        total,
413        ingested,
414        events: ingested_events,
415    }))
416}
417
418pub async fn query_events(
419    State(store): State<SharedStore>,
420    Query(req): Query<QueryEventsRequest>,
421) -> Result<Json<QueryEventsResponse>> {
422    let requested_limit = req.limit;
423
424    // Query without limit to get total count
425    let unlimited_req = QueryEventsRequest {
426        entity_id: req.entity_id,
427        event_type: req.event_type,
428        tenant_id: req.tenant_id,
429        as_of: req.as_of,
430        since: req.since,
431        until: req.until,
432        limit: None,
433        event_type_prefix: req.event_type_prefix,
434        payload_filter: req.payload_filter,
435    };
436    let all_events = store.query(unlimited_req)?;
437    let total_count = all_events.len();
438
439    // Apply limit
440    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
441        all_events.into_iter().take(limit).collect()
442    } else {
443        all_events
444    };
445
446    let count = limited_events.len();
447    let has_more = count < total_count;
448    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
449
450    tracing::debug!("Query returned {} events (total: {})", count, total_count);
451
452    Ok(Json(QueryEventsResponse {
453        events,
454        count,
455        total_count,
456        has_more,
457    }))
458}
459
460pub async fn list_entities(
461    State(store): State<SharedStore>,
462    Query(req): Query<ListEntitiesRequest>,
463) -> Result<Json<ListEntitiesResponse>> {
464    use std::collections::HashMap;
465
466    // Get all events matching the filters
467    let query_req = QueryEventsRequest {
468        entity_id: None,
469        event_type: None,
470        tenant_id: None,
471        as_of: None,
472        since: None,
473        until: None,
474        limit: None,
475        event_type_prefix: req.event_type_prefix,
476        payload_filter: req.payload_filter,
477    };
478    let events = store.query(query_req)?;
479
480    // Group by entity_id
481    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
482    for event in &events {
483        entity_map
484            .entry(event.entity_id().to_string())
485            .or_default()
486            .push(event);
487    }
488
489    // Build entity summaries sorted by last event time (descending)
490    let mut summaries: Vec<EntitySummary> = entity_map
491        .into_iter()
492        .map(|(entity_id, events)| {
493            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
494            EntitySummary {
495                entity_id,
496                event_count: events.len(),
497                last_event_type: last.event_type_str().to_string(),
498                last_event_at: last.timestamp(),
499            }
500        })
501        .collect();
502    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
503
504    let total = summaries.len();
505
506    // Apply offset and limit
507    let offset = req.offset.unwrap_or(0);
508    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
509    let summaries = if let Some(limit) = req.limit {
510        let has_more = summaries.len() > limit;
511        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
512        return Ok(Json(ListEntitiesResponse {
513            entities: truncated,
514            total,
515            has_more,
516        }));
517    } else {
518        summaries
519    };
520
521    Ok(Json(ListEntitiesResponse {
522        entities: summaries,
523        total,
524        has_more: false,
525    }))
526}
527
528pub async fn detect_duplicates(
529    State(store): State<SharedStore>,
530    Query(req): Query<DetectDuplicatesRequest>,
531) -> Result<Json<DetectDuplicatesResponse>> {
532    use std::collections::HashMap;
533
534    let group_by_fields: Vec<&str> = req.group_by.split(',').map(|s| s.trim()).collect();
535
536    // Query events scoped by the required prefix
537    let query_req = QueryEventsRequest {
538        entity_id: None,
539        event_type: None,
540        tenant_id: None,
541        as_of: None,
542        since: None,
543        until: None,
544        limit: None,
545        event_type_prefix: Some(req.event_type_prefix),
546        payload_filter: None,
547    };
548    let events = store.query(query_req)?;
549
550    // For each entity, extract the latest event's payload fields specified by group_by
551    // Then group entities by those field values
552    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
553    for event in &events {
554        let eid = event.entity_id().to_string();
555        entity_latest
556            .entry(eid)
557            .and_modify(|existing| {
558                if event.timestamp() > existing.timestamp() {
559                    *existing = event;
560                }
561            })
562            .or_insert(event);
563    }
564
565    // Group entities by their payload field values
566    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
567    for (entity_id, event) in &entity_latest {
568        let payload = event.payload();
569        let mut key_parts = serde_json::Map::new();
570        for field in &group_by_fields {
571            let value = payload
572                .get(*field)
573                .cloned()
574                .unwrap_or(serde_json::Value::Null);
575            key_parts.insert(field.to_string(), value);
576        }
577        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
578        groups.entry(key_str).or_default().push(entity_id.clone());
579    }
580
581    // Filter to groups with count > 1 (actual duplicates)
582    let mut duplicate_groups: Vec<DuplicateGroup> = groups
583        .into_iter()
584        .filter(|(_, ids)| ids.len() > 1)
585        .map(|(key_str, mut ids)| {
586            ids.sort();
587            let key: serde_json::Value =
588                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
589            let count = ids.len();
590            DuplicateGroup {
591                key,
592                entity_ids: ids,
593                count,
594            }
595        })
596        .collect();
597
598    // Sort by count descending for consistent output
599    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
600
601    let total = duplicate_groups.len();
602
603    // Apply offset and limit
604    let offset = req.offset.unwrap_or(0);
605    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
606
607    if let Some(limit) = req.limit {
608        let has_more = duplicate_groups.len() > limit;
609        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
610        return Ok(Json(DetectDuplicatesResponse {
611            duplicates: truncated,
612            total,
613            has_more,
614        }));
615    }
616
617    Ok(Json(DetectDuplicatesResponse {
618        duplicates: duplicate_groups,
619        total,
620        has_more: false,
621    }))
622}
623
624#[derive(Deserialize)]
625pub struct EntityStateParams {
626    as_of: Option<chrono::DateTime<chrono::Utc>>,
627}
628
629pub async fn get_entity_state(
630    State(store): State<SharedStore>,
631    Path(entity_id): Path<String>,
632    Query(params): Query<EntityStateParams>,
633) -> Result<Json<serde_json::Value>> {
634    let state = store.reconstruct_state(&entity_id, params.as_of)?;
635
636    tracing::info!("State reconstructed for entity: {}", entity_id);
637
638    Ok(Json(state))
639}
640
641pub async fn get_entity_snapshot(
642    State(store): State<SharedStore>,
643    Path(entity_id): Path<String>,
644) -> Result<Json<serde_json::Value>> {
645    let snapshot = store.get_snapshot(&entity_id)?;
646
647    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
648
649    Ok(Json(snapshot))
650}
651
652pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
653    let stats = store.stats();
654    Json(stats)
655}
656
657// v0.10: List all streams (entity_ids) in the event store
658/// Query parameters for listing streams
659#[derive(Debug, Deserialize)]
660pub struct ListStreamsParams {
661    /// Optional limit on number of streams to return
662    pub limit: Option<usize>,
663    /// Optional offset for pagination
664    pub offset: Option<usize>,
665}
666
667/// Response for listing streams
668#[derive(Debug, serde::Serialize)]
669pub struct ListStreamsResponse {
670    pub streams: Vec<StreamInfo>,
671    pub total: usize,
672}
673
674pub async fn list_streams(
675    State(store): State<SharedStore>,
676    Query(params): Query<ListStreamsParams>,
677) -> Json<ListStreamsResponse> {
678    let mut streams = store.list_streams();
679    let total = streams.len();
680
681    // Sort by last_event_at descending (most recent first)
682    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
683
684    // Apply pagination
685    if let Some(offset) = params.offset {
686        if offset < streams.len() {
687            streams = streams[offset..].to_vec();
688        } else {
689            streams = vec![];
690        }
691    }
692
693    if let Some(limit) = params.limit {
694        streams.truncate(limit);
695    }
696
697    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
698
699    Json(ListStreamsResponse { streams, total })
700}
701
702// v0.10: List all event types in the event store
703/// Query parameters for listing event types
704#[derive(Debug, Deserialize)]
705pub struct ListEventTypesParams {
706    /// Optional limit on number of event types to return
707    pub limit: Option<usize>,
708    /// Optional offset for pagination
709    pub offset: Option<usize>,
710}
711
712/// Response for listing event types
713#[derive(Debug, serde::Serialize)]
714pub struct ListEventTypesResponse {
715    pub event_types: Vec<EventTypeInfo>,
716    pub total: usize,
717}
718
719pub async fn list_event_types(
720    State(store): State<SharedStore>,
721    Query(params): Query<ListEventTypesParams>,
722) -> Json<ListEventTypesResponse> {
723    let mut event_types = store.list_event_types();
724    let total = event_types.len();
725
726    // Sort by event_count descending (most used first)
727    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
728
729    // Apply pagination
730    if let Some(offset) = params.offset {
731        if offset < event_types.len() {
732            event_types = event_types[offset..].to_vec();
733        } else {
734            event_types = vec![];
735        }
736    }
737
738    if let Some(limit) = params.limit {
739        event_types.truncate(limit);
740    }
741
742    tracing::debug!(
743        "Listed {} event types (total: {})",
744        event_types.len(),
745        total
746    );
747
748    Json(ListEventTypesResponse { event_types, total })
749}
750
751// v0.2: WebSocket endpoint for real-time event streaming
752pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
753    let websocket_manager = store.websocket_manager();
754
755    ws.on_upgrade(move |socket| async move {
756        websocket_manager.handle_socket(socket).await;
757    })
758}
759
760// v0.2: Event frequency analytics endpoint
761pub async fn analytics_frequency(
762    State(store): State<SharedStore>,
763    Query(req): Query<EventFrequencyRequest>,
764) -> Result<Json<EventFrequencyResponse>> {
765    let response = AnalyticsEngine::event_frequency(&store, req)?;
766
767    tracing::debug!(
768        "Frequency analysis returned {} buckets",
769        response.buckets.len()
770    );
771
772    Ok(Json(response))
773}
774
775// v0.2: Statistical summary endpoint
776pub async fn analytics_summary(
777    State(store): State<SharedStore>,
778    Query(req): Query<StatsSummaryRequest>,
779) -> Result<Json<StatsSummaryResponse>> {
780    let response = AnalyticsEngine::stats_summary(&store, req)?;
781
782    tracing::debug!(
783        "Stats summary: {} events across {} entities",
784        response.total_events,
785        response.unique_entities
786    );
787
788    Ok(Json(response))
789}
790
791// v0.2: Event correlation analysis endpoint
792pub async fn analytics_correlation(
793    State(store): State<SharedStore>,
794    Query(req): Query<CorrelationRequest>,
795) -> Result<Json<CorrelationResponse>> {
796    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
797
798    tracing::debug!(
799        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
800        response.correlated_pairs,
801        response.total_a,
802        response.correlation_percentage
803    );
804
805    Ok(Json(response))
806}
807
808// v0.2: Create a snapshot for an entity
809pub async fn create_snapshot(
810    State(store): State<SharedStore>,
811    Json(req): Json<CreateSnapshotRequest>,
812) -> Result<Json<CreateSnapshotResponse>> {
813    store.create_snapshot(&req.entity_id)?;
814
815    let snapshot_manager = store.snapshot_manager();
816    let snapshot = snapshot_manager
817        .get_latest_snapshot(&req.entity_id)
818        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
819
820    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
821
822    Ok(Json(CreateSnapshotResponse {
823        snapshot_id: snapshot.id,
824        entity_id: snapshot.entity_id,
825        created_at: snapshot.created_at,
826        event_count: snapshot.event_count,
827        size_bytes: snapshot.metadata.size_bytes,
828    }))
829}
830
831// v0.2: List snapshots
832pub async fn list_snapshots(
833    State(store): State<SharedStore>,
834    Query(req): Query<ListSnapshotsRequest>,
835) -> Result<Json<ListSnapshotsResponse>> {
836    let snapshot_manager = store.snapshot_manager();
837
838    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
839        snapshot_manager
840            .get_all_snapshots(&entity_id)
841            .into_iter()
842            .map(SnapshotInfo::from)
843            .collect()
844    } else {
845        // List all entities with snapshots
846        let entities = snapshot_manager.list_entities();
847        entities
848            .iter()
849            .flat_map(|entity_id| {
850                snapshot_manager
851                    .get_all_snapshots(entity_id)
852                    .into_iter()
853                    .map(SnapshotInfo::from)
854            })
855            .collect()
856    };
857
858    let total = snapshots.len();
859
860    tracing::debug!("Listed {} snapshots", total);
861
862    Ok(Json(ListSnapshotsResponse { snapshots, total }))
863}
864
865// v0.2: Get latest snapshot for an entity
866pub async fn get_latest_snapshot(
867    State(store): State<SharedStore>,
868    Path(entity_id): Path<String>,
869) -> Result<Json<serde_json::Value>> {
870    let snapshot_manager = store.snapshot_manager();
871
872    let snapshot = snapshot_manager
873        .get_latest_snapshot(&entity_id)
874        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
875
876    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
877
878    Ok(Json(serde_json::json!({
879        "snapshot_id": snapshot.id,
880        "entity_id": snapshot.entity_id,
881        "created_at": snapshot.created_at,
882        "as_of": snapshot.as_of,
883        "event_count": snapshot.event_count,
884        "size_bytes": snapshot.metadata.size_bytes,
885        "snapshot_type": snapshot.metadata.snapshot_type,
886        "state": snapshot.state
887    })))
888}
889
890// v0.2: Trigger manual compaction
891pub async fn trigger_compaction(
892    State(store): State<SharedStore>,
893) -> Result<Json<CompactionResult>> {
894    let compaction_manager = store.compaction_manager().ok_or_else(|| {
895        crate::error::AllSourceError::InternalError(
896            "Compaction not enabled (no Parquet storage)".to_string(),
897        )
898    })?;
899
900    tracing::info!("📦 Manual compaction triggered via API");
901
902    let result = compaction_manager.compact_now()?;
903
904    Ok(Json(result))
905}
906
907// v0.2: Get compaction statistics
908pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
909    let compaction_manager = store.compaction_manager().ok_or_else(|| {
910        crate::error::AllSourceError::InternalError(
911            "Compaction not enabled (no Parquet storage)".to_string(),
912        )
913    })?;
914
915    let stats = compaction_manager.stats();
916    let config = compaction_manager.config();
917
918    Ok(Json(serde_json::json!({
919        "stats": stats,
920        "config": {
921            "min_files_to_compact": config.min_files_to_compact,
922            "target_file_size": config.target_file_size,
923            "max_file_size": config.max_file_size,
924            "small_file_threshold": config.small_file_threshold,
925            "compaction_interval_seconds": config.compaction_interval_seconds,
926            "auto_compact": config.auto_compact,
927            "strategy": config.strategy
928        }
929    })))
930}
931
932// v0.5: Register a new schema
933pub async fn register_schema(
934    State(store): State<SharedStore>,
935    Json(req): Json<RegisterSchemaRequest>,
936) -> Result<Json<RegisterSchemaResponse>> {
937    let schema_registry = store.schema_registry();
938
939    let response =
940        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
941
942    tracing::info!(
943        "📋 Schema registered: v{} for '{}'",
944        response.version,
945        response.subject
946    );
947
948    Ok(Json(response))
949}
950
951// v0.5: Get a schema by subject and optional version
952#[derive(Deserialize)]
953pub struct GetSchemaParams {
954    version: Option<u32>,
955}
956
957pub async fn get_schema(
958    State(store): State<SharedStore>,
959    Path(subject): Path<String>,
960    Query(params): Query<GetSchemaParams>,
961) -> Result<Json<serde_json::Value>> {
962    let schema_registry = store.schema_registry();
963
964    let schema = schema_registry.get_schema(&subject, params.version)?;
965
966    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
967
968    Ok(Json(serde_json::json!({
969        "id": schema.id,
970        "subject": schema.subject,
971        "version": schema.version,
972        "schema": schema.schema,
973        "created_at": schema.created_at,
974        "description": schema.description,
975        "tags": schema.tags
976    })))
977}
978
979// v0.5: List all versions of a schema subject
980pub async fn list_schema_versions(
981    State(store): State<SharedStore>,
982    Path(subject): Path<String>,
983) -> Result<Json<serde_json::Value>> {
984    let schema_registry = store.schema_registry();
985
986    let versions = schema_registry.list_versions(&subject)?;
987
988    Ok(Json(serde_json::json!({
989        "subject": subject,
990        "versions": versions
991    })))
992}
993
994// v0.5: List all schema subjects
995pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
996    let schema_registry = store.schema_registry();
997
998    let subjects = schema_registry.list_subjects();
999
1000    Json(serde_json::json!({
1001        "subjects": subjects,
1002        "total": subjects.len()
1003    }))
1004}
1005
1006// v0.5: Validate an event against a schema
1007pub async fn validate_event_schema(
1008    State(store): State<SharedStore>,
1009    Json(req): Json<ValidateEventRequest>,
1010) -> Result<Json<ValidateEventResponse>> {
1011    let schema_registry = store.schema_registry();
1012
1013    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1014
1015    if response.valid {
1016        tracing::debug!(
1017            "✅ Event validated against schema '{}' v{}",
1018            req.subject,
1019            response.schema_version
1020        );
1021    } else {
1022        tracing::warn!(
1023            "❌ Event validation failed for '{}': {:?}",
1024            req.subject,
1025            response.errors
1026        );
1027    }
1028
1029    Ok(Json(response))
1030}
1031
1032// v0.5: Set compatibility mode for a subject
1033#[derive(Deserialize)]
1034pub struct SetCompatibilityRequest {
1035    compatibility: CompatibilityMode,
1036}
1037
1038pub async fn set_compatibility_mode(
1039    State(store): State<SharedStore>,
1040    Path(subject): Path<String>,
1041    Json(req): Json<SetCompatibilityRequest>,
1042) -> Json<serde_json::Value> {
1043    let schema_registry = store.schema_registry();
1044
1045    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1046
1047    tracing::info!(
1048        "🔧 Set compatibility mode for '{}' to {:?}",
1049        subject,
1050        req.compatibility
1051    );
1052
1053    Json(serde_json::json!({
1054        "subject": subject,
1055        "compatibility": req.compatibility
1056    }))
1057}
1058
1059// v0.5: Start a replay operation
1060pub async fn start_replay(
1061    State(store): State<SharedStore>,
1062    Json(req): Json<StartReplayRequest>,
1063) -> Result<Json<StartReplayResponse>> {
1064    let replay_manager = store.replay_manager();
1065
1066    let response = replay_manager.start_replay(store, req)?;
1067
1068    tracing::info!(
1069        "🔄 Started replay {} with {} events",
1070        response.replay_id,
1071        response.total_events
1072    );
1073
1074    Ok(Json(response))
1075}
1076
1077// v0.5: Get replay progress
1078pub async fn get_replay_progress(
1079    State(store): State<SharedStore>,
1080    Path(replay_id): Path<uuid::Uuid>,
1081) -> Result<Json<ReplayProgress>> {
1082    let replay_manager = store.replay_manager();
1083
1084    let progress = replay_manager.get_progress(replay_id)?;
1085
1086    Ok(Json(progress))
1087}
1088
1089// v0.5: List all replay operations
1090pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1091    let replay_manager = store.replay_manager();
1092
1093    let replays = replay_manager.list_replays();
1094
1095    Json(serde_json::json!({
1096        "replays": replays,
1097        "total": replays.len()
1098    }))
1099}
1100
1101// v0.5: Cancel a running replay
1102pub async fn cancel_replay(
1103    State(store): State<SharedStore>,
1104    Path(replay_id): Path<uuid::Uuid>,
1105) -> Result<Json<serde_json::Value>> {
1106    let replay_manager = store.replay_manager();
1107
1108    replay_manager.cancel_replay(replay_id)?;
1109
1110    tracing::info!("🛑 Cancelled replay {}", replay_id);
1111
1112    Ok(Json(serde_json::json!({
1113        "replay_id": replay_id,
1114        "status": "cancelled"
1115    })))
1116}
1117
1118// v0.5: Delete a completed replay
1119pub async fn delete_replay(
1120    State(store): State<SharedStore>,
1121    Path(replay_id): Path<uuid::Uuid>,
1122) -> Result<Json<serde_json::Value>> {
1123    let replay_manager = store.replay_manager();
1124
1125    let deleted = replay_manager.delete_replay(replay_id)?;
1126
1127    if deleted {
1128        tracing::info!("🗑️  Deleted replay {}", replay_id);
1129    }
1130
1131    Ok(Json(serde_json::json!({
1132        "replay_id": replay_id,
1133        "deleted": deleted
1134    })))
1135}
1136
1137// v0.5: Register a new pipeline
1138pub async fn register_pipeline(
1139    State(store): State<SharedStore>,
1140    Json(config): Json<PipelineConfig>,
1141) -> Result<Json<serde_json::Value>> {
1142    let pipeline_manager = store.pipeline_manager();
1143
1144    let pipeline_id = pipeline_manager.register(config.clone());
1145
1146    tracing::info!(
1147        "🔀 Pipeline registered: {} (name: {})",
1148        pipeline_id,
1149        config.name
1150    );
1151
1152    Ok(Json(serde_json::json!({
1153        "pipeline_id": pipeline_id,
1154        "name": config.name,
1155        "enabled": config.enabled
1156    })))
1157}
1158
1159// v0.5: List all pipelines
1160pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1161    let pipeline_manager = store.pipeline_manager();
1162
1163    let pipelines = pipeline_manager.list();
1164
1165    tracing::debug!("Listed {} pipelines", pipelines.len());
1166
1167    Json(serde_json::json!({
1168        "pipelines": pipelines,
1169        "total": pipelines.len()
1170    }))
1171}
1172
1173// v0.5: Get a specific pipeline
1174pub async fn get_pipeline(
1175    State(store): State<SharedStore>,
1176    Path(pipeline_id): Path<uuid::Uuid>,
1177) -> Result<Json<PipelineConfig>> {
1178    let pipeline_manager = store.pipeline_manager();
1179
1180    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1181        crate::error::AllSourceError::ValidationError(format!(
1182            "Pipeline not found: {}",
1183            pipeline_id
1184        ))
1185    })?;
1186
1187    Ok(Json(pipeline.config().clone()))
1188}
1189
1190// v0.5: Remove a pipeline
1191pub async fn remove_pipeline(
1192    State(store): State<SharedStore>,
1193    Path(pipeline_id): Path<uuid::Uuid>,
1194) -> Result<Json<serde_json::Value>> {
1195    let pipeline_manager = store.pipeline_manager();
1196
1197    let removed = pipeline_manager.remove(pipeline_id);
1198
1199    if removed {
1200        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1201    }
1202
1203    Ok(Json(serde_json::json!({
1204        "pipeline_id": pipeline_id,
1205        "removed": removed
1206    })))
1207}
1208
1209// v0.5: Get statistics for all pipelines
1210pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1211    let pipeline_manager = store.pipeline_manager();
1212
1213    let stats = pipeline_manager.all_stats();
1214
1215    Json(serde_json::json!({
1216        "stats": stats,
1217        "total": stats.len()
1218    }))
1219}
1220
1221// v0.5: Get statistics for a specific pipeline
1222pub async fn get_pipeline_stats(
1223    State(store): State<SharedStore>,
1224    Path(pipeline_id): Path<uuid::Uuid>,
1225) -> Result<Json<PipelineStats>> {
1226    let pipeline_manager = store.pipeline_manager();
1227
1228    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1229        crate::error::AllSourceError::ValidationError(format!(
1230            "Pipeline not found: {}",
1231            pipeline_id
1232        ))
1233    })?;
1234
1235    Ok(Json(pipeline.stats()))
1236}
1237
1238// v0.5: Reset a pipeline's state
1239pub async fn reset_pipeline(
1240    State(store): State<SharedStore>,
1241    Path(pipeline_id): Path<uuid::Uuid>,
1242) -> Result<Json<serde_json::Value>> {
1243    let pipeline_manager = store.pipeline_manager();
1244
1245    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1246        crate::error::AllSourceError::ValidationError(format!(
1247            "Pipeline not found: {}",
1248            pipeline_id
1249        ))
1250    })?;
1251
1252    pipeline.reset();
1253
1254    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1255
1256    Ok(Json(serde_json::json!({
1257        "pipeline_id": pipeline_id,
1258        "reset": true
1259    })))
1260}
1261
1262// =============================================================================
1263// v0.11: Single Event Lookup by ID
1264// =============================================================================
1265
1266/// Get a single event by UUID
1267pub async fn get_event_by_id(
1268    State(store): State<SharedStore>,
1269    Path(event_id): Path<uuid::Uuid>,
1270) -> Result<Json<serde_json::Value>> {
1271    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1272        crate::error::AllSourceError::EntityNotFound(format!("Event '{}' not found", event_id))
1273    })?;
1274
1275    let dto = EventDto::from(&event);
1276
1277    tracing::debug!("Event retrieved by ID: {}", event_id);
1278
1279    Ok(Json(serde_json::json!({
1280        "event": dto,
1281        "found": true
1282    })))
1283}
1284
1285// =============================================================================
1286// v0.7: Projection State API for Query Service Integration
1287// =============================================================================
1288
1289/// List all registered projections
1290pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1291    let projection_manager = store.projection_manager();
1292    let status_map = store.projection_status();
1293
1294    let projections: Vec<serde_json::Value> = projection_manager
1295        .list_projections()
1296        .iter()
1297        .map(|(name, projection)| {
1298            let status = status_map
1299                .get(name)
1300                .map(|s| s.value().clone())
1301                .unwrap_or_else(|| "running".to_string());
1302            serde_json::json!({
1303                "name": name,
1304                "type": format!("{:?}", projection.name()),
1305                "status": status,
1306            })
1307        })
1308        .collect();
1309
1310    tracing::debug!("Listed {} projections", projections.len());
1311
1312    Json(serde_json::json!({
1313        "projections": projections,
1314        "total": projections.len()
1315    }))
1316}
1317
1318/// Get projection metadata by name
1319pub async fn get_projection(
1320    State(store): State<SharedStore>,
1321    Path(name): Path<String>,
1322) -> Result<Json<serde_json::Value>> {
1323    let projection_manager = store.projection_manager();
1324
1325    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1326        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1327    })?;
1328
1329    Ok(Json(serde_json::json!({
1330        "name": projection.name(),
1331        "found": true
1332    })))
1333}
1334
1335/// Get projection state for a specific entity
1336///
1337/// This endpoint allows the Elixir Query Service to fetch projection state
1338/// from the Rust Core for synchronization.
1339pub async fn get_projection_state(
1340    State(store): State<SharedStore>,
1341    Path((name, entity_id)): Path<(String, String)>,
1342) -> Result<Json<serde_json::Value>> {
1343    let projection_manager = store.projection_manager();
1344
1345    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1346        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1347    })?;
1348
1349    let state = projection.get_state(&entity_id);
1350
1351    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1352
1353    Ok(Json(serde_json::json!({
1354        "projection": name,
1355        "entity_id": entity_id,
1356        "state": state,
1357        "found": state.is_some()
1358    })))
1359}
1360
1361/// Delete (clear) a projection by name
1362///
1363/// Removes all state from the projection. The projection definition remains
1364/// registered but its accumulated state is cleared.
1365pub async fn delete_projection(
1366    State(store): State<SharedStore>,
1367    Path(name): Path<String>,
1368) -> Result<Json<serde_json::Value>> {
1369    let projection_manager = store.projection_manager();
1370
1371    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1372        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1373    })?;
1374
1375    projection.clear();
1376
1377    // Also clear any cached state for this projection
1378    let cache = store.projection_state_cache();
1379    let prefix = format!("{name}:");
1380    let keys_to_remove: Vec<String> = cache
1381        .iter()
1382        .filter(|entry| entry.key().starts_with(&prefix))
1383        .map(|entry| entry.key().clone())
1384        .collect();
1385    for key in keys_to_remove {
1386        cache.remove(&key);
1387    }
1388
1389    tracing::info!("Projection deleted (cleared): {}", name);
1390
1391    Ok(Json(serde_json::json!({
1392        "projection": name,
1393        "deleted": true
1394    })))
1395}
1396
1397/// Get aggregate projection state (all entities)
1398///
1399/// Returns summary information about a projection's state across all entities.
1400pub async fn get_projection_state_summary(
1401    State(store): State<SharedStore>,
1402    Path(name): Path<String>,
1403) -> Result<Json<serde_json::Value>> {
1404    let projection_manager = store.projection_manager();
1405
1406    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1407        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1408    })?;
1409
1410    // Collect cached states for this projection
1411    let cache = store.projection_state_cache();
1412    let prefix = format!("{name}:");
1413    let states: Vec<serde_json::Value> = cache
1414        .iter()
1415        .filter(|entry| entry.key().starts_with(&prefix))
1416        .map(|entry| {
1417            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1418            serde_json::json!({
1419                "entity_id": entity_id,
1420                "state": entry.value().clone()
1421            })
1422        })
1423        .collect();
1424
1425    let total = states.len();
1426
1427    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1428
1429    Ok(Json(serde_json::json!({
1430        "projection": name,
1431        "states": states,
1432        "total": total
1433    })))
1434}
1435
1436/// Reset a projection to its initial state
1437///
1438/// Clears all accumulated state and reprocesses events from the beginning.
1439pub async fn reset_projection(
1440    State(store): State<SharedStore>,
1441    Path(name): Path<String>,
1442) -> Result<Json<serde_json::Value>> {
1443    let reprocessed = store.reset_projection(&name)?;
1444
1445    tracing::info!(
1446        "Projection reset: {} ({} events reprocessed)",
1447        name,
1448        reprocessed
1449    );
1450
1451    Ok(Json(serde_json::json!({
1452        "projection": name,
1453        "reset": true,
1454        "events_reprocessed": reprocessed
1455    })))
1456}
1457
1458/// Pause a projection
1459///
1460/// Sets the projection status to "paused" so it stops processing new events.
1461pub async fn pause_projection(
1462    State(store): State<SharedStore>,
1463    Path(name): Path<String>,
1464) -> Result<Json<serde_json::Value>> {
1465    let projection_manager = store.projection_manager();
1466
1467    // Verify projection exists
1468    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1469        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1470    })?;
1471
1472    store
1473        .projection_status()
1474        .insert(name.clone(), "paused".to_string());
1475
1476    tracing::info!("Projection paused: {}", name);
1477
1478    Ok(Json(serde_json::json!({
1479        "projection": name,
1480        "status": "paused"
1481    })))
1482}
1483
1484/// Start (resume) a projection
1485///
1486/// Sets the projection status to "running" so it resumes processing events.
1487pub async fn start_projection(
1488    State(store): State<SharedStore>,
1489    Path(name): Path<String>,
1490) -> Result<Json<serde_json::Value>> {
1491    let projection_manager = store.projection_manager();
1492
1493    // Verify projection exists
1494    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1495        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1496    })?;
1497
1498    store
1499        .projection_status()
1500        .insert(name.clone(), "running".to_string());
1501
1502    tracing::info!("Projection started: {}", name);
1503
1504    Ok(Json(serde_json::json!({
1505        "projection": name,
1506        "status": "running"
1507    })))
1508}
1509
1510/// Request body for saving projection state
1511#[derive(Debug, Deserialize)]
1512pub struct SaveProjectionStateRequest {
1513    pub state: serde_json::Value,
1514}
1515
1516/// Save/update projection state for an entity
1517///
1518/// This endpoint allows external services (like Elixir Query Service) to
1519/// store computed projection state back to the Core for persistence.
1520pub async fn save_projection_state(
1521    State(store): State<SharedStore>,
1522    Path((name, entity_id)): Path<(String, String)>,
1523    Json(req): Json<SaveProjectionStateRequest>,
1524) -> Result<Json<serde_json::Value>> {
1525    let projection_cache = store.projection_state_cache();
1526
1527    // Store in the projection state cache
1528    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1529
1530    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1531
1532    Ok(Json(serde_json::json!({
1533        "projection": name,
1534        "entity_id": entity_id,
1535        "saved": true
1536    })))
1537}
1538
1539/// Bulk get projection states for multiple entities
1540///
1541/// Efficient endpoint for fetching multiple entity states in a single request.
1542#[derive(Debug, Deserialize)]
1543pub struct BulkGetStateRequest {
1544    pub entity_ids: Vec<String>,
1545}
1546
1547/// Bulk save projection states for multiple entities
1548///
1549/// Efficient endpoint for saving multiple entity states in a single request.
1550#[derive(Debug, Deserialize)]
1551pub struct BulkSaveStateRequest {
1552    pub states: Vec<BulkSaveStateItem>,
1553}
1554
1555#[derive(Debug, Deserialize)]
1556pub struct BulkSaveStateItem {
1557    pub entity_id: String,
1558    pub state: serde_json::Value,
1559}
1560
1561pub async fn bulk_get_projection_states(
1562    State(store): State<SharedStore>,
1563    Path(name): Path<String>,
1564    Json(req): Json<BulkGetStateRequest>,
1565) -> Result<Json<serde_json::Value>> {
1566    let projection_manager = store.projection_manager();
1567
1568    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1569        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1570    })?;
1571
1572    let states: Vec<serde_json::Value> = req
1573        .entity_ids
1574        .iter()
1575        .map(|entity_id| {
1576            let state = projection.get_state(entity_id);
1577            serde_json::json!({
1578                "entity_id": entity_id,
1579                "state": state,
1580                "found": state.is_some()
1581            })
1582        })
1583        .collect();
1584
1585    tracing::debug!(
1586        "Bulk projection state retrieved: {} entities from {}",
1587        states.len(),
1588        name
1589    );
1590
1591    Ok(Json(serde_json::json!({
1592        "projection": name,
1593        "states": states,
1594        "total": states.len()
1595    })))
1596}
1597
1598/// Bulk save projection states for multiple entities
1599///
1600/// This endpoint allows efficient batch saving of projection states,
1601/// critical for high-throughput event processing pipelines.
1602pub async fn bulk_save_projection_states(
1603    State(store): State<SharedStore>,
1604    Path(name): Path<String>,
1605    Json(req): Json<BulkSaveStateRequest>,
1606) -> Result<Json<serde_json::Value>> {
1607    let projection_cache = store.projection_state_cache();
1608
1609    let mut saved_count = 0;
1610    for item in &req.states {
1611        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1612        saved_count += 1;
1613    }
1614
1615    tracing::info!(
1616        "Bulk projection state saved: {} entities for {}",
1617        saved_count,
1618        name
1619    );
1620
1621    Ok(Json(serde_json::json!({
1622        "projection": name,
1623        "saved": saved_count,
1624        "total": req.states.len()
1625    })))
1626}
1627
1628// =============================================================================
1629// v0.11: Webhook Management API
1630// =============================================================================
1631
1632/// Query parameters for listing webhooks
1633#[derive(Debug, Deserialize)]
1634pub struct ListWebhooksParams {
1635    pub tenant_id: Option<String>,
1636}
1637
1638/// Register a new webhook subscription
1639pub async fn register_webhook(
1640    State(store): State<SharedStore>,
1641    Json(req): Json<RegisterWebhookRequest>,
1642) -> Json<serde_json::Value> {
1643    let registry = store.webhook_registry();
1644    let webhook = registry.register(req);
1645
1646    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1647
1648    Json(serde_json::json!({
1649        "webhook": webhook,
1650        "created": true
1651    }))
1652}
1653
1654/// List webhooks, optionally filtered by tenant_id
1655pub async fn list_webhooks(
1656    State(store): State<SharedStore>,
1657    Query(params): Query<ListWebhooksParams>,
1658) -> Json<serde_json::Value> {
1659    let registry = store.webhook_registry();
1660
1661    let webhooks = if let Some(tenant_id) = params.tenant_id {
1662        registry.list_by_tenant(&tenant_id)
1663    } else {
1664        // Without tenant filter, return empty (tenants should always filter)
1665        vec![]
1666    };
1667
1668    let total = webhooks.len();
1669
1670    Json(serde_json::json!({
1671        "webhooks": webhooks,
1672        "total": total
1673    }))
1674}
1675
1676/// Get a specific webhook by ID
1677pub async fn get_webhook(
1678    State(store): State<SharedStore>,
1679    Path(webhook_id): Path<uuid::Uuid>,
1680) -> Result<Json<serde_json::Value>> {
1681    let registry = store.webhook_registry();
1682
1683    let webhook = registry.get(webhook_id).ok_or_else(|| {
1684        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1685    })?;
1686
1687    Ok(Json(serde_json::json!({
1688        "webhook": webhook,
1689        "found": true
1690    })))
1691}
1692
1693/// Update a webhook subscription
1694pub async fn update_webhook(
1695    State(store): State<SharedStore>,
1696    Path(webhook_id): Path<uuid::Uuid>,
1697    Json(req): Json<UpdateWebhookRequest>,
1698) -> Result<Json<serde_json::Value>> {
1699    let registry = store.webhook_registry();
1700
1701    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1702        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1703    })?;
1704
1705    tracing::info!("Webhook updated: {}", webhook_id);
1706
1707    Ok(Json(serde_json::json!({
1708        "webhook": webhook,
1709        "updated": true
1710    })))
1711}
1712
1713/// Delete a webhook subscription
1714pub async fn delete_webhook(
1715    State(store): State<SharedStore>,
1716    Path(webhook_id): Path<uuid::Uuid>,
1717) -> Result<Json<serde_json::Value>> {
1718    let registry = store.webhook_registry();
1719
1720    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1721        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1722    })?;
1723
1724    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1725
1726    Ok(Json(serde_json::json!({
1727        "webhook_id": webhook_id,
1728        "deleted": true
1729    })))
1730}
1731
1732/// Query parameters for listing webhook deliveries
1733#[derive(Debug, Deserialize)]
1734pub struct ListDeliveriesParams {
1735    pub limit: Option<usize>,
1736}
1737
1738/// List delivery history for a webhook
1739pub async fn list_webhook_deliveries(
1740    State(store): State<SharedStore>,
1741    Path(webhook_id): Path<uuid::Uuid>,
1742    Query(params): Query<ListDeliveriesParams>,
1743) -> Result<Json<serde_json::Value>> {
1744    let registry = store.webhook_registry();
1745
1746    // Verify webhook exists
1747    registry.get(webhook_id).ok_or_else(|| {
1748        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1749    })?;
1750
1751    let limit = params.limit.unwrap_or(50);
1752    let deliveries = registry.get_deliveries(webhook_id, limit);
1753    let total = deliveries.len();
1754
1755    Ok(Json(serde_json::json!({
1756        "webhook_id": webhook_id,
1757        "deliveries": deliveries,
1758        "total": total
1759    })))
1760}
1761
1762// =============================================================================
1763// v2.0: Advanced Query Features
1764// =============================================================================
1765
1766/// EventQL: Execute SQL queries over events using DataFusion
1767pub async fn eventql_query(
1768    State(store): State<SharedStore>,
1769    Json(req): Json<EventQLRequest>,
1770) -> Result<Json<serde_json::Value>> {
1771    let events = store.snapshot_events();
1772    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1773        Ok(response) => Ok(Json(serde_json::json!({
1774            "columns": response.columns,
1775            "rows": response.rows,
1776            "row_count": response.row_count,
1777        }))),
1778        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1779    }
1780}
1781
1782/// GraphQL: Execute GraphQL queries
1783pub async fn graphql_query(
1784    State(store): State<SharedStore>,
1785    Json(req): Json<GraphQLRequest>,
1786) -> Json<serde_json::Value> {
1787    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1788        Ok(f) => f,
1789        Err(e) => {
1790            return Json(
1791                serde_json::to_value(GraphQLResponse {
1792                    data: None,
1793                    errors: vec![GraphQLError { message: e }],
1794                })
1795                .unwrap(),
1796            );
1797        }
1798    };
1799
1800    let mut data = serde_json::Map::new();
1801    let mut errors = Vec::new();
1802
1803    for field in &fields {
1804        match field.name.as_str() {
1805            "events" => {
1806                let request = crate::application::dto::QueryEventsRequest {
1807                    entity_id: field.arguments.get("entity_id").cloned(),
1808                    event_type: field.arguments.get("event_type").cloned(),
1809                    tenant_id: field.arguments.get("tenant_id").cloned(),
1810                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1811                    as_of: None,
1812                    since: None,
1813                    until: None,
1814                    event_type_prefix: None,
1815                    payload_filter: None,
1816                };
1817                match store.query(request) {
1818                    Ok(events) => {
1819                        let json_events: Vec<serde_json::Value> = events
1820                            .iter()
1821                            .map(|e| {
1822                                crate::infrastructure::query::graphql::event_to_json(
1823                                    e,
1824                                    &field.fields,
1825                                )
1826                            })
1827                            .collect();
1828                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1829                    }
1830                    Err(e) => errors.push(GraphQLError {
1831                        message: format!("events query failed: {e}"),
1832                    }),
1833                }
1834            }
1835            "event" => {
1836                if let Some(id_str) = field.arguments.get("id") {
1837                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1838                        match store.get_event_by_id(&id) {
1839                            Ok(Some(event)) => {
1840                                data.insert(
1841                                    "event".to_string(),
1842                                    crate::infrastructure::query::graphql::event_to_json(
1843                                        &event,
1844                                        &field.fields,
1845                                    ),
1846                                );
1847                            }
1848                            Ok(None) => {
1849                                data.insert("event".to_string(), serde_json::Value::Null);
1850                            }
1851                            Err(e) => errors.push(GraphQLError {
1852                                message: format!("event lookup failed: {e}"),
1853                            }),
1854                        }
1855                    } else {
1856                        errors.push(GraphQLError {
1857                            message: format!("Invalid UUID: {id_str}"),
1858                        });
1859                    }
1860                } else {
1861                    errors.push(GraphQLError {
1862                        message: "event query requires 'id' argument".to_string(),
1863                    });
1864                }
1865            }
1866            "projections" => {
1867                let pm = store.projection_manager();
1868                let names: Vec<serde_json::Value> = pm
1869                    .list_projections()
1870                    .iter()
1871                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1872                    .collect();
1873                data.insert("projections".to_string(), serde_json::Value::Array(names));
1874            }
1875            "stats" => {
1876                let stats = store.stats();
1877                data.insert(
1878                    "stats".to_string(),
1879                    serde_json::json!({
1880                        "total_events": stats.total_events,
1881                        "total_entities": stats.total_entities,
1882                        "total_event_types": stats.total_event_types,
1883                    }),
1884                );
1885            }
1886            "__schema" => {
1887                data.insert(
1888                    "__schema".to_string(),
1889                    crate::infrastructure::query::graphql::introspection_schema(),
1890                );
1891            }
1892            other => {
1893                errors.push(GraphQLError {
1894                    message: format!("Unknown field: {other}"),
1895                });
1896            }
1897        }
1898    }
1899
1900    Json(
1901        serde_json::to_value(GraphQLResponse {
1902            data: Some(serde_json::Value::Object(data)),
1903            errors,
1904        })
1905        .unwrap(),
1906    )
1907}
1908
1909/// Geospatial: Query events by location
1910pub async fn geo_query(
1911    State(store): State<SharedStore>,
1912    Json(req): Json<GeoQueryRequest>,
1913) -> Json<serde_json::Value> {
1914    let events = store.snapshot_events();
1915    let geo_index = store.geo_index();
1916    let results =
1917        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1918    let total = results.len();
1919    Json(serde_json::json!({
1920        "results": results,
1921        "total": total,
1922    }))
1923}
1924
1925/// Geospatial index stats
1926pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1927    let stats = store.geo_index().stats();
1928    Json(serde_json::json!(stats))
1929}
1930
1931/// Exactly-once processing stats
1932pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1933    let stats = store.exactly_once().stats();
1934    Json(serde_json::json!(stats))
1935}
1936
1937/// Schema evolution history for an event type
1938pub async fn schema_evolution_history(
1939    State(store): State<SharedStore>,
1940    Path(event_type): Path<String>,
1941) -> Json<serde_json::Value> {
1942    let mgr = store.schema_evolution();
1943    let history = mgr.get_history(&event_type);
1944    let version = mgr.get_version(&event_type);
1945    Json(serde_json::json!({
1946        "event_type": event_type,
1947        "current_version": version,
1948        "history": history,
1949    }))
1950}
1951
1952/// Current inferred schema for an event type
1953pub async fn schema_evolution_schema(
1954    State(store): State<SharedStore>,
1955    Path(event_type): Path<String>,
1956) -> Json<serde_json::Value> {
1957    let mgr = store.schema_evolution();
1958    if let Some(schema) = mgr.get_schema(&event_type) {
1959        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1960        Json(serde_json::json!({
1961            "event_type": event_type,
1962            "version": mgr.get_version(&event_type),
1963            "inferred_schema": schema,
1964            "json_schema": json_schema,
1965        }))
1966    } else {
1967        Json(serde_json::json!({
1968            "event_type": event_type,
1969            "error": "No schema inferred for this event type"
1970        }))
1971    }
1972}
1973
1974/// Schema evolution stats
1975pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1976    let stats = store.schema_evolution().stats();
1977    let event_types = store.schema_evolution().list_event_types();
1978    Json(serde_json::json!({
1979        "stats": stats,
1980        "tracked_event_types": event_types,
1981    }))
1982}
1983
1984// =============================================================================
1985// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
1986// =============================================================================
1987
1988/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
1989#[cfg(feature = "embedded-sync")]
1990pub async fn sync_pull_handler(
1991    State(state): State<AppState>,
1992    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
1993) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
1994    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
1995
1996    let store = &state.store;
1997
1998    // Compute "since" threshold from the client's version vector
1999    // We return all events the client hasn't seen yet
2000    let since = request
2001        .version_vector
2002        .values()
2003        .map(|ts| ts.physical_ms)
2004        .min()
2005        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2006
2007    let events = store.query(crate::application::dto::QueryEventsRequest {
2008        entity_id: None,
2009        event_type: None,
2010        tenant_id: None,
2011        as_of: None,
2012        since,
2013        until: None,
2014        limit: None,
2015        event_type_prefix: None,
2016        payload_filter: None,
2017    })?;
2018
2019    // Convert domain events to ReplicatedEvent wire format
2020    let mut replicated = Vec::with_capacity(events.len());
2021    let mut last_ms = 0u64;
2022    let mut logical = 0u32;
2023
2024    for event in &events {
2025        let event_ms = event.timestamp().timestamp_millis() as u64;
2026        if event_ms == last_ms {
2027            logical += 1;
2028        } else {
2029            last_ms = event_ms;
2030            logical = 0;
2031        }
2032
2033        replicated.push(ReplicatedEvent {
2034            event_id: event.id().to_string(),
2035            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2036            origin_region: "server".to_string(),
2037            event_data: serde_json::json!({
2038                "event_type": event.event_type_str(),
2039                "entity_id": event.entity_id_str(),
2040                "tenant_id": event.tenant_id_str(),
2041                "payload": event.payload,
2042                "metadata": event.metadata,
2043            }),
2044        });
2045    }
2046
2047    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2048        events: replicated,
2049        version_vector: std::collections::BTreeMap::new(),
2050    }))
2051}
2052
2053/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2054#[cfg(feature = "embedded-sync")]
2055pub async fn sync_push_handler(
2056    State(state): State<AppState>,
2057    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2058) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2059    let store = &state.store;
2060
2061    let mut accepted = 0usize;
2062    let mut skipped = 0usize;
2063
2064    for rep_event in &request.events {
2065        let event_data = &rep_event.event_data;
2066        let event_type = event_data
2067            .get("event_type")
2068            .and_then(|v| v.as_str())
2069            .unwrap_or("unknown")
2070            .to_string();
2071        let entity_id = event_data
2072            .get("entity_id")
2073            .and_then(|v| v.as_str())
2074            .unwrap_or("unknown")
2075            .to_string();
2076        let tenant_id = event_data
2077            .get("tenant_id")
2078            .and_then(|v| v.as_str())
2079            .unwrap_or("default")
2080            .to_string();
2081        let payload = event_data
2082            .get("payload")
2083            .cloned()
2084            .unwrap_or(serde_json::json!({}));
2085        let metadata = event_data.get("metadata").cloned();
2086
2087        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2088            Ok(domain_event) => {
2089                store.ingest(domain_event)?;
2090                accepted += 1;
2091            }
2092            Err(_) => {
2093                skipped += 1;
2094            }
2095        }
2096    }
2097
2098    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2099        accepted,
2100        skipped,
2101        version_vector: std::collections::BTreeMap::new(),
2102    }))
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107    use super::*;
2108    use crate::{domain::entities::Event, store::EventStore};
2109
2110    fn create_test_store() -> Arc<EventStore> {
2111        Arc::new(EventStore::new())
2112    }
2113
2114    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2115        Event::from_strings(
2116            event_type.to_string(),
2117            entity_id.to_string(),
2118            "test-stream".to_string(),
2119            serde_json::json!({
2120                "name": "Test",
2121                "value": 42
2122            }),
2123            None,
2124        )
2125        .unwrap()
2126    }
2127
2128    #[tokio::test]
2129    async fn test_query_events_has_more_and_total_count() {
2130        let store = create_test_store();
2131
2132        // Ingest 50 events
2133        for i in 0..50 {
2134            store
2135                .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2136                .unwrap();
2137        }
2138
2139        // Query with limit=10 — should get has_more=true, total_count=50
2140        let req = QueryEventsRequest {
2141            entity_id: None,
2142            event_type: None,
2143            tenant_id: None,
2144            as_of: None,
2145            since: None,
2146            until: None,
2147            limit: Some(10),
2148            event_type_prefix: None,
2149            payload_filter: None,
2150        };
2151
2152        let requested_limit = req.limit;
2153        let unlimited_req = QueryEventsRequest {
2154            limit: None,
2155            ..QueryEventsRequest {
2156                entity_id: req.entity_id,
2157                event_type: req.event_type,
2158                tenant_id: req.tenant_id,
2159                as_of: req.as_of,
2160                since: req.since,
2161                until: req.until,
2162                limit: None,
2163                event_type_prefix: req.event_type_prefix,
2164                payload_filter: req.payload_filter,
2165            }
2166        };
2167        let all_events = store.query(unlimited_req).unwrap();
2168        let total_count = all_events.len();
2169        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2170            all_events.into_iter().take(limit).collect()
2171        } else {
2172            all_events
2173        };
2174        let count = limited_events.len();
2175        let has_more = count < total_count;
2176
2177        assert_eq!(count, 10);
2178        assert_eq!(total_count, 50);
2179        assert!(has_more);
2180    }
2181
2182    #[tokio::test]
2183    async fn test_query_events_no_more_results() {
2184        let store = create_test_store();
2185
2186        // Ingest 5 events
2187        for i in 0..5 {
2188            store
2189                .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2190                .unwrap();
2191        }
2192
2193        // Query with limit=100 — should get has_more=false, total_count=5
2194        let all_events = store
2195            .query(QueryEventsRequest {
2196                entity_id: None,
2197                event_type: None,
2198                tenant_id: None,
2199                as_of: None,
2200                since: None,
2201                until: None,
2202                limit: None,
2203                event_type_prefix: None,
2204                payload_filter: None,
2205            })
2206            .unwrap();
2207        let total_count = all_events.len();
2208        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2209        let count = limited_events.len();
2210        let has_more = count < total_count;
2211
2212        assert_eq!(count, 5);
2213        assert_eq!(total_count, 5);
2214        assert!(!has_more);
2215    }
2216
2217    #[tokio::test]
2218    async fn test_list_entities_by_type_prefix() {
2219        let store = create_test_store();
2220
2221        // 3 index entities
2222        store
2223            .ingest(create_test_event("idx-1", "index.created"))
2224            .unwrap();
2225        store
2226            .ingest(create_test_event("idx-1", "index.updated"))
2227            .unwrap();
2228        store
2229            .ingest(create_test_event("idx-2", "index.created"))
2230            .unwrap();
2231        store
2232            .ingest(create_test_event("idx-3", "index.created"))
2233            .unwrap();
2234        // 2 trade entities
2235        store
2236            .ingest(create_test_event("trade-1", "trade.created"))
2237            .unwrap();
2238        store
2239            .ingest(create_test_event("trade-2", "trade.created"))
2240            .unwrap();
2241
2242        // List entities for index.*
2243        let req = ListEntitiesRequest {
2244            event_type_prefix: Some("index.".to_string()),
2245            payload_filter: None,
2246            limit: None,
2247            offset: None,
2248        };
2249        let query_req = QueryEventsRequest {
2250            entity_id: None,
2251            event_type: None,
2252            tenant_id: None,
2253            as_of: None,
2254            since: None,
2255            until: None,
2256            limit: None,
2257            event_type_prefix: req.event_type_prefix,
2258            payload_filter: req.payload_filter,
2259        };
2260        let events = store.query(query_req).unwrap();
2261
2262        // Group and verify
2263        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2264            std::collections::HashMap::new();
2265        for event in &events {
2266            entity_map
2267                .entry(event.entity_id().to_string())
2268                .or_default()
2269                .push(event);
2270        }
2271
2272        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2273        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2274        assert_eq!(entity_map["idx-2"].len(), 1);
2275        assert_eq!(entity_map["idx-3"].len(), 1);
2276    }
2277
2278    fn create_test_event_with_payload(
2279        entity_id: &str,
2280        event_type: &str,
2281        payload: serde_json::Value,
2282    ) -> Event {
2283        Event::from_strings(
2284            event_type.to_string(),
2285            entity_id.to_string(),
2286            "test-stream".to_string(),
2287            payload,
2288            None,
2289        )
2290        .unwrap()
2291    }
2292
2293    #[tokio::test]
2294    async fn test_detect_duplicates_by_payload_fields() {
2295        let store = create_test_store();
2296
2297        // Create entities with duplicate "name" field values
2298        store
2299            .ingest(create_test_event_with_payload(
2300                "idx-1",
2301                "index.created",
2302                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2303            ))
2304            .unwrap();
2305        store
2306            .ingest(create_test_event_with_payload(
2307                "idx-2",
2308                "index.created",
2309                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2310            ))
2311            .unwrap();
2312        store
2313            .ingest(create_test_event_with_payload(
2314                "idx-3",
2315                "index.created",
2316                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2317            ))
2318            .unwrap();
2319        store
2320            .ingest(create_test_event_with_payload(
2321                "idx-4",
2322                "index.created",
2323                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2324            ))
2325            .unwrap();
2326        store
2327            .ingest(create_test_event_with_payload(
2328                "idx-5",
2329                "index.created",
2330                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2331            ))
2332            .unwrap();
2333
2334        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2335        let query_req = QueryEventsRequest {
2336            entity_id: None,
2337            event_type: None,
2338            tenant_id: None,
2339            as_of: None,
2340            since: None,
2341            until: None,
2342            limit: None,
2343            event_type_prefix: Some("index.".to_string()),
2344            payload_filter: None,
2345        };
2346        let events = store.query(query_req).unwrap();
2347
2348        // Manually replicate the handler logic for testing
2349        let group_by_fields = vec!["name"];
2350        let mut entity_latest: std::collections::HashMap<String, &Event> =
2351            std::collections::HashMap::new();
2352        for event in &events {
2353            let eid = event.entity_id().to_string();
2354            entity_latest
2355                .entry(eid)
2356                .and_modify(|existing| {
2357                    if event.timestamp() > existing.timestamp() {
2358                        *existing = event;
2359                    }
2360                })
2361                .or_insert(event);
2362        }
2363
2364        let mut groups: std::collections::HashMap<String, Vec<String>> =
2365            std::collections::HashMap::new();
2366        for (entity_id, event) in &entity_latest {
2367            let payload = event.payload();
2368            let mut key_parts = serde_json::Map::new();
2369            for field in &group_by_fields {
2370                let value = payload
2371                    .get(*field)
2372                    .cloned()
2373                    .unwrap_or(serde_json::Value::Null);
2374                key_parts.insert(field.to_string(), value);
2375            }
2376            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2377            groups.entry(key_str).or_default().push(entity_id.clone());
2378        }
2379
2380        let duplicate_groups: Vec<_> = groups
2381            .into_iter()
2382            .filter(|(_, ids)| ids.len() > 1)
2383            .collect();
2384
2385        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2386        for (_, ids) in &duplicate_groups {
2387            assert_eq!(ids.len(), 2);
2388        }
2389    }
2390
2391    #[tokio::test]
2392    async fn test_detect_duplicates_no_duplicates() {
2393        let store = create_test_store();
2394
2395        // All unique names
2396        store
2397            .ingest(create_test_event_with_payload(
2398                "idx-1",
2399                "index.created",
2400                serde_json::json!({"name": "A"}),
2401            ))
2402            .unwrap();
2403        store
2404            .ingest(create_test_event_with_payload(
2405                "idx-2",
2406                "index.created",
2407                serde_json::json!({"name": "B"}),
2408            ))
2409            .unwrap();
2410
2411        let query_req = QueryEventsRequest {
2412            entity_id: None,
2413            event_type: None,
2414            tenant_id: None,
2415            as_of: None,
2416            since: None,
2417            until: None,
2418            limit: None,
2419            event_type_prefix: Some("index.".to_string()),
2420            payload_filter: None,
2421        };
2422        let events = store.query(query_req).unwrap();
2423
2424        let mut entity_latest: std::collections::HashMap<String, &Event> =
2425            std::collections::HashMap::new();
2426        for event in &events {
2427            entity_latest
2428                .entry(event.entity_id().to_string())
2429                .or_insert(event);
2430        }
2431
2432        let mut groups: std::collections::HashMap<String, Vec<String>> =
2433            std::collections::HashMap::new();
2434        for (entity_id, event) in &entity_latest {
2435            let key_str =
2436                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2437                    .unwrap();
2438            groups.entry(key_str).or_default().push(entity_id.clone());
2439        }
2440
2441        let duplicate_groups: Vec<_> = groups
2442            .into_iter()
2443            .filter(|(_, ids)| ids.len() > 1)
2444            .collect();
2445
2446        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2447    }
2448
2449    #[tokio::test]
2450    async fn test_detect_duplicates_multi_field_group_by() {
2451        let store = create_test_store();
2452
2453        // Two entities with same name AND user_id = true duplicate
2454        store
2455            .ingest(create_test_event_with_payload(
2456                "idx-1",
2457                "index.created",
2458                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2459            ))
2460            .unwrap();
2461        store
2462            .ingest(create_test_event_with_payload(
2463                "idx-2",
2464                "index.created",
2465                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2466            ))
2467            .unwrap();
2468        // Same name but different user_id = NOT a duplicate in multi-field group
2469        store
2470            .ingest(create_test_event_with_payload(
2471                "idx-3",
2472                "index.created",
2473                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2474            ))
2475            .unwrap();
2476
2477        let query_req = QueryEventsRequest {
2478            entity_id: None,
2479            event_type: None,
2480            tenant_id: None,
2481            as_of: None,
2482            since: None,
2483            until: None,
2484            limit: None,
2485            event_type_prefix: Some("index.".to_string()),
2486            payload_filter: None,
2487        };
2488        let events = store.query(query_req).unwrap();
2489
2490        let group_by_fields = vec!["name", "user_id"];
2491        let mut entity_latest: std::collections::HashMap<String, &Event> =
2492            std::collections::HashMap::new();
2493        for event in &events {
2494            entity_latest
2495                .entry(event.entity_id().to_string())
2496                .and_modify(|existing| {
2497                    if event.timestamp() > existing.timestamp() {
2498                        *existing = event;
2499                    }
2500                })
2501                .or_insert(event);
2502        }
2503
2504        let mut groups: std::collections::HashMap<String, Vec<String>> =
2505            std::collections::HashMap::new();
2506        for (entity_id, event) in &entity_latest {
2507            let payload = event.payload();
2508            let mut key_parts = serde_json::Map::new();
2509            for field in &group_by_fields {
2510                let value = payload
2511                    .get(*field)
2512                    .cloned()
2513                    .unwrap_or(serde_json::Value::Null);
2514                key_parts.insert(field.to_string(), value);
2515            }
2516            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2517            groups.entry(key_str).or_default().push(entity_id.clone());
2518        }
2519
2520        let duplicate_groups: Vec<_> = groups
2521            .into_iter()
2522            .filter(|(_, ids)| ids.len() > 1)
2523            .collect();
2524
2525        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2526        assert_eq!(duplicate_groups.len(), 1);
2527        let (_, ref ids) = duplicate_groups[0];
2528        assert_eq!(ids.len(), 2);
2529        let mut sorted_ids = ids.clone();
2530        sorted_ids.sort();
2531        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2532    }
2533
2534    #[tokio::test]
2535    async fn test_projection_state_cache() {
2536        let store = create_test_store();
2537
2538        // Test cache insertion
2539        let cache = store.projection_state_cache();
2540        cache.insert(
2541            "entity_snapshots:user-123".to_string(),
2542            serde_json::json!({"name": "Test User", "age": 30}),
2543        );
2544
2545        // Test cache retrieval
2546        let state = cache.get("entity_snapshots:user-123");
2547        assert!(state.is_some());
2548        let state = state.unwrap();
2549        assert_eq!(state["name"], "Test User");
2550        assert_eq!(state["age"], 30);
2551    }
2552
2553    #[tokio::test]
2554    async fn test_projection_manager_list_projections() {
2555        let store = create_test_store();
2556
2557        // List projections (built-in projections should be available)
2558        let projection_manager = store.projection_manager();
2559        let projections = projection_manager.list_projections();
2560
2561        // Should have entity_snapshots and event_counters
2562        assert!(projections.len() >= 2);
2563
2564        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2565        assert!(names.contains(&"entity_snapshots"));
2566        assert!(names.contains(&"event_counters"));
2567    }
2568
2569    #[tokio::test]
2570    async fn test_projection_state_after_event_ingestion() {
2571        let store = create_test_store();
2572
2573        // Ingest an event
2574        let event = create_test_event("user-456", "user.created");
2575        store.ingest(event).unwrap();
2576
2577        // Get projection state
2578        let projection_manager = store.projection_manager();
2579        let snapshot_projection = projection_manager
2580            .get_projection("entity_snapshots")
2581            .unwrap();
2582
2583        let state = snapshot_projection.get_state("user-456");
2584        assert!(state.is_some());
2585        let state = state.unwrap();
2586        assert_eq!(state["name"], "Test");
2587        assert_eq!(state["value"], 42);
2588    }
2589
2590    #[tokio::test]
2591    async fn test_projection_state_cache_multiple_entities() {
2592        let store = create_test_store();
2593        let cache = store.projection_state_cache();
2594
2595        // Insert multiple entities
2596        for i in 0..10 {
2597            cache.insert(
2598                format!("entity_snapshots:entity-{}", i),
2599                serde_json::json!({"id": i, "status": "active"}),
2600            );
2601        }
2602
2603        // Verify all insertions
2604        assert_eq!(cache.len(), 10);
2605
2606        // Verify each entity
2607        for i in 0..10 {
2608            let key = format!("entity_snapshots:entity-{}", i);
2609            let state = cache.get(&key);
2610            assert!(state.is_some());
2611            assert_eq!(state.unwrap()["id"], i);
2612        }
2613    }
2614
2615    #[tokio::test]
2616    async fn test_projection_state_update() {
2617        let store = create_test_store();
2618        let cache = store.projection_state_cache();
2619
2620        // Initial state
2621        cache.insert(
2622            "entity_snapshots:user-789".to_string(),
2623            serde_json::json!({"balance": 100}),
2624        );
2625
2626        // Update state
2627        cache.insert(
2628            "entity_snapshots:user-789".to_string(),
2629            serde_json::json!({"balance": 150}),
2630        );
2631
2632        // Verify update
2633        let state = cache.get("entity_snapshots:user-789").unwrap();
2634        assert_eq!(state["balance"], 150);
2635    }
2636
2637    #[tokio::test]
2638    async fn test_event_counter_projection() {
2639        let store = create_test_store();
2640
2641        // Ingest events of different types
2642        store
2643            .ingest(create_test_event("user-1", "user.created"))
2644            .unwrap();
2645        store
2646            .ingest(create_test_event("user-2", "user.created"))
2647            .unwrap();
2648        store
2649            .ingest(create_test_event("user-1", "user.updated"))
2650            .unwrap();
2651
2652        // Get event counter projection
2653        let projection_manager = store.projection_manager();
2654        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2655
2656        // Check counts
2657        let created_state = counter_projection.get_state("user.created");
2658        assert!(created_state.is_some());
2659        assert_eq!(created_state.unwrap()["count"], 2);
2660
2661        let updated_state = counter_projection.get_state("user.updated");
2662        assert!(updated_state.is_some());
2663        assert_eq!(updated_state.unwrap()["count"], 1);
2664    }
2665
2666    #[tokio::test]
2667    async fn test_projection_state_cache_key_format() {
2668        let store = create_test_store();
2669        let cache = store.projection_state_cache();
2670
2671        // Test standard key format: {projection_name}:{entity_id}
2672        let key = "orders:order-12345".to_string();
2673        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2674
2675        let state = cache.get(&key).unwrap();
2676        assert_eq!(state["total"], 99.99);
2677    }
2678
2679    #[tokio::test]
2680    async fn test_projection_state_cache_removal() {
2681        let store = create_test_store();
2682        let cache = store.projection_state_cache();
2683
2684        // Insert and then remove
2685        cache.insert(
2686            "test:entity-1".to_string(),
2687            serde_json::json!({"data": "value"}),
2688        );
2689        assert_eq!(cache.len(), 1);
2690
2691        cache.remove("test:entity-1");
2692        assert_eq!(cache.len(), 0);
2693        assert!(cache.get("test:entity-1").is_none());
2694    }
2695
2696    #[tokio::test]
2697    async fn test_get_nonexistent_projection() {
2698        let store = create_test_store();
2699        let projection_manager = store.projection_manager();
2700
2701        // Requesting a non-existent projection should return None
2702        let projection = projection_manager.get_projection("nonexistent_projection");
2703        assert!(projection.is_none());
2704    }
2705
2706    #[tokio::test]
2707    async fn test_get_nonexistent_entity_state() {
2708        let store = create_test_store();
2709        let projection_manager = store.projection_manager();
2710
2711        // Get state for non-existent entity
2712        let snapshot_projection = projection_manager
2713            .get_projection("entity_snapshots")
2714            .unwrap();
2715        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2716        assert!(state.is_none());
2717    }
2718
2719    #[tokio::test]
2720    async fn test_projection_state_cache_concurrent_access() {
2721        let store = create_test_store();
2722        let cache = store.projection_state_cache();
2723
2724        // Simulate concurrent writes
2725        let handles: Vec<_> = (0..10)
2726            .map(|i| {
2727                let cache_clone = cache.clone();
2728                tokio::spawn(async move {
2729                    cache_clone.insert(
2730                        format!("concurrent:entity-{}", i),
2731                        serde_json::json!({"thread": i}),
2732                    );
2733                })
2734            })
2735            .collect();
2736
2737        for handle in handles {
2738            handle.await.unwrap();
2739        }
2740
2741        // All 10 entries should be present
2742        assert_eq!(cache.len(), 10);
2743    }
2744
2745    #[tokio::test]
2746    async fn test_projection_state_large_payload() {
2747        let store = create_test_store();
2748        let cache = store.projection_state_cache();
2749
2750        // Create a large JSON payload (~10KB)
2751        let large_array: Vec<serde_json::Value> = (0..1000)
2752            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2753            .collect();
2754
2755        cache.insert(
2756            "large:entity-1".to_string(),
2757            serde_json::json!({"items": large_array}),
2758        );
2759
2760        let state = cache.get("large:entity-1").unwrap();
2761        let items = state["items"].as_array().unwrap();
2762        assert_eq!(items.len(), 1000);
2763    }
2764
2765    #[tokio::test]
2766    async fn test_projection_state_complex_json() {
2767        let store = create_test_store();
2768        let cache = store.projection_state_cache();
2769
2770        // Complex nested JSON structure
2771        let complex_state = serde_json::json!({
2772            "user": {
2773                "id": "user-123",
2774                "profile": {
2775                    "name": "John Doe",
2776                    "email": "john@example.com",
2777                    "settings": {
2778                        "theme": "dark",
2779                        "notifications": true
2780                    }
2781                },
2782                "roles": ["admin", "user"],
2783                "metadata": {
2784                    "created_at": "2025-01-01T00:00:00Z",
2785                    "last_login": null
2786                }
2787            }
2788        });
2789
2790        cache.insert("complex:user-123".to_string(), complex_state);
2791
2792        let state = cache.get("complex:user-123").unwrap();
2793        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2794        assert_eq!(state["user"]["roles"][0], "admin");
2795        assert!(state["user"]["metadata"]["last_login"].is_null());
2796    }
2797
2798    #[tokio::test]
2799    async fn test_projection_state_cache_iteration() {
2800        let store = create_test_store();
2801        let cache = store.projection_state_cache();
2802
2803        // Insert entries
2804        for i in 0..5 {
2805            cache.insert(
2806                format!("iter:entity-{}", i),
2807                serde_json::json!({"index": i}),
2808            );
2809        }
2810
2811        // Iterate over all entries
2812        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2813        assert_eq!(entries.len(), 5);
2814    }
2815
2816    #[tokio::test]
2817    async fn test_projection_manager_get_entity_snapshots() {
2818        let store = create_test_store();
2819        let projection_manager = store.projection_manager();
2820
2821        // Get entity_snapshots projection specifically
2822        let projection = projection_manager.get_projection("entity_snapshots");
2823        assert!(projection.is_some());
2824        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2825    }
2826
2827    #[tokio::test]
2828    async fn test_projection_manager_get_event_counters() {
2829        let store = create_test_store();
2830        let projection_manager = store.projection_manager();
2831
2832        // Get event_counters projection specifically
2833        let projection = projection_manager.get_projection("event_counters");
2834        assert!(projection.is_some());
2835        assert_eq!(projection.unwrap().name(), "event_counters");
2836    }
2837
2838    #[tokio::test]
2839    async fn test_projection_state_cache_overwrite() {
2840        let store = create_test_store();
2841        let cache = store.projection_state_cache();
2842
2843        // Initial value
2844        cache.insert(
2845            "overwrite:entity-1".to_string(),
2846            serde_json::json!({"version": 1}),
2847        );
2848
2849        // Overwrite with new value
2850        cache.insert(
2851            "overwrite:entity-1".to_string(),
2852            serde_json::json!({"version": 2}),
2853        );
2854
2855        // Overwrite again
2856        cache.insert(
2857            "overwrite:entity-1".to_string(),
2858            serde_json::json!({"version": 3}),
2859        );
2860
2861        let state = cache.get("overwrite:entity-1").unwrap();
2862        assert_eq!(state["version"], 3);
2863
2864        // Should still be only 1 entry
2865        assert_eq!(cache.len(), 1);
2866    }
2867
2868    #[tokio::test]
2869    async fn test_projection_state_multiple_projections() {
2870        let store = create_test_store();
2871        let cache = store.projection_state_cache();
2872
2873        // Store states for different projections
2874        cache.insert(
2875            "entity_snapshots:user-1".to_string(),
2876            serde_json::json!({"name": "Alice"}),
2877        );
2878        cache.insert(
2879            "event_counters:user.created".to_string(),
2880            serde_json::json!({"count": 5}),
2881        );
2882        cache.insert(
2883            "custom_projection:order-1".to_string(),
2884            serde_json::json!({"total": 150.0}),
2885        );
2886
2887        // Verify each projection's state
2888        assert_eq!(
2889            cache.get("entity_snapshots:user-1").unwrap()["name"],
2890            "Alice"
2891        );
2892        assert_eq!(
2893            cache.get("event_counters:user.created").unwrap()["count"],
2894            5
2895        );
2896        assert_eq!(
2897            cache.get("custom_projection:order-1").unwrap()["total"],
2898            150.0
2899        );
2900    }
2901
2902    #[tokio::test]
2903    async fn test_bulk_projection_state_access() {
2904        let store = create_test_store();
2905
2906        // Ingest multiple events for different entities
2907        for i in 0..5 {
2908            let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
2909            store.ingest(event).unwrap();
2910        }
2911
2912        // Get projection and verify bulk access
2913        let projection_manager = store.projection_manager();
2914        let snapshot_projection = projection_manager
2915            .get_projection("entity_snapshots")
2916            .unwrap();
2917
2918        // Verify we can access all entities
2919        for i in 0..5 {
2920            let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
2921            assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
2922        }
2923    }
2924
2925    #[tokio::test]
2926    async fn test_bulk_save_projection_states() {
2927        let store = create_test_store();
2928        let cache = store.projection_state_cache();
2929
2930        // Simulate bulk save request
2931        let states = vec![
2932            BulkSaveStateItem {
2933                entity_id: "bulk-entity-1".to_string(),
2934                state: serde_json::json!({"name": "Entity 1", "value": 100}),
2935            },
2936            BulkSaveStateItem {
2937                entity_id: "bulk-entity-2".to_string(),
2938                state: serde_json::json!({"name": "Entity 2", "value": 200}),
2939            },
2940            BulkSaveStateItem {
2941                entity_id: "bulk-entity-3".to_string(),
2942                state: serde_json::json!({"name": "Entity 3", "value": 300}),
2943            },
2944        ];
2945
2946        let projection_name = "test_projection";
2947
2948        // Save states to cache (simulating bulk_save_projection_states handler)
2949        for item in &states {
2950            cache.insert(
2951                format!("{projection_name}:{}", item.entity_id),
2952                item.state.clone(),
2953            );
2954        }
2955
2956        // Verify all states were saved
2957        assert_eq!(cache.len(), 3);
2958
2959        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
2960        assert_eq!(state1["name"], "Entity 1");
2961        assert_eq!(state1["value"], 100);
2962
2963        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
2964        assert_eq!(state2["name"], "Entity 2");
2965        assert_eq!(state2["value"], 200);
2966
2967        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
2968        assert_eq!(state3["name"], "Entity 3");
2969        assert_eq!(state3["value"], 300);
2970    }
2971
2972    #[tokio::test]
2973    async fn test_bulk_save_empty_states() {
2974        let store = create_test_store();
2975        let cache = store.projection_state_cache();
2976
2977        // Clear cache
2978        cache.clear();
2979
2980        // Empty states should work fine
2981        let states: Vec<BulkSaveStateItem> = vec![];
2982        assert_eq!(states.len(), 0);
2983
2984        // Cache should remain empty
2985        assert_eq!(cache.len(), 0);
2986    }
2987
2988    #[tokio::test]
2989    async fn test_bulk_save_overwrites_existing() {
2990        let store = create_test_store();
2991        let cache = store.projection_state_cache();
2992
2993        // Insert initial state
2994        cache.insert(
2995            "test:entity-1".to_string(),
2996            serde_json::json!({"version": 1, "data": "initial"}),
2997        );
2998
2999        // Bulk save with updated state
3000        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3001        cache.insert("test:entity-1".to_string(), new_state);
3002
3003        // Verify overwrite
3004        let state = cache.get("test:entity-1").unwrap();
3005        assert_eq!(state["version"], 2);
3006        assert_eq!(state["data"], "updated");
3007    }
3008
3009    #[tokio::test]
3010    async fn test_bulk_save_high_volume() {
3011        let store = create_test_store();
3012        let cache = store.projection_state_cache();
3013
3014        // Simulate high volume save (1000 entities)
3015        for i in 0..1000 {
3016            cache.insert(
3017                format!("volume_test:entity-{}", i),
3018                serde_json::json!({"index": i, "status": "active"}),
3019            );
3020        }
3021
3022        // Verify count
3023        assert_eq!(cache.len(), 1000);
3024
3025        // Spot check some entries
3026        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3027        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3028        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3029    }
3030
3031    #[tokio::test]
3032    async fn test_bulk_save_different_projections() {
3033        let store = create_test_store();
3034        let cache = store.projection_state_cache();
3035
3036        // Save to multiple projections in bulk
3037        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3038
3039        for proj in projections.iter() {
3040            for i in 0..5 {
3041                cache.insert(
3042                    format!("{proj}:entity-{i}"),
3043                    serde_json::json!({"projection": proj, "id": i}),
3044                );
3045            }
3046        }
3047
3048        // Verify total count (3 projections * 5 entities)
3049        assert_eq!(cache.len(), 15);
3050
3051        // Verify each projection
3052        for proj in projections.iter() {
3053            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3054            assert_eq!(state["projection"], *proj);
3055        }
3056    }
3057}