Skip to main content

allsource_core/infrastructure/web/
api.rs

1use crate::{
2    application::{
3        dto::{
4            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
5            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
6            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
7            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
8            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
9        },
10        services::{
11            analytics::{
12                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
13                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
14            },
15            pipeline::{PipelineConfig, PipelineStats},
16            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
17            schema::{
18                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
19                ValidateEventRequest, ValidateEventResponse,
20            },
21            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
22        },
23    },
24    domain::entities::Event,
25    error::Result,
26    infrastructure::{
27        persistence::{
28            compaction::CompactionResult,
29            snapshot::{
30                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
31                ListSnapshotsResponse, SnapshotInfo,
32            },
33        },
34        query::{
35            geospatial::GeoQueryRequest,
36            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
37        },
38        replication::ReplicationMode,
39        web::api_v1::AppState,
40    },
41    store::{EventStore, EventTypeInfo, StreamInfo},
42};
43use axum::{
44    Json, Router,
45    extract::{Path, Query, State, WebSocketUpgrade},
46    response::{IntoResponse, Response},
47    routing::{get, post, put},
48};
49use serde::Deserialize;
50use std::sync::Arc;
51use tower_http::{
52    cors::{Any, CorsLayer},
53    trace::TraceLayer,
54};
55
56type SharedStore = Arc<EventStore>;
57
58/// Wait for follower ACK(s) in semi-sync/sync replication modes.
59///
60/// In async mode (default), returns immediately. In semi-sync mode, waits for
61/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
62/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
63async fn await_replication_ack(state: &AppState) {
64    let shipper_guard = state.wal_shipper.read().await;
65    if let Some(ref shipper) = *shipper_guard {
66        let mode = shipper.replication_mode();
67        if mode == ReplicationMode::Async {
68            return;
69        }
70
71        let target_offset = shipper.current_leader_offset();
72        if target_offset == 0 {
73            return;
74        }
75
76        let shipper = Arc::clone(shipper);
77        // Drop the read guard before the async wait to avoid holding it across await
78        drop(shipper_guard);
79
80        let timer = state
81            .store
82            .metrics()
83            .replication_ack_wait_seconds
84            .start_timer();
85        let acked = shipper.wait_for_ack(target_offset).await;
86        timer.observe_duration();
87
88        if !acked {
89            tracing::warn!(
90                "Replication ACK timeout in {} mode (offset {}). \
91                 Write succeeded locally but follower confirmation pending.",
92                mode,
93                target_offset,
94            );
95        }
96    }
97}
98
99pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
100    let app = Router::new()
101        .route("/health", get(health))
102        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
103        .route("/api/v1/events", post(ingest_event))
104        .route("/api/v1/events/batch", post(ingest_events_batch))
105        .route("/api/v1/events/query", get(query_events))
106        .route("/api/v1/events/{event_id}", get(get_event_by_id))
107        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
108        // v0.10: Stream and event type discovery endpoints
109        .route("/api/v1/streams", get(list_streams))
110        .route("/api/v1/event-types", get(list_event_types))
111        .route("/api/v1/entities/duplicates", get(detect_duplicates))
112        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
113        .route(
114            "/api/v1/entities/{entity_id}/snapshot",
115            get(get_entity_snapshot),
116        )
117        .route("/api/v1/stats", get(get_stats))
118        // v0.2: Advanced analytics endpoints
119        .route("/api/v1/analytics/frequency", get(analytics_frequency))
120        .route("/api/v1/analytics/summary", get(analytics_summary))
121        .route("/api/v1/analytics/correlation", get(analytics_correlation))
122        // v0.2: Snapshot management endpoints
123        .route("/api/v1/snapshots", post(create_snapshot))
124        .route("/api/v1/snapshots", get(list_snapshots))
125        .route(
126            "/api/v1/snapshots/{entity_id}/latest",
127            get(get_latest_snapshot),
128        )
129        // v0.2: Compaction endpoints
130        .route("/api/v1/compaction/trigger", post(trigger_compaction))
131        .route("/api/v1/compaction/stats", get(compaction_stats))
132        // v0.5: Schema registry endpoints
133        .route("/api/v1/schemas", post(register_schema))
134        .route("/api/v1/schemas", get(list_subjects))
135        .route("/api/v1/schemas/{subject}", get(get_schema))
136        .route(
137            "/api/v1/schemas/{subject}/versions",
138            get(list_schema_versions),
139        )
140        .route("/api/v1/schemas/validate", post(validate_event_schema))
141        .route(
142            "/api/v1/schemas/{subject}/compatibility",
143            put(set_compatibility_mode),
144        )
145        // v0.5: Replay and projection rebuild endpoints
146        .route("/api/v1/replay", post(start_replay))
147        .route("/api/v1/replay", get(list_replays))
148        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
149        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
150        .route(
151            "/api/v1/replay/{replay_id}",
152            axum::routing::delete(delete_replay),
153        )
154        // v0.5: Stream processing pipeline endpoints
155        .route("/api/v1/pipelines", post(register_pipeline))
156        .route("/api/v1/pipelines", get(list_pipelines))
157        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
158        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
159        .route(
160            "/api/v1/pipelines/{pipeline_id}",
161            axum::routing::delete(remove_pipeline),
162        )
163        .route(
164            "/api/v1/pipelines/{pipeline_id}/stats",
165            get(get_pipeline_stats),
166        )
167        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
168        // v0.7: Projection State API for Query Service integration
169        .route("/api/v1/projections", get(list_projections))
170        .route("/api/v1/projections/{name}", get(get_projection))
171        .route(
172            "/api/v1/projections/{name}",
173            axum::routing::delete(delete_projection),
174        )
175        .route(
176            "/api/v1/projections/{name}/state",
177            get(get_projection_state_summary),
178        )
179        .route("/api/v1/projections/{name}/reset", post(reset_projection))
180        .route(
181            "/api/v1/projections/{name}/{entity_id}/state",
182            get(get_projection_state),
183        )
184        .route(
185            "/api/v1/projections/{name}/{entity_id}/state",
186            post(save_projection_state),
187        )
188        .route(
189            "/api/v1/projections/{name}/{entity_id}/state",
190            put(save_projection_state),
191        )
192        .route(
193            "/api/v1/projections/{name}/bulk",
194            post(bulk_get_projection_states),
195        )
196        .route(
197            "/api/v1/projections/{name}/bulk/save",
198            post(bulk_save_projection_states),
199        )
200        // v0.11: Webhook management endpoints
201        .route("/api/v1/webhooks", post(register_webhook))
202        .route("/api/v1/webhooks", get(list_webhooks))
203        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
204        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
205        .route(
206            "/api/v1/webhooks/{webhook_id}",
207            axum::routing::delete(delete_webhook),
208        )
209        .route(
210            "/api/v1/webhooks/{webhook_id}/deliveries",
211            get(list_webhook_deliveries),
212        )
213        // v2.0: Advanced query features
214        .route("/api/v1/graphql", post(graphql_query))
215        .route("/api/v1/geospatial/query", post(geo_query))
216        .route("/api/v1/geospatial/stats", get(geo_stats))
217        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
218        .route(
219            "/api/v1/schema-evolution/history/{event_type}",
220            get(schema_evolution_history),
221        )
222        .route(
223            "/api/v1/schema-evolution/schema/{event_type}",
224            get(schema_evolution_schema),
225        )
226        .route(
227            "/api/v1/schema-evolution/stats",
228            get(schema_evolution_stats),
229        )
230        .layer(
231            CorsLayer::new()
232                .allow_origin(Any)
233                .allow_methods(Any)
234                .allow_headers(Any),
235        )
236        .layer(TraceLayer::new_for_http())
237        .with_state(store);
238
239    let listener = tokio::net::TcpListener::bind(addr).await?;
240    axum::serve(listener, app).await?;
241
242    Ok(())
243}
244
245pub async fn health() -> impl IntoResponse {
246    Json(serde_json::json!({
247        "status": "healthy",
248        "service": "allsource-core",
249        "version": env!("CARGO_PKG_VERSION")
250    }))
251}
252
253// v0.6: Prometheus metrics endpoint
254pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
255    let metrics = store.metrics();
256
257    match metrics.encode() {
258        Ok(encoded) => Response::builder()
259            .status(200)
260            .header("Content-Type", "text/plain; version=0.0.4")
261            .body(encoded)
262            .unwrap()
263            .into_response(),
264        Err(e) => Response::builder()
265            .status(500)
266            .body(format!("Error encoding metrics: {e}"))
267            .unwrap()
268            .into_response(),
269    }
270}
271
272pub async fn ingest_event(
273    State(store): State<SharedStore>,
274    Json(req): Json<IngestEventRequest>,
275) -> Result<Json<IngestEventResponse>> {
276    let expected_version = req.expected_version;
277
278    // Create event using from_strings with default tenant
279    let event = Event::from_strings(
280        req.event_type,
281        req.entity_id,
282        "default".to_string(),
283        req.payload,
284        req.metadata,
285    )?;
286
287    let event_id = event.id;
288    let timestamp = event.timestamp;
289
290    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
291
292    tracing::info!("Event ingested: {}", event_id);
293
294    Ok(Json(IngestEventResponse {
295        event_id,
296        timestamp,
297        version: Some(new_version),
298    }))
299}
300
301/// Ingest a single event with semi-sync/sync replication ACK waiting.
302///
303/// Used by the v1 API (with auth and replication support).
304pub async fn ingest_event_v1(
305    State(state): State<AppState>,
306    Json(req): Json<IngestEventRequest>,
307) -> Result<Json<IngestEventResponse>> {
308    let expected_version = req.expected_version;
309
310    let event = Event::from_strings(
311        req.event_type,
312        req.entity_id,
313        "default".to_string(),
314        req.payload,
315        req.metadata,
316    )?;
317
318    let event_id = event.id;
319    let timestamp = event.timestamp;
320
321    let new_version = state
322        .store
323        .ingest_with_expected_version(&event, expected_version)?;
324
325    // Semi-sync/sync: wait for follower ACK(s) before returning
326    await_replication_ack(&state).await;
327
328    tracing::info!("Event ingested: {}", event_id);
329
330    Ok(Json(IngestEventResponse {
331        event_id,
332        timestamp,
333        version: Some(new_version),
334    }))
335}
336
337/// Batch ingest multiple events in a single request
338///
339/// This endpoint allows ingesting multiple events atomically, which is more
340/// efficient than making individual requests for each event.
341pub async fn ingest_events_batch(
342    State(store): State<SharedStore>,
343    Json(req): Json<IngestEventsBatchRequest>,
344) -> Result<Json<IngestEventsBatchResponse>> {
345    let total = req.events.len();
346    let mut ingested_events = Vec::with_capacity(total);
347
348    for event_req in req.events {
349        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
350        let expected_version = event_req.expected_version;
351
352        let event = Event::from_strings(
353            event_req.event_type,
354            event_req.entity_id,
355            tenant_id,
356            event_req.payload,
357            event_req.metadata,
358        )?;
359
360        let event_id = event.id;
361        let timestamp = event.timestamp;
362
363        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
364
365        ingested_events.push(IngestEventResponse {
366            event_id,
367            timestamp,
368            version: Some(new_version),
369        });
370    }
371
372    let ingested = ingested_events.len();
373    tracing::info!("Batch ingested {} events", ingested);
374
375    Ok(Json(IngestEventsBatchResponse {
376        total,
377        ingested,
378        events: ingested_events,
379    }))
380}
381
382/// Batch ingest with semi-sync/sync replication ACK waiting.
383///
384/// Used by the v1 API (with auth and replication support).
385pub async fn ingest_events_batch_v1(
386    State(state): State<AppState>,
387    Json(req): Json<IngestEventsBatchRequest>,
388) -> Result<Json<IngestEventsBatchResponse>> {
389    let total = req.events.len();
390    let mut ingested_events = Vec::with_capacity(total);
391
392    for event_req in req.events {
393        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
394        let expected_version = event_req.expected_version;
395
396        let event = Event::from_strings(
397            event_req.event_type,
398            event_req.entity_id,
399            tenant_id,
400            event_req.payload,
401            event_req.metadata,
402        )?;
403
404        let event_id = event.id;
405        let timestamp = event.timestamp;
406
407        let new_version = state
408            .store
409            .ingest_with_expected_version(&event, expected_version)?;
410
411        ingested_events.push(IngestEventResponse {
412            event_id,
413            timestamp,
414            version: Some(new_version),
415        });
416    }
417
418    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
419    await_replication_ack(&state).await;
420
421    let ingested = ingested_events.len();
422    tracing::info!("Batch ingested {} events", ingested);
423
424    Ok(Json(IngestEventsBatchResponse {
425        total,
426        ingested,
427        events: ingested_events,
428    }))
429}
430
431pub async fn query_events(
432    State(store): State<SharedStore>,
433    Query(req): Query<QueryEventsRequest>,
434) -> Result<Json<QueryEventsResponse>> {
435    let requested_limit = req.limit;
436    let queried_entity_id = req.entity_id.clone();
437
438    // Query without limit to get total count
439    let unlimited_req = QueryEventsRequest {
440        entity_id: req.entity_id,
441        event_type: req.event_type,
442        tenant_id: req.tenant_id,
443        as_of: req.as_of,
444        since: req.since,
445        until: req.until,
446        limit: None,
447        event_type_prefix: req.event_type_prefix,
448        payload_filter: req.payload_filter,
449    };
450    let all_events = store.query(&unlimited_req)?;
451    let total_count = all_events.len();
452
453    // Apply limit
454    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
455        all_events.into_iter().take(limit).collect()
456    } else {
457        all_events
458    };
459
460    let count = limited_events.len();
461    let has_more = count < total_count;
462    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
463
464    // Include entity_version only when filtering by a single entity_id
465    let entity_version = queried_entity_id
466        .as_deref()
467        .map(|eid| store.get_entity_version(eid));
468
469    tracing::debug!("Query returned {} events (total: {})", count, total_count);
470
471    Ok(Json(QueryEventsResponse {
472        events,
473        count,
474        total_count,
475        has_more,
476        entity_version,
477    }))
478}
479
480pub async fn list_entities(
481    State(store): State<SharedStore>,
482    Query(req): Query<ListEntitiesRequest>,
483) -> Result<Json<ListEntitiesResponse>> {
484    use std::collections::HashMap;
485
486    // Get all events matching the filters
487    let query_req = QueryEventsRequest {
488        entity_id: None,
489        event_type: None,
490        tenant_id: None,
491        as_of: None,
492        since: None,
493        until: None,
494        limit: None,
495        event_type_prefix: req.event_type_prefix,
496        payload_filter: req.payload_filter,
497    };
498    let events = store.query(&query_req)?;
499
500    // Group by entity_id
501    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
502    for event in &events {
503        entity_map
504            .entry(event.entity_id().to_string())
505            .or_default()
506            .push(event);
507    }
508
509    // Build entity summaries sorted by last event time (descending)
510    let mut summaries: Vec<EntitySummary> = entity_map
511        .into_iter()
512        .map(|(entity_id, events)| {
513            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
514            EntitySummary {
515                entity_id,
516                event_count: events.len(),
517                last_event_type: last.event_type_str().to_string(),
518                last_event_at: last.timestamp(),
519            }
520        })
521        .collect();
522    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
523
524    let total = summaries.len();
525
526    // Apply offset and limit
527    let offset = req.offset.unwrap_or(0);
528    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
529    let summaries = if let Some(limit) = req.limit {
530        let has_more = summaries.len() > limit;
531        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
532        return Ok(Json(ListEntitiesResponse {
533            entities: truncated,
534            total,
535            has_more,
536        }));
537    } else {
538        summaries
539    };
540
541    Ok(Json(ListEntitiesResponse {
542        entities: summaries,
543        total,
544        has_more: false,
545    }))
546}
547
548pub async fn detect_duplicates(
549    State(store): State<SharedStore>,
550    Query(req): Query<DetectDuplicatesRequest>,
551) -> Result<Json<DetectDuplicatesResponse>> {
552    use std::collections::HashMap;
553
554    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
555
556    // Query events scoped by the required prefix
557    let query_req = QueryEventsRequest {
558        entity_id: None,
559        event_type: None,
560        tenant_id: None,
561        as_of: None,
562        since: None,
563        until: None,
564        limit: None,
565        event_type_prefix: Some(req.event_type_prefix),
566        payload_filter: None,
567    };
568    let events = store.query(&query_req)?;
569
570    // For each entity, extract the latest event's payload fields specified by group_by
571    // Then group entities by those field values
572    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
573    for event in &events {
574        let eid = event.entity_id().to_string();
575        entity_latest
576            .entry(eid)
577            .and_modify(|existing| {
578                if event.timestamp() > existing.timestamp() {
579                    *existing = event;
580                }
581            })
582            .or_insert(event);
583    }
584
585    // Group entities by their payload field values
586    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
587    for (entity_id, event) in &entity_latest {
588        let payload = event.payload();
589        let mut key_parts = serde_json::Map::new();
590        for field in &group_by_fields {
591            let value = payload
592                .get(*field)
593                .cloned()
594                .unwrap_or(serde_json::Value::Null);
595            key_parts.insert((*field).to_string(), value);
596        }
597        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
598        groups.entry(key_str).or_default().push(entity_id.clone());
599    }
600
601    // Filter to groups with count > 1 (actual duplicates)
602    let mut duplicate_groups: Vec<DuplicateGroup> = groups
603        .into_iter()
604        .filter(|(_, ids)| ids.len() > 1)
605        .map(|(key_str, mut ids)| {
606            ids.sort();
607            let key: serde_json::Value =
608                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
609            let count = ids.len();
610            DuplicateGroup {
611                key,
612                entity_ids: ids,
613                count,
614            }
615        })
616        .collect();
617
618    // Sort by count descending for consistent output
619    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
620
621    let total = duplicate_groups.len();
622
623    // Apply offset and limit
624    let offset = req.offset.unwrap_or(0);
625    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
626
627    if let Some(limit) = req.limit {
628        let has_more = duplicate_groups.len() > limit;
629        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
630        return Ok(Json(DetectDuplicatesResponse {
631            duplicates: truncated,
632            total,
633            has_more,
634        }));
635    }
636
637    Ok(Json(DetectDuplicatesResponse {
638        duplicates: duplicate_groups,
639        total,
640        has_more: false,
641    }))
642}
643
644#[derive(Deserialize)]
645pub struct EntityStateParams {
646    as_of: Option<chrono::DateTime<chrono::Utc>>,
647}
648
649pub async fn get_entity_state(
650    State(store): State<SharedStore>,
651    Path(entity_id): Path<String>,
652    Query(params): Query<EntityStateParams>,
653) -> Result<Json<serde_json::Value>> {
654    let state = store.reconstruct_state(&entity_id, params.as_of)?;
655
656    tracing::info!("State reconstructed for entity: {}", entity_id);
657
658    Ok(Json(state))
659}
660
661pub async fn get_entity_snapshot(
662    State(store): State<SharedStore>,
663    Path(entity_id): Path<String>,
664) -> Result<Json<serde_json::Value>> {
665    let snapshot = store.get_snapshot(&entity_id)?;
666
667    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
668
669    Ok(Json(snapshot))
670}
671
672pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
673    let stats = store.stats();
674    Json(stats)
675}
676
677// v0.10: List all streams (entity_ids) in the event store
678/// Query parameters for listing streams
679#[derive(Debug, Deserialize)]
680pub struct ListStreamsParams {
681    /// Optional limit on number of streams to return
682    pub limit: Option<usize>,
683    /// Optional offset for pagination
684    pub offset: Option<usize>,
685}
686
687/// Response for listing streams
688#[derive(Debug, serde::Serialize)]
689pub struct ListStreamsResponse {
690    pub streams: Vec<StreamInfo>,
691    pub total: usize,
692}
693
694pub async fn list_streams(
695    State(store): State<SharedStore>,
696    Query(params): Query<ListStreamsParams>,
697) -> Json<ListStreamsResponse> {
698    let mut streams = store.list_streams();
699    let total = streams.len();
700
701    // Sort by last_event_at descending (most recent first)
702    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
703
704    // Apply pagination
705    if let Some(offset) = params.offset {
706        if offset < streams.len() {
707            streams = streams[offset..].to_vec();
708        } else {
709            streams = vec![];
710        }
711    }
712
713    if let Some(limit) = params.limit {
714        streams.truncate(limit);
715    }
716
717    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
718
719    Json(ListStreamsResponse { streams, total })
720}
721
722// v0.10: List all event types in the event store
723/// Query parameters for listing event types
724#[derive(Debug, Deserialize)]
725pub struct ListEventTypesParams {
726    /// Optional limit on number of event types to return
727    pub limit: Option<usize>,
728    /// Optional offset for pagination
729    pub offset: Option<usize>,
730}
731
732/// Response for listing event types
733#[derive(Debug, serde::Serialize)]
734pub struct ListEventTypesResponse {
735    pub event_types: Vec<EventTypeInfo>,
736    pub total: usize,
737}
738
739pub async fn list_event_types(
740    State(store): State<SharedStore>,
741    Query(params): Query<ListEventTypesParams>,
742) -> Json<ListEventTypesResponse> {
743    let mut event_types = store.list_event_types();
744    let total = event_types.len();
745
746    // Sort by event_count descending (most used first)
747    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
748
749    // Apply pagination
750    if let Some(offset) = params.offset {
751        if offset < event_types.len() {
752            event_types = event_types[offset..].to_vec();
753        } else {
754            event_types = vec![];
755        }
756    }
757
758    if let Some(limit) = params.limit {
759        event_types.truncate(limit);
760    }
761
762    tracing::debug!(
763        "Listed {} event types (total: {})",
764        event_types.len(),
765        total
766    );
767
768    Json(ListEventTypesResponse { event_types, total })
769}
770
771// v0.2: WebSocket endpoint for real-time event streaming
772#[derive(Debug, Deserialize)]
773pub struct WebSocketParams {
774    pub consumer_id: Option<String>,
775}
776
777pub async fn events_websocket(
778    ws: WebSocketUpgrade,
779    State(store): State<SharedStore>,
780    Query(params): Query<WebSocketParams>,
781) -> Response {
782    let websocket_manager = store.websocket_manager();
783
784    ws.on_upgrade(move |socket| async move {
785        if let Some(consumer_id) = params.consumer_id {
786            websocket_manager
787                .handle_socket_with_consumer(socket, consumer_id, store)
788                .await;
789        } else {
790            websocket_manager.handle_socket(socket).await;
791        }
792    })
793}
794
795// v0.2: Event frequency analytics endpoint
796pub async fn analytics_frequency(
797    State(store): State<SharedStore>,
798    Query(req): Query<EventFrequencyRequest>,
799) -> Result<Json<EventFrequencyResponse>> {
800    let response = AnalyticsEngine::event_frequency(&store, &req)?;
801
802    tracing::debug!(
803        "Frequency analysis returned {} buckets",
804        response.buckets.len()
805    );
806
807    Ok(Json(response))
808}
809
810// v0.2: Statistical summary endpoint
811pub async fn analytics_summary(
812    State(store): State<SharedStore>,
813    Query(req): Query<StatsSummaryRequest>,
814) -> Result<Json<StatsSummaryResponse>> {
815    let response = AnalyticsEngine::stats_summary(&store, &req)?;
816
817    tracing::debug!(
818        "Stats summary: {} events across {} entities",
819        response.total_events,
820        response.unique_entities
821    );
822
823    Ok(Json(response))
824}
825
826// v0.2: Event correlation analysis endpoint
827pub async fn analytics_correlation(
828    State(store): State<SharedStore>,
829    Query(req): Query<CorrelationRequest>,
830) -> Result<Json<CorrelationResponse>> {
831    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
832
833    tracing::debug!(
834        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
835        response.correlated_pairs,
836        response.total_a,
837        response.correlation_percentage
838    );
839
840    Ok(Json(response))
841}
842
843// v0.2: Create a snapshot for an entity
844pub async fn create_snapshot(
845    State(store): State<SharedStore>,
846    Json(req): Json<CreateSnapshotRequest>,
847) -> Result<Json<CreateSnapshotResponse>> {
848    store.create_snapshot(&req.entity_id)?;
849
850    let snapshot_manager = store.snapshot_manager();
851    let snapshot = snapshot_manager
852        .get_latest_snapshot(&req.entity_id)
853        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
854
855    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
856
857    Ok(Json(CreateSnapshotResponse {
858        snapshot_id: snapshot.id,
859        entity_id: snapshot.entity_id,
860        created_at: snapshot.created_at,
861        event_count: snapshot.event_count,
862        size_bytes: snapshot.metadata.size_bytes,
863    }))
864}
865
866// v0.2: List snapshots
867pub async fn list_snapshots(
868    State(store): State<SharedStore>,
869    Query(req): Query<ListSnapshotsRequest>,
870) -> Result<Json<ListSnapshotsResponse>> {
871    let snapshot_manager = store.snapshot_manager();
872
873    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
874        snapshot_manager
875            .get_all_snapshots(&entity_id)
876            .into_iter()
877            .map(SnapshotInfo::from)
878            .collect()
879    } else {
880        // List all entities with snapshots
881        let entities = snapshot_manager.list_entities();
882        entities
883            .iter()
884            .flat_map(|entity_id| {
885                snapshot_manager
886                    .get_all_snapshots(entity_id)
887                    .into_iter()
888                    .map(SnapshotInfo::from)
889            })
890            .collect()
891    };
892
893    let total = snapshots.len();
894
895    tracing::debug!("Listed {} snapshots", total);
896
897    Ok(Json(ListSnapshotsResponse { snapshots, total }))
898}
899
900// v0.2: Get latest snapshot for an entity
901pub async fn get_latest_snapshot(
902    State(store): State<SharedStore>,
903    Path(entity_id): Path<String>,
904) -> Result<Json<serde_json::Value>> {
905    let snapshot_manager = store.snapshot_manager();
906
907    let snapshot = snapshot_manager
908        .get_latest_snapshot(&entity_id)
909        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
910
911    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
912
913    Ok(Json(serde_json::json!({
914        "snapshot_id": snapshot.id,
915        "entity_id": snapshot.entity_id,
916        "created_at": snapshot.created_at,
917        "as_of": snapshot.as_of,
918        "event_count": snapshot.event_count,
919        "size_bytes": snapshot.metadata.size_bytes,
920        "snapshot_type": snapshot.metadata.snapshot_type,
921        "state": snapshot.state
922    })))
923}
924
925// v0.2: Trigger manual compaction
926pub async fn trigger_compaction(
927    State(store): State<SharedStore>,
928) -> Result<Json<CompactionResult>> {
929    let compaction_manager = store.compaction_manager().ok_or_else(|| {
930        crate::error::AllSourceError::InternalError(
931            "Compaction not enabled (no Parquet storage)".to_string(),
932        )
933    })?;
934
935    tracing::info!("📦 Manual compaction triggered via API");
936
937    let result = compaction_manager.compact_now()?;
938
939    Ok(Json(result))
940}
941
942// v0.2: Get compaction statistics
943pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
944    let compaction_manager = store.compaction_manager().ok_or_else(|| {
945        crate::error::AllSourceError::InternalError(
946            "Compaction not enabled (no Parquet storage)".to_string(),
947        )
948    })?;
949
950    let stats = compaction_manager.stats();
951    let config = compaction_manager.config();
952
953    Ok(Json(serde_json::json!({
954        "stats": stats,
955        "config": {
956            "min_files_to_compact": config.min_files_to_compact,
957            "target_file_size": config.target_file_size,
958            "max_file_size": config.max_file_size,
959            "small_file_threshold": config.small_file_threshold,
960            "compaction_interval_seconds": config.compaction_interval_seconds,
961            "auto_compact": config.auto_compact,
962            "strategy": config.strategy
963        }
964    })))
965}
966
967// v0.5: Register a new schema
968pub async fn register_schema(
969    State(store): State<SharedStore>,
970    Json(req): Json<RegisterSchemaRequest>,
971) -> Result<Json<RegisterSchemaResponse>> {
972    let schema_registry = store.schema_registry();
973
974    let response =
975        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
976
977    tracing::info!(
978        "📋 Schema registered: v{} for '{}'",
979        response.version,
980        response.subject
981    );
982
983    Ok(Json(response))
984}
985
986// v0.5: Get a schema by subject and optional version
987#[derive(Deserialize)]
988pub struct GetSchemaParams {
989    version: Option<u32>,
990}
991
992pub async fn get_schema(
993    State(store): State<SharedStore>,
994    Path(subject): Path<String>,
995    Query(params): Query<GetSchemaParams>,
996) -> Result<Json<serde_json::Value>> {
997    let schema_registry = store.schema_registry();
998
999    let schema = schema_registry.get_schema(&subject, params.version)?;
1000
1001    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1002
1003    Ok(Json(serde_json::json!({
1004        "id": schema.id,
1005        "subject": schema.subject,
1006        "version": schema.version,
1007        "schema": schema.schema,
1008        "created_at": schema.created_at,
1009        "description": schema.description,
1010        "tags": schema.tags
1011    })))
1012}
1013
1014// v0.5: List all versions of a schema subject
1015pub async fn list_schema_versions(
1016    State(store): State<SharedStore>,
1017    Path(subject): Path<String>,
1018) -> Result<Json<serde_json::Value>> {
1019    let schema_registry = store.schema_registry();
1020
1021    let versions = schema_registry.list_versions(&subject)?;
1022
1023    Ok(Json(serde_json::json!({
1024        "subject": subject,
1025        "versions": versions
1026    })))
1027}
1028
1029// v0.5: List all schema subjects
1030pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1031    let schema_registry = store.schema_registry();
1032
1033    let subjects = schema_registry.list_subjects();
1034
1035    Json(serde_json::json!({
1036        "subjects": subjects,
1037        "total": subjects.len()
1038    }))
1039}
1040
1041// v0.5: Validate an event against a schema
1042pub async fn validate_event_schema(
1043    State(store): State<SharedStore>,
1044    Json(req): Json<ValidateEventRequest>,
1045) -> Result<Json<ValidateEventResponse>> {
1046    let schema_registry = store.schema_registry();
1047
1048    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1049
1050    if response.valid {
1051        tracing::debug!(
1052            "✅ Event validated against schema '{}' v{}",
1053            req.subject,
1054            response.schema_version
1055        );
1056    } else {
1057        tracing::warn!(
1058            "❌ Event validation failed for '{}': {:?}",
1059            req.subject,
1060            response.errors
1061        );
1062    }
1063
1064    Ok(Json(response))
1065}
1066
1067// v0.5: Set compatibility mode for a subject
1068#[derive(Deserialize)]
1069pub struct SetCompatibilityRequest {
1070    compatibility: CompatibilityMode,
1071}
1072
1073pub async fn set_compatibility_mode(
1074    State(store): State<SharedStore>,
1075    Path(subject): Path<String>,
1076    Json(req): Json<SetCompatibilityRequest>,
1077) -> Json<serde_json::Value> {
1078    let schema_registry = store.schema_registry();
1079
1080    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1081
1082    tracing::info!(
1083        "🔧 Set compatibility mode for '{}' to {:?}",
1084        subject,
1085        req.compatibility
1086    );
1087
1088    Json(serde_json::json!({
1089        "subject": subject,
1090        "compatibility": req.compatibility
1091    }))
1092}
1093
1094// v0.5: Start a replay operation
1095pub async fn start_replay(
1096    State(store): State<SharedStore>,
1097    Json(req): Json<StartReplayRequest>,
1098) -> Result<Json<StartReplayResponse>> {
1099    let replay_manager = store.replay_manager();
1100
1101    let response = replay_manager.start_replay(store, req)?;
1102
1103    tracing::info!(
1104        "🔄 Started replay {} with {} events",
1105        response.replay_id,
1106        response.total_events
1107    );
1108
1109    Ok(Json(response))
1110}
1111
1112// v0.5: Get replay progress
1113pub async fn get_replay_progress(
1114    State(store): State<SharedStore>,
1115    Path(replay_id): Path<uuid::Uuid>,
1116) -> Result<Json<ReplayProgress>> {
1117    let replay_manager = store.replay_manager();
1118
1119    let progress = replay_manager.get_progress(replay_id)?;
1120
1121    Ok(Json(progress))
1122}
1123
1124// v0.5: List all replay operations
1125pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1126    let replay_manager = store.replay_manager();
1127
1128    let replays = replay_manager.list_replays();
1129
1130    Json(serde_json::json!({
1131        "replays": replays,
1132        "total": replays.len()
1133    }))
1134}
1135
1136// v0.5: Cancel a running replay
1137pub async fn cancel_replay(
1138    State(store): State<SharedStore>,
1139    Path(replay_id): Path<uuid::Uuid>,
1140) -> Result<Json<serde_json::Value>> {
1141    let replay_manager = store.replay_manager();
1142
1143    replay_manager.cancel_replay(replay_id)?;
1144
1145    tracing::info!("🛑 Cancelled replay {}", replay_id);
1146
1147    Ok(Json(serde_json::json!({
1148        "replay_id": replay_id,
1149        "status": "cancelled"
1150    })))
1151}
1152
1153// v0.5: Delete a completed replay
1154pub async fn delete_replay(
1155    State(store): State<SharedStore>,
1156    Path(replay_id): Path<uuid::Uuid>,
1157) -> Result<Json<serde_json::Value>> {
1158    let replay_manager = store.replay_manager();
1159
1160    let deleted = replay_manager.delete_replay(replay_id)?;
1161
1162    if deleted {
1163        tracing::info!("🗑️  Deleted replay {}", replay_id);
1164    }
1165
1166    Ok(Json(serde_json::json!({
1167        "replay_id": replay_id,
1168        "deleted": deleted
1169    })))
1170}
1171
1172// v0.5: Register a new pipeline
1173pub async fn register_pipeline(
1174    State(store): State<SharedStore>,
1175    Json(config): Json<PipelineConfig>,
1176) -> Result<Json<serde_json::Value>> {
1177    let pipeline_manager = store.pipeline_manager();
1178
1179    let pipeline_id = pipeline_manager.register(config.clone());
1180
1181    tracing::info!(
1182        "🔀 Pipeline registered: {} (name: {})",
1183        pipeline_id,
1184        config.name
1185    );
1186
1187    Ok(Json(serde_json::json!({
1188        "pipeline_id": pipeline_id,
1189        "name": config.name,
1190        "enabled": config.enabled
1191    })))
1192}
1193
1194// v0.5: List all pipelines
1195pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1196    let pipeline_manager = store.pipeline_manager();
1197
1198    let pipelines = pipeline_manager.list();
1199
1200    tracing::debug!("Listed {} pipelines", pipelines.len());
1201
1202    Json(serde_json::json!({
1203        "pipelines": pipelines,
1204        "total": pipelines.len()
1205    }))
1206}
1207
1208// v0.5: Get a specific pipeline
1209pub async fn get_pipeline(
1210    State(store): State<SharedStore>,
1211    Path(pipeline_id): Path<uuid::Uuid>,
1212) -> Result<Json<PipelineConfig>> {
1213    let pipeline_manager = store.pipeline_manager();
1214
1215    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1216        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1217    })?;
1218
1219    Ok(Json(pipeline.config().clone()))
1220}
1221
1222// v0.5: Remove a pipeline
1223pub async fn remove_pipeline(
1224    State(store): State<SharedStore>,
1225    Path(pipeline_id): Path<uuid::Uuid>,
1226) -> Result<Json<serde_json::Value>> {
1227    let pipeline_manager = store.pipeline_manager();
1228
1229    let removed = pipeline_manager.remove(pipeline_id);
1230
1231    if removed {
1232        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1233    }
1234
1235    Ok(Json(serde_json::json!({
1236        "pipeline_id": pipeline_id,
1237        "removed": removed
1238    })))
1239}
1240
1241// v0.5: Get statistics for all pipelines
1242pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1243    let pipeline_manager = store.pipeline_manager();
1244
1245    let stats = pipeline_manager.all_stats();
1246
1247    Json(serde_json::json!({
1248        "stats": stats,
1249        "total": stats.len()
1250    }))
1251}
1252
1253// v0.5: Get statistics for a specific pipeline
1254pub async fn get_pipeline_stats(
1255    State(store): State<SharedStore>,
1256    Path(pipeline_id): Path<uuid::Uuid>,
1257) -> Result<Json<PipelineStats>> {
1258    let pipeline_manager = store.pipeline_manager();
1259
1260    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1261        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1262    })?;
1263
1264    Ok(Json(pipeline.stats()))
1265}
1266
1267// v0.5: Reset a pipeline's state
1268pub async fn reset_pipeline(
1269    State(store): State<SharedStore>,
1270    Path(pipeline_id): Path<uuid::Uuid>,
1271) -> Result<Json<serde_json::Value>> {
1272    let pipeline_manager = store.pipeline_manager();
1273
1274    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1275        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1276    })?;
1277
1278    pipeline.reset();
1279
1280    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1281
1282    Ok(Json(serde_json::json!({
1283        "pipeline_id": pipeline_id,
1284        "reset": true
1285    })))
1286}
1287
1288// =============================================================================
1289// v0.11: Single Event Lookup by ID
1290// =============================================================================
1291
1292/// Get a single event by UUID
1293pub async fn get_event_by_id(
1294    State(store): State<SharedStore>,
1295    Path(event_id): Path<uuid::Uuid>,
1296) -> Result<Json<serde_json::Value>> {
1297    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1298        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1299    })?;
1300
1301    let dto = EventDto::from(&event);
1302
1303    tracing::debug!("Event retrieved by ID: {}", event_id);
1304
1305    Ok(Json(serde_json::json!({
1306        "event": dto,
1307        "found": true
1308    })))
1309}
1310
1311// =============================================================================
1312// v0.7: Projection State API for Query Service Integration
1313// =============================================================================
1314
1315/// List all registered projections
1316pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1317    let projection_manager = store.projection_manager();
1318    let status_map = store.projection_status();
1319
1320    let projections: Vec<serde_json::Value> = projection_manager
1321        .list_projections()
1322        .iter()
1323        .map(|(name, projection)| {
1324            let status = status_map
1325                .get(name)
1326                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1327            serde_json::json!({
1328                "name": name,
1329                "type": format!("{:?}", projection.name()),
1330                "status": status,
1331            })
1332        })
1333        .collect();
1334
1335    tracing::debug!("Listed {} projections", projections.len());
1336
1337    Json(serde_json::json!({
1338        "projections": projections,
1339        "total": projections.len()
1340    }))
1341}
1342
1343/// Get projection metadata by name
1344pub async fn get_projection(
1345    State(store): State<SharedStore>,
1346    Path(name): Path<String>,
1347) -> Result<Json<serde_json::Value>> {
1348    let projection_manager = store.projection_manager();
1349
1350    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1351        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1352    })?;
1353
1354    Ok(Json(serde_json::json!({
1355        "name": projection.name(),
1356        "found": true
1357    })))
1358}
1359
1360/// Get projection state for a specific entity
1361///
1362/// This endpoint allows the Elixir Query Service to fetch projection state
1363/// from the Rust Core for synchronization.
1364pub async fn get_projection_state(
1365    State(store): State<SharedStore>,
1366    Path((name, entity_id)): Path<(String, String)>,
1367) -> Result<Json<serde_json::Value>> {
1368    let projection_manager = store.projection_manager();
1369
1370    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1371        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1372    })?;
1373
1374    let state = projection.get_state(&entity_id);
1375
1376    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1377
1378    Ok(Json(serde_json::json!({
1379        "projection": name,
1380        "entity_id": entity_id,
1381        "state": state,
1382        "found": state.is_some()
1383    })))
1384}
1385
1386/// Delete (clear) a projection by name
1387///
1388/// Removes all state from the projection. The projection definition remains
1389/// registered but its accumulated state is cleared.
1390pub async fn delete_projection(
1391    State(store): State<SharedStore>,
1392    Path(name): Path<String>,
1393) -> Result<Json<serde_json::Value>> {
1394    let projection_manager = store.projection_manager();
1395
1396    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1397        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1398    })?;
1399
1400    projection.clear();
1401
1402    // Also clear any cached state for this projection
1403    let cache = store.projection_state_cache();
1404    let prefix = format!("{name}:");
1405    let keys_to_remove: Vec<String> = cache
1406        .iter()
1407        .filter(|entry| entry.key().starts_with(&prefix))
1408        .map(|entry| entry.key().clone())
1409        .collect();
1410    for key in keys_to_remove {
1411        cache.remove(&key);
1412    }
1413
1414    tracing::info!("Projection deleted (cleared): {}", name);
1415
1416    Ok(Json(serde_json::json!({
1417        "projection": name,
1418        "deleted": true
1419    })))
1420}
1421
1422/// Get aggregate projection state (all entities)
1423///
1424/// Returns summary information about a projection's state across all entities.
1425pub async fn get_projection_state_summary(
1426    State(store): State<SharedStore>,
1427    Path(name): Path<String>,
1428) -> Result<Json<serde_json::Value>> {
1429    let projection_manager = store.projection_manager();
1430
1431    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1432        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1433    })?;
1434
1435    // Collect cached states for this projection
1436    let cache = store.projection_state_cache();
1437    let prefix = format!("{name}:");
1438    let states: Vec<serde_json::Value> = cache
1439        .iter()
1440        .filter(|entry| entry.key().starts_with(&prefix))
1441        .map(|entry| {
1442            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1443            serde_json::json!({
1444                "entity_id": entity_id,
1445                "state": entry.value().clone()
1446            })
1447        })
1448        .collect();
1449
1450    let total = states.len();
1451
1452    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1453
1454    Ok(Json(serde_json::json!({
1455        "projection": name,
1456        "states": states,
1457        "total": total
1458    })))
1459}
1460
1461/// Reset a projection to its initial state
1462///
1463/// Clears all accumulated state and reprocesses events from the beginning.
1464pub async fn reset_projection(
1465    State(store): State<SharedStore>,
1466    Path(name): Path<String>,
1467) -> Result<Json<serde_json::Value>> {
1468    let reprocessed = store.reset_projection(&name)?;
1469
1470    tracing::info!(
1471        "Projection reset: {} ({} events reprocessed)",
1472        name,
1473        reprocessed
1474    );
1475
1476    Ok(Json(serde_json::json!({
1477        "projection": name,
1478        "reset": true,
1479        "events_reprocessed": reprocessed
1480    })))
1481}
1482
1483/// Pause a projection
1484///
1485/// Sets the projection status to "paused" so it stops processing new events.
1486pub async fn pause_projection(
1487    State(store): State<SharedStore>,
1488    Path(name): Path<String>,
1489) -> Result<Json<serde_json::Value>> {
1490    let projection_manager = store.projection_manager();
1491
1492    // Verify projection exists
1493    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1494        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1495    })?;
1496
1497    store
1498        .projection_status()
1499        .insert(name.clone(), "paused".to_string());
1500
1501    tracing::info!("Projection paused: {}", name);
1502
1503    Ok(Json(serde_json::json!({
1504        "projection": name,
1505        "status": "paused"
1506    })))
1507}
1508
1509/// Start (resume) a projection
1510///
1511/// Sets the projection status to "running" so it resumes processing events.
1512pub async fn start_projection(
1513    State(store): State<SharedStore>,
1514    Path(name): Path<String>,
1515) -> Result<Json<serde_json::Value>> {
1516    let projection_manager = store.projection_manager();
1517
1518    // Verify projection exists
1519    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1520        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1521    })?;
1522
1523    store
1524        .projection_status()
1525        .insert(name.clone(), "running".to_string());
1526
1527    tracing::info!("Projection started: {}", name);
1528
1529    Ok(Json(serde_json::json!({
1530        "projection": name,
1531        "status": "running"
1532    })))
1533}
1534
1535/// Request body for saving projection state
1536#[derive(Debug, Deserialize)]
1537pub struct SaveProjectionStateRequest {
1538    pub state: serde_json::Value,
1539}
1540
1541/// Save/update projection state for an entity
1542///
1543/// This endpoint allows external services (like Elixir Query Service) to
1544/// store computed projection state back to the Core for persistence.
1545pub async fn save_projection_state(
1546    State(store): State<SharedStore>,
1547    Path((name, entity_id)): Path<(String, String)>,
1548    Json(req): Json<SaveProjectionStateRequest>,
1549) -> Result<Json<serde_json::Value>> {
1550    let projection_cache = store.projection_state_cache();
1551
1552    // Store in the projection state cache
1553    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1554
1555    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1556
1557    Ok(Json(serde_json::json!({
1558        "projection": name,
1559        "entity_id": entity_id,
1560        "saved": true
1561    })))
1562}
1563
1564/// Bulk get projection states for multiple entities
1565///
1566/// Efficient endpoint for fetching multiple entity states in a single request.
1567#[derive(Debug, Deserialize)]
1568pub struct BulkGetStateRequest {
1569    pub entity_ids: Vec<String>,
1570}
1571
1572/// Bulk save projection states for multiple entities
1573///
1574/// Efficient endpoint for saving multiple entity states in a single request.
1575#[derive(Debug, Deserialize)]
1576pub struct BulkSaveStateRequest {
1577    pub states: Vec<BulkSaveStateItem>,
1578}
1579
1580#[derive(Debug, Deserialize)]
1581pub struct BulkSaveStateItem {
1582    pub entity_id: String,
1583    pub state: serde_json::Value,
1584}
1585
1586pub async fn bulk_get_projection_states(
1587    State(store): State<SharedStore>,
1588    Path(name): Path<String>,
1589    Json(req): Json<BulkGetStateRequest>,
1590) -> Result<Json<serde_json::Value>> {
1591    let projection_manager = store.projection_manager();
1592
1593    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1594        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1595    })?;
1596
1597    let states: Vec<serde_json::Value> = req
1598        .entity_ids
1599        .iter()
1600        .map(|entity_id| {
1601            let state = projection.get_state(entity_id);
1602            serde_json::json!({
1603                "entity_id": entity_id,
1604                "state": state,
1605                "found": state.is_some()
1606            })
1607        })
1608        .collect();
1609
1610    tracing::debug!(
1611        "Bulk projection state retrieved: {} entities from {}",
1612        states.len(),
1613        name
1614    );
1615
1616    Ok(Json(serde_json::json!({
1617        "projection": name,
1618        "states": states,
1619        "total": states.len()
1620    })))
1621}
1622
1623/// Bulk save projection states for multiple entities
1624///
1625/// This endpoint allows efficient batch saving of projection states,
1626/// critical for high-throughput event processing pipelines.
1627pub async fn bulk_save_projection_states(
1628    State(store): State<SharedStore>,
1629    Path(name): Path<String>,
1630    Json(req): Json<BulkSaveStateRequest>,
1631) -> Result<Json<serde_json::Value>> {
1632    let projection_cache = store.projection_state_cache();
1633
1634    let mut saved_count = 0;
1635    for item in &req.states {
1636        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1637        saved_count += 1;
1638    }
1639
1640    tracing::info!(
1641        "Bulk projection state saved: {} entities for {}",
1642        saved_count,
1643        name
1644    );
1645
1646    Ok(Json(serde_json::json!({
1647        "projection": name,
1648        "saved": saved_count,
1649        "total": req.states.len()
1650    })))
1651}
1652
1653// =============================================================================
1654// v0.11: Webhook Management API
1655// =============================================================================
1656
1657/// Query parameters for listing webhooks
1658#[derive(Debug, Deserialize)]
1659pub struct ListWebhooksParams {
1660    pub tenant_id: Option<String>,
1661}
1662
1663/// Register a new webhook subscription
1664pub async fn register_webhook(
1665    State(store): State<SharedStore>,
1666    Json(req): Json<RegisterWebhookRequest>,
1667) -> Json<serde_json::Value> {
1668    let registry = store.webhook_registry();
1669    let webhook = registry.register(req);
1670
1671    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1672
1673    Json(serde_json::json!({
1674        "webhook": webhook,
1675        "created": true
1676    }))
1677}
1678
1679/// List webhooks, optionally filtered by tenant_id
1680pub async fn list_webhooks(
1681    State(store): State<SharedStore>,
1682    Query(params): Query<ListWebhooksParams>,
1683) -> Json<serde_json::Value> {
1684    let registry = store.webhook_registry();
1685
1686    let webhooks = if let Some(tenant_id) = params.tenant_id {
1687        registry.list_by_tenant(&tenant_id)
1688    } else {
1689        // Without tenant filter, return empty (tenants should always filter)
1690        vec![]
1691    };
1692
1693    let total = webhooks.len();
1694
1695    Json(serde_json::json!({
1696        "webhooks": webhooks,
1697        "total": total
1698    }))
1699}
1700
1701/// Get a specific webhook by ID
1702pub async fn get_webhook(
1703    State(store): State<SharedStore>,
1704    Path(webhook_id): Path<uuid::Uuid>,
1705) -> Result<Json<serde_json::Value>> {
1706    let registry = store.webhook_registry();
1707
1708    let webhook = registry.get(webhook_id).ok_or_else(|| {
1709        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1710    })?;
1711
1712    Ok(Json(serde_json::json!({
1713        "webhook": webhook,
1714        "found": true
1715    })))
1716}
1717
1718/// Update a webhook subscription
1719pub async fn update_webhook(
1720    State(store): State<SharedStore>,
1721    Path(webhook_id): Path<uuid::Uuid>,
1722    Json(req): Json<UpdateWebhookRequest>,
1723) -> Result<Json<serde_json::Value>> {
1724    let registry = store.webhook_registry();
1725
1726    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1727        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1728    })?;
1729
1730    tracing::info!("Webhook updated: {}", webhook_id);
1731
1732    Ok(Json(serde_json::json!({
1733        "webhook": webhook,
1734        "updated": true
1735    })))
1736}
1737
1738/// Delete a webhook subscription
1739pub async fn delete_webhook(
1740    State(store): State<SharedStore>,
1741    Path(webhook_id): Path<uuid::Uuid>,
1742) -> Result<Json<serde_json::Value>> {
1743    let registry = store.webhook_registry();
1744
1745    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1746        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1747    })?;
1748
1749    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1750
1751    Ok(Json(serde_json::json!({
1752        "webhook_id": webhook_id,
1753        "deleted": true
1754    })))
1755}
1756
1757/// Query parameters for listing webhook deliveries
1758#[derive(Debug, Deserialize)]
1759pub struct ListDeliveriesParams {
1760    pub limit: Option<usize>,
1761}
1762
1763/// List delivery history for a webhook
1764pub async fn list_webhook_deliveries(
1765    State(store): State<SharedStore>,
1766    Path(webhook_id): Path<uuid::Uuid>,
1767    Query(params): Query<ListDeliveriesParams>,
1768) -> Result<Json<serde_json::Value>> {
1769    let registry = store.webhook_registry();
1770
1771    // Verify webhook exists
1772    registry.get(webhook_id).ok_or_else(|| {
1773        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1774    })?;
1775
1776    let limit = params.limit.unwrap_or(50);
1777    let deliveries = registry.get_deliveries(webhook_id, limit);
1778    let total = deliveries.len();
1779
1780    Ok(Json(serde_json::json!({
1781        "webhook_id": webhook_id,
1782        "deliveries": deliveries,
1783        "total": total
1784    })))
1785}
1786
1787// =============================================================================
1788// v2.0: Advanced Query Features
1789// =============================================================================
1790
1791/// EventQL: Execute SQL queries over events using DataFusion
1792#[cfg(feature = "analytics")]
1793pub async fn eventql_query(
1794    State(store): State<SharedStore>,
1795    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1796) -> Result<Json<serde_json::Value>> {
1797    let events = store.snapshot_events();
1798    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1799        Ok(response) => Ok(Json(serde_json::json!({
1800            "columns": response.columns,
1801            "rows": response.rows,
1802            "row_count": response.row_count,
1803        }))),
1804        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1805    }
1806}
1807
1808/// GraphQL: Execute GraphQL queries
1809pub async fn graphql_query(
1810    State(store): State<SharedStore>,
1811    Json(req): Json<GraphQLRequest>,
1812) -> Json<serde_json::Value> {
1813    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1814        Ok(f) => f,
1815        Err(e) => {
1816            return Json(
1817                serde_json::to_value(GraphQLResponse {
1818                    data: None,
1819                    errors: vec![GraphQLError { message: e }],
1820                })
1821                .unwrap(),
1822            );
1823        }
1824    };
1825
1826    let mut data = serde_json::Map::new();
1827    let mut errors = Vec::new();
1828
1829    for field in &fields {
1830        match field.name.as_str() {
1831            "events" => {
1832                let request = crate::application::dto::QueryEventsRequest {
1833                    entity_id: field.arguments.get("entity_id").cloned(),
1834                    event_type: field.arguments.get("event_type").cloned(),
1835                    tenant_id: field.arguments.get("tenant_id").cloned(),
1836                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1837                    as_of: None,
1838                    since: None,
1839                    until: None,
1840                    event_type_prefix: None,
1841                    payload_filter: None,
1842                };
1843                match store.query(&request) {
1844                    Ok(events) => {
1845                        let json_events: Vec<serde_json::Value> = events
1846                            .iter()
1847                            .map(|e| {
1848                                crate::infrastructure::query::graphql::event_to_json(
1849                                    e,
1850                                    &field.fields,
1851                                )
1852                            })
1853                            .collect();
1854                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1855                    }
1856                    Err(e) => errors.push(GraphQLError {
1857                        message: format!("events query failed: {e}"),
1858                    }),
1859                }
1860            }
1861            "event" => {
1862                if let Some(id_str) = field.arguments.get("id") {
1863                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1864                        match store.get_event_by_id(&id) {
1865                            Ok(Some(event)) => {
1866                                data.insert(
1867                                    "event".to_string(),
1868                                    crate::infrastructure::query::graphql::event_to_json(
1869                                        &event,
1870                                        &field.fields,
1871                                    ),
1872                                );
1873                            }
1874                            Ok(None) => {
1875                                data.insert("event".to_string(), serde_json::Value::Null);
1876                            }
1877                            Err(e) => errors.push(GraphQLError {
1878                                message: format!("event lookup failed: {e}"),
1879                            }),
1880                        }
1881                    } else {
1882                        errors.push(GraphQLError {
1883                            message: format!("Invalid UUID: {id_str}"),
1884                        });
1885                    }
1886                } else {
1887                    errors.push(GraphQLError {
1888                        message: "event query requires 'id' argument".to_string(),
1889                    });
1890                }
1891            }
1892            "projections" => {
1893                let pm = store.projection_manager();
1894                let names: Vec<serde_json::Value> = pm
1895                    .list_projections()
1896                    .iter()
1897                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1898                    .collect();
1899                data.insert("projections".to_string(), serde_json::Value::Array(names));
1900            }
1901            "stats" => {
1902                let stats = store.stats();
1903                data.insert(
1904                    "stats".to_string(),
1905                    serde_json::json!({
1906                        "total_events": stats.total_events,
1907                        "total_entities": stats.total_entities,
1908                        "total_event_types": stats.total_event_types,
1909                    }),
1910                );
1911            }
1912            "__schema" => {
1913                data.insert(
1914                    "__schema".to_string(),
1915                    crate::infrastructure::query::graphql::introspection_schema(),
1916                );
1917            }
1918            other => {
1919                errors.push(GraphQLError {
1920                    message: format!("Unknown field: {other}"),
1921                });
1922            }
1923        }
1924    }
1925
1926    Json(
1927        serde_json::to_value(GraphQLResponse {
1928            data: Some(serde_json::Value::Object(data)),
1929            errors,
1930        })
1931        .unwrap(),
1932    )
1933}
1934
1935/// Geospatial: Query events by location
1936pub async fn geo_query(
1937    State(store): State<SharedStore>,
1938    Json(req): Json<GeoQueryRequest>,
1939) -> Json<serde_json::Value> {
1940    let events = store.snapshot_events();
1941    let geo_index = store.geo_index();
1942    let results =
1943        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1944    let total = results.len();
1945    Json(serde_json::json!({
1946        "results": results,
1947        "total": total,
1948    }))
1949}
1950
1951/// Geospatial index stats
1952pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1953    let stats = store.geo_index().stats();
1954    Json(serde_json::json!(stats))
1955}
1956
1957/// Exactly-once processing stats
1958pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1959    let stats = store.exactly_once().stats();
1960    Json(serde_json::json!(stats))
1961}
1962
1963/// Schema evolution history for an event type
1964pub async fn schema_evolution_history(
1965    State(store): State<SharedStore>,
1966    Path(event_type): Path<String>,
1967) -> Json<serde_json::Value> {
1968    let mgr = store.schema_evolution();
1969    let history = mgr.get_history(&event_type);
1970    let version = mgr.get_version(&event_type);
1971    Json(serde_json::json!({
1972        "event_type": event_type,
1973        "current_version": version,
1974        "history": history,
1975    }))
1976}
1977
1978/// Current inferred schema for an event type
1979pub async fn schema_evolution_schema(
1980    State(store): State<SharedStore>,
1981    Path(event_type): Path<String>,
1982) -> Json<serde_json::Value> {
1983    let mgr = store.schema_evolution();
1984    if let Some(schema) = mgr.get_schema(&event_type) {
1985        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1986        Json(serde_json::json!({
1987            "event_type": event_type,
1988            "version": mgr.get_version(&event_type),
1989            "inferred_schema": schema,
1990            "json_schema": json_schema,
1991        }))
1992    } else {
1993        Json(serde_json::json!({
1994            "event_type": event_type,
1995            "error": "No schema inferred for this event type"
1996        }))
1997    }
1998}
1999
2000/// Schema evolution stats
2001pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2002    let stats = store.schema_evolution().stats();
2003    let event_types = store.schema_evolution().list_event_types();
2004    Json(serde_json::json!({
2005        "stats": stats,
2006        "tracked_event_types": event_types,
2007    }))
2008}
2009
2010// =============================================================================
2011// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2012// =============================================================================
2013
2014/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2015#[cfg(feature = "embedded-sync")]
2016pub async fn sync_pull_handler(
2017    State(state): State<AppState>,
2018    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2019) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2020    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2021
2022    let store = &state.store;
2023
2024    // Compute "since" threshold from the client's version vector
2025    // We return all events the client hasn't seen yet
2026    let since = request
2027        .version_vector
2028        .values()
2029        .map(|ts| ts.physical_ms)
2030        .min()
2031        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2032
2033    let events = store.query(&crate::application::dto::QueryEventsRequest {
2034        entity_id: None,
2035        event_type: None,
2036        tenant_id: None,
2037        as_of: None,
2038        since,
2039        until: None,
2040        limit: None,
2041        event_type_prefix: None,
2042        payload_filter: None,
2043    })?;
2044
2045    // Convert domain events to ReplicatedEvent wire format
2046    let mut replicated = Vec::with_capacity(events.len());
2047    let mut last_ms = 0u64;
2048    let mut logical = 0u32;
2049
2050    for event in &events {
2051        let event_ms = event.timestamp().timestamp_millis() as u64;
2052        if event_ms == last_ms {
2053            logical += 1;
2054        } else {
2055            last_ms = event_ms;
2056            logical = 0;
2057        }
2058
2059        replicated.push(ReplicatedEvent {
2060            event_id: event.id().to_string(),
2061            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2062            origin_region: "server".to_string(),
2063            event_data: serde_json::json!({
2064                "event_type": event.event_type_str(),
2065                "entity_id": event.entity_id_str(),
2066                "tenant_id": event.tenant_id_str(),
2067                "payload": event.payload,
2068                "metadata": event.metadata,
2069            }),
2070        });
2071    }
2072
2073    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2074        events: replicated,
2075        version_vector: std::collections::BTreeMap::new(),
2076    }))
2077}
2078
2079/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2080#[cfg(feature = "embedded-sync")]
2081pub async fn sync_push_handler(
2082    State(state): State<AppState>,
2083    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2084) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2085    let store = &state.store;
2086
2087    let mut accepted = 0usize;
2088    let mut skipped = 0usize;
2089
2090    for rep_event in &request.events {
2091        let event_data = &rep_event.event_data;
2092        let event_type = event_data
2093            .get("event_type")
2094            .and_then(|v| v.as_str())
2095            .unwrap_or("unknown")
2096            .to_string();
2097        let entity_id = event_data
2098            .get("entity_id")
2099            .and_then(|v| v.as_str())
2100            .unwrap_or("unknown")
2101            .to_string();
2102        let tenant_id = event_data
2103            .get("tenant_id")
2104            .and_then(|v| v.as_str())
2105            .unwrap_or("default")
2106            .to_string();
2107        let payload = event_data
2108            .get("payload")
2109            .cloned()
2110            .unwrap_or(serde_json::json!({}));
2111        let metadata = event_data.get("metadata").cloned();
2112
2113        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2114            Ok(domain_event) => {
2115                store.ingest(&domain_event)?;
2116                accepted += 1;
2117            }
2118            Err(_) => {
2119                skipped += 1;
2120            }
2121        }
2122    }
2123
2124    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2125        accepted,
2126        skipped,
2127        version_vector: std::collections::BTreeMap::new(),
2128    }))
2129}
2130
2131// =============================================================================
2132// Consumer endpoints for durable subscriptions (v0.14)
2133// =============================================================================
2134
2135/// POST /api/v1/consumers — Register a durable consumer
2136pub async fn register_consumer(
2137    State(store): State<SharedStore>,
2138    Json(req): Json<RegisterConsumerRequest>,
2139) -> Result<Json<ConsumerResponse>> {
2140    let consumer = store
2141        .consumer_registry()
2142        .register(&req.consumer_id, &req.event_type_filters);
2143
2144    Ok(Json(ConsumerResponse {
2145        consumer_id: consumer.consumer_id,
2146        event_type_filters: consumer.event_type_filters,
2147        cursor_position: consumer.cursor_position,
2148    }))
2149}
2150
2151/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2152pub async fn get_consumer(
2153    State(store): State<SharedStore>,
2154    Path(consumer_id): Path<String>,
2155) -> Result<Json<ConsumerResponse>> {
2156    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2157
2158    Ok(Json(ConsumerResponse {
2159        consumer_id: consumer.consumer_id,
2160        event_type_filters: consumer.event_type_filters,
2161        cursor_position: consumer.cursor_position,
2162    }))
2163}
2164
2165/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2166#[derive(Debug, Deserialize)]
2167pub struct ConsumerPollQuery {
2168    pub limit: Option<usize>,
2169}
2170
2171pub async fn poll_consumer_events(
2172    State(store): State<SharedStore>,
2173    Path(consumer_id): Path<String>,
2174    Query(query): Query<ConsumerPollQuery>,
2175) -> Result<Json<ConsumerEventsResponse>> {
2176    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2177    let offset = consumer.cursor_position.unwrap_or(0);
2178    let limit = query.limit.unwrap_or(100);
2179
2180    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2181    let count = events.len();
2182
2183    let consumer_events: Vec<ConsumerEventDto> = events
2184        .into_iter()
2185        .map(|(position, event)| ConsumerEventDto {
2186            position,
2187            event: EventDto::from(&event),
2188        })
2189        .collect();
2190
2191    Ok(Json(ConsumerEventsResponse {
2192        events: consumer_events,
2193        count,
2194    }))
2195}
2196
2197/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2198pub async fn ack_consumer(
2199    State(store): State<SharedStore>,
2200    Path(consumer_id): Path<String>,
2201    Json(req): Json<AckRequest>,
2202) -> Result<Json<serde_json::Value>> {
2203    let max_offset = store.total_events() as u64;
2204
2205    store
2206        .consumer_registry()
2207        .ack(&consumer_id, req.position, max_offset)
2208        .map_err(crate::error::AllSourceError::InvalidInput)?;
2209
2210    Ok(Json(serde_json::json!({
2211        "status": "ok",
2212        "consumer_id": consumer_id,
2213        "position": req.position,
2214    })))
2215}
2216
2217#[cfg(test)]
2218mod tests {
2219    use super::*;
2220    use crate::{domain::entities::Event, store::EventStore};
2221
2222    fn create_test_store() -> Arc<EventStore> {
2223        Arc::new(EventStore::new())
2224    }
2225
2226    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2227        Event::from_strings(
2228            event_type.to_string(),
2229            entity_id.to_string(),
2230            "test-stream".to_string(),
2231            serde_json::json!({
2232                "name": "Test",
2233                "value": 42
2234            }),
2235            None,
2236        )
2237        .unwrap()
2238    }
2239
2240    #[tokio::test]
2241    async fn test_query_events_has_more_and_total_count() {
2242        let store = create_test_store();
2243
2244        // Ingest 50 events
2245        for i in 0..50 {
2246            store
2247                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2248                .unwrap();
2249        }
2250
2251        // Query with limit=10 — should get has_more=true, total_count=50
2252        let req = QueryEventsRequest {
2253            entity_id: None,
2254            event_type: None,
2255            tenant_id: None,
2256            as_of: None,
2257            since: None,
2258            until: None,
2259            limit: Some(10),
2260            event_type_prefix: None,
2261            payload_filter: None,
2262        };
2263
2264        let requested_limit = req.limit;
2265        let unlimited_req = QueryEventsRequest {
2266            limit: None,
2267            ..QueryEventsRequest {
2268                entity_id: req.entity_id,
2269                event_type: req.event_type,
2270                tenant_id: req.tenant_id,
2271                as_of: req.as_of,
2272                since: req.since,
2273                until: req.until,
2274                limit: None,
2275                event_type_prefix: req.event_type_prefix,
2276                payload_filter: req.payload_filter,
2277            }
2278        };
2279        let all_events = store.query(&unlimited_req).unwrap();
2280        let total_count = all_events.len();
2281        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2282            all_events.into_iter().take(limit).collect()
2283        } else {
2284            all_events
2285        };
2286        let count = limited_events.len();
2287        let has_more = count < total_count;
2288
2289        assert_eq!(count, 10);
2290        assert_eq!(total_count, 50);
2291        assert!(has_more);
2292    }
2293
2294    #[tokio::test]
2295    async fn test_query_events_no_more_results() {
2296        let store = create_test_store();
2297
2298        // Ingest 5 events
2299        for i in 0..5 {
2300            store
2301                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2302                .unwrap();
2303        }
2304
2305        // Query with limit=100 — should get has_more=false, total_count=5
2306        let all_events = store
2307            .query(&QueryEventsRequest {
2308                entity_id: None,
2309                event_type: None,
2310                tenant_id: None,
2311                as_of: None,
2312                since: None,
2313                until: None,
2314                limit: None,
2315                event_type_prefix: None,
2316                payload_filter: None,
2317            })
2318            .unwrap();
2319        let total_count = all_events.len();
2320        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2321        let count = limited_events.len();
2322        let has_more = count < total_count;
2323
2324        assert_eq!(count, 5);
2325        assert_eq!(total_count, 5);
2326        assert!(!has_more);
2327    }
2328
2329    #[tokio::test]
2330    async fn test_list_entities_by_type_prefix() {
2331        let store = create_test_store();
2332
2333        // 3 index entities
2334        store
2335            .ingest(&create_test_event("idx-1", "index.created"))
2336            .unwrap();
2337        store
2338            .ingest(&create_test_event("idx-1", "index.updated"))
2339            .unwrap();
2340        store
2341            .ingest(&create_test_event("idx-2", "index.created"))
2342            .unwrap();
2343        store
2344            .ingest(&create_test_event("idx-3", "index.created"))
2345            .unwrap();
2346        // 2 trade entities
2347        store
2348            .ingest(&create_test_event("trade-1", "trade.created"))
2349            .unwrap();
2350        store
2351            .ingest(&create_test_event("trade-2", "trade.created"))
2352            .unwrap();
2353
2354        // List entities for index.*
2355        let req = ListEntitiesRequest {
2356            event_type_prefix: Some("index.".to_string()),
2357            payload_filter: None,
2358            limit: None,
2359            offset: None,
2360        };
2361        let query_req = QueryEventsRequest {
2362            entity_id: None,
2363            event_type: None,
2364            tenant_id: None,
2365            as_of: None,
2366            since: None,
2367            until: None,
2368            limit: None,
2369            event_type_prefix: req.event_type_prefix,
2370            payload_filter: req.payload_filter,
2371        };
2372        let events = store.query(&query_req).unwrap();
2373
2374        // Group and verify
2375        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2376            std::collections::HashMap::new();
2377        for event in &events {
2378            entity_map
2379                .entry(event.entity_id().to_string())
2380                .or_default()
2381                .push(event);
2382        }
2383
2384        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2385        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2386        assert_eq!(entity_map["idx-2"].len(), 1);
2387        assert_eq!(entity_map["idx-3"].len(), 1);
2388    }
2389
2390    fn create_test_event_with_payload(
2391        entity_id: &str,
2392        event_type: &str,
2393        payload: serde_json::Value,
2394    ) -> Event {
2395        Event::from_strings(
2396            event_type.to_string(),
2397            entity_id.to_string(),
2398            "test-stream".to_string(),
2399            payload,
2400            None,
2401        )
2402        .unwrap()
2403    }
2404
2405    #[tokio::test]
2406    async fn test_detect_duplicates_by_payload_fields() {
2407        let store = create_test_store();
2408
2409        // Create entities with duplicate "name" field values
2410        store
2411            .ingest(&create_test_event_with_payload(
2412                "idx-1",
2413                "index.created",
2414                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2415            ))
2416            .unwrap();
2417        store
2418            .ingest(&create_test_event_with_payload(
2419                "idx-2",
2420                "index.created",
2421                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2422            ))
2423            .unwrap();
2424        store
2425            .ingest(&create_test_event_with_payload(
2426                "idx-3",
2427                "index.created",
2428                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2429            ))
2430            .unwrap();
2431        store
2432            .ingest(&create_test_event_with_payload(
2433                "idx-4",
2434                "index.created",
2435                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2436            ))
2437            .unwrap();
2438        store
2439            .ingest(&create_test_event_with_payload(
2440                "idx-5",
2441                "index.created",
2442                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2443            ))
2444            .unwrap();
2445
2446        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2447        let query_req = QueryEventsRequest {
2448            entity_id: None,
2449            event_type: None,
2450            tenant_id: None,
2451            as_of: None,
2452            since: None,
2453            until: None,
2454            limit: None,
2455            event_type_prefix: Some("index.".to_string()),
2456            payload_filter: None,
2457        };
2458        let events = store.query(&query_req).unwrap();
2459
2460        // Manually replicate the handler logic for testing
2461        let group_by_fields = vec!["name"];
2462        let mut entity_latest: std::collections::HashMap<String, &Event> =
2463            std::collections::HashMap::new();
2464        for event in &events {
2465            let eid = event.entity_id().to_string();
2466            entity_latest
2467                .entry(eid)
2468                .and_modify(|existing| {
2469                    if event.timestamp() > existing.timestamp() {
2470                        *existing = event;
2471                    }
2472                })
2473                .or_insert(event);
2474        }
2475
2476        let mut groups: std::collections::HashMap<String, Vec<String>> =
2477            std::collections::HashMap::new();
2478        for (entity_id, event) in &entity_latest {
2479            let payload = event.payload();
2480            let mut key_parts = serde_json::Map::new();
2481            for field in &group_by_fields {
2482                let value = payload
2483                    .get(*field)
2484                    .cloned()
2485                    .unwrap_or(serde_json::Value::Null);
2486                key_parts.insert((*field).to_string(), value);
2487            }
2488            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2489            groups.entry(key_str).or_default().push(entity_id.clone());
2490        }
2491
2492        let duplicate_groups: Vec<_> = groups
2493            .into_iter()
2494            .filter(|(_, ids)| ids.len() > 1)
2495            .collect();
2496
2497        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2498        for (_, ids) in &duplicate_groups {
2499            assert_eq!(ids.len(), 2);
2500        }
2501    }
2502
2503    #[tokio::test]
2504    async fn test_detect_duplicates_no_duplicates() {
2505        let store = create_test_store();
2506
2507        // All unique names
2508        store
2509            .ingest(&create_test_event_with_payload(
2510                "idx-1",
2511                "index.created",
2512                serde_json::json!({"name": "A"}),
2513            ))
2514            .unwrap();
2515        store
2516            .ingest(&create_test_event_with_payload(
2517                "idx-2",
2518                "index.created",
2519                serde_json::json!({"name": "B"}),
2520            ))
2521            .unwrap();
2522
2523        let query_req = QueryEventsRequest {
2524            entity_id: None,
2525            event_type: None,
2526            tenant_id: None,
2527            as_of: None,
2528            since: None,
2529            until: None,
2530            limit: None,
2531            event_type_prefix: Some("index.".to_string()),
2532            payload_filter: None,
2533        };
2534        let events = store.query(&query_req).unwrap();
2535
2536        let mut entity_latest: std::collections::HashMap<String, &Event> =
2537            std::collections::HashMap::new();
2538        for event in &events {
2539            entity_latest
2540                .entry(event.entity_id().to_string())
2541                .or_insert(event);
2542        }
2543
2544        let mut groups: std::collections::HashMap<String, Vec<String>> =
2545            std::collections::HashMap::new();
2546        for (entity_id, event) in &entity_latest {
2547            let key_str =
2548                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2549                    .unwrap();
2550            groups.entry(key_str).or_default().push(entity_id.clone());
2551        }
2552
2553        let duplicate_groups: Vec<_> = groups
2554            .into_iter()
2555            .filter(|(_, ids)| ids.len() > 1)
2556            .collect();
2557
2558        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2559    }
2560
2561    #[tokio::test]
2562    async fn test_detect_duplicates_multi_field_group_by() {
2563        let store = create_test_store();
2564
2565        // Two entities with same name AND user_id = true duplicate
2566        store
2567            .ingest(&create_test_event_with_payload(
2568                "idx-1",
2569                "index.created",
2570                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2571            ))
2572            .unwrap();
2573        store
2574            .ingest(&create_test_event_with_payload(
2575                "idx-2",
2576                "index.created",
2577                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2578            ))
2579            .unwrap();
2580        // Same name but different user_id = NOT a duplicate in multi-field group
2581        store
2582            .ingest(&create_test_event_with_payload(
2583                "idx-3",
2584                "index.created",
2585                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2586            ))
2587            .unwrap();
2588
2589        let query_req = QueryEventsRequest {
2590            entity_id: None,
2591            event_type: None,
2592            tenant_id: None,
2593            as_of: None,
2594            since: None,
2595            until: None,
2596            limit: None,
2597            event_type_prefix: Some("index.".to_string()),
2598            payload_filter: None,
2599        };
2600        let events = store.query(&query_req).unwrap();
2601
2602        let group_by_fields = vec!["name", "user_id"];
2603        let mut entity_latest: std::collections::HashMap<String, &Event> =
2604            std::collections::HashMap::new();
2605        for event in &events {
2606            entity_latest
2607                .entry(event.entity_id().to_string())
2608                .and_modify(|existing| {
2609                    if event.timestamp() > existing.timestamp() {
2610                        *existing = event;
2611                    }
2612                })
2613                .or_insert(event);
2614        }
2615
2616        let mut groups: std::collections::HashMap<String, Vec<String>> =
2617            std::collections::HashMap::new();
2618        for (entity_id, event) in &entity_latest {
2619            let payload = event.payload();
2620            let mut key_parts = serde_json::Map::new();
2621            for field in &group_by_fields {
2622                let value = payload
2623                    .get(*field)
2624                    .cloned()
2625                    .unwrap_or(serde_json::Value::Null);
2626                key_parts.insert((*field).to_string(), value);
2627            }
2628            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2629            groups.entry(key_str).or_default().push(entity_id.clone());
2630        }
2631
2632        let duplicate_groups: Vec<_> = groups
2633            .into_iter()
2634            .filter(|(_, ids)| ids.len() > 1)
2635            .collect();
2636
2637        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2638        assert_eq!(duplicate_groups.len(), 1);
2639        let (_, ref ids) = duplicate_groups[0];
2640        assert_eq!(ids.len(), 2);
2641        let mut sorted_ids = ids.clone();
2642        sorted_ids.sort();
2643        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2644    }
2645
2646    #[tokio::test]
2647    async fn test_projection_state_cache() {
2648        let store = create_test_store();
2649
2650        // Test cache insertion
2651        let cache = store.projection_state_cache();
2652        cache.insert(
2653            "entity_snapshots:user-123".to_string(),
2654            serde_json::json!({"name": "Test User", "age": 30}),
2655        );
2656
2657        // Test cache retrieval
2658        let state = cache.get("entity_snapshots:user-123");
2659        assert!(state.is_some());
2660        let state = state.unwrap();
2661        assert_eq!(state["name"], "Test User");
2662        assert_eq!(state["age"], 30);
2663    }
2664
2665    #[tokio::test]
2666    async fn test_projection_manager_list_projections() {
2667        let store = create_test_store();
2668
2669        // List projections (built-in projections should be available)
2670        let projection_manager = store.projection_manager();
2671        let projections = projection_manager.list_projections();
2672
2673        // Should have entity_snapshots and event_counters
2674        assert!(projections.len() >= 2);
2675
2676        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2677        assert!(names.contains(&"entity_snapshots"));
2678        assert!(names.contains(&"event_counters"));
2679    }
2680
2681    #[tokio::test]
2682    async fn test_projection_state_after_event_ingestion() {
2683        let store = create_test_store();
2684
2685        // Ingest an event
2686        let event = create_test_event("user-456", "user.created");
2687        store.ingest(&event).unwrap();
2688
2689        // Get projection state
2690        let projection_manager = store.projection_manager();
2691        let snapshot_projection = projection_manager
2692            .get_projection("entity_snapshots")
2693            .unwrap();
2694
2695        let state = snapshot_projection.get_state("user-456");
2696        assert!(state.is_some());
2697        let state = state.unwrap();
2698        assert_eq!(state["name"], "Test");
2699        assert_eq!(state["value"], 42);
2700    }
2701
2702    #[tokio::test]
2703    async fn test_projection_state_cache_multiple_entities() {
2704        let store = create_test_store();
2705        let cache = store.projection_state_cache();
2706
2707        // Insert multiple entities
2708        for i in 0..10 {
2709            cache.insert(
2710                format!("entity_snapshots:entity-{i}"),
2711                serde_json::json!({"id": i, "status": "active"}),
2712            );
2713        }
2714
2715        // Verify all insertions
2716        assert_eq!(cache.len(), 10);
2717
2718        // Verify each entity
2719        for i in 0..10 {
2720            let key = format!("entity_snapshots:entity-{i}");
2721            let state = cache.get(&key);
2722            assert!(state.is_some());
2723            assert_eq!(state.unwrap()["id"], i);
2724        }
2725    }
2726
2727    #[tokio::test]
2728    async fn test_projection_state_update() {
2729        let store = create_test_store();
2730        let cache = store.projection_state_cache();
2731
2732        // Initial state
2733        cache.insert(
2734            "entity_snapshots:user-789".to_string(),
2735            serde_json::json!({"balance": 100}),
2736        );
2737
2738        // Update state
2739        cache.insert(
2740            "entity_snapshots:user-789".to_string(),
2741            serde_json::json!({"balance": 150}),
2742        );
2743
2744        // Verify update
2745        let state = cache.get("entity_snapshots:user-789").unwrap();
2746        assert_eq!(state["balance"], 150);
2747    }
2748
2749    #[tokio::test]
2750    async fn test_event_counter_projection() {
2751        let store = create_test_store();
2752
2753        // Ingest events of different types
2754        store
2755            .ingest(&create_test_event("user-1", "user.created"))
2756            .unwrap();
2757        store
2758            .ingest(&create_test_event("user-2", "user.created"))
2759            .unwrap();
2760        store
2761            .ingest(&create_test_event("user-1", "user.updated"))
2762            .unwrap();
2763
2764        // Get event counter projection
2765        let projection_manager = store.projection_manager();
2766        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2767
2768        // Check counts
2769        let created_state = counter_projection.get_state("user.created");
2770        assert!(created_state.is_some());
2771        assert_eq!(created_state.unwrap()["count"], 2);
2772
2773        let updated_state = counter_projection.get_state("user.updated");
2774        assert!(updated_state.is_some());
2775        assert_eq!(updated_state.unwrap()["count"], 1);
2776    }
2777
2778    #[tokio::test]
2779    async fn test_projection_state_cache_key_format() {
2780        let store = create_test_store();
2781        let cache = store.projection_state_cache();
2782
2783        // Test standard key format: {projection_name}:{entity_id}
2784        let key = "orders:order-12345".to_string();
2785        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2786
2787        let state = cache.get(&key).unwrap();
2788        assert_eq!(state["total"], 99.99);
2789    }
2790
2791    #[tokio::test]
2792    async fn test_projection_state_cache_removal() {
2793        let store = create_test_store();
2794        let cache = store.projection_state_cache();
2795
2796        // Insert and then remove
2797        cache.insert(
2798            "test:entity-1".to_string(),
2799            serde_json::json!({"data": "value"}),
2800        );
2801        assert_eq!(cache.len(), 1);
2802
2803        cache.remove("test:entity-1");
2804        assert_eq!(cache.len(), 0);
2805        assert!(cache.get("test:entity-1").is_none());
2806    }
2807
2808    #[tokio::test]
2809    async fn test_get_nonexistent_projection() {
2810        let store = create_test_store();
2811        let projection_manager = store.projection_manager();
2812
2813        // Requesting a non-existent projection should return None
2814        let projection = projection_manager.get_projection("nonexistent_projection");
2815        assert!(projection.is_none());
2816    }
2817
2818    #[tokio::test]
2819    async fn test_get_nonexistent_entity_state() {
2820        let store = create_test_store();
2821        let projection_manager = store.projection_manager();
2822
2823        // Get state for non-existent entity
2824        let snapshot_projection = projection_manager
2825            .get_projection("entity_snapshots")
2826            .unwrap();
2827        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2828        assert!(state.is_none());
2829    }
2830
2831    #[tokio::test]
2832    async fn test_projection_state_cache_concurrent_access() {
2833        let store = create_test_store();
2834        let cache = store.projection_state_cache();
2835
2836        // Simulate concurrent writes
2837        let handles: Vec<_> = (0..10)
2838            .map(|i| {
2839                let cache_clone = cache.clone();
2840                tokio::spawn(async move {
2841                    cache_clone.insert(
2842                        format!("concurrent:entity-{i}"),
2843                        serde_json::json!({"thread": i}),
2844                    );
2845                })
2846            })
2847            .collect();
2848
2849        for handle in handles {
2850            handle.await.unwrap();
2851        }
2852
2853        // All 10 entries should be present
2854        assert_eq!(cache.len(), 10);
2855    }
2856
2857    #[tokio::test]
2858    async fn test_projection_state_large_payload() {
2859        let store = create_test_store();
2860        let cache = store.projection_state_cache();
2861
2862        // Create a large JSON payload (~10KB)
2863        let large_array: Vec<serde_json::Value> = (0..1000)
2864            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2865            .collect();
2866
2867        cache.insert(
2868            "large:entity-1".to_string(),
2869            serde_json::json!({"items": large_array}),
2870        );
2871
2872        let state = cache.get("large:entity-1").unwrap();
2873        let items = state["items"].as_array().unwrap();
2874        assert_eq!(items.len(), 1000);
2875    }
2876
2877    #[tokio::test]
2878    async fn test_projection_state_complex_json() {
2879        let store = create_test_store();
2880        let cache = store.projection_state_cache();
2881
2882        // Complex nested JSON structure
2883        let complex_state = serde_json::json!({
2884            "user": {
2885                "id": "user-123",
2886                "profile": {
2887                    "name": "John Doe",
2888                    "email": "john@example.com",
2889                    "settings": {
2890                        "theme": "dark",
2891                        "notifications": true
2892                    }
2893                },
2894                "roles": ["admin", "user"],
2895                "metadata": {
2896                    "created_at": "2025-01-01T00:00:00Z",
2897                    "last_login": null
2898                }
2899            }
2900        });
2901
2902        cache.insert("complex:user-123".to_string(), complex_state);
2903
2904        let state = cache.get("complex:user-123").unwrap();
2905        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2906        assert_eq!(state["user"]["roles"][0], "admin");
2907        assert!(state["user"]["metadata"]["last_login"].is_null());
2908    }
2909
2910    #[tokio::test]
2911    async fn test_projection_state_cache_iteration() {
2912        let store = create_test_store();
2913        let cache = store.projection_state_cache();
2914
2915        // Insert entries
2916        for i in 0..5 {
2917            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2918        }
2919
2920        // Iterate over all entries
2921        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2922        assert_eq!(entries.len(), 5);
2923    }
2924
2925    #[tokio::test]
2926    async fn test_projection_manager_get_entity_snapshots() {
2927        let store = create_test_store();
2928        let projection_manager = store.projection_manager();
2929
2930        // Get entity_snapshots projection specifically
2931        let projection = projection_manager.get_projection("entity_snapshots");
2932        assert!(projection.is_some());
2933        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2934    }
2935
2936    #[tokio::test]
2937    async fn test_projection_manager_get_event_counters() {
2938        let store = create_test_store();
2939        let projection_manager = store.projection_manager();
2940
2941        // Get event_counters projection specifically
2942        let projection = projection_manager.get_projection("event_counters");
2943        assert!(projection.is_some());
2944        assert_eq!(projection.unwrap().name(), "event_counters");
2945    }
2946
2947    #[tokio::test]
2948    async fn test_projection_state_cache_overwrite() {
2949        let store = create_test_store();
2950        let cache = store.projection_state_cache();
2951
2952        // Initial value
2953        cache.insert(
2954            "overwrite:entity-1".to_string(),
2955            serde_json::json!({"version": 1}),
2956        );
2957
2958        // Overwrite with new value
2959        cache.insert(
2960            "overwrite:entity-1".to_string(),
2961            serde_json::json!({"version": 2}),
2962        );
2963
2964        // Overwrite again
2965        cache.insert(
2966            "overwrite:entity-1".to_string(),
2967            serde_json::json!({"version": 3}),
2968        );
2969
2970        let state = cache.get("overwrite:entity-1").unwrap();
2971        assert_eq!(state["version"], 3);
2972
2973        // Should still be only 1 entry
2974        assert_eq!(cache.len(), 1);
2975    }
2976
2977    #[tokio::test]
2978    async fn test_projection_state_multiple_projections() {
2979        let store = create_test_store();
2980        let cache = store.projection_state_cache();
2981
2982        // Store states for different projections
2983        cache.insert(
2984            "entity_snapshots:user-1".to_string(),
2985            serde_json::json!({"name": "Alice"}),
2986        );
2987        cache.insert(
2988            "event_counters:user.created".to_string(),
2989            serde_json::json!({"count": 5}),
2990        );
2991        cache.insert(
2992            "custom_projection:order-1".to_string(),
2993            serde_json::json!({"total": 150.0}),
2994        );
2995
2996        // Verify each projection's state
2997        assert_eq!(
2998            cache.get("entity_snapshots:user-1").unwrap()["name"],
2999            "Alice"
3000        );
3001        assert_eq!(
3002            cache.get("event_counters:user.created").unwrap()["count"],
3003            5
3004        );
3005        assert_eq!(
3006            cache.get("custom_projection:order-1").unwrap()["total"],
3007            150.0
3008        );
3009    }
3010
3011    #[tokio::test]
3012    async fn test_bulk_projection_state_access() {
3013        let store = create_test_store();
3014
3015        // Ingest multiple events for different entities
3016        for i in 0..5 {
3017            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3018            store.ingest(&event).unwrap();
3019        }
3020
3021        // Get projection and verify bulk access
3022        let projection_manager = store.projection_manager();
3023        let snapshot_projection = projection_manager
3024            .get_projection("entity_snapshots")
3025            .unwrap();
3026
3027        // Verify we can access all entities
3028        for i in 0..5 {
3029            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3030            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3031        }
3032    }
3033
3034    #[tokio::test]
3035    async fn test_bulk_save_projection_states() {
3036        let store = create_test_store();
3037        let cache = store.projection_state_cache();
3038
3039        // Simulate bulk save request
3040        let states = vec![
3041            BulkSaveStateItem {
3042                entity_id: "bulk-entity-1".to_string(),
3043                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3044            },
3045            BulkSaveStateItem {
3046                entity_id: "bulk-entity-2".to_string(),
3047                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3048            },
3049            BulkSaveStateItem {
3050                entity_id: "bulk-entity-3".to_string(),
3051                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3052            },
3053        ];
3054
3055        let projection_name = "test_projection";
3056
3057        // Save states to cache (simulating bulk_save_projection_states handler)
3058        for item in &states {
3059            cache.insert(
3060                format!("{projection_name}:{}", item.entity_id),
3061                item.state.clone(),
3062            );
3063        }
3064
3065        // Verify all states were saved
3066        assert_eq!(cache.len(), 3);
3067
3068        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3069        assert_eq!(state1["name"], "Entity 1");
3070        assert_eq!(state1["value"], 100);
3071
3072        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3073        assert_eq!(state2["name"], "Entity 2");
3074        assert_eq!(state2["value"], 200);
3075
3076        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3077        assert_eq!(state3["name"], "Entity 3");
3078        assert_eq!(state3["value"], 300);
3079    }
3080
3081    #[tokio::test]
3082    async fn test_bulk_save_empty_states() {
3083        let store = create_test_store();
3084        let cache = store.projection_state_cache();
3085
3086        // Clear cache
3087        cache.clear();
3088
3089        // Empty states should work fine
3090        let states: Vec<BulkSaveStateItem> = vec![];
3091        assert_eq!(states.len(), 0);
3092
3093        // Cache should remain empty
3094        assert_eq!(cache.len(), 0);
3095    }
3096
3097    #[tokio::test]
3098    async fn test_bulk_save_overwrites_existing() {
3099        let store = create_test_store();
3100        let cache = store.projection_state_cache();
3101
3102        // Insert initial state
3103        cache.insert(
3104            "test:entity-1".to_string(),
3105            serde_json::json!({"version": 1, "data": "initial"}),
3106        );
3107
3108        // Bulk save with updated state
3109        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3110        cache.insert("test:entity-1".to_string(), new_state);
3111
3112        // Verify overwrite
3113        let state = cache.get("test:entity-1").unwrap();
3114        assert_eq!(state["version"], 2);
3115        assert_eq!(state["data"], "updated");
3116    }
3117
3118    #[tokio::test]
3119    async fn test_bulk_save_high_volume() {
3120        let store = create_test_store();
3121        let cache = store.projection_state_cache();
3122
3123        // Simulate high volume save (1000 entities)
3124        for i in 0..1000 {
3125            cache.insert(
3126                format!("volume_test:entity-{i}"),
3127                serde_json::json!({"index": i, "status": "active"}),
3128            );
3129        }
3130
3131        // Verify count
3132        assert_eq!(cache.len(), 1000);
3133
3134        // Spot check some entries
3135        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3136        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3137        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3138    }
3139
3140    #[tokio::test]
3141    async fn test_bulk_save_different_projections() {
3142        let store = create_test_store();
3143        let cache = store.projection_state_cache();
3144
3145        // Save to multiple projections in bulk
3146        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3147
3148        for proj in &projections {
3149            for i in 0..5 {
3150                cache.insert(
3151                    format!("{proj}:entity-{i}"),
3152                    serde_json::json!({"projection": proj, "id": i}),
3153                );
3154            }
3155        }
3156
3157        // Verify total count (3 projections * 5 entities)
3158        assert_eq!(cache.len(), 15);
3159
3160        // Verify each projection
3161        for proj in &projections {
3162            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3163            assert_eq!(state["projection"], *proj);
3164        }
3165    }
3166}