Skip to main content

allsource_core/infrastructure/web/
api.rs

1use crate::{
2    application::{
3        dto::{
4            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
5            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
6            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
7            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
8            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
9        },
10        services::{
11            analytics::{
12                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
13                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
14            },
15            pipeline::{PipelineConfig, PipelineStats},
16            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
17            schema::{
18                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
19                ValidateEventRequest, ValidateEventResponse,
20            },
21            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
22        },
23    },
24    domain::entities::Event,
25    error::Result,
26    infrastructure::{
27        persistence::{
28            compaction::CompactionResult,
29            snapshot::{
30                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
31                ListSnapshotsResponse, SnapshotInfo,
32            },
33        },
34        query::{
35            eventql::EventQLRequest,
36            geospatial::GeoQueryRequest,
37            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
38        },
39        replication::ReplicationMode,
40        web::api_v1::AppState,
41    },
42    store::{EventStore, EventTypeInfo, StreamInfo},
43};
44use axum::{
45    Json, Router,
46    extract::{Path, Query, State, WebSocketUpgrade},
47    response::{IntoResponse, Response},
48    routing::{get, post, put},
49};
50use serde::Deserialize;
51use std::sync::Arc;
52use tower_http::{
53    cors::{Any, CorsLayer},
54    trace::TraceLayer,
55};
56
57type SharedStore = Arc<EventStore>;
58
59/// Wait for follower ACK(s) in semi-sync/sync replication modes.
60///
61/// In async mode (default), returns immediately. In semi-sync mode, waits for
62/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
63/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
64async fn await_replication_ack(state: &AppState) {
65    let shipper_guard = state.wal_shipper.read().await;
66    if let Some(ref shipper) = *shipper_guard {
67        let mode = shipper.replication_mode();
68        if mode == ReplicationMode::Async {
69            return;
70        }
71
72        let target_offset = shipper.current_leader_offset();
73        if target_offset == 0 {
74            return;
75        }
76
77        let shipper = Arc::clone(shipper);
78        // Drop the read guard before the async wait to avoid holding it across await
79        drop(shipper_guard);
80
81        let timer = state
82            .store
83            .metrics()
84            .replication_ack_wait_seconds
85            .start_timer();
86        let acked = shipper.wait_for_ack(target_offset).await;
87        timer.observe_duration();
88
89        if !acked {
90            tracing::warn!(
91                "Replication ACK timeout in {} mode (offset {}). \
92                 Write succeeded locally but follower confirmation pending.",
93                mode,
94                target_offset,
95            );
96        }
97    }
98}
99
100pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
101    let app = Router::new()
102        .route("/health", get(health))
103        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
104        .route("/api/v1/events", post(ingest_event))
105        .route("/api/v1/events/batch", post(ingest_events_batch))
106        .route("/api/v1/events/query", get(query_events))
107        .route("/api/v1/events/{event_id}", get(get_event_by_id))
108        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
109        // v0.10: Stream and event type discovery endpoints
110        .route("/api/v1/streams", get(list_streams))
111        .route("/api/v1/event-types", get(list_event_types))
112        .route("/api/v1/entities/duplicates", get(detect_duplicates))
113        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
114        .route(
115            "/api/v1/entities/{entity_id}/snapshot",
116            get(get_entity_snapshot),
117        )
118        .route("/api/v1/stats", get(get_stats))
119        // v0.2: Advanced analytics endpoints
120        .route("/api/v1/analytics/frequency", get(analytics_frequency))
121        .route("/api/v1/analytics/summary", get(analytics_summary))
122        .route("/api/v1/analytics/correlation", get(analytics_correlation))
123        // v0.2: Snapshot management endpoints
124        .route("/api/v1/snapshots", post(create_snapshot))
125        .route("/api/v1/snapshots", get(list_snapshots))
126        .route(
127            "/api/v1/snapshots/{entity_id}/latest",
128            get(get_latest_snapshot),
129        )
130        // v0.2: Compaction endpoints
131        .route("/api/v1/compaction/trigger", post(trigger_compaction))
132        .route("/api/v1/compaction/stats", get(compaction_stats))
133        // v0.5: Schema registry endpoints
134        .route("/api/v1/schemas", post(register_schema))
135        .route("/api/v1/schemas", get(list_subjects))
136        .route("/api/v1/schemas/{subject}", get(get_schema))
137        .route(
138            "/api/v1/schemas/{subject}/versions",
139            get(list_schema_versions),
140        )
141        .route("/api/v1/schemas/validate", post(validate_event_schema))
142        .route(
143            "/api/v1/schemas/{subject}/compatibility",
144            put(set_compatibility_mode),
145        )
146        // v0.5: Replay and projection rebuild endpoints
147        .route("/api/v1/replay", post(start_replay))
148        .route("/api/v1/replay", get(list_replays))
149        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
150        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
151        .route(
152            "/api/v1/replay/{replay_id}",
153            axum::routing::delete(delete_replay),
154        )
155        // v0.5: Stream processing pipeline endpoints
156        .route("/api/v1/pipelines", post(register_pipeline))
157        .route("/api/v1/pipelines", get(list_pipelines))
158        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
159        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
160        .route(
161            "/api/v1/pipelines/{pipeline_id}",
162            axum::routing::delete(remove_pipeline),
163        )
164        .route(
165            "/api/v1/pipelines/{pipeline_id}/stats",
166            get(get_pipeline_stats),
167        )
168        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
169        // v0.7: Projection State API for Query Service integration
170        .route("/api/v1/projections", get(list_projections))
171        .route("/api/v1/projections/{name}", get(get_projection))
172        .route(
173            "/api/v1/projections/{name}",
174            axum::routing::delete(delete_projection),
175        )
176        .route(
177            "/api/v1/projections/{name}/state",
178            get(get_projection_state_summary),
179        )
180        .route("/api/v1/projections/{name}/reset", post(reset_projection))
181        .route(
182            "/api/v1/projections/{name}/{entity_id}/state",
183            get(get_projection_state),
184        )
185        .route(
186            "/api/v1/projections/{name}/{entity_id}/state",
187            post(save_projection_state),
188        )
189        .route(
190            "/api/v1/projections/{name}/{entity_id}/state",
191            put(save_projection_state),
192        )
193        .route(
194            "/api/v1/projections/{name}/bulk",
195            post(bulk_get_projection_states),
196        )
197        .route(
198            "/api/v1/projections/{name}/bulk/save",
199            post(bulk_save_projection_states),
200        )
201        // v0.11: Webhook management endpoints
202        .route("/api/v1/webhooks", post(register_webhook))
203        .route("/api/v1/webhooks", get(list_webhooks))
204        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
205        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
206        .route(
207            "/api/v1/webhooks/{webhook_id}",
208            axum::routing::delete(delete_webhook),
209        )
210        .route(
211            "/api/v1/webhooks/{webhook_id}/deliveries",
212            get(list_webhook_deliveries),
213        )
214        // v2.0: Advanced query features
215        .route("/api/v1/eventql", post(eventql_query))
216        .route("/api/v1/graphql", post(graphql_query))
217        .route("/api/v1/geospatial/query", post(geo_query))
218        .route("/api/v1/geospatial/stats", get(geo_stats))
219        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
220        .route(
221            "/api/v1/schema-evolution/history/{event_type}",
222            get(schema_evolution_history),
223        )
224        .route(
225            "/api/v1/schema-evolution/schema/{event_type}",
226            get(schema_evolution_schema),
227        )
228        .route(
229            "/api/v1/schema-evolution/stats",
230            get(schema_evolution_stats),
231        )
232        .layer(
233            CorsLayer::new()
234                .allow_origin(Any)
235                .allow_methods(Any)
236                .allow_headers(Any),
237        )
238        .layer(TraceLayer::new_for_http())
239        .with_state(store);
240
241    let listener = tokio::net::TcpListener::bind(addr).await?;
242    axum::serve(listener, app).await?;
243
244    Ok(())
245}
246
247pub async fn health() -> impl IntoResponse {
248    Json(serde_json::json!({
249        "status": "healthy",
250        "service": "allsource-core",
251        "version": env!("CARGO_PKG_VERSION")
252    }))
253}
254
255// v0.6: Prometheus metrics endpoint
256pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
257    let metrics = store.metrics();
258
259    match metrics.encode() {
260        Ok(encoded) => Response::builder()
261            .status(200)
262            .header("Content-Type", "text/plain; version=0.0.4")
263            .body(encoded)
264            .unwrap()
265            .into_response(),
266        Err(e) => Response::builder()
267            .status(500)
268            .body(format!("Error encoding metrics: {e}"))
269            .unwrap()
270            .into_response(),
271    }
272}
273
274pub async fn ingest_event(
275    State(store): State<SharedStore>,
276    Json(req): Json<IngestEventRequest>,
277) -> Result<Json<IngestEventResponse>> {
278    let expected_version = req.expected_version;
279
280    // Create event using from_strings with default tenant
281    let event = Event::from_strings(
282        req.event_type,
283        req.entity_id,
284        "default".to_string(),
285        req.payload,
286        req.metadata,
287    )?;
288
289    let event_id = event.id;
290    let timestamp = event.timestamp;
291
292    let new_version = store.ingest_with_expected_version(event, expected_version)?;
293
294    tracing::info!("Event ingested: {}", event_id);
295
296    Ok(Json(IngestEventResponse {
297        event_id,
298        timestamp,
299        version: Some(new_version),
300    }))
301}
302
303/// Ingest a single event with semi-sync/sync replication ACK waiting.
304///
305/// Used by the v1 API (with auth and replication support).
306pub async fn ingest_event_v1(
307    State(state): State<AppState>,
308    Json(req): Json<IngestEventRequest>,
309) -> Result<Json<IngestEventResponse>> {
310    let expected_version = req.expected_version;
311
312    let event = Event::from_strings(
313        req.event_type,
314        req.entity_id,
315        "default".to_string(),
316        req.payload,
317        req.metadata,
318    )?;
319
320    let event_id = event.id;
321    let timestamp = event.timestamp;
322
323    let new_version = state
324        .store
325        .ingest_with_expected_version(event, expected_version)?;
326
327    // Semi-sync/sync: wait for follower ACK(s) before returning
328    await_replication_ack(&state).await;
329
330    tracing::info!("Event ingested: {}", event_id);
331
332    Ok(Json(IngestEventResponse {
333        event_id,
334        timestamp,
335        version: Some(new_version),
336    }))
337}
338
339/// Batch ingest multiple events in a single request
340///
341/// This endpoint allows ingesting multiple events atomically, which is more
342/// efficient than making individual requests for each event.
343pub async fn ingest_events_batch(
344    State(store): State<SharedStore>,
345    Json(req): Json<IngestEventsBatchRequest>,
346) -> Result<Json<IngestEventsBatchResponse>> {
347    let total = req.events.len();
348    let mut ingested_events = Vec::with_capacity(total);
349
350    for event_req in req.events {
351        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
352        let expected_version = event_req.expected_version;
353
354        let event = Event::from_strings(
355            event_req.event_type,
356            event_req.entity_id,
357            tenant_id,
358            event_req.payload,
359            event_req.metadata,
360        )?;
361
362        let event_id = event.id;
363        let timestamp = event.timestamp;
364
365        let new_version = store.ingest_with_expected_version(event, expected_version)?;
366
367        ingested_events.push(IngestEventResponse {
368            event_id,
369            timestamp,
370            version: Some(new_version),
371        });
372    }
373
374    let ingested = ingested_events.len();
375    tracing::info!("Batch ingested {} events", ingested);
376
377    Ok(Json(IngestEventsBatchResponse {
378        total,
379        ingested,
380        events: ingested_events,
381    }))
382}
383
384/// Batch ingest with semi-sync/sync replication ACK waiting.
385///
386/// Used by the v1 API (with auth and replication support).
387pub async fn ingest_events_batch_v1(
388    State(state): State<AppState>,
389    Json(req): Json<IngestEventsBatchRequest>,
390) -> Result<Json<IngestEventsBatchResponse>> {
391    let total = req.events.len();
392    let mut ingested_events = Vec::with_capacity(total);
393
394    for event_req in req.events {
395        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
396        let expected_version = event_req.expected_version;
397
398        let event = Event::from_strings(
399            event_req.event_type,
400            event_req.entity_id,
401            tenant_id,
402            event_req.payload,
403            event_req.metadata,
404        )?;
405
406        let event_id = event.id;
407        let timestamp = event.timestamp;
408
409        let new_version = state
410            .store
411            .ingest_with_expected_version(event, expected_version)?;
412
413        ingested_events.push(IngestEventResponse {
414            event_id,
415            timestamp,
416            version: Some(new_version),
417        });
418    }
419
420    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
421    await_replication_ack(&state).await;
422
423    let ingested = ingested_events.len();
424    tracing::info!("Batch ingested {} events", ingested);
425
426    Ok(Json(IngestEventsBatchResponse {
427        total,
428        ingested,
429        events: ingested_events,
430    }))
431}
432
433pub async fn query_events(
434    State(store): State<SharedStore>,
435    Query(req): Query<QueryEventsRequest>,
436) -> Result<Json<QueryEventsResponse>> {
437    let requested_limit = req.limit;
438    let queried_entity_id = req.entity_id.clone();
439
440    // Query without limit to get total count
441    let unlimited_req = QueryEventsRequest {
442        entity_id: req.entity_id,
443        event_type: req.event_type,
444        tenant_id: req.tenant_id,
445        as_of: req.as_of,
446        since: req.since,
447        until: req.until,
448        limit: None,
449        event_type_prefix: req.event_type_prefix,
450        payload_filter: req.payload_filter,
451    };
452    let all_events = store.query(unlimited_req)?;
453    let total_count = all_events.len();
454
455    // Apply limit
456    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
457        all_events.into_iter().take(limit).collect()
458    } else {
459        all_events
460    };
461
462    let count = limited_events.len();
463    let has_more = count < total_count;
464    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
465
466    // Include entity_version only when filtering by a single entity_id
467    let entity_version = queried_entity_id
468        .as_deref()
469        .map(|eid| store.get_entity_version(eid));
470
471    tracing::debug!("Query returned {} events (total: {})", count, total_count);
472
473    Ok(Json(QueryEventsResponse {
474        events,
475        count,
476        total_count,
477        has_more,
478        entity_version,
479    }))
480}
481
482pub async fn list_entities(
483    State(store): State<SharedStore>,
484    Query(req): Query<ListEntitiesRequest>,
485) -> Result<Json<ListEntitiesResponse>> {
486    use std::collections::HashMap;
487
488    // Get all events matching the filters
489    let query_req = QueryEventsRequest {
490        entity_id: None,
491        event_type: None,
492        tenant_id: None,
493        as_of: None,
494        since: None,
495        until: None,
496        limit: None,
497        event_type_prefix: req.event_type_prefix,
498        payload_filter: req.payload_filter,
499    };
500    let events = store.query(query_req)?;
501
502    // Group by entity_id
503    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
504    for event in &events {
505        entity_map
506            .entry(event.entity_id().to_string())
507            .or_default()
508            .push(event);
509    }
510
511    // Build entity summaries sorted by last event time (descending)
512    let mut summaries: Vec<EntitySummary> = entity_map
513        .into_iter()
514        .map(|(entity_id, events)| {
515            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
516            EntitySummary {
517                entity_id,
518                event_count: events.len(),
519                last_event_type: last.event_type_str().to_string(),
520                last_event_at: last.timestamp(),
521            }
522        })
523        .collect();
524    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
525
526    let total = summaries.len();
527
528    // Apply offset and limit
529    let offset = req.offset.unwrap_or(0);
530    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
531    let summaries = if let Some(limit) = req.limit {
532        let has_more = summaries.len() > limit;
533        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
534        return Ok(Json(ListEntitiesResponse {
535            entities: truncated,
536            total,
537            has_more,
538        }));
539    } else {
540        summaries
541    };
542
543    Ok(Json(ListEntitiesResponse {
544        entities: summaries,
545        total,
546        has_more: false,
547    }))
548}
549
550pub async fn detect_duplicates(
551    State(store): State<SharedStore>,
552    Query(req): Query<DetectDuplicatesRequest>,
553) -> Result<Json<DetectDuplicatesResponse>> {
554    use std::collections::HashMap;
555
556    let group_by_fields: Vec<&str> = req.group_by.split(',').map(|s| s.trim()).collect();
557
558    // Query events scoped by the required prefix
559    let query_req = QueryEventsRequest {
560        entity_id: None,
561        event_type: None,
562        tenant_id: None,
563        as_of: None,
564        since: None,
565        until: None,
566        limit: None,
567        event_type_prefix: Some(req.event_type_prefix),
568        payload_filter: None,
569    };
570    let events = store.query(query_req)?;
571
572    // For each entity, extract the latest event's payload fields specified by group_by
573    // Then group entities by those field values
574    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
575    for event in &events {
576        let eid = event.entity_id().to_string();
577        entity_latest
578            .entry(eid)
579            .and_modify(|existing| {
580                if event.timestamp() > existing.timestamp() {
581                    *existing = event;
582                }
583            })
584            .or_insert(event);
585    }
586
587    // Group entities by their payload field values
588    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
589    for (entity_id, event) in &entity_latest {
590        let payload = event.payload();
591        let mut key_parts = serde_json::Map::new();
592        for field in &group_by_fields {
593            let value = payload
594                .get(*field)
595                .cloned()
596                .unwrap_or(serde_json::Value::Null);
597            key_parts.insert(field.to_string(), value);
598        }
599        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
600        groups.entry(key_str).or_default().push(entity_id.clone());
601    }
602
603    // Filter to groups with count > 1 (actual duplicates)
604    let mut duplicate_groups: Vec<DuplicateGroup> = groups
605        .into_iter()
606        .filter(|(_, ids)| ids.len() > 1)
607        .map(|(key_str, mut ids)| {
608            ids.sort();
609            let key: serde_json::Value =
610                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
611            let count = ids.len();
612            DuplicateGroup {
613                key,
614                entity_ids: ids,
615                count,
616            }
617        })
618        .collect();
619
620    // Sort by count descending for consistent output
621    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
622
623    let total = duplicate_groups.len();
624
625    // Apply offset and limit
626    let offset = req.offset.unwrap_or(0);
627    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
628
629    if let Some(limit) = req.limit {
630        let has_more = duplicate_groups.len() > limit;
631        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
632        return Ok(Json(DetectDuplicatesResponse {
633            duplicates: truncated,
634            total,
635            has_more,
636        }));
637    }
638
639    Ok(Json(DetectDuplicatesResponse {
640        duplicates: duplicate_groups,
641        total,
642        has_more: false,
643    }))
644}
645
646#[derive(Deserialize)]
647pub struct EntityStateParams {
648    as_of: Option<chrono::DateTime<chrono::Utc>>,
649}
650
651pub async fn get_entity_state(
652    State(store): State<SharedStore>,
653    Path(entity_id): Path<String>,
654    Query(params): Query<EntityStateParams>,
655) -> Result<Json<serde_json::Value>> {
656    let state = store.reconstruct_state(&entity_id, params.as_of)?;
657
658    tracing::info!("State reconstructed for entity: {}", entity_id);
659
660    Ok(Json(state))
661}
662
663pub async fn get_entity_snapshot(
664    State(store): State<SharedStore>,
665    Path(entity_id): Path<String>,
666) -> Result<Json<serde_json::Value>> {
667    let snapshot = store.get_snapshot(&entity_id)?;
668
669    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
670
671    Ok(Json(snapshot))
672}
673
674pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
675    let stats = store.stats();
676    Json(stats)
677}
678
679// v0.10: List all streams (entity_ids) in the event store
680/// Query parameters for listing streams
681#[derive(Debug, Deserialize)]
682pub struct ListStreamsParams {
683    /// Optional limit on number of streams to return
684    pub limit: Option<usize>,
685    /// Optional offset for pagination
686    pub offset: Option<usize>,
687}
688
689/// Response for listing streams
690#[derive(Debug, serde::Serialize)]
691pub struct ListStreamsResponse {
692    pub streams: Vec<StreamInfo>,
693    pub total: usize,
694}
695
696pub async fn list_streams(
697    State(store): State<SharedStore>,
698    Query(params): Query<ListStreamsParams>,
699) -> Json<ListStreamsResponse> {
700    let mut streams = store.list_streams();
701    let total = streams.len();
702
703    // Sort by last_event_at descending (most recent first)
704    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
705
706    // Apply pagination
707    if let Some(offset) = params.offset {
708        if offset < streams.len() {
709            streams = streams[offset..].to_vec();
710        } else {
711            streams = vec![];
712        }
713    }
714
715    if let Some(limit) = params.limit {
716        streams.truncate(limit);
717    }
718
719    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
720
721    Json(ListStreamsResponse { streams, total })
722}
723
724// v0.10: List all event types in the event store
725/// Query parameters for listing event types
726#[derive(Debug, Deserialize)]
727pub struct ListEventTypesParams {
728    /// Optional limit on number of event types to return
729    pub limit: Option<usize>,
730    /// Optional offset for pagination
731    pub offset: Option<usize>,
732}
733
734/// Response for listing event types
735#[derive(Debug, serde::Serialize)]
736pub struct ListEventTypesResponse {
737    pub event_types: Vec<EventTypeInfo>,
738    pub total: usize,
739}
740
741pub async fn list_event_types(
742    State(store): State<SharedStore>,
743    Query(params): Query<ListEventTypesParams>,
744) -> Json<ListEventTypesResponse> {
745    let mut event_types = store.list_event_types();
746    let total = event_types.len();
747
748    // Sort by event_count descending (most used first)
749    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
750
751    // Apply pagination
752    if let Some(offset) = params.offset {
753        if offset < event_types.len() {
754            event_types = event_types[offset..].to_vec();
755        } else {
756            event_types = vec![];
757        }
758    }
759
760    if let Some(limit) = params.limit {
761        event_types.truncate(limit);
762    }
763
764    tracing::debug!(
765        "Listed {} event types (total: {})",
766        event_types.len(),
767        total
768    );
769
770    Json(ListEventTypesResponse { event_types, total })
771}
772
773// v0.2: WebSocket endpoint for real-time event streaming
774#[derive(Debug, Deserialize)]
775pub struct WebSocketParams {
776    pub consumer_id: Option<String>,
777}
778
779pub async fn events_websocket(
780    ws: WebSocketUpgrade,
781    State(store): State<SharedStore>,
782    Query(params): Query<WebSocketParams>,
783) -> Response {
784    let websocket_manager = store.websocket_manager();
785
786    ws.on_upgrade(move |socket| async move {
787        if let Some(consumer_id) = params.consumer_id {
788            websocket_manager
789                .handle_socket_with_consumer(socket, consumer_id, store)
790                .await;
791        } else {
792            websocket_manager.handle_socket(socket).await;
793        }
794    })
795}
796
797// v0.2: Event frequency analytics endpoint
798pub async fn analytics_frequency(
799    State(store): State<SharedStore>,
800    Query(req): Query<EventFrequencyRequest>,
801) -> Result<Json<EventFrequencyResponse>> {
802    let response = AnalyticsEngine::event_frequency(&store, req)?;
803
804    tracing::debug!(
805        "Frequency analysis returned {} buckets",
806        response.buckets.len()
807    );
808
809    Ok(Json(response))
810}
811
812// v0.2: Statistical summary endpoint
813pub async fn analytics_summary(
814    State(store): State<SharedStore>,
815    Query(req): Query<StatsSummaryRequest>,
816) -> Result<Json<StatsSummaryResponse>> {
817    let response = AnalyticsEngine::stats_summary(&store, req)?;
818
819    tracing::debug!(
820        "Stats summary: {} events across {} entities",
821        response.total_events,
822        response.unique_entities
823    );
824
825    Ok(Json(response))
826}
827
828// v0.2: Event correlation analysis endpoint
829pub async fn analytics_correlation(
830    State(store): State<SharedStore>,
831    Query(req): Query<CorrelationRequest>,
832) -> Result<Json<CorrelationResponse>> {
833    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
834
835    tracing::debug!(
836        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
837        response.correlated_pairs,
838        response.total_a,
839        response.correlation_percentage
840    );
841
842    Ok(Json(response))
843}
844
845// v0.2: Create a snapshot for an entity
846pub async fn create_snapshot(
847    State(store): State<SharedStore>,
848    Json(req): Json<CreateSnapshotRequest>,
849) -> Result<Json<CreateSnapshotResponse>> {
850    store.create_snapshot(&req.entity_id)?;
851
852    let snapshot_manager = store.snapshot_manager();
853    let snapshot = snapshot_manager
854        .get_latest_snapshot(&req.entity_id)
855        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
856
857    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
858
859    Ok(Json(CreateSnapshotResponse {
860        snapshot_id: snapshot.id,
861        entity_id: snapshot.entity_id,
862        created_at: snapshot.created_at,
863        event_count: snapshot.event_count,
864        size_bytes: snapshot.metadata.size_bytes,
865    }))
866}
867
868// v0.2: List snapshots
869pub async fn list_snapshots(
870    State(store): State<SharedStore>,
871    Query(req): Query<ListSnapshotsRequest>,
872) -> Result<Json<ListSnapshotsResponse>> {
873    let snapshot_manager = store.snapshot_manager();
874
875    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
876        snapshot_manager
877            .get_all_snapshots(&entity_id)
878            .into_iter()
879            .map(SnapshotInfo::from)
880            .collect()
881    } else {
882        // List all entities with snapshots
883        let entities = snapshot_manager.list_entities();
884        entities
885            .iter()
886            .flat_map(|entity_id| {
887                snapshot_manager
888                    .get_all_snapshots(entity_id)
889                    .into_iter()
890                    .map(SnapshotInfo::from)
891            })
892            .collect()
893    };
894
895    let total = snapshots.len();
896
897    tracing::debug!("Listed {} snapshots", total);
898
899    Ok(Json(ListSnapshotsResponse { snapshots, total }))
900}
901
902// v0.2: Get latest snapshot for an entity
903pub async fn get_latest_snapshot(
904    State(store): State<SharedStore>,
905    Path(entity_id): Path<String>,
906) -> Result<Json<serde_json::Value>> {
907    let snapshot_manager = store.snapshot_manager();
908
909    let snapshot = snapshot_manager
910        .get_latest_snapshot(&entity_id)
911        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
912
913    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
914
915    Ok(Json(serde_json::json!({
916        "snapshot_id": snapshot.id,
917        "entity_id": snapshot.entity_id,
918        "created_at": snapshot.created_at,
919        "as_of": snapshot.as_of,
920        "event_count": snapshot.event_count,
921        "size_bytes": snapshot.metadata.size_bytes,
922        "snapshot_type": snapshot.metadata.snapshot_type,
923        "state": snapshot.state
924    })))
925}
926
927// v0.2: Trigger manual compaction
928pub async fn trigger_compaction(
929    State(store): State<SharedStore>,
930) -> Result<Json<CompactionResult>> {
931    let compaction_manager = store.compaction_manager().ok_or_else(|| {
932        crate::error::AllSourceError::InternalError(
933            "Compaction not enabled (no Parquet storage)".to_string(),
934        )
935    })?;
936
937    tracing::info!("📦 Manual compaction triggered via API");
938
939    let result = compaction_manager.compact_now()?;
940
941    Ok(Json(result))
942}
943
944// v0.2: Get compaction statistics
945pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
946    let compaction_manager = store.compaction_manager().ok_or_else(|| {
947        crate::error::AllSourceError::InternalError(
948            "Compaction not enabled (no Parquet storage)".to_string(),
949        )
950    })?;
951
952    let stats = compaction_manager.stats();
953    let config = compaction_manager.config();
954
955    Ok(Json(serde_json::json!({
956        "stats": stats,
957        "config": {
958            "min_files_to_compact": config.min_files_to_compact,
959            "target_file_size": config.target_file_size,
960            "max_file_size": config.max_file_size,
961            "small_file_threshold": config.small_file_threshold,
962            "compaction_interval_seconds": config.compaction_interval_seconds,
963            "auto_compact": config.auto_compact,
964            "strategy": config.strategy
965        }
966    })))
967}
968
969// v0.5: Register a new schema
970pub async fn register_schema(
971    State(store): State<SharedStore>,
972    Json(req): Json<RegisterSchemaRequest>,
973) -> Result<Json<RegisterSchemaResponse>> {
974    let schema_registry = store.schema_registry();
975
976    let response =
977        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
978
979    tracing::info!(
980        "📋 Schema registered: v{} for '{}'",
981        response.version,
982        response.subject
983    );
984
985    Ok(Json(response))
986}
987
988// v0.5: Get a schema by subject and optional version
989#[derive(Deserialize)]
990pub struct GetSchemaParams {
991    version: Option<u32>,
992}
993
994pub async fn get_schema(
995    State(store): State<SharedStore>,
996    Path(subject): Path<String>,
997    Query(params): Query<GetSchemaParams>,
998) -> Result<Json<serde_json::Value>> {
999    let schema_registry = store.schema_registry();
1000
1001    let schema = schema_registry.get_schema(&subject, params.version)?;
1002
1003    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1004
1005    Ok(Json(serde_json::json!({
1006        "id": schema.id,
1007        "subject": schema.subject,
1008        "version": schema.version,
1009        "schema": schema.schema,
1010        "created_at": schema.created_at,
1011        "description": schema.description,
1012        "tags": schema.tags
1013    })))
1014}
1015
1016// v0.5: List all versions of a schema subject
1017pub async fn list_schema_versions(
1018    State(store): State<SharedStore>,
1019    Path(subject): Path<String>,
1020) -> Result<Json<serde_json::Value>> {
1021    let schema_registry = store.schema_registry();
1022
1023    let versions = schema_registry.list_versions(&subject)?;
1024
1025    Ok(Json(serde_json::json!({
1026        "subject": subject,
1027        "versions": versions
1028    })))
1029}
1030
1031// v0.5: List all schema subjects
1032pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1033    let schema_registry = store.schema_registry();
1034
1035    let subjects = schema_registry.list_subjects();
1036
1037    Json(serde_json::json!({
1038        "subjects": subjects,
1039        "total": subjects.len()
1040    }))
1041}
1042
1043// v0.5: Validate an event against a schema
1044pub async fn validate_event_schema(
1045    State(store): State<SharedStore>,
1046    Json(req): Json<ValidateEventRequest>,
1047) -> Result<Json<ValidateEventResponse>> {
1048    let schema_registry = store.schema_registry();
1049
1050    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1051
1052    if response.valid {
1053        tracing::debug!(
1054            "✅ Event validated against schema '{}' v{}",
1055            req.subject,
1056            response.schema_version
1057        );
1058    } else {
1059        tracing::warn!(
1060            "❌ Event validation failed for '{}': {:?}",
1061            req.subject,
1062            response.errors
1063        );
1064    }
1065
1066    Ok(Json(response))
1067}
1068
1069// v0.5: Set compatibility mode for a subject
1070#[derive(Deserialize)]
1071pub struct SetCompatibilityRequest {
1072    compatibility: CompatibilityMode,
1073}
1074
1075pub async fn set_compatibility_mode(
1076    State(store): State<SharedStore>,
1077    Path(subject): Path<String>,
1078    Json(req): Json<SetCompatibilityRequest>,
1079) -> Json<serde_json::Value> {
1080    let schema_registry = store.schema_registry();
1081
1082    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1083
1084    tracing::info!(
1085        "🔧 Set compatibility mode for '{}' to {:?}",
1086        subject,
1087        req.compatibility
1088    );
1089
1090    Json(serde_json::json!({
1091        "subject": subject,
1092        "compatibility": req.compatibility
1093    }))
1094}
1095
1096// v0.5: Start a replay operation
1097pub async fn start_replay(
1098    State(store): State<SharedStore>,
1099    Json(req): Json<StartReplayRequest>,
1100) -> Result<Json<StartReplayResponse>> {
1101    let replay_manager = store.replay_manager();
1102
1103    let response = replay_manager.start_replay(store, req)?;
1104
1105    tracing::info!(
1106        "🔄 Started replay {} with {} events",
1107        response.replay_id,
1108        response.total_events
1109    );
1110
1111    Ok(Json(response))
1112}
1113
1114// v0.5: Get replay progress
1115pub async fn get_replay_progress(
1116    State(store): State<SharedStore>,
1117    Path(replay_id): Path<uuid::Uuid>,
1118) -> Result<Json<ReplayProgress>> {
1119    let replay_manager = store.replay_manager();
1120
1121    let progress = replay_manager.get_progress(replay_id)?;
1122
1123    Ok(Json(progress))
1124}
1125
1126// v0.5: List all replay operations
1127pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1128    let replay_manager = store.replay_manager();
1129
1130    let replays = replay_manager.list_replays();
1131
1132    Json(serde_json::json!({
1133        "replays": replays,
1134        "total": replays.len()
1135    }))
1136}
1137
1138// v0.5: Cancel a running replay
1139pub async fn cancel_replay(
1140    State(store): State<SharedStore>,
1141    Path(replay_id): Path<uuid::Uuid>,
1142) -> Result<Json<serde_json::Value>> {
1143    let replay_manager = store.replay_manager();
1144
1145    replay_manager.cancel_replay(replay_id)?;
1146
1147    tracing::info!("🛑 Cancelled replay {}", replay_id);
1148
1149    Ok(Json(serde_json::json!({
1150        "replay_id": replay_id,
1151        "status": "cancelled"
1152    })))
1153}
1154
1155// v0.5: Delete a completed replay
1156pub async fn delete_replay(
1157    State(store): State<SharedStore>,
1158    Path(replay_id): Path<uuid::Uuid>,
1159) -> Result<Json<serde_json::Value>> {
1160    let replay_manager = store.replay_manager();
1161
1162    let deleted = replay_manager.delete_replay(replay_id)?;
1163
1164    if deleted {
1165        tracing::info!("🗑️  Deleted replay {}", replay_id);
1166    }
1167
1168    Ok(Json(serde_json::json!({
1169        "replay_id": replay_id,
1170        "deleted": deleted
1171    })))
1172}
1173
1174// v0.5: Register a new pipeline
1175pub async fn register_pipeline(
1176    State(store): State<SharedStore>,
1177    Json(config): Json<PipelineConfig>,
1178) -> Result<Json<serde_json::Value>> {
1179    let pipeline_manager = store.pipeline_manager();
1180
1181    let pipeline_id = pipeline_manager.register(config.clone());
1182
1183    tracing::info!(
1184        "🔀 Pipeline registered: {} (name: {})",
1185        pipeline_id,
1186        config.name
1187    );
1188
1189    Ok(Json(serde_json::json!({
1190        "pipeline_id": pipeline_id,
1191        "name": config.name,
1192        "enabled": config.enabled
1193    })))
1194}
1195
1196// v0.5: List all pipelines
1197pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1198    let pipeline_manager = store.pipeline_manager();
1199
1200    let pipelines = pipeline_manager.list();
1201
1202    tracing::debug!("Listed {} pipelines", pipelines.len());
1203
1204    Json(serde_json::json!({
1205        "pipelines": pipelines,
1206        "total": pipelines.len()
1207    }))
1208}
1209
1210// v0.5: Get a specific pipeline
1211pub async fn get_pipeline(
1212    State(store): State<SharedStore>,
1213    Path(pipeline_id): Path<uuid::Uuid>,
1214) -> Result<Json<PipelineConfig>> {
1215    let pipeline_manager = store.pipeline_manager();
1216
1217    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1218        crate::error::AllSourceError::ValidationError(format!(
1219            "Pipeline not found: {}",
1220            pipeline_id
1221        ))
1222    })?;
1223
1224    Ok(Json(pipeline.config().clone()))
1225}
1226
1227// v0.5: Remove a pipeline
1228pub async fn remove_pipeline(
1229    State(store): State<SharedStore>,
1230    Path(pipeline_id): Path<uuid::Uuid>,
1231) -> Result<Json<serde_json::Value>> {
1232    let pipeline_manager = store.pipeline_manager();
1233
1234    let removed = pipeline_manager.remove(pipeline_id);
1235
1236    if removed {
1237        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1238    }
1239
1240    Ok(Json(serde_json::json!({
1241        "pipeline_id": pipeline_id,
1242        "removed": removed
1243    })))
1244}
1245
1246// v0.5: Get statistics for all pipelines
1247pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1248    let pipeline_manager = store.pipeline_manager();
1249
1250    let stats = pipeline_manager.all_stats();
1251
1252    Json(serde_json::json!({
1253        "stats": stats,
1254        "total": stats.len()
1255    }))
1256}
1257
1258// v0.5: Get statistics for a specific pipeline
1259pub async fn get_pipeline_stats(
1260    State(store): State<SharedStore>,
1261    Path(pipeline_id): Path<uuid::Uuid>,
1262) -> Result<Json<PipelineStats>> {
1263    let pipeline_manager = store.pipeline_manager();
1264
1265    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1266        crate::error::AllSourceError::ValidationError(format!(
1267            "Pipeline not found: {}",
1268            pipeline_id
1269        ))
1270    })?;
1271
1272    Ok(Json(pipeline.stats()))
1273}
1274
1275// v0.5: Reset a pipeline's state
1276pub async fn reset_pipeline(
1277    State(store): State<SharedStore>,
1278    Path(pipeline_id): Path<uuid::Uuid>,
1279) -> Result<Json<serde_json::Value>> {
1280    let pipeline_manager = store.pipeline_manager();
1281
1282    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1283        crate::error::AllSourceError::ValidationError(format!(
1284            "Pipeline not found: {}",
1285            pipeline_id
1286        ))
1287    })?;
1288
1289    pipeline.reset();
1290
1291    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1292
1293    Ok(Json(serde_json::json!({
1294        "pipeline_id": pipeline_id,
1295        "reset": true
1296    })))
1297}
1298
1299// =============================================================================
1300// v0.11: Single Event Lookup by ID
1301// =============================================================================
1302
1303/// Get a single event by UUID
1304pub async fn get_event_by_id(
1305    State(store): State<SharedStore>,
1306    Path(event_id): Path<uuid::Uuid>,
1307) -> Result<Json<serde_json::Value>> {
1308    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1309        crate::error::AllSourceError::EntityNotFound(format!("Event '{}' not found", event_id))
1310    })?;
1311
1312    let dto = EventDto::from(&event);
1313
1314    tracing::debug!("Event retrieved by ID: {}", event_id);
1315
1316    Ok(Json(serde_json::json!({
1317        "event": dto,
1318        "found": true
1319    })))
1320}
1321
1322// =============================================================================
1323// v0.7: Projection State API for Query Service Integration
1324// =============================================================================
1325
1326/// List all registered projections
1327pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1328    let projection_manager = store.projection_manager();
1329    let status_map = store.projection_status();
1330
1331    let projections: Vec<serde_json::Value> = projection_manager
1332        .list_projections()
1333        .iter()
1334        .map(|(name, projection)| {
1335            let status = status_map
1336                .get(name)
1337                .map(|s| s.value().clone())
1338                .unwrap_or_else(|| "running".to_string());
1339            serde_json::json!({
1340                "name": name,
1341                "type": format!("{:?}", projection.name()),
1342                "status": status,
1343            })
1344        })
1345        .collect();
1346
1347    tracing::debug!("Listed {} projections", projections.len());
1348
1349    Json(serde_json::json!({
1350        "projections": projections,
1351        "total": projections.len()
1352    }))
1353}
1354
1355/// Get projection metadata by name
1356pub async fn get_projection(
1357    State(store): State<SharedStore>,
1358    Path(name): Path<String>,
1359) -> Result<Json<serde_json::Value>> {
1360    let projection_manager = store.projection_manager();
1361
1362    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1363        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1364    })?;
1365
1366    Ok(Json(serde_json::json!({
1367        "name": projection.name(),
1368        "found": true
1369    })))
1370}
1371
1372/// Get projection state for a specific entity
1373///
1374/// This endpoint allows the Elixir Query Service to fetch projection state
1375/// from the Rust Core for synchronization.
1376pub async fn get_projection_state(
1377    State(store): State<SharedStore>,
1378    Path((name, entity_id)): Path<(String, String)>,
1379) -> Result<Json<serde_json::Value>> {
1380    let projection_manager = store.projection_manager();
1381
1382    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1383        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1384    })?;
1385
1386    let state = projection.get_state(&entity_id);
1387
1388    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1389
1390    Ok(Json(serde_json::json!({
1391        "projection": name,
1392        "entity_id": entity_id,
1393        "state": state,
1394        "found": state.is_some()
1395    })))
1396}
1397
1398/// Delete (clear) a projection by name
1399///
1400/// Removes all state from the projection. The projection definition remains
1401/// registered but its accumulated state is cleared.
1402pub async fn delete_projection(
1403    State(store): State<SharedStore>,
1404    Path(name): Path<String>,
1405) -> Result<Json<serde_json::Value>> {
1406    let projection_manager = store.projection_manager();
1407
1408    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1409        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1410    })?;
1411
1412    projection.clear();
1413
1414    // Also clear any cached state for this projection
1415    let cache = store.projection_state_cache();
1416    let prefix = format!("{name}:");
1417    let keys_to_remove: Vec<String> = cache
1418        .iter()
1419        .filter(|entry| entry.key().starts_with(&prefix))
1420        .map(|entry| entry.key().clone())
1421        .collect();
1422    for key in keys_to_remove {
1423        cache.remove(&key);
1424    }
1425
1426    tracing::info!("Projection deleted (cleared): {}", name);
1427
1428    Ok(Json(serde_json::json!({
1429        "projection": name,
1430        "deleted": true
1431    })))
1432}
1433
1434/// Get aggregate projection state (all entities)
1435///
1436/// Returns summary information about a projection's state across all entities.
1437pub async fn get_projection_state_summary(
1438    State(store): State<SharedStore>,
1439    Path(name): Path<String>,
1440) -> Result<Json<serde_json::Value>> {
1441    let projection_manager = store.projection_manager();
1442
1443    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1444        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1445    })?;
1446
1447    // Collect cached states for this projection
1448    let cache = store.projection_state_cache();
1449    let prefix = format!("{name}:");
1450    let states: Vec<serde_json::Value> = cache
1451        .iter()
1452        .filter(|entry| entry.key().starts_with(&prefix))
1453        .map(|entry| {
1454            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1455            serde_json::json!({
1456                "entity_id": entity_id,
1457                "state": entry.value().clone()
1458            })
1459        })
1460        .collect();
1461
1462    let total = states.len();
1463
1464    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1465
1466    Ok(Json(serde_json::json!({
1467        "projection": name,
1468        "states": states,
1469        "total": total
1470    })))
1471}
1472
1473/// Reset a projection to its initial state
1474///
1475/// Clears all accumulated state and reprocesses events from the beginning.
1476pub async fn reset_projection(
1477    State(store): State<SharedStore>,
1478    Path(name): Path<String>,
1479) -> Result<Json<serde_json::Value>> {
1480    let reprocessed = store.reset_projection(&name)?;
1481
1482    tracing::info!(
1483        "Projection reset: {} ({} events reprocessed)",
1484        name,
1485        reprocessed
1486    );
1487
1488    Ok(Json(serde_json::json!({
1489        "projection": name,
1490        "reset": true,
1491        "events_reprocessed": reprocessed
1492    })))
1493}
1494
1495/// Pause a projection
1496///
1497/// Sets the projection status to "paused" so it stops processing new events.
1498pub async fn pause_projection(
1499    State(store): State<SharedStore>,
1500    Path(name): Path<String>,
1501) -> Result<Json<serde_json::Value>> {
1502    let projection_manager = store.projection_manager();
1503
1504    // Verify projection exists
1505    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1506        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1507    })?;
1508
1509    store
1510        .projection_status()
1511        .insert(name.clone(), "paused".to_string());
1512
1513    tracing::info!("Projection paused: {}", name);
1514
1515    Ok(Json(serde_json::json!({
1516        "projection": name,
1517        "status": "paused"
1518    })))
1519}
1520
1521/// Start (resume) a projection
1522///
1523/// Sets the projection status to "running" so it resumes processing events.
1524pub async fn start_projection(
1525    State(store): State<SharedStore>,
1526    Path(name): Path<String>,
1527) -> Result<Json<serde_json::Value>> {
1528    let projection_manager = store.projection_manager();
1529
1530    // Verify projection exists
1531    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1532        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1533    })?;
1534
1535    store
1536        .projection_status()
1537        .insert(name.clone(), "running".to_string());
1538
1539    tracing::info!("Projection started: {}", name);
1540
1541    Ok(Json(serde_json::json!({
1542        "projection": name,
1543        "status": "running"
1544    })))
1545}
1546
1547/// Request body for saving projection state
1548#[derive(Debug, Deserialize)]
1549pub struct SaveProjectionStateRequest {
1550    pub state: serde_json::Value,
1551}
1552
1553/// Save/update projection state for an entity
1554///
1555/// This endpoint allows external services (like Elixir Query Service) to
1556/// store computed projection state back to the Core for persistence.
1557pub async fn save_projection_state(
1558    State(store): State<SharedStore>,
1559    Path((name, entity_id)): Path<(String, String)>,
1560    Json(req): Json<SaveProjectionStateRequest>,
1561) -> Result<Json<serde_json::Value>> {
1562    let projection_cache = store.projection_state_cache();
1563
1564    // Store in the projection state cache
1565    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1566
1567    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1568
1569    Ok(Json(serde_json::json!({
1570        "projection": name,
1571        "entity_id": entity_id,
1572        "saved": true
1573    })))
1574}
1575
1576/// Bulk get projection states for multiple entities
1577///
1578/// Efficient endpoint for fetching multiple entity states in a single request.
1579#[derive(Debug, Deserialize)]
1580pub struct BulkGetStateRequest {
1581    pub entity_ids: Vec<String>,
1582}
1583
1584/// Bulk save projection states for multiple entities
1585///
1586/// Efficient endpoint for saving multiple entity states in a single request.
1587#[derive(Debug, Deserialize)]
1588pub struct BulkSaveStateRequest {
1589    pub states: Vec<BulkSaveStateItem>,
1590}
1591
1592#[derive(Debug, Deserialize)]
1593pub struct BulkSaveStateItem {
1594    pub entity_id: String,
1595    pub state: serde_json::Value,
1596}
1597
1598pub async fn bulk_get_projection_states(
1599    State(store): State<SharedStore>,
1600    Path(name): Path<String>,
1601    Json(req): Json<BulkGetStateRequest>,
1602) -> Result<Json<serde_json::Value>> {
1603    let projection_manager = store.projection_manager();
1604
1605    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1606        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1607    })?;
1608
1609    let states: Vec<serde_json::Value> = req
1610        .entity_ids
1611        .iter()
1612        .map(|entity_id| {
1613            let state = projection.get_state(entity_id);
1614            serde_json::json!({
1615                "entity_id": entity_id,
1616                "state": state,
1617                "found": state.is_some()
1618            })
1619        })
1620        .collect();
1621
1622    tracing::debug!(
1623        "Bulk projection state retrieved: {} entities from {}",
1624        states.len(),
1625        name
1626    );
1627
1628    Ok(Json(serde_json::json!({
1629        "projection": name,
1630        "states": states,
1631        "total": states.len()
1632    })))
1633}
1634
1635/// Bulk save projection states for multiple entities
1636///
1637/// This endpoint allows efficient batch saving of projection states,
1638/// critical for high-throughput event processing pipelines.
1639pub async fn bulk_save_projection_states(
1640    State(store): State<SharedStore>,
1641    Path(name): Path<String>,
1642    Json(req): Json<BulkSaveStateRequest>,
1643) -> Result<Json<serde_json::Value>> {
1644    let projection_cache = store.projection_state_cache();
1645
1646    let mut saved_count = 0;
1647    for item in &req.states {
1648        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1649        saved_count += 1;
1650    }
1651
1652    tracing::info!(
1653        "Bulk projection state saved: {} entities for {}",
1654        saved_count,
1655        name
1656    );
1657
1658    Ok(Json(serde_json::json!({
1659        "projection": name,
1660        "saved": saved_count,
1661        "total": req.states.len()
1662    })))
1663}
1664
1665// =============================================================================
1666// v0.11: Webhook Management API
1667// =============================================================================
1668
1669/// Query parameters for listing webhooks
1670#[derive(Debug, Deserialize)]
1671pub struct ListWebhooksParams {
1672    pub tenant_id: Option<String>,
1673}
1674
1675/// Register a new webhook subscription
1676pub async fn register_webhook(
1677    State(store): State<SharedStore>,
1678    Json(req): Json<RegisterWebhookRequest>,
1679) -> Json<serde_json::Value> {
1680    let registry = store.webhook_registry();
1681    let webhook = registry.register(req);
1682
1683    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1684
1685    Json(serde_json::json!({
1686        "webhook": webhook,
1687        "created": true
1688    }))
1689}
1690
1691/// List webhooks, optionally filtered by tenant_id
1692pub async fn list_webhooks(
1693    State(store): State<SharedStore>,
1694    Query(params): Query<ListWebhooksParams>,
1695) -> Json<serde_json::Value> {
1696    let registry = store.webhook_registry();
1697
1698    let webhooks = if let Some(tenant_id) = params.tenant_id {
1699        registry.list_by_tenant(&tenant_id)
1700    } else {
1701        // Without tenant filter, return empty (tenants should always filter)
1702        vec![]
1703    };
1704
1705    let total = webhooks.len();
1706
1707    Json(serde_json::json!({
1708        "webhooks": webhooks,
1709        "total": total
1710    }))
1711}
1712
1713/// Get a specific webhook by ID
1714pub async fn get_webhook(
1715    State(store): State<SharedStore>,
1716    Path(webhook_id): Path<uuid::Uuid>,
1717) -> Result<Json<serde_json::Value>> {
1718    let registry = store.webhook_registry();
1719
1720    let webhook = registry.get(webhook_id).ok_or_else(|| {
1721        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1722    })?;
1723
1724    Ok(Json(serde_json::json!({
1725        "webhook": webhook,
1726        "found": true
1727    })))
1728}
1729
1730/// Update a webhook subscription
1731pub async fn update_webhook(
1732    State(store): State<SharedStore>,
1733    Path(webhook_id): Path<uuid::Uuid>,
1734    Json(req): Json<UpdateWebhookRequest>,
1735) -> Result<Json<serde_json::Value>> {
1736    let registry = store.webhook_registry();
1737
1738    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1739        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1740    })?;
1741
1742    tracing::info!("Webhook updated: {}", webhook_id);
1743
1744    Ok(Json(serde_json::json!({
1745        "webhook": webhook,
1746        "updated": true
1747    })))
1748}
1749
1750/// Delete a webhook subscription
1751pub async fn delete_webhook(
1752    State(store): State<SharedStore>,
1753    Path(webhook_id): Path<uuid::Uuid>,
1754) -> Result<Json<serde_json::Value>> {
1755    let registry = store.webhook_registry();
1756
1757    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1758        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1759    })?;
1760
1761    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1762
1763    Ok(Json(serde_json::json!({
1764        "webhook_id": webhook_id,
1765        "deleted": true
1766    })))
1767}
1768
1769/// Query parameters for listing webhook deliveries
1770#[derive(Debug, Deserialize)]
1771pub struct ListDeliveriesParams {
1772    pub limit: Option<usize>,
1773}
1774
1775/// List delivery history for a webhook
1776pub async fn list_webhook_deliveries(
1777    State(store): State<SharedStore>,
1778    Path(webhook_id): Path<uuid::Uuid>,
1779    Query(params): Query<ListDeliveriesParams>,
1780) -> Result<Json<serde_json::Value>> {
1781    let registry = store.webhook_registry();
1782
1783    // Verify webhook exists
1784    registry.get(webhook_id).ok_or_else(|| {
1785        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1786    })?;
1787
1788    let limit = params.limit.unwrap_or(50);
1789    let deliveries = registry.get_deliveries(webhook_id, limit);
1790    let total = deliveries.len();
1791
1792    Ok(Json(serde_json::json!({
1793        "webhook_id": webhook_id,
1794        "deliveries": deliveries,
1795        "total": total
1796    })))
1797}
1798
1799// =============================================================================
1800// v2.0: Advanced Query Features
1801// =============================================================================
1802
1803/// EventQL: Execute SQL queries over events using DataFusion
1804pub async fn eventql_query(
1805    State(store): State<SharedStore>,
1806    Json(req): Json<EventQLRequest>,
1807) -> Result<Json<serde_json::Value>> {
1808    let events = store.snapshot_events();
1809    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1810        Ok(response) => Ok(Json(serde_json::json!({
1811            "columns": response.columns,
1812            "rows": response.rows,
1813            "row_count": response.row_count,
1814        }))),
1815        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1816    }
1817}
1818
1819/// GraphQL: Execute GraphQL queries
1820pub async fn graphql_query(
1821    State(store): State<SharedStore>,
1822    Json(req): Json<GraphQLRequest>,
1823) -> Json<serde_json::Value> {
1824    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1825        Ok(f) => f,
1826        Err(e) => {
1827            return Json(
1828                serde_json::to_value(GraphQLResponse {
1829                    data: None,
1830                    errors: vec![GraphQLError { message: e }],
1831                })
1832                .unwrap(),
1833            );
1834        }
1835    };
1836
1837    let mut data = serde_json::Map::new();
1838    let mut errors = Vec::new();
1839
1840    for field in &fields {
1841        match field.name.as_str() {
1842            "events" => {
1843                let request = crate::application::dto::QueryEventsRequest {
1844                    entity_id: field.arguments.get("entity_id").cloned(),
1845                    event_type: field.arguments.get("event_type").cloned(),
1846                    tenant_id: field.arguments.get("tenant_id").cloned(),
1847                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1848                    as_of: None,
1849                    since: None,
1850                    until: None,
1851                    event_type_prefix: None,
1852                    payload_filter: None,
1853                };
1854                match store.query(request) {
1855                    Ok(events) => {
1856                        let json_events: Vec<serde_json::Value> = events
1857                            .iter()
1858                            .map(|e| {
1859                                crate::infrastructure::query::graphql::event_to_json(
1860                                    e,
1861                                    &field.fields,
1862                                )
1863                            })
1864                            .collect();
1865                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1866                    }
1867                    Err(e) => errors.push(GraphQLError {
1868                        message: format!("events query failed: {e}"),
1869                    }),
1870                }
1871            }
1872            "event" => {
1873                if let Some(id_str) = field.arguments.get("id") {
1874                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1875                        match store.get_event_by_id(&id) {
1876                            Ok(Some(event)) => {
1877                                data.insert(
1878                                    "event".to_string(),
1879                                    crate::infrastructure::query::graphql::event_to_json(
1880                                        &event,
1881                                        &field.fields,
1882                                    ),
1883                                );
1884                            }
1885                            Ok(None) => {
1886                                data.insert("event".to_string(), serde_json::Value::Null);
1887                            }
1888                            Err(e) => errors.push(GraphQLError {
1889                                message: format!("event lookup failed: {e}"),
1890                            }),
1891                        }
1892                    } else {
1893                        errors.push(GraphQLError {
1894                            message: format!("Invalid UUID: {id_str}"),
1895                        });
1896                    }
1897                } else {
1898                    errors.push(GraphQLError {
1899                        message: "event query requires 'id' argument".to_string(),
1900                    });
1901                }
1902            }
1903            "projections" => {
1904                let pm = store.projection_manager();
1905                let names: Vec<serde_json::Value> = pm
1906                    .list_projections()
1907                    .iter()
1908                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1909                    .collect();
1910                data.insert("projections".to_string(), serde_json::Value::Array(names));
1911            }
1912            "stats" => {
1913                let stats = store.stats();
1914                data.insert(
1915                    "stats".to_string(),
1916                    serde_json::json!({
1917                        "total_events": stats.total_events,
1918                        "total_entities": stats.total_entities,
1919                        "total_event_types": stats.total_event_types,
1920                    }),
1921                );
1922            }
1923            "__schema" => {
1924                data.insert(
1925                    "__schema".to_string(),
1926                    crate::infrastructure::query::graphql::introspection_schema(),
1927                );
1928            }
1929            other => {
1930                errors.push(GraphQLError {
1931                    message: format!("Unknown field: {other}"),
1932                });
1933            }
1934        }
1935    }
1936
1937    Json(
1938        serde_json::to_value(GraphQLResponse {
1939            data: Some(serde_json::Value::Object(data)),
1940            errors,
1941        })
1942        .unwrap(),
1943    )
1944}
1945
1946/// Geospatial: Query events by location
1947pub async fn geo_query(
1948    State(store): State<SharedStore>,
1949    Json(req): Json<GeoQueryRequest>,
1950) -> Json<serde_json::Value> {
1951    let events = store.snapshot_events();
1952    let geo_index = store.geo_index();
1953    let results =
1954        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1955    let total = results.len();
1956    Json(serde_json::json!({
1957        "results": results,
1958        "total": total,
1959    }))
1960}
1961
1962/// Geospatial index stats
1963pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1964    let stats = store.geo_index().stats();
1965    Json(serde_json::json!(stats))
1966}
1967
1968/// Exactly-once processing stats
1969pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1970    let stats = store.exactly_once().stats();
1971    Json(serde_json::json!(stats))
1972}
1973
1974/// Schema evolution history for an event type
1975pub async fn schema_evolution_history(
1976    State(store): State<SharedStore>,
1977    Path(event_type): Path<String>,
1978) -> Json<serde_json::Value> {
1979    let mgr = store.schema_evolution();
1980    let history = mgr.get_history(&event_type);
1981    let version = mgr.get_version(&event_type);
1982    Json(serde_json::json!({
1983        "event_type": event_type,
1984        "current_version": version,
1985        "history": history,
1986    }))
1987}
1988
1989/// Current inferred schema for an event type
1990pub async fn schema_evolution_schema(
1991    State(store): State<SharedStore>,
1992    Path(event_type): Path<String>,
1993) -> Json<serde_json::Value> {
1994    let mgr = store.schema_evolution();
1995    if let Some(schema) = mgr.get_schema(&event_type) {
1996        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1997        Json(serde_json::json!({
1998            "event_type": event_type,
1999            "version": mgr.get_version(&event_type),
2000            "inferred_schema": schema,
2001            "json_schema": json_schema,
2002        }))
2003    } else {
2004        Json(serde_json::json!({
2005            "event_type": event_type,
2006            "error": "No schema inferred for this event type"
2007        }))
2008    }
2009}
2010
2011/// Schema evolution stats
2012pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2013    let stats = store.schema_evolution().stats();
2014    let event_types = store.schema_evolution().list_event_types();
2015    Json(serde_json::json!({
2016        "stats": stats,
2017        "tracked_event_types": event_types,
2018    }))
2019}
2020
2021// =============================================================================
2022// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2023// =============================================================================
2024
2025/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2026#[cfg(feature = "embedded-sync")]
2027pub async fn sync_pull_handler(
2028    State(state): State<AppState>,
2029    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2030) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2031    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2032
2033    let store = &state.store;
2034
2035    // Compute "since" threshold from the client's version vector
2036    // We return all events the client hasn't seen yet
2037    let since = request
2038        .version_vector
2039        .values()
2040        .map(|ts| ts.physical_ms)
2041        .min()
2042        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2043
2044    let events = store.query(crate::application::dto::QueryEventsRequest {
2045        entity_id: None,
2046        event_type: None,
2047        tenant_id: None,
2048        as_of: None,
2049        since,
2050        until: None,
2051        limit: None,
2052        event_type_prefix: None,
2053        payload_filter: None,
2054    })?;
2055
2056    // Convert domain events to ReplicatedEvent wire format
2057    let mut replicated = Vec::with_capacity(events.len());
2058    let mut last_ms = 0u64;
2059    let mut logical = 0u32;
2060
2061    for event in &events {
2062        let event_ms = event.timestamp().timestamp_millis() as u64;
2063        if event_ms == last_ms {
2064            logical += 1;
2065        } else {
2066            last_ms = event_ms;
2067            logical = 0;
2068        }
2069
2070        replicated.push(ReplicatedEvent {
2071            event_id: event.id().to_string(),
2072            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2073            origin_region: "server".to_string(),
2074            event_data: serde_json::json!({
2075                "event_type": event.event_type_str(),
2076                "entity_id": event.entity_id_str(),
2077                "tenant_id": event.tenant_id_str(),
2078                "payload": event.payload,
2079                "metadata": event.metadata,
2080            }),
2081        });
2082    }
2083
2084    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2085        events: replicated,
2086        version_vector: std::collections::BTreeMap::new(),
2087    }))
2088}
2089
2090/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2091#[cfg(feature = "embedded-sync")]
2092pub async fn sync_push_handler(
2093    State(state): State<AppState>,
2094    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2095) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2096    let store = &state.store;
2097
2098    let mut accepted = 0usize;
2099    let mut skipped = 0usize;
2100
2101    for rep_event in &request.events {
2102        let event_data = &rep_event.event_data;
2103        let event_type = event_data
2104            .get("event_type")
2105            .and_then(|v| v.as_str())
2106            .unwrap_or("unknown")
2107            .to_string();
2108        let entity_id = event_data
2109            .get("entity_id")
2110            .and_then(|v| v.as_str())
2111            .unwrap_or("unknown")
2112            .to_string();
2113        let tenant_id = event_data
2114            .get("tenant_id")
2115            .and_then(|v| v.as_str())
2116            .unwrap_or("default")
2117            .to_string();
2118        let payload = event_data
2119            .get("payload")
2120            .cloned()
2121            .unwrap_or(serde_json::json!({}));
2122        let metadata = event_data.get("metadata").cloned();
2123
2124        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2125            Ok(domain_event) => {
2126                store.ingest(domain_event)?;
2127                accepted += 1;
2128            }
2129            Err(_) => {
2130                skipped += 1;
2131            }
2132        }
2133    }
2134
2135    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2136        accepted,
2137        skipped,
2138        version_vector: std::collections::BTreeMap::new(),
2139    }))
2140}
2141
2142// =============================================================================
2143// Consumer endpoints for durable subscriptions (v0.14)
2144// =============================================================================
2145
2146/// POST /api/v1/consumers — Register a durable consumer
2147pub async fn register_consumer(
2148    State(store): State<SharedStore>,
2149    Json(req): Json<RegisterConsumerRequest>,
2150) -> Result<Json<ConsumerResponse>> {
2151    let consumer = store
2152        .consumer_registry()
2153        .register(req.consumer_id, req.event_type_filters);
2154
2155    Ok(Json(ConsumerResponse {
2156        consumer_id: consumer.consumer_id,
2157        event_type_filters: consumer.event_type_filters,
2158        cursor_position: consumer.cursor_position,
2159    }))
2160}
2161
2162/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2163pub async fn get_consumer(
2164    State(store): State<SharedStore>,
2165    Path(consumer_id): Path<String>,
2166) -> Result<Json<ConsumerResponse>> {
2167    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2168
2169    Ok(Json(ConsumerResponse {
2170        consumer_id: consumer.consumer_id,
2171        event_type_filters: consumer.event_type_filters,
2172        cursor_position: consumer.cursor_position,
2173    }))
2174}
2175
2176/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2177#[derive(Debug, Deserialize)]
2178pub struct ConsumerPollQuery {
2179    pub limit: Option<usize>,
2180}
2181
2182pub async fn poll_consumer_events(
2183    State(store): State<SharedStore>,
2184    Path(consumer_id): Path<String>,
2185    Query(query): Query<ConsumerPollQuery>,
2186) -> Result<Json<ConsumerEventsResponse>> {
2187    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2188    let offset = consumer.cursor_position.unwrap_or(0);
2189    let limit = query.limit.unwrap_or(100);
2190
2191    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2192    let count = events.len();
2193
2194    let consumer_events: Vec<ConsumerEventDto> = events
2195        .into_iter()
2196        .map(|(position, event)| ConsumerEventDto {
2197            position,
2198            event: EventDto::from(&event),
2199        })
2200        .collect();
2201
2202    Ok(Json(ConsumerEventsResponse {
2203        events: consumer_events,
2204        count,
2205    }))
2206}
2207
2208/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2209pub async fn ack_consumer(
2210    State(store): State<SharedStore>,
2211    Path(consumer_id): Path<String>,
2212    Json(req): Json<AckRequest>,
2213) -> Result<Json<serde_json::Value>> {
2214    let max_offset = store.total_events() as u64;
2215
2216    store
2217        .consumer_registry()
2218        .ack(&consumer_id, req.position, max_offset)
2219        .map_err(crate::error::AllSourceError::InvalidInput)?;
2220
2221    Ok(Json(serde_json::json!({
2222        "status": "ok",
2223        "consumer_id": consumer_id,
2224        "position": req.position,
2225    })))
2226}
2227
2228#[cfg(test)]
2229mod tests {
2230    use super::*;
2231    use crate::{domain::entities::Event, store::EventStore};
2232
2233    fn create_test_store() -> Arc<EventStore> {
2234        Arc::new(EventStore::new())
2235    }
2236
2237    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2238        Event::from_strings(
2239            event_type.to_string(),
2240            entity_id.to_string(),
2241            "test-stream".to_string(),
2242            serde_json::json!({
2243                "name": "Test",
2244                "value": 42
2245            }),
2246            None,
2247        )
2248        .unwrap()
2249    }
2250
2251    #[tokio::test]
2252    async fn test_query_events_has_more_and_total_count() {
2253        let store = create_test_store();
2254
2255        // Ingest 50 events
2256        for i in 0..50 {
2257            store
2258                .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2259                .unwrap();
2260        }
2261
2262        // Query with limit=10 — should get has_more=true, total_count=50
2263        let req = QueryEventsRequest {
2264            entity_id: None,
2265            event_type: None,
2266            tenant_id: None,
2267            as_of: None,
2268            since: None,
2269            until: None,
2270            limit: Some(10),
2271            event_type_prefix: None,
2272            payload_filter: None,
2273        };
2274
2275        let requested_limit = req.limit;
2276        let unlimited_req = QueryEventsRequest {
2277            limit: None,
2278            ..QueryEventsRequest {
2279                entity_id: req.entity_id,
2280                event_type: req.event_type,
2281                tenant_id: req.tenant_id,
2282                as_of: req.as_of,
2283                since: req.since,
2284                until: req.until,
2285                limit: None,
2286                event_type_prefix: req.event_type_prefix,
2287                payload_filter: req.payload_filter,
2288            }
2289        };
2290        let all_events = store.query(unlimited_req).unwrap();
2291        let total_count = all_events.len();
2292        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2293            all_events.into_iter().take(limit).collect()
2294        } else {
2295            all_events
2296        };
2297        let count = limited_events.len();
2298        let has_more = count < total_count;
2299
2300        assert_eq!(count, 10);
2301        assert_eq!(total_count, 50);
2302        assert!(has_more);
2303    }
2304
2305    #[tokio::test]
2306    async fn test_query_events_no_more_results() {
2307        let store = create_test_store();
2308
2309        // Ingest 5 events
2310        for i in 0..5 {
2311            store
2312                .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2313                .unwrap();
2314        }
2315
2316        // Query with limit=100 — should get has_more=false, total_count=5
2317        let all_events = store
2318            .query(QueryEventsRequest {
2319                entity_id: None,
2320                event_type: None,
2321                tenant_id: None,
2322                as_of: None,
2323                since: None,
2324                until: None,
2325                limit: None,
2326                event_type_prefix: None,
2327                payload_filter: None,
2328            })
2329            .unwrap();
2330        let total_count = all_events.len();
2331        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2332        let count = limited_events.len();
2333        let has_more = count < total_count;
2334
2335        assert_eq!(count, 5);
2336        assert_eq!(total_count, 5);
2337        assert!(!has_more);
2338    }
2339
2340    #[tokio::test]
2341    async fn test_list_entities_by_type_prefix() {
2342        let store = create_test_store();
2343
2344        // 3 index entities
2345        store
2346            .ingest(create_test_event("idx-1", "index.created"))
2347            .unwrap();
2348        store
2349            .ingest(create_test_event("idx-1", "index.updated"))
2350            .unwrap();
2351        store
2352            .ingest(create_test_event("idx-2", "index.created"))
2353            .unwrap();
2354        store
2355            .ingest(create_test_event("idx-3", "index.created"))
2356            .unwrap();
2357        // 2 trade entities
2358        store
2359            .ingest(create_test_event("trade-1", "trade.created"))
2360            .unwrap();
2361        store
2362            .ingest(create_test_event("trade-2", "trade.created"))
2363            .unwrap();
2364
2365        // List entities for index.*
2366        let req = ListEntitiesRequest {
2367            event_type_prefix: Some("index.".to_string()),
2368            payload_filter: None,
2369            limit: None,
2370            offset: None,
2371        };
2372        let query_req = QueryEventsRequest {
2373            entity_id: None,
2374            event_type: None,
2375            tenant_id: None,
2376            as_of: None,
2377            since: None,
2378            until: None,
2379            limit: None,
2380            event_type_prefix: req.event_type_prefix,
2381            payload_filter: req.payload_filter,
2382        };
2383        let events = store.query(query_req).unwrap();
2384
2385        // Group and verify
2386        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2387            std::collections::HashMap::new();
2388        for event in &events {
2389            entity_map
2390                .entry(event.entity_id().to_string())
2391                .or_default()
2392                .push(event);
2393        }
2394
2395        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2396        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2397        assert_eq!(entity_map["idx-2"].len(), 1);
2398        assert_eq!(entity_map["idx-3"].len(), 1);
2399    }
2400
2401    fn create_test_event_with_payload(
2402        entity_id: &str,
2403        event_type: &str,
2404        payload: serde_json::Value,
2405    ) -> Event {
2406        Event::from_strings(
2407            event_type.to_string(),
2408            entity_id.to_string(),
2409            "test-stream".to_string(),
2410            payload,
2411            None,
2412        )
2413        .unwrap()
2414    }
2415
2416    #[tokio::test]
2417    async fn test_detect_duplicates_by_payload_fields() {
2418        let store = create_test_store();
2419
2420        // Create entities with duplicate "name" field values
2421        store
2422            .ingest(create_test_event_with_payload(
2423                "idx-1",
2424                "index.created",
2425                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2426            ))
2427            .unwrap();
2428        store
2429            .ingest(create_test_event_with_payload(
2430                "idx-2",
2431                "index.created",
2432                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2433            ))
2434            .unwrap();
2435        store
2436            .ingest(create_test_event_with_payload(
2437                "idx-3",
2438                "index.created",
2439                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2440            ))
2441            .unwrap();
2442        store
2443            .ingest(create_test_event_with_payload(
2444                "idx-4",
2445                "index.created",
2446                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2447            ))
2448            .unwrap();
2449        store
2450            .ingest(create_test_event_with_payload(
2451                "idx-5",
2452                "index.created",
2453                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2454            ))
2455            .unwrap();
2456
2457        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2458        let query_req = QueryEventsRequest {
2459            entity_id: None,
2460            event_type: None,
2461            tenant_id: None,
2462            as_of: None,
2463            since: None,
2464            until: None,
2465            limit: None,
2466            event_type_prefix: Some("index.".to_string()),
2467            payload_filter: None,
2468        };
2469        let events = store.query(query_req).unwrap();
2470
2471        // Manually replicate the handler logic for testing
2472        let group_by_fields = vec!["name"];
2473        let mut entity_latest: std::collections::HashMap<String, &Event> =
2474            std::collections::HashMap::new();
2475        for event in &events {
2476            let eid = event.entity_id().to_string();
2477            entity_latest
2478                .entry(eid)
2479                .and_modify(|existing| {
2480                    if event.timestamp() > existing.timestamp() {
2481                        *existing = event;
2482                    }
2483                })
2484                .or_insert(event);
2485        }
2486
2487        let mut groups: std::collections::HashMap<String, Vec<String>> =
2488            std::collections::HashMap::new();
2489        for (entity_id, event) in &entity_latest {
2490            let payload = event.payload();
2491            let mut key_parts = serde_json::Map::new();
2492            for field in &group_by_fields {
2493                let value = payload
2494                    .get(*field)
2495                    .cloned()
2496                    .unwrap_or(serde_json::Value::Null);
2497                key_parts.insert(field.to_string(), value);
2498            }
2499            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2500            groups.entry(key_str).or_default().push(entity_id.clone());
2501        }
2502
2503        let duplicate_groups: Vec<_> = groups
2504            .into_iter()
2505            .filter(|(_, ids)| ids.len() > 1)
2506            .collect();
2507
2508        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2509        for (_, ids) in &duplicate_groups {
2510            assert_eq!(ids.len(), 2);
2511        }
2512    }
2513
2514    #[tokio::test]
2515    async fn test_detect_duplicates_no_duplicates() {
2516        let store = create_test_store();
2517
2518        // All unique names
2519        store
2520            .ingest(create_test_event_with_payload(
2521                "idx-1",
2522                "index.created",
2523                serde_json::json!({"name": "A"}),
2524            ))
2525            .unwrap();
2526        store
2527            .ingest(create_test_event_with_payload(
2528                "idx-2",
2529                "index.created",
2530                serde_json::json!({"name": "B"}),
2531            ))
2532            .unwrap();
2533
2534        let query_req = QueryEventsRequest {
2535            entity_id: None,
2536            event_type: None,
2537            tenant_id: None,
2538            as_of: None,
2539            since: None,
2540            until: None,
2541            limit: None,
2542            event_type_prefix: Some("index.".to_string()),
2543            payload_filter: None,
2544        };
2545        let events = store.query(query_req).unwrap();
2546
2547        let mut entity_latest: std::collections::HashMap<String, &Event> =
2548            std::collections::HashMap::new();
2549        for event in &events {
2550            entity_latest
2551                .entry(event.entity_id().to_string())
2552                .or_insert(event);
2553        }
2554
2555        let mut groups: std::collections::HashMap<String, Vec<String>> =
2556            std::collections::HashMap::new();
2557        for (entity_id, event) in &entity_latest {
2558            let key_str =
2559                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2560                    .unwrap();
2561            groups.entry(key_str).or_default().push(entity_id.clone());
2562        }
2563
2564        let duplicate_groups: Vec<_> = groups
2565            .into_iter()
2566            .filter(|(_, ids)| ids.len() > 1)
2567            .collect();
2568
2569        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2570    }
2571
2572    #[tokio::test]
2573    async fn test_detect_duplicates_multi_field_group_by() {
2574        let store = create_test_store();
2575
2576        // Two entities with same name AND user_id = true duplicate
2577        store
2578            .ingest(create_test_event_with_payload(
2579                "idx-1",
2580                "index.created",
2581                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2582            ))
2583            .unwrap();
2584        store
2585            .ingest(create_test_event_with_payload(
2586                "idx-2",
2587                "index.created",
2588                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2589            ))
2590            .unwrap();
2591        // Same name but different user_id = NOT a duplicate in multi-field group
2592        store
2593            .ingest(create_test_event_with_payload(
2594                "idx-3",
2595                "index.created",
2596                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2597            ))
2598            .unwrap();
2599
2600        let query_req = QueryEventsRequest {
2601            entity_id: None,
2602            event_type: None,
2603            tenant_id: None,
2604            as_of: None,
2605            since: None,
2606            until: None,
2607            limit: None,
2608            event_type_prefix: Some("index.".to_string()),
2609            payload_filter: None,
2610        };
2611        let events = store.query(query_req).unwrap();
2612
2613        let group_by_fields = vec!["name", "user_id"];
2614        let mut entity_latest: std::collections::HashMap<String, &Event> =
2615            std::collections::HashMap::new();
2616        for event in &events {
2617            entity_latest
2618                .entry(event.entity_id().to_string())
2619                .and_modify(|existing| {
2620                    if event.timestamp() > existing.timestamp() {
2621                        *existing = event;
2622                    }
2623                })
2624                .or_insert(event);
2625        }
2626
2627        let mut groups: std::collections::HashMap<String, Vec<String>> =
2628            std::collections::HashMap::new();
2629        for (entity_id, event) in &entity_latest {
2630            let payload = event.payload();
2631            let mut key_parts = serde_json::Map::new();
2632            for field in &group_by_fields {
2633                let value = payload
2634                    .get(*field)
2635                    .cloned()
2636                    .unwrap_or(serde_json::Value::Null);
2637                key_parts.insert(field.to_string(), value);
2638            }
2639            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2640            groups.entry(key_str).or_default().push(entity_id.clone());
2641        }
2642
2643        let duplicate_groups: Vec<_> = groups
2644            .into_iter()
2645            .filter(|(_, ids)| ids.len() > 1)
2646            .collect();
2647
2648        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2649        assert_eq!(duplicate_groups.len(), 1);
2650        let (_, ref ids) = duplicate_groups[0];
2651        assert_eq!(ids.len(), 2);
2652        let mut sorted_ids = ids.clone();
2653        sorted_ids.sort();
2654        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2655    }
2656
2657    #[tokio::test]
2658    async fn test_projection_state_cache() {
2659        let store = create_test_store();
2660
2661        // Test cache insertion
2662        let cache = store.projection_state_cache();
2663        cache.insert(
2664            "entity_snapshots:user-123".to_string(),
2665            serde_json::json!({"name": "Test User", "age": 30}),
2666        );
2667
2668        // Test cache retrieval
2669        let state = cache.get("entity_snapshots:user-123");
2670        assert!(state.is_some());
2671        let state = state.unwrap();
2672        assert_eq!(state["name"], "Test User");
2673        assert_eq!(state["age"], 30);
2674    }
2675
2676    #[tokio::test]
2677    async fn test_projection_manager_list_projections() {
2678        let store = create_test_store();
2679
2680        // List projections (built-in projections should be available)
2681        let projection_manager = store.projection_manager();
2682        let projections = projection_manager.list_projections();
2683
2684        // Should have entity_snapshots and event_counters
2685        assert!(projections.len() >= 2);
2686
2687        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2688        assert!(names.contains(&"entity_snapshots"));
2689        assert!(names.contains(&"event_counters"));
2690    }
2691
2692    #[tokio::test]
2693    async fn test_projection_state_after_event_ingestion() {
2694        let store = create_test_store();
2695
2696        // Ingest an event
2697        let event = create_test_event("user-456", "user.created");
2698        store.ingest(event).unwrap();
2699
2700        // Get projection state
2701        let projection_manager = store.projection_manager();
2702        let snapshot_projection = projection_manager
2703            .get_projection("entity_snapshots")
2704            .unwrap();
2705
2706        let state = snapshot_projection.get_state("user-456");
2707        assert!(state.is_some());
2708        let state = state.unwrap();
2709        assert_eq!(state["name"], "Test");
2710        assert_eq!(state["value"], 42);
2711    }
2712
2713    #[tokio::test]
2714    async fn test_projection_state_cache_multiple_entities() {
2715        let store = create_test_store();
2716        let cache = store.projection_state_cache();
2717
2718        // Insert multiple entities
2719        for i in 0..10 {
2720            cache.insert(
2721                format!("entity_snapshots:entity-{}", i),
2722                serde_json::json!({"id": i, "status": "active"}),
2723            );
2724        }
2725
2726        // Verify all insertions
2727        assert_eq!(cache.len(), 10);
2728
2729        // Verify each entity
2730        for i in 0..10 {
2731            let key = format!("entity_snapshots:entity-{}", i);
2732            let state = cache.get(&key);
2733            assert!(state.is_some());
2734            assert_eq!(state.unwrap()["id"], i);
2735        }
2736    }
2737
2738    #[tokio::test]
2739    async fn test_projection_state_update() {
2740        let store = create_test_store();
2741        let cache = store.projection_state_cache();
2742
2743        // Initial state
2744        cache.insert(
2745            "entity_snapshots:user-789".to_string(),
2746            serde_json::json!({"balance": 100}),
2747        );
2748
2749        // Update state
2750        cache.insert(
2751            "entity_snapshots:user-789".to_string(),
2752            serde_json::json!({"balance": 150}),
2753        );
2754
2755        // Verify update
2756        let state = cache.get("entity_snapshots:user-789").unwrap();
2757        assert_eq!(state["balance"], 150);
2758    }
2759
2760    #[tokio::test]
2761    async fn test_event_counter_projection() {
2762        let store = create_test_store();
2763
2764        // Ingest events of different types
2765        store
2766            .ingest(create_test_event("user-1", "user.created"))
2767            .unwrap();
2768        store
2769            .ingest(create_test_event("user-2", "user.created"))
2770            .unwrap();
2771        store
2772            .ingest(create_test_event("user-1", "user.updated"))
2773            .unwrap();
2774
2775        // Get event counter projection
2776        let projection_manager = store.projection_manager();
2777        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2778
2779        // Check counts
2780        let created_state = counter_projection.get_state("user.created");
2781        assert!(created_state.is_some());
2782        assert_eq!(created_state.unwrap()["count"], 2);
2783
2784        let updated_state = counter_projection.get_state("user.updated");
2785        assert!(updated_state.is_some());
2786        assert_eq!(updated_state.unwrap()["count"], 1);
2787    }
2788
2789    #[tokio::test]
2790    async fn test_projection_state_cache_key_format() {
2791        let store = create_test_store();
2792        let cache = store.projection_state_cache();
2793
2794        // Test standard key format: {projection_name}:{entity_id}
2795        let key = "orders:order-12345".to_string();
2796        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2797
2798        let state = cache.get(&key).unwrap();
2799        assert_eq!(state["total"], 99.99);
2800    }
2801
2802    #[tokio::test]
2803    async fn test_projection_state_cache_removal() {
2804        let store = create_test_store();
2805        let cache = store.projection_state_cache();
2806
2807        // Insert and then remove
2808        cache.insert(
2809            "test:entity-1".to_string(),
2810            serde_json::json!({"data": "value"}),
2811        );
2812        assert_eq!(cache.len(), 1);
2813
2814        cache.remove("test:entity-1");
2815        assert_eq!(cache.len(), 0);
2816        assert!(cache.get("test:entity-1").is_none());
2817    }
2818
2819    #[tokio::test]
2820    async fn test_get_nonexistent_projection() {
2821        let store = create_test_store();
2822        let projection_manager = store.projection_manager();
2823
2824        // Requesting a non-existent projection should return None
2825        let projection = projection_manager.get_projection("nonexistent_projection");
2826        assert!(projection.is_none());
2827    }
2828
2829    #[tokio::test]
2830    async fn test_get_nonexistent_entity_state() {
2831        let store = create_test_store();
2832        let projection_manager = store.projection_manager();
2833
2834        // Get state for non-existent entity
2835        let snapshot_projection = projection_manager
2836            .get_projection("entity_snapshots")
2837            .unwrap();
2838        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2839        assert!(state.is_none());
2840    }
2841
2842    #[tokio::test]
2843    async fn test_projection_state_cache_concurrent_access() {
2844        let store = create_test_store();
2845        let cache = store.projection_state_cache();
2846
2847        // Simulate concurrent writes
2848        let handles: Vec<_> = (0..10)
2849            .map(|i| {
2850                let cache_clone = cache.clone();
2851                tokio::spawn(async move {
2852                    cache_clone.insert(
2853                        format!("concurrent:entity-{}", i),
2854                        serde_json::json!({"thread": i}),
2855                    );
2856                })
2857            })
2858            .collect();
2859
2860        for handle in handles {
2861            handle.await.unwrap();
2862        }
2863
2864        // All 10 entries should be present
2865        assert_eq!(cache.len(), 10);
2866    }
2867
2868    #[tokio::test]
2869    async fn test_projection_state_large_payload() {
2870        let store = create_test_store();
2871        let cache = store.projection_state_cache();
2872
2873        // Create a large JSON payload (~10KB)
2874        let large_array: Vec<serde_json::Value> = (0..1000)
2875            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2876            .collect();
2877
2878        cache.insert(
2879            "large:entity-1".to_string(),
2880            serde_json::json!({"items": large_array}),
2881        );
2882
2883        let state = cache.get("large:entity-1").unwrap();
2884        let items = state["items"].as_array().unwrap();
2885        assert_eq!(items.len(), 1000);
2886    }
2887
2888    #[tokio::test]
2889    async fn test_projection_state_complex_json() {
2890        let store = create_test_store();
2891        let cache = store.projection_state_cache();
2892
2893        // Complex nested JSON structure
2894        let complex_state = serde_json::json!({
2895            "user": {
2896                "id": "user-123",
2897                "profile": {
2898                    "name": "John Doe",
2899                    "email": "john@example.com",
2900                    "settings": {
2901                        "theme": "dark",
2902                        "notifications": true
2903                    }
2904                },
2905                "roles": ["admin", "user"],
2906                "metadata": {
2907                    "created_at": "2025-01-01T00:00:00Z",
2908                    "last_login": null
2909                }
2910            }
2911        });
2912
2913        cache.insert("complex:user-123".to_string(), complex_state);
2914
2915        let state = cache.get("complex:user-123").unwrap();
2916        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2917        assert_eq!(state["user"]["roles"][0], "admin");
2918        assert!(state["user"]["metadata"]["last_login"].is_null());
2919    }
2920
2921    #[tokio::test]
2922    async fn test_projection_state_cache_iteration() {
2923        let store = create_test_store();
2924        let cache = store.projection_state_cache();
2925
2926        // Insert entries
2927        for i in 0..5 {
2928            cache.insert(
2929                format!("iter:entity-{}", i),
2930                serde_json::json!({"index": i}),
2931            );
2932        }
2933
2934        // Iterate over all entries
2935        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2936        assert_eq!(entries.len(), 5);
2937    }
2938
2939    #[tokio::test]
2940    async fn test_projection_manager_get_entity_snapshots() {
2941        let store = create_test_store();
2942        let projection_manager = store.projection_manager();
2943
2944        // Get entity_snapshots projection specifically
2945        let projection = projection_manager.get_projection("entity_snapshots");
2946        assert!(projection.is_some());
2947        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2948    }
2949
2950    #[tokio::test]
2951    async fn test_projection_manager_get_event_counters() {
2952        let store = create_test_store();
2953        let projection_manager = store.projection_manager();
2954
2955        // Get event_counters projection specifically
2956        let projection = projection_manager.get_projection("event_counters");
2957        assert!(projection.is_some());
2958        assert_eq!(projection.unwrap().name(), "event_counters");
2959    }
2960
2961    #[tokio::test]
2962    async fn test_projection_state_cache_overwrite() {
2963        let store = create_test_store();
2964        let cache = store.projection_state_cache();
2965
2966        // Initial value
2967        cache.insert(
2968            "overwrite:entity-1".to_string(),
2969            serde_json::json!({"version": 1}),
2970        );
2971
2972        // Overwrite with new value
2973        cache.insert(
2974            "overwrite:entity-1".to_string(),
2975            serde_json::json!({"version": 2}),
2976        );
2977
2978        // Overwrite again
2979        cache.insert(
2980            "overwrite:entity-1".to_string(),
2981            serde_json::json!({"version": 3}),
2982        );
2983
2984        let state = cache.get("overwrite:entity-1").unwrap();
2985        assert_eq!(state["version"], 3);
2986
2987        // Should still be only 1 entry
2988        assert_eq!(cache.len(), 1);
2989    }
2990
2991    #[tokio::test]
2992    async fn test_projection_state_multiple_projections() {
2993        let store = create_test_store();
2994        let cache = store.projection_state_cache();
2995
2996        // Store states for different projections
2997        cache.insert(
2998            "entity_snapshots:user-1".to_string(),
2999            serde_json::json!({"name": "Alice"}),
3000        );
3001        cache.insert(
3002            "event_counters:user.created".to_string(),
3003            serde_json::json!({"count": 5}),
3004        );
3005        cache.insert(
3006            "custom_projection:order-1".to_string(),
3007            serde_json::json!({"total": 150.0}),
3008        );
3009
3010        // Verify each projection's state
3011        assert_eq!(
3012            cache.get("entity_snapshots:user-1").unwrap()["name"],
3013            "Alice"
3014        );
3015        assert_eq!(
3016            cache.get("event_counters:user.created").unwrap()["count"],
3017            5
3018        );
3019        assert_eq!(
3020            cache.get("custom_projection:order-1").unwrap()["total"],
3021            150.0
3022        );
3023    }
3024
3025    #[tokio::test]
3026    async fn test_bulk_projection_state_access() {
3027        let store = create_test_store();
3028
3029        // Ingest multiple events for different entities
3030        for i in 0..5 {
3031            let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
3032            store.ingest(event).unwrap();
3033        }
3034
3035        // Get projection and verify bulk access
3036        let projection_manager = store.projection_manager();
3037        let snapshot_projection = projection_manager
3038            .get_projection("entity_snapshots")
3039            .unwrap();
3040
3041        // Verify we can access all entities
3042        for i in 0..5 {
3043            let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
3044            assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
3045        }
3046    }
3047
3048    #[tokio::test]
3049    async fn test_bulk_save_projection_states() {
3050        let store = create_test_store();
3051        let cache = store.projection_state_cache();
3052
3053        // Simulate bulk save request
3054        let states = vec![
3055            BulkSaveStateItem {
3056                entity_id: "bulk-entity-1".to_string(),
3057                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3058            },
3059            BulkSaveStateItem {
3060                entity_id: "bulk-entity-2".to_string(),
3061                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3062            },
3063            BulkSaveStateItem {
3064                entity_id: "bulk-entity-3".to_string(),
3065                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3066            },
3067        ];
3068
3069        let projection_name = "test_projection";
3070
3071        // Save states to cache (simulating bulk_save_projection_states handler)
3072        for item in &states {
3073            cache.insert(
3074                format!("{projection_name}:{}", item.entity_id),
3075                item.state.clone(),
3076            );
3077        }
3078
3079        // Verify all states were saved
3080        assert_eq!(cache.len(), 3);
3081
3082        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3083        assert_eq!(state1["name"], "Entity 1");
3084        assert_eq!(state1["value"], 100);
3085
3086        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3087        assert_eq!(state2["name"], "Entity 2");
3088        assert_eq!(state2["value"], 200);
3089
3090        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3091        assert_eq!(state3["name"], "Entity 3");
3092        assert_eq!(state3["value"], 300);
3093    }
3094
3095    #[tokio::test]
3096    async fn test_bulk_save_empty_states() {
3097        let store = create_test_store();
3098        let cache = store.projection_state_cache();
3099
3100        // Clear cache
3101        cache.clear();
3102
3103        // Empty states should work fine
3104        let states: Vec<BulkSaveStateItem> = vec![];
3105        assert_eq!(states.len(), 0);
3106
3107        // Cache should remain empty
3108        assert_eq!(cache.len(), 0);
3109    }
3110
3111    #[tokio::test]
3112    async fn test_bulk_save_overwrites_existing() {
3113        let store = create_test_store();
3114        let cache = store.projection_state_cache();
3115
3116        // Insert initial state
3117        cache.insert(
3118            "test:entity-1".to_string(),
3119            serde_json::json!({"version": 1, "data": "initial"}),
3120        );
3121
3122        // Bulk save with updated state
3123        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3124        cache.insert("test:entity-1".to_string(), new_state);
3125
3126        // Verify overwrite
3127        let state = cache.get("test:entity-1").unwrap();
3128        assert_eq!(state["version"], 2);
3129        assert_eq!(state["data"], "updated");
3130    }
3131
3132    #[tokio::test]
3133    async fn test_bulk_save_high_volume() {
3134        let store = create_test_store();
3135        let cache = store.projection_state_cache();
3136
3137        // Simulate high volume save (1000 entities)
3138        for i in 0..1000 {
3139            cache.insert(
3140                format!("volume_test:entity-{}", i),
3141                serde_json::json!({"index": i, "status": "active"}),
3142            );
3143        }
3144
3145        // Verify count
3146        assert_eq!(cache.len(), 1000);
3147
3148        // Spot check some entries
3149        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3150        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3151        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3152    }
3153
3154    #[tokio::test]
3155    async fn test_bulk_save_different_projections() {
3156        let store = create_test_store();
3157        let cache = store.projection_state_cache();
3158
3159        // Save to multiple projections in bulk
3160        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3161
3162        for proj in projections.iter() {
3163            for i in 0..5 {
3164                cache.insert(
3165                    format!("{proj}:entity-{i}"),
3166                    serde_json::json!({"projection": proj, "id": i}),
3167                );
3168            }
3169        }
3170
3171        // Verify total count (3 projections * 5 entities)
3172        assert_eq!(cache.len(), 15);
3173
3174        // Verify each projection
3175        for proj in projections.iter() {
3176            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3177            assert_eq!(state["projection"], *proj);
3178        }
3179    }
3180}