Skip to main content

allsource_core/infrastructure/web/
api.rs

1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4    application::{
5        dto::{
6            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11        },
12        services::{
13            analytics::{
14                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16            },
17            pipeline::{PipelineConfig, PipelineStats},
18            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19            schema::{
20                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21                ValidateEventRequest, ValidateEventResponse,
22            },
23            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24        },
25    },
26    domain::entities::Event,
27    error::Result,
28    infrastructure::{
29        persistence::{
30            compaction::CompactionResult,
31            snapshot::{
32                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33                ListSnapshotsResponse, SnapshotInfo,
34            },
35        },
36        query::{
37            geospatial::GeoQueryRequest,
38            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39        },
40        web::api_v1::AppState,
41    },
42    store::{EventStore, EventTypeInfo, StreamInfo},
43};
44use axum::{
45    Json, Router,
46    extract::{Path, Query, State, WebSocketUpgrade},
47    response::{IntoResponse, Response},
48    routing::{get, post, put},
49};
50use serde::Deserialize;
51use std::sync::Arc;
52use tower_http::{
53    cors::{Any, CorsLayer},
54    trace::TraceLayer,
55};
56
57type SharedStore = Arc<EventStore>;
58
59/// Wait for follower ACK(s) in semi-sync/sync replication modes.
60///
61/// In async mode (default), returns immediately. In semi-sync mode, waits for
62/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
63/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
64#[cfg(feature = "replication")]
65async fn await_replication_ack(state: &AppState) {
66    let shipper_guard = state.wal_shipper.read().await;
67    if let Some(ref shipper) = *shipper_guard {
68        let mode = shipper.replication_mode();
69        if mode == ReplicationMode::Async {
70            return;
71        }
72
73        let target_offset = shipper.current_leader_offset();
74        if target_offset == 0 {
75            return;
76        }
77
78        let shipper = Arc::clone(shipper);
79        // Drop the read guard before the async wait to avoid holding it across await
80        drop(shipper_guard);
81
82        let timer = state
83            .store
84            .metrics()
85            .replication_ack_wait_seconds
86            .start_timer();
87        let acked = shipper.wait_for_ack(target_offset).await;
88        timer.observe_duration();
89
90        if !acked {
91            tracing::warn!(
92                "Replication ACK timeout in {} mode (offset {}). \
93                 Write succeeded locally but follower confirmation pending.",
94                mode,
95                target_offset,
96            );
97        }
98    }
99}
100#[cfg(not(feature = "replication"))]
101async fn await_replication_ack(_state: &AppState) {
102    // No-op in community edition
103}
104
105pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
106    let app = Router::new()
107        .route("/health", get(health))
108        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
109        .route("/api/v1/events", post(ingest_event))
110        .route("/api/v1/events/batch", post(ingest_events_batch))
111        .route("/api/v1/events/query", get(query_events))
112        .route("/api/v1/events/{event_id}", get(get_event_by_id))
113        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
114        // v0.10: Stream and event type discovery endpoints
115        .route("/api/v1/streams", get(list_streams))
116        .route("/api/v1/event-types", get(list_event_types))
117        .route("/api/v1/entities/duplicates", get(detect_duplicates))
118        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
119        .route(
120            "/api/v1/entities/{entity_id}/snapshot",
121            get(get_entity_snapshot),
122        )
123        .route("/api/v1/stats", get(get_stats))
124        // v0.2: Advanced analytics endpoints
125        .route("/api/v1/analytics/frequency", get(analytics_frequency))
126        .route("/api/v1/analytics/summary", get(analytics_summary))
127        .route("/api/v1/analytics/correlation", get(analytics_correlation))
128        // v0.2: Snapshot management endpoints
129        .route("/api/v1/snapshots", post(create_snapshot))
130        .route("/api/v1/snapshots", get(list_snapshots))
131        .route(
132            "/api/v1/snapshots/{entity_id}/latest",
133            get(get_latest_snapshot),
134        )
135        // v0.2: Compaction endpoints
136        .route("/api/v1/compaction/trigger", post(trigger_compaction))
137        .route("/api/v1/compaction/stats", get(compaction_stats))
138        // v0.5: Schema registry endpoints
139        .route("/api/v1/schemas", post(register_schema))
140        .route("/api/v1/schemas", get(list_subjects))
141        .route("/api/v1/schemas/{subject}", get(get_schema))
142        .route(
143            "/api/v1/schemas/{subject}/versions",
144            get(list_schema_versions),
145        )
146        .route("/api/v1/schemas/validate", post(validate_event_schema))
147        .route(
148            "/api/v1/schemas/{subject}/compatibility",
149            put(set_compatibility_mode),
150        )
151        // v0.5: Replay and projection rebuild endpoints
152        .route("/api/v1/replay", post(start_replay))
153        .route("/api/v1/replay", get(list_replays))
154        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
155        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
156        .route(
157            "/api/v1/replay/{replay_id}",
158            axum::routing::delete(delete_replay),
159        )
160        // v0.5: Stream processing pipeline endpoints
161        .route("/api/v1/pipelines", post(register_pipeline))
162        .route("/api/v1/pipelines", get(list_pipelines))
163        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
164        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
165        .route(
166            "/api/v1/pipelines/{pipeline_id}",
167            axum::routing::delete(remove_pipeline),
168        )
169        .route(
170            "/api/v1/pipelines/{pipeline_id}/stats",
171            get(get_pipeline_stats),
172        )
173        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
174        // v0.7: Projection State API for Query Service integration
175        .route("/api/v1/projections", get(list_projections))
176        .route("/api/v1/projections/{name}", get(get_projection))
177        .route(
178            "/api/v1/projections/{name}",
179            axum::routing::delete(delete_projection),
180        )
181        .route(
182            "/api/v1/projections/{name}/state",
183            get(get_projection_state_summary),
184        )
185        .route("/api/v1/projections/{name}/reset", post(reset_projection))
186        .route(
187            "/api/v1/projections/{name}/{entity_id}/state",
188            get(get_projection_state),
189        )
190        .route(
191            "/api/v1/projections/{name}/{entity_id}/state",
192            post(save_projection_state),
193        )
194        .route(
195            "/api/v1/projections/{name}/{entity_id}/state",
196            put(save_projection_state),
197        )
198        .route(
199            "/api/v1/projections/{name}/bulk",
200            post(bulk_get_projection_states),
201        )
202        .route(
203            "/api/v1/projections/{name}/bulk/save",
204            post(bulk_save_projection_states),
205        )
206        // v0.11: Webhook management endpoints
207        .route("/api/v1/webhooks", post(register_webhook))
208        .route("/api/v1/webhooks", get(list_webhooks))
209        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
210        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
211        .route(
212            "/api/v1/webhooks/{webhook_id}",
213            axum::routing::delete(delete_webhook),
214        )
215        .route(
216            "/api/v1/webhooks/{webhook_id}/deliveries",
217            get(list_webhook_deliveries),
218        )
219        // v2.0: Advanced query features
220        .route("/api/v1/graphql", post(graphql_query))
221        .route("/api/v1/geospatial/query", post(geo_query))
222        .route("/api/v1/geospatial/stats", get(geo_stats))
223        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
224        .route(
225            "/api/v1/schema-evolution/history/{event_type}",
226            get(schema_evolution_history),
227        )
228        .route(
229            "/api/v1/schema-evolution/schema/{event_type}",
230            get(schema_evolution_schema),
231        )
232        .route(
233            "/api/v1/schema-evolution/stats",
234            get(schema_evolution_stats),
235        )
236        .layer(
237            CorsLayer::new()
238                .allow_origin(Any)
239                .allow_methods(Any)
240                .allow_headers(Any),
241        )
242        .layer(TraceLayer::new_for_http())
243        .with_state(store);
244
245    let listener = tokio::net::TcpListener::bind(addr).await?;
246    axum::serve(listener, app).await?;
247
248    Ok(())
249}
250
251pub async fn health() -> impl IntoResponse {
252    Json(serde_json::json!({
253        "status": "healthy",
254        "service": "allsource-core",
255        "version": env!("CARGO_PKG_VERSION")
256    }))
257}
258
259// v0.6: Prometheus metrics endpoint
260pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
261    let metrics = store.metrics();
262
263    match metrics.encode() {
264        Ok(encoded) => Response::builder()
265            .status(200)
266            .header("Content-Type", "text/plain; version=0.0.4")
267            .body(encoded)
268            .unwrap()
269            .into_response(),
270        Err(e) => Response::builder()
271            .status(500)
272            .body(format!("Error encoding metrics: {e}"))
273            .unwrap()
274            .into_response(),
275    }
276}
277
278pub async fn ingest_event(
279    State(store): State<SharedStore>,
280    Json(req): Json<IngestEventRequest>,
281) -> Result<Json<IngestEventResponse>> {
282    let expected_version = req.expected_version;
283
284    // Create event using from_strings with default tenant
285    let event = Event::from_strings(
286        req.event_type,
287        req.entity_id,
288        "default".to_string(),
289        req.payload,
290        req.metadata,
291    )?;
292
293    let event_id = event.id;
294    let timestamp = event.timestamp;
295
296    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
297
298    tracing::info!("Event ingested: {}", event_id);
299
300    Ok(Json(IngestEventResponse {
301        event_id,
302        timestamp,
303        version: Some(new_version),
304    }))
305}
306
307/// Ingest a single event with semi-sync/sync replication ACK waiting.
308///
309/// Used by the v1 API (with auth and replication support).
310pub async fn ingest_event_v1(
311    State(state): State<AppState>,
312    Json(req): Json<IngestEventRequest>,
313) -> Result<Json<IngestEventResponse>> {
314    let expected_version = req.expected_version;
315
316    let event = Event::from_strings(
317        req.event_type,
318        req.entity_id,
319        "default".to_string(),
320        req.payload,
321        req.metadata,
322    )?;
323
324    let event_id = event.id;
325    let timestamp = event.timestamp;
326
327    let new_version = state
328        .store
329        .ingest_with_expected_version(&event, expected_version)?;
330
331    // Semi-sync/sync: wait for follower ACK(s) before returning
332    await_replication_ack(&state).await;
333
334    tracing::info!("Event ingested: {}", event_id);
335
336    Ok(Json(IngestEventResponse {
337        event_id,
338        timestamp,
339        version: Some(new_version),
340    }))
341}
342
343/// Batch ingest multiple events in a single request
344///
345/// This endpoint allows ingesting multiple events atomically, which is more
346/// efficient than making individual requests for each event.
347pub async fn ingest_events_batch(
348    State(store): State<SharedStore>,
349    Json(req): Json<IngestEventsBatchRequest>,
350) -> Result<Json<IngestEventsBatchResponse>> {
351    let total = req.events.len();
352    let mut ingested_events = Vec::with_capacity(total);
353
354    for event_req in req.events {
355        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
356        let expected_version = event_req.expected_version;
357
358        let event = Event::from_strings(
359            event_req.event_type,
360            event_req.entity_id,
361            tenant_id,
362            event_req.payload,
363            event_req.metadata,
364        )?;
365
366        let event_id = event.id;
367        let timestamp = event.timestamp;
368
369        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
370
371        ingested_events.push(IngestEventResponse {
372            event_id,
373            timestamp,
374            version: Some(new_version),
375        });
376    }
377
378    let ingested = ingested_events.len();
379    tracing::info!("Batch ingested {} events", ingested);
380
381    Ok(Json(IngestEventsBatchResponse {
382        total,
383        ingested,
384        events: ingested_events,
385    }))
386}
387
388/// Batch ingest with semi-sync/sync replication ACK waiting.
389///
390/// Used by the v1 API (with auth and replication support).
391pub async fn ingest_events_batch_v1(
392    State(state): State<AppState>,
393    Json(req): Json<IngestEventsBatchRequest>,
394) -> Result<Json<IngestEventsBatchResponse>> {
395    let total = req.events.len();
396    let mut ingested_events = Vec::with_capacity(total);
397
398    for event_req in req.events {
399        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
400        let expected_version = event_req.expected_version;
401
402        let event = Event::from_strings(
403            event_req.event_type,
404            event_req.entity_id,
405            tenant_id,
406            event_req.payload,
407            event_req.metadata,
408        )?;
409
410        let event_id = event.id;
411        let timestamp = event.timestamp;
412
413        let new_version = state
414            .store
415            .ingest_with_expected_version(&event, expected_version)?;
416
417        ingested_events.push(IngestEventResponse {
418            event_id,
419            timestamp,
420            version: Some(new_version),
421        });
422    }
423
424    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
425    await_replication_ack(&state).await;
426
427    let ingested = ingested_events.len();
428    tracing::info!("Batch ingested {} events", ingested);
429
430    Ok(Json(IngestEventsBatchResponse {
431        total,
432        ingested,
433        events: ingested_events,
434    }))
435}
436
437pub async fn query_events(
438    State(store): State<SharedStore>,
439    Query(req): Query<QueryEventsRequest>,
440) -> Result<Json<QueryEventsResponse>> {
441    let requested_limit = req.limit;
442    let queried_entity_id = req.entity_id.clone();
443
444    // Query without limit to get total count
445    let unlimited_req = QueryEventsRequest {
446        entity_id: req.entity_id,
447        event_type: req.event_type,
448        tenant_id: req.tenant_id,
449        as_of: req.as_of,
450        since: req.since,
451        until: req.until,
452        limit: None,
453        event_type_prefix: req.event_type_prefix,
454        payload_filter: req.payload_filter,
455    };
456    let all_events = store.query(&unlimited_req)?;
457    let total_count = all_events.len();
458
459    // Apply limit
460    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
461        all_events.into_iter().take(limit).collect()
462    } else {
463        all_events
464    };
465
466    let count = limited_events.len();
467    let has_more = count < total_count;
468    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
469
470    // Include entity_version only when filtering by a single entity_id
471    let entity_version = queried_entity_id
472        .as_deref()
473        .map(|eid| store.get_entity_version(eid));
474
475    tracing::debug!("Query returned {} events (total: {})", count, total_count);
476
477    Ok(Json(QueryEventsResponse {
478        events,
479        count,
480        total_count,
481        has_more,
482        entity_version,
483    }))
484}
485
486pub async fn list_entities(
487    State(store): State<SharedStore>,
488    Query(req): Query<ListEntitiesRequest>,
489) -> Result<Json<ListEntitiesResponse>> {
490    use std::collections::HashMap;
491
492    // Get all events matching the filters
493    let query_req = QueryEventsRequest {
494        entity_id: None,
495        event_type: None,
496        tenant_id: None,
497        as_of: None,
498        since: None,
499        until: None,
500        limit: None,
501        event_type_prefix: req.event_type_prefix,
502        payload_filter: req.payload_filter,
503    };
504    let events = store.query(&query_req)?;
505
506    // Group by entity_id
507    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
508    for event in &events {
509        entity_map
510            .entry(event.entity_id().to_string())
511            .or_default()
512            .push(event);
513    }
514
515    // Build entity summaries sorted by last event time (descending)
516    let mut summaries: Vec<EntitySummary> = entity_map
517        .into_iter()
518        .map(|(entity_id, events)| {
519            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
520            EntitySummary {
521                entity_id,
522                event_count: events.len(),
523                last_event_type: last.event_type_str().to_string(),
524                last_event_at: last.timestamp(),
525            }
526        })
527        .collect();
528    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
529
530    let total = summaries.len();
531
532    // Apply offset and limit
533    let offset = req.offset.unwrap_or(0);
534    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
535    let summaries = if let Some(limit) = req.limit {
536        let has_more = summaries.len() > limit;
537        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
538        return Ok(Json(ListEntitiesResponse {
539            entities: truncated,
540            total,
541            has_more,
542        }));
543    } else {
544        summaries
545    };
546
547    Ok(Json(ListEntitiesResponse {
548        entities: summaries,
549        total,
550        has_more: false,
551    }))
552}
553
554pub async fn detect_duplicates(
555    State(store): State<SharedStore>,
556    Query(req): Query<DetectDuplicatesRequest>,
557) -> Result<Json<DetectDuplicatesResponse>> {
558    use std::collections::HashMap;
559
560    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
561
562    // Query events scoped by the required prefix
563    let query_req = QueryEventsRequest {
564        entity_id: None,
565        event_type: None,
566        tenant_id: None,
567        as_of: None,
568        since: None,
569        until: None,
570        limit: None,
571        event_type_prefix: Some(req.event_type_prefix),
572        payload_filter: None,
573    };
574    let events = store.query(&query_req)?;
575
576    // For each entity, extract the latest event's payload fields specified by group_by
577    // Then group entities by those field values
578    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
579    for event in &events {
580        let eid = event.entity_id().to_string();
581        entity_latest
582            .entry(eid)
583            .and_modify(|existing| {
584                if event.timestamp() > existing.timestamp() {
585                    *existing = event;
586                }
587            })
588            .or_insert(event);
589    }
590
591    // Group entities by their payload field values
592    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
593    for (entity_id, event) in &entity_latest {
594        let payload = event.payload();
595        let mut key_parts = serde_json::Map::new();
596        for field in &group_by_fields {
597            let value = payload
598                .get(*field)
599                .cloned()
600                .unwrap_or(serde_json::Value::Null);
601            key_parts.insert((*field).to_string(), value);
602        }
603        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
604        groups.entry(key_str).or_default().push(entity_id.clone());
605    }
606
607    // Filter to groups with count > 1 (actual duplicates)
608    let mut duplicate_groups: Vec<DuplicateGroup> = groups
609        .into_iter()
610        .filter(|(_, ids)| ids.len() > 1)
611        .map(|(key_str, mut ids)| {
612            ids.sort();
613            let key: serde_json::Value =
614                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
615            let count = ids.len();
616            DuplicateGroup {
617                key,
618                entity_ids: ids,
619                count,
620            }
621        })
622        .collect();
623
624    // Sort by count descending for consistent output
625    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
626
627    let total = duplicate_groups.len();
628
629    // Apply offset and limit
630    let offset = req.offset.unwrap_or(0);
631    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
632
633    if let Some(limit) = req.limit {
634        let has_more = duplicate_groups.len() > limit;
635        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
636        return Ok(Json(DetectDuplicatesResponse {
637            duplicates: truncated,
638            total,
639            has_more,
640        }));
641    }
642
643    Ok(Json(DetectDuplicatesResponse {
644        duplicates: duplicate_groups,
645        total,
646        has_more: false,
647    }))
648}
649
650#[derive(Deserialize)]
651pub struct EntityStateParams {
652    as_of: Option<chrono::DateTime<chrono::Utc>>,
653}
654
655pub async fn get_entity_state(
656    State(store): State<SharedStore>,
657    Path(entity_id): Path<String>,
658    Query(params): Query<EntityStateParams>,
659) -> Result<Json<serde_json::Value>> {
660    let state = store.reconstruct_state(&entity_id, params.as_of)?;
661
662    tracing::info!("State reconstructed for entity: {}", entity_id);
663
664    Ok(Json(state))
665}
666
667pub async fn get_entity_snapshot(
668    State(store): State<SharedStore>,
669    Path(entity_id): Path<String>,
670) -> Result<Json<serde_json::Value>> {
671    let snapshot = store.get_snapshot(&entity_id)?;
672
673    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
674
675    Ok(Json(snapshot))
676}
677
678pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
679    let stats = store.stats();
680    Json(stats)
681}
682
683// v0.10: List all streams (entity_ids) in the event store
684/// Query parameters for listing streams
685#[derive(Debug, Deserialize)]
686pub struct ListStreamsParams {
687    /// Optional limit on number of streams to return
688    pub limit: Option<usize>,
689    /// Optional offset for pagination
690    pub offset: Option<usize>,
691}
692
693/// Response for listing streams
694#[derive(Debug, serde::Serialize)]
695pub struct ListStreamsResponse {
696    pub streams: Vec<StreamInfo>,
697    pub total: usize,
698}
699
700pub async fn list_streams(
701    State(store): State<SharedStore>,
702    Query(params): Query<ListStreamsParams>,
703) -> Json<ListStreamsResponse> {
704    let mut streams = store.list_streams();
705    let total = streams.len();
706
707    // Sort by last_event_at descending (most recent first)
708    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
709
710    // Apply pagination
711    if let Some(offset) = params.offset {
712        if offset < streams.len() {
713            streams = streams[offset..].to_vec();
714        } else {
715            streams = vec![];
716        }
717    }
718
719    if let Some(limit) = params.limit {
720        streams.truncate(limit);
721    }
722
723    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
724
725    Json(ListStreamsResponse { streams, total })
726}
727
728// v0.10: List all event types in the event store
729/// Query parameters for listing event types
730#[derive(Debug, Deserialize)]
731pub struct ListEventTypesParams {
732    /// Optional limit on number of event types to return
733    pub limit: Option<usize>,
734    /// Optional offset for pagination
735    pub offset: Option<usize>,
736}
737
738/// Response for listing event types
739#[derive(Debug, serde::Serialize)]
740pub struct ListEventTypesResponse {
741    pub event_types: Vec<EventTypeInfo>,
742    pub total: usize,
743}
744
745pub async fn list_event_types(
746    State(store): State<SharedStore>,
747    Query(params): Query<ListEventTypesParams>,
748) -> Json<ListEventTypesResponse> {
749    let mut event_types = store.list_event_types();
750    let total = event_types.len();
751
752    // Sort by event_count descending (most used first)
753    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
754
755    // Apply pagination
756    if let Some(offset) = params.offset {
757        if offset < event_types.len() {
758            event_types = event_types[offset..].to_vec();
759        } else {
760            event_types = vec![];
761        }
762    }
763
764    if let Some(limit) = params.limit {
765        event_types.truncate(limit);
766    }
767
768    tracing::debug!(
769        "Listed {} event types (total: {})",
770        event_types.len(),
771        total
772    );
773
774    Json(ListEventTypesResponse { event_types, total })
775}
776
777// v0.2: WebSocket endpoint for real-time event streaming
778#[derive(Debug, Deserialize)]
779pub struct WebSocketParams {
780    pub consumer_id: Option<String>,
781}
782
783pub async fn events_websocket(
784    ws: WebSocketUpgrade,
785    State(store): State<SharedStore>,
786    Query(params): Query<WebSocketParams>,
787) -> Response {
788    let websocket_manager = store.websocket_manager();
789
790    ws.on_upgrade(move |socket| async move {
791        if let Some(consumer_id) = params.consumer_id {
792            websocket_manager
793                .handle_socket_with_consumer(socket, consumer_id, store)
794                .await;
795        } else {
796            websocket_manager.handle_socket(socket).await;
797        }
798    })
799}
800
801// v0.2: Event frequency analytics endpoint
802pub async fn analytics_frequency(
803    State(store): State<SharedStore>,
804    Query(req): Query<EventFrequencyRequest>,
805) -> Result<Json<EventFrequencyResponse>> {
806    let response = AnalyticsEngine::event_frequency(&store, &req)?;
807
808    tracing::debug!(
809        "Frequency analysis returned {} buckets",
810        response.buckets.len()
811    );
812
813    Ok(Json(response))
814}
815
816// v0.2: Statistical summary endpoint
817pub async fn analytics_summary(
818    State(store): State<SharedStore>,
819    Query(req): Query<StatsSummaryRequest>,
820) -> Result<Json<StatsSummaryResponse>> {
821    let response = AnalyticsEngine::stats_summary(&store, &req)?;
822
823    tracing::debug!(
824        "Stats summary: {} events across {} entities",
825        response.total_events,
826        response.unique_entities
827    );
828
829    Ok(Json(response))
830}
831
832// v0.2: Event correlation analysis endpoint
833pub async fn analytics_correlation(
834    State(store): State<SharedStore>,
835    Query(req): Query<CorrelationRequest>,
836) -> Result<Json<CorrelationResponse>> {
837    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
838
839    tracing::debug!(
840        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
841        response.correlated_pairs,
842        response.total_a,
843        response.correlation_percentage
844    );
845
846    Ok(Json(response))
847}
848
849// v0.2: Create a snapshot for an entity
850pub async fn create_snapshot(
851    State(store): State<SharedStore>,
852    Json(req): Json<CreateSnapshotRequest>,
853) -> Result<Json<CreateSnapshotResponse>> {
854    store.create_snapshot(&req.entity_id)?;
855
856    let snapshot_manager = store.snapshot_manager();
857    let snapshot = snapshot_manager
858        .get_latest_snapshot(&req.entity_id)
859        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
860
861    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
862
863    Ok(Json(CreateSnapshotResponse {
864        snapshot_id: snapshot.id,
865        entity_id: snapshot.entity_id,
866        created_at: snapshot.created_at,
867        event_count: snapshot.event_count,
868        size_bytes: snapshot.metadata.size_bytes,
869    }))
870}
871
872// v0.2: List snapshots
873pub async fn list_snapshots(
874    State(store): State<SharedStore>,
875    Query(req): Query<ListSnapshotsRequest>,
876) -> Result<Json<ListSnapshotsResponse>> {
877    let snapshot_manager = store.snapshot_manager();
878
879    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
880        snapshot_manager
881            .get_all_snapshots(&entity_id)
882            .into_iter()
883            .map(SnapshotInfo::from)
884            .collect()
885    } else {
886        // List all entities with snapshots
887        let entities = snapshot_manager.list_entities();
888        entities
889            .iter()
890            .flat_map(|entity_id| {
891                snapshot_manager
892                    .get_all_snapshots(entity_id)
893                    .into_iter()
894                    .map(SnapshotInfo::from)
895            })
896            .collect()
897    };
898
899    let total = snapshots.len();
900
901    tracing::debug!("Listed {} snapshots", total);
902
903    Ok(Json(ListSnapshotsResponse { snapshots, total }))
904}
905
906// v0.2: Get latest snapshot for an entity
907pub async fn get_latest_snapshot(
908    State(store): State<SharedStore>,
909    Path(entity_id): Path<String>,
910) -> Result<Json<serde_json::Value>> {
911    let snapshot_manager = store.snapshot_manager();
912
913    let snapshot = snapshot_manager
914        .get_latest_snapshot(&entity_id)
915        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
916
917    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
918
919    Ok(Json(serde_json::json!({
920        "snapshot_id": snapshot.id,
921        "entity_id": snapshot.entity_id,
922        "created_at": snapshot.created_at,
923        "as_of": snapshot.as_of,
924        "event_count": snapshot.event_count,
925        "size_bytes": snapshot.metadata.size_bytes,
926        "snapshot_type": snapshot.metadata.snapshot_type,
927        "state": snapshot.state
928    })))
929}
930
931// v0.2: Trigger manual compaction
932pub async fn trigger_compaction(
933    State(store): State<SharedStore>,
934) -> Result<Json<CompactionResult>> {
935    let compaction_manager = store.compaction_manager().ok_or_else(|| {
936        crate::error::AllSourceError::InternalError(
937            "Compaction not enabled (no Parquet storage)".to_string(),
938        )
939    })?;
940
941    tracing::info!("📦 Manual compaction triggered via API");
942
943    let result = compaction_manager.compact_now()?;
944
945    Ok(Json(result))
946}
947
948// v0.2: Get compaction statistics
949pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
950    let compaction_manager = store.compaction_manager().ok_or_else(|| {
951        crate::error::AllSourceError::InternalError(
952            "Compaction not enabled (no Parquet storage)".to_string(),
953        )
954    })?;
955
956    let stats = compaction_manager.stats();
957    let config = compaction_manager.config();
958
959    Ok(Json(serde_json::json!({
960        "stats": stats,
961        "config": {
962            "min_files_to_compact": config.min_files_to_compact,
963            "target_file_size": config.target_file_size,
964            "max_file_size": config.max_file_size,
965            "small_file_threshold": config.small_file_threshold,
966            "compaction_interval_seconds": config.compaction_interval_seconds,
967            "auto_compact": config.auto_compact,
968            "strategy": config.strategy
969        }
970    })))
971}
972
973// v0.5: Register a new schema
974pub async fn register_schema(
975    State(store): State<SharedStore>,
976    Json(req): Json<RegisterSchemaRequest>,
977) -> Result<Json<RegisterSchemaResponse>> {
978    let schema_registry = store.schema_registry();
979
980    let response =
981        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
982
983    tracing::info!(
984        "📋 Schema registered: v{} for '{}'",
985        response.version,
986        response.subject
987    );
988
989    Ok(Json(response))
990}
991
992// v0.5: Get a schema by subject and optional version
993#[derive(Deserialize)]
994pub struct GetSchemaParams {
995    version: Option<u32>,
996}
997
998pub async fn get_schema(
999    State(store): State<SharedStore>,
1000    Path(subject): Path<String>,
1001    Query(params): Query<GetSchemaParams>,
1002) -> Result<Json<serde_json::Value>> {
1003    let schema_registry = store.schema_registry();
1004
1005    let schema = schema_registry.get_schema(&subject, params.version)?;
1006
1007    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1008
1009    Ok(Json(serde_json::json!({
1010        "id": schema.id,
1011        "subject": schema.subject,
1012        "version": schema.version,
1013        "schema": schema.schema,
1014        "created_at": schema.created_at,
1015        "description": schema.description,
1016        "tags": schema.tags
1017    })))
1018}
1019
1020// v0.5: List all versions of a schema subject
1021pub async fn list_schema_versions(
1022    State(store): State<SharedStore>,
1023    Path(subject): Path<String>,
1024) -> Result<Json<serde_json::Value>> {
1025    let schema_registry = store.schema_registry();
1026
1027    let versions = schema_registry.list_versions(&subject)?;
1028
1029    Ok(Json(serde_json::json!({
1030        "subject": subject,
1031        "versions": versions
1032    })))
1033}
1034
1035// v0.5: List all schema subjects
1036pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1037    let schema_registry = store.schema_registry();
1038
1039    let subjects = schema_registry.list_subjects();
1040
1041    Json(serde_json::json!({
1042        "subjects": subjects,
1043        "total": subjects.len()
1044    }))
1045}
1046
1047// v0.5: Validate an event against a schema
1048pub async fn validate_event_schema(
1049    State(store): State<SharedStore>,
1050    Json(req): Json<ValidateEventRequest>,
1051) -> Result<Json<ValidateEventResponse>> {
1052    let schema_registry = store.schema_registry();
1053
1054    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1055
1056    if response.valid {
1057        tracing::debug!(
1058            "✅ Event validated against schema '{}' v{}",
1059            req.subject,
1060            response.schema_version
1061        );
1062    } else {
1063        tracing::warn!(
1064            "❌ Event validation failed for '{}': {:?}",
1065            req.subject,
1066            response.errors
1067        );
1068    }
1069
1070    Ok(Json(response))
1071}
1072
1073// v0.5: Set compatibility mode for a subject
1074#[derive(Deserialize)]
1075pub struct SetCompatibilityRequest {
1076    compatibility: CompatibilityMode,
1077}
1078
1079pub async fn set_compatibility_mode(
1080    State(store): State<SharedStore>,
1081    Path(subject): Path<String>,
1082    Json(req): Json<SetCompatibilityRequest>,
1083) -> Json<serde_json::Value> {
1084    let schema_registry = store.schema_registry();
1085
1086    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1087
1088    tracing::info!(
1089        "🔧 Set compatibility mode for '{}' to {:?}",
1090        subject,
1091        req.compatibility
1092    );
1093
1094    Json(serde_json::json!({
1095        "subject": subject,
1096        "compatibility": req.compatibility
1097    }))
1098}
1099
1100// v0.5: Start a replay operation
1101pub async fn start_replay(
1102    State(store): State<SharedStore>,
1103    Json(req): Json<StartReplayRequest>,
1104) -> Result<Json<StartReplayResponse>> {
1105    let replay_manager = store.replay_manager();
1106
1107    let response = replay_manager.start_replay(store, req)?;
1108
1109    tracing::info!(
1110        "🔄 Started replay {} with {} events",
1111        response.replay_id,
1112        response.total_events
1113    );
1114
1115    Ok(Json(response))
1116}
1117
1118// v0.5: Get replay progress
1119pub async fn get_replay_progress(
1120    State(store): State<SharedStore>,
1121    Path(replay_id): Path<uuid::Uuid>,
1122) -> Result<Json<ReplayProgress>> {
1123    let replay_manager = store.replay_manager();
1124
1125    let progress = replay_manager.get_progress(replay_id)?;
1126
1127    Ok(Json(progress))
1128}
1129
1130// v0.5: List all replay operations
1131pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1132    let replay_manager = store.replay_manager();
1133
1134    let replays = replay_manager.list_replays();
1135
1136    Json(serde_json::json!({
1137        "replays": replays,
1138        "total": replays.len()
1139    }))
1140}
1141
1142// v0.5: Cancel a running replay
1143pub async fn cancel_replay(
1144    State(store): State<SharedStore>,
1145    Path(replay_id): Path<uuid::Uuid>,
1146) -> Result<Json<serde_json::Value>> {
1147    let replay_manager = store.replay_manager();
1148
1149    replay_manager.cancel_replay(replay_id)?;
1150
1151    tracing::info!("🛑 Cancelled replay {}", replay_id);
1152
1153    Ok(Json(serde_json::json!({
1154        "replay_id": replay_id,
1155        "status": "cancelled"
1156    })))
1157}
1158
1159// v0.5: Delete a completed replay
1160pub async fn delete_replay(
1161    State(store): State<SharedStore>,
1162    Path(replay_id): Path<uuid::Uuid>,
1163) -> Result<Json<serde_json::Value>> {
1164    let replay_manager = store.replay_manager();
1165
1166    let deleted = replay_manager.delete_replay(replay_id)?;
1167
1168    if deleted {
1169        tracing::info!("🗑️  Deleted replay {}", replay_id);
1170    }
1171
1172    Ok(Json(serde_json::json!({
1173        "replay_id": replay_id,
1174        "deleted": deleted
1175    })))
1176}
1177
1178// v0.5: Register a new pipeline
1179pub async fn register_pipeline(
1180    State(store): State<SharedStore>,
1181    Json(config): Json<PipelineConfig>,
1182) -> Result<Json<serde_json::Value>> {
1183    let pipeline_manager = store.pipeline_manager();
1184
1185    let pipeline_id = pipeline_manager.register(config.clone());
1186
1187    tracing::info!(
1188        "🔀 Pipeline registered: {} (name: {})",
1189        pipeline_id,
1190        config.name
1191    );
1192
1193    Ok(Json(serde_json::json!({
1194        "pipeline_id": pipeline_id,
1195        "name": config.name,
1196        "enabled": config.enabled
1197    })))
1198}
1199
1200// v0.5: List all pipelines
1201pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1202    let pipeline_manager = store.pipeline_manager();
1203
1204    let pipelines = pipeline_manager.list();
1205
1206    tracing::debug!("Listed {} pipelines", pipelines.len());
1207
1208    Json(serde_json::json!({
1209        "pipelines": pipelines,
1210        "total": pipelines.len()
1211    }))
1212}
1213
1214// v0.5: Get a specific pipeline
1215pub async fn get_pipeline(
1216    State(store): State<SharedStore>,
1217    Path(pipeline_id): Path<uuid::Uuid>,
1218) -> Result<Json<PipelineConfig>> {
1219    let pipeline_manager = store.pipeline_manager();
1220
1221    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1222        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1223    })?;
1224
1225    Ok(Json(pipeline.config().clone()))
1226}
1227
1228// v0.5: Remove a pipeline
1229pub async fn remove_pipeline(
1230    State(store): State<SharedStore>,
1231    Path(pipeline_id): Path<uuid::Uuid>,
1232) -> Result<Json<serde_json::Value>> {
1233    let pipeline_manager = store.pipeline_manager();
1234
1235    let removed = pipeline_manager.remove(pipeline_id);
1236
1237    if removed {
1238        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1239    }
1240
1241    Ok(Json(serde_json::json!({
1242        "pipeline_id": pipeline_id,
1243        "removed": removed
1244    })))
1245}
1246
1247// v0.5: Get statistics for all pipelines
1248pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1249    let pipeline_manager = store.pipeline_manager();
1250
1251    let stats = pipeline_manager.all_stats();
1252
1253    Json(serde_json::json!({
1254        "stats": stats,
1255        "total": stats.len()
1256    }))
1257}
1258
1259// v0.5: Get statistics for a specific pipeline
1260pub async fn get_pipeline_stats(
1261    State(store): State<SharedStore>,
1262    Path(pipeline_id): Path<uuid::Uuid>,
1263) -> Result<Json<PipelineStats>> {
1264    let pipeline_manager = store.pipeline_manager();
1265
1266    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1267        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1268    })?;
1269
1270    Ok(Json(pipeline.stats()))
1271}
1272
1273// v0.5: Reset a pipeline's state
1274pub async fn reset_pipeline(
1275    State(store): State<SharedStore>,
1276    Path(pipeline_id): Path<uuid::Uuid>,
1277) -> Result<Json<serde_json::Value>> {
1278    let pipeline_manager = store.pipeline_manager();
1279
1280    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1281        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1282    })?;
1283
1284    pipeline.reset();
1285
1286    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1287
1288    Ok(Json(serde_json::json!({
1289        "pipeline_id": pipeline_id,
1290        "reset": true
1291    })))
1292}
1293
1294// =============================================================================
1295// v0.11: Single Event Lookup by ID
1296// =============================================================================
1297
1298/// Get a single event by UUID
1299pub async fn get_event_by_id(
1300    State(store): State<SharedStore>,
1301    Path(event_id): Path<uuid::Uuid>,
1302) -> Result<Json<serde_json::Value>> {
1303    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1304        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1305    })?;
1306
1307    let dto = EventDto::from(&event);
1308
1309    tracing::debug!("Event retrieved by ID: {}", event_id);
1310
1311    Ok(Json(serde_json::json!({
1312        "event": dto,
1313        "found": true
1314    })))
1315}
1316
1317// =============================================================================
1318// v0.7: Projection State API for Query Service Integration
1319// =============================================================================
1320
1321/// List all registered projections
1322pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1323    let projection_manager = store.projection_manager();
1324    let status_map = store.projection_status();
1325
1326    let projections: Vec<serde_json::Value> = projection_manager
1327        .list_projections()
1328        .iter()
1329        .map(|(name, projection)| {
1330            let status = status_map
1331                .get(name)
1332                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1333            serde_json::json!({
1334                "name": name,
1335                "type": format!("{:?}", projection.name()),
1336                "status": status,
1337            })
1338        })
1339        .collect();
1340
1341    tracing::debug!("Listed {} projections", projections.len());
1342
1343    Json(serde_json::json!({
1344        "projections": projections,
1345        "total": projections.len()
1346    }))
1347}
1348
1349/// Get projection metadata by name
1350pub async fn get_projection(
1351    State(store): State<SharedStore>,
1352    Path(name): Path<String>,
1353) -> Result<Json<serde_json::Value>> {
1354    let projection_manager = store.projection_manager();
1355
1356    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1357        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1358    })?;
1359
1360    Ok(Json(serde_json::json!({
1361        "name": projection.name(),
1362        "found": true
1363    })))
1364}
1365
1366/// Get projection state for a specific entity
1367///
1368/// This endpoint allows the Elixir Query Service to fetch projection state
1369/// from the Rust Core for synchronization.
1370pub async fn get_projection_state(
1371    State(store): State<SharedStore>,
1372    Path((name, entity_id)): Path<(String, String)>,
1373) -> Result<Json<serde_json::Value>> {
1374    let projection_manager = store.projection_manager();
1375
1376    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1377        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1378    })?;
1379
1380    let state = projection.get_state(&entity_id);
1381
1382    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1383
1384    Ok(Json(serde_json::json!({
1385        "projection": name,
1386        "entity_id": entity_id,
1387        "state": state,
1388        "found": state.is_some()
1389    })))
1390}
1391
1392/// Delete (clear) a projection by name
1393///
1394/// Removes all state from the projection. The projection definition remains
1395/// registered but its accumulated state is cleared.
1396pub async fn delete_projection(
1397    State(store): State<SharedStore>,
1398    Path(name): Path<String>,
1399) -> Result<Json<serde_json::Value>> {
1400    let projection_manager = store.projection_manager();
1401
1402    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1403        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1404    })?;
1405
1406    projection.clear();
1407
1408    // Also clear any cached state for this projection
1409    let cache = store.projection_state_cache();
1410    let prefix = format!("{name}:");
1411    let keys_to_remove: Vec<String> = cache
1412        .iter()
1413        .filter(|entry| entry.key().starts_with(&prefix))
1414        .map(|entry| entry.key().clone())
1415        .collect();
1416    for key in keys_to_remove {
1417        cache.remove(&key);
1418    }
1419
1420    tracing::info!("Projection deleted (cleared): {}", name);
1421
1422    Ok(Json(serde_json::json!({
1423        "projection": name,
1424        "deleted": true
1425    })))
1426}
1427
1428/// Get aggregate projection state (all entities)
1429///
1430/// Returns summary information about a projection's state across all entities.
1431pub async fn get_projection_state_summary(
1432    State(store): State<SharedStore>,
1433    Path(name): Path<String>,
1434) -> Result<Json<serde_json::Value>> {
1435    let projection_manager = store.projection_manager();
1436
1437    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1438        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1439    })?;
1440
1441    // Collect cached states for this projection
1442    let cache = store.projection_state_cache();
1443    let prefix = format!("{name}:");
1444    let states: Vec<serde_json::Value> = cache
1445        .iter()
1446        .filter(|entry| entry.key().starts_with(&prefix))
1447        .map(|entry| {
1448            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1449            serde_json::json!({
1450                "entity_id": entity_id,
1451                "state": entry.value().clone()
1452            })
1453        })
1454        .collect();
1455
1456    let total = states.len();
1457
1458    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1459
1460    Ok(Json(serde_json::json!({
1461        "projection": name,
1462        "states": states,
1463        "total": total
1464    })))
1465}
1466
1467/// Reset a projection to its initial state
1468///
1469/// Clears all accumulated state and reprocesses events from the beginning.
1470pub async fn reset_projection(
1471    State(store): State<SharedStore>,
1472    Path(name): Path<String>,
1473) -> Result<Json<serde_json::Value>> {
1474    let reprocessed = store.reset_projection(&name)?;
1475
1476    tracing::info!(
1477        "Projection reset: {} ({} events reprocessed)",
1478        name,
1479        reprocessed
1480    );
1481
1482    Ok(Json(serde_json::json!({
1483        "projection": name,
1484        "reset": true,
1485        "events_reprocessed": reprocessed
1486    })))
1487}
1488
1489/// Pause a projection
1490///
1491/// Sets the projection status to "paused" so it stops processing new events.
1492pub async fn pause_projection(
1493    State(store): State<SharedStore>,
1494    Path(name): Path<String>,
1495) -> Result<Json<serde_json::Value>> {
1496    let projection_manager = store.projection_manager();
1497
1498    // Verify projection exists
1499    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1500        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1501    })?;
1502
1503    store
1504        .projection_status()
1505        .insert(name.clone(), "paused".to_string());
1506
1507    tracing::info!("Projection paused: {}", name);
1508
1509    Ok(Json(serde_json::json!({
1510        "projection": name,
1511        "status": "paused"
1512    })))
1513}
1514
1515/// Start (resume) a projection
1516///
1517/// Sets the projection status to "running" so it resumes processing events.
1518pub async fn start_projection(
1519    State(store): State<SharedStore>,
1520    Path(name): Path<String>,
1521) -> Result<Json<serde_json::Value>> {
1522    let projection_manager = store.projection_manager();
1523
1524    // Verify projection exists
1525    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1526        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1527    })?;
1528
1529    store
1530        .projection_status()
1531        .insert(name.clone(), "running".to_string());
1532
1533    tracing::info!("Projection started: {}", name);
1534
1535    Ok(Json(serde_json::json!({
1536        "projection": name,
1537        "status": "running"
1538    })))
1539}
1540
1541/// Request body for saving projection state
1542#[derive(Debug, Deserialize)]
1543pub struct SaveProjectionStateRequest {
1544    pub state: serde_json::Value,
1545}
1546
1547/// Save/update projection state for an entity
1548///
1549/// This endpoint allows external services (like Elixir Query Service) to
1550/// store computed projection state back to the Core for persistence.
1551pub async fn save_projection_state(
1552    State(store): State<SharedStore>,
1553    Path((name, entity_id)): Path<(String, String)>,
1554    Json(req): Json<SaveProjectionStateRequest>,
1555) -> Result<Json<serde_json::Value>> {
1556    let projection_cache = store.projection_state_cache();
1557
1558    // Store in the projection state cache
1559    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1560
1561    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1562
1563    Ok(Json(serde_json::json!({
1564        "projection": name,
1565        "entity_id": entity_id,
1566        "saved": true
1567    })))
1568}
1569
1570/// Bulk get projection states for multiple entities
1571///
1572/// Efficient endpoint for fetching multiple entity states in a single request.
1573#[derive(Debug, Deserialize)]
1574pub struct BulkGetStateRequest {
1575    pub entity_ids: Vec<String>,
1576}
1577
1578/// Bulk save projection states for multiple entities
1579///
1580/// Efficient endpoint for saving multiple entity states in a single request.
1581#[derive(Debug, Deserialize)]
1582pub struct BulkSaveStateRequest {
1583    pub states: Vec<BulkSaveStateItem>,
1584}
1585
1586#[derive(Debug, Deserialize)]
1587pub struct BulkSaveStateItem {
1588    pub entity_id: String,
1589    pub state: serde_json::Value,
1590}
1591
1592pub async fn bulk_get_projection_states(
1593    State(store): State<SharedStore>,
1594    Path(name): Path<String>,
1595    Json(req): Json<BulkGetStateRequest>,
1596) -> Result<Json<serde_json::Value>> {
1597    let projection_manager = store.projection_manager();
1598
1599    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1600        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1601    })?;
1602
1603    let states: Vec<serde_json::Value> = req
1604        .entity_ids
1605        .iter()
1606        .map(|entity_id| {
1607            let state = projection.get_state(entity_id);
1608            serde_json::json!({
1609                "entity_id": entity_id,
1610                "state": state,
1611                "found": state.is_some()
1612            })
1613        })
1614        .collect();
1615
1616    tracing::debug!(
1617        "Bulk projection state retrieved: {} entities from {}",
1618        states.len(),
1619        name
1620    );
1621
1622    Ok(Json(serde_json::json!({
1623        "projection": name,
1624        "states": states,
1625        "total": states.len()
1626    })))
1627}
1628
1629/// Bulk save projection states for multiple entities
1630///
1631/// This endpoint allows efficient batch saving of projection states,
1632/// critical for high-throughput event processing pipelines.
1633pub async fn bulk_save_projection_states(
1634    State(store): State<SharedStore>,
1635    Path(name): Path<String>,
1636    Json(req): Json<BulkSaveStateRequest>,
1637) -> Result<Json<serde_json::Value>> {
1638    let projection_cache = store.projection_state_cache();
1639
1640    let mut saved_count = 0;
1641    for item in &req.states {
1642        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1643        saved_count += 1;
1644    }
1645
1646    tracing::info!(
1647        "Bulk projection state saved: {} entities for {}",
1648        saved_count,
1649        name
1650    );
1651
1652    Ok(Json(serde_json::json!({
1653        "projection": name,
1654        "saved": saved_count,
1655        "total": req.states.len()
1656    })))
1657}
1658
1659// =============================================================================
1660// v0.11: Webhook Management API
1661// =============================================================================
1662
1663/// Query parameters for listing webhooks
1664#[derive(Debug, Deserialize)]
1665pub struct ListWebhooksParams {
1666    pub tenant_id: Option<String>,
1667}
1668
1669/// Register a new webhook subscription
1670pub async fn register_webhook(
1671    State(store): State<SharedStore>,
1672    Json(req): Json<RegisterWebhookRequest>,
1673) -> Json<serde_json::Value> {
1674    let registry = store.webhook_registry();
1675    let webhook = registry.register(req);
1676
1677    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1678
1679    Json(serde_json::json!({
1680        "webhook": webhook,
1681        "created": true
1682    }))
1683}
1684
1685/// List webhooks, optionally filtered by tenant_id
1686pub async fn list_webhooks(
1687    State(store): State<SharedStore>,
1688    Query(params): Query<ListWebhooksParams>,
1689) -> Json<serde_json::Value> {
1690    let registry = store.webhook_registry();
1691
1692    let webhooks = if let Some(tenant_id) = params.tenant_id {
1693        registry.list_by_tenant(&tenant_id)
1694    } else {
1695        // Without tenant filter, return empty (tenants should always filter)
1696        vec![]
1697    };
1698
1699    let total = webhooks.len();
1700
1701    Json(serde_json::json!({
1702        "webhooks": webhooks,
1703        "total": total
1704    }))
1705}
1706
1707/// Get a specific webhook by ID
1708pub async fn get_webhook(
1709    State(store): State<SharedStore>,
1710    Path(webhook_id): Path<uuid::Uuid>,
1711) -> Result<Json<serde_json::Value>> {
1712    let registry = store.webhook_registry();
1713
1714    let webhook = registry.get(webhook_id).ok_or_else(|| {
1715        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1716    })?;
1717
1718    Ok(Json(serde_json::json!({
1719        "webhook": webhook,
1720        "found": true
1721    })))
1722}
1723
1724/// Update a webhook subscription
1725pub async fn update_webhook(
1726    State(store): State<SharedStore>,
1727    Path(webhook_id): Path<uuid::Uuid>,
1728    Json(req): Json<UpdateWebhookRequest>,
1729) -> Result<Json<serde_json::Value>> {
1730    let registry = store.webhook_registry();
1731
1732    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1733        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1734    })?;
1735
1736    tracing::info!("Webhook updated: {}", webhook_id);
1737
1738    Ok(Json(serde_json::json!({
1739        "webhook": webhook,
1740        "updated": true
1741    })))
1742}
1743
1744/// Delete a webhook subscription
1745pub async fn delete_webhook(
1746    State(store): State<SharedStore>,
1747    Path(webhook_id): Path<uuid::Uuid>,
1748) -> Result<Json<serde_json::Value>> {
1749    let registry = store.webhook_registry();
1750
1751    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1752        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1753    })?;
1754
1755    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1756
1757    Ok(Json(serde_json::json!({
1758        "webhook_id": webhook_id,
1759        "deleted": true
1760    })))
1761}
1762
1763/// Query parameters for listing webhook deliveries
1764#[derive(Debug, Deserialize)]
1765pub struct ListDeliveriesParams {
1766    pub limit: Option<usize>,
1767}
1768
1769/// List delivery history for a webhook
1770pub async fn list_webhook_deliveries(
1771    State(store): State<SharedStore>,
1772    Path(webhook_id): Path<uuid::Uuid>,
1773    Query(params): Query<ListDeliveriesParams>,
1774) -> Result<Json<serde_json::Value>> {
1775    let registry = store.webhook_registry();
1776
1777    // Verify webhook exists
1778    registry.get(webhook_id).ok_or_else(|| {
1779        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1780    })?;
1781
1782    let limit = params.limit.unwrap_or(50);
1783    let deliveries = registry.get_deliveries(webhook_id, limit);
1784    let total = deliveries.len();
1785
1786    Ok(Json(serde_json::json!({
1787        "webhook_id": webhook_id,
1788        "deliveries": deliveries,
1789        "total": total
1790    })))
1791}
1792
1793// =============================================================================
1794// v2.0: Advanced Query Features
1795// =============================================================================
1796
1797/// EventQL: Execute SQL queries over events using DataFusion
1798#[cfg(feature = "analytics")]
1799pub async fn eventql_query(
1800    State(store): State<SharedStore>,
1801    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1802) -> Result<Json<serde_json::Value>> {
1803    let events = store.snapshot_events();
1804    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1805        Ok(response) => Ok(Json(serde_json::json!({
1806            "columns": response.columns,
1807            "rows": response.rows,
1808            "row_count": response.row_count,
1809        }))),
1810        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1811    }
1812}
1813
1814/// GraphQL: Execute GraphQL queries
1815pub async fn graphql_query(
1816    State(store): State<SharedStore>,
1817    Json(req): Json<GraphQLRequest>,
1818) -> Json<serde_json::Value> {
1819    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1820        Ok(f) => f,
1821        Err(e) => {
1822            return Json(
1823                serde_json::to_value(GraphQLResponse {
1824                    data: None,
1825                    errors: vec![GraphQLError { message: e }],
1826                })
1827                .unwrap(),
1828            );
1829        }
1830    };
1831
1832    let mut data = serde_json::Map::new();
1833    let mut errors = Vec::new();
1834
1835    for field in &fields {
1836        match field.name.as_str() {
1837            "events" => {
1838                let request = crate::application::dto::QueryEventsRequest {
1839                    entity_id: field.arguments.get("entity_id").cloned(),
1840                    event_type: field.arguments.get("event_type").cloned(),
1841                    tenant_id: field.arguments.get("tenant_id").cloned(),
1842                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1843                    as_of: None,
1844                    since: None,
1845                    until: None,
1846                    event_type_prefix: None,
1847                    payload_filter: None,
1848                };
1849                match store.query(&request) {
1850                    Ok(events) => {
1851                        let json_events: Vec<serde_json::Value> = events
1852                            .iter()
1853                            .map(|e| {
1854                                crate::infrastructure::query::graphql::event_to_json(
1855                                    e,
1856                                    &field.fields,
1857                                )
1858                            })
1859                            .collect();
1860                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1861                    }
1862                    Err(e) => errors.push(GraphQLError {
1863                        message: format!("events query failed: {e}"),
1864                    }),
1865                }
1866            }
1867            "event" => {
1868                if let Some(id_str) = field.arguments.get("id") {
1869                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1870                        match store.get_event_by_id(&id) {
1871                            Ok(Some(event)) => {
1872                                data.insert(
1873                                    "event".to_string(),
1874                                    crate::infrastructure::query::graphql::event_to_json(
1875                                        &event,
1876                                        &field.fields,
1877                                    ),
1878                                );
1879                            }
1880                            Ok(None) => {
1881                                data.insert("event".to_string(), serde_json::Value::Null);
1882                            }
1883                            Err(e) => errors.push(GraphQLError {
1884                                message: format!("event lookup failed: {e}"),
1885                            }),
1886                        }
1887                    } else {
1888                        errors.push(GraphQLError {
1889                            message: format!("Invalid UUID: {id_str}"),
1890                        });
1891                    }
1892                } else {
1893                    errors.push(GraphQLError {
1894                        message: "event query requires 'id' argument".to_string(),
1895                    });
1896                }
1897            }
1898            "projections" => {
1899                let pm = store.projection_manager();
1900                let names: Vec<serde_json::Value> = pm
1901                    .list_projections()
1902                    .iter()
1903                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1904                    .collect();
1905                data.insert("projections".to_string(), serde_json::Value::Array(names));
1906            }
1907            "stats" => {
1908                let stats = store.stats();
1909                data.insert(
1910                    "stats".to_string(),
1911                    serde_json::json!({
1912                        "total_events": stats.total_events,
1913                        "total_entities": stats.total_entities,
1914                        "total_event_types": stats.total_event_types,
1915                    }),
1916                );
1917            }
1918            "__schema" => {
1919                data.insert(
1920                    "__schema".to_string(),
1921                    crate::infrastructure::query::graphql::introspection_schema(),
1922                );
1923            }
1924            other => {
1925                errors.push(GraphQLError {
1926                    message: format!("Unknown field: {other}"),
1927                });
1928            }
1929        }
1930    }
1931
1932    Json(
1933        serde_json::to_value(GraphQLResponse {
1934            data: Some(serde_json::Value::Object(data)),
1935            errors,
1936        })
1937        .unwrap(),
1938    )
1939}
1940
1941/// Geospatial: Query events by location
1942pub async fn geo_query(
1943    State(store): State<SharedStore>,
1944    Json(req): Json<GeoQueryRequest>,
1945) -> Json<serde_json::Value> {
1946    let events = store.snapshot_events();
1947    let geo_index = store.geo_index();
1948    let results =
1949        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1950    let total = results.len();
1951    Json(serde_json::json!({
1952        "results": results,
1953        "total": total,
1954    }))
1955}
1956
1957/// Geospatial index stats
1958pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1959    let stats = store.geo_index().stats();
1960    Json(serde_json::json!(stats))
1961}
1962
1963/// Exactly-once processing stats
1964pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1965    let stats = store.exactly_once().stats();
1966    Json(serde_json::json!(stats))
1967}
1968
1969/// Schema evolution history for an event type
1970pub async fn schema_evolution_history(
1971    State(store): State<SharedStore>,
1972    Path(event_type): Path<String>,
1973) -> Json<serde_json::Value> {
1974    let mgr = store.schema_evolution();
1975    let history = mgr.get_history(&event_type);
1976    let version = mgr.get_version(&event_type);
1977    Json(serde_json::json!({
1978        "event_type": event_type,
1979        "current_version": version,
1980        "history": history,
1981    }))
1982}
1983
1984/// Current inferred schema for an event type
1985pub async fn schema_evolution_schema(
1986    State(store): State<SharedStore>,
1987    Path(event_type): Path<String>,
1988) -> Json<serde_json::Value> {
1989    let mgr = store.schema_evolution();
1990    if let Some(schema) = mgr.get_schema(&event_type) {
1991        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1992        Json(serde_json::json!({
1993            "event_type": event_type,
1994            "version": mgr.get_version(&event_type),
1995            "inferred_schema": schema,
1996            "json_schema": json_schema,
1997        }))
1998    } else {
1999        Json(serde_json::json!({
2000            "event_type": event_type,
2001            "error": "No schema inferred for this event type"
2002        }))
2003    }
2004}
2005
2006/// Schema evolution stats
2007pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2008    let stats = store.schema_evolution().stats();
2009    let event_types = store.schema_evolution().list_event_types();
2010    Json(serde_json::json!({
2011        "stats": stats,
2012        "tracked_event_types": event_types,
2013    }))
2014}
2015
2016// =============================================================================
2017// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2018// =============================================================================
2019
2020/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2021#[cfg(feature = "embedded-sync")]
2022pub async fn sync_pull_handler(
2023    State(state): State<AppState>,
2024    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2025) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2026    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2027
2028    let store = &state.store;
2029
2030    // Compute "since" threshold from the client's version vector
2031    // We return all events the client hasn't seen yet
2032    let since = request
2033        .version_vector
2034        .values()
2035        .map(|ts| ts.physical_ms)
2036        .min()
2037        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2038
2039    let events = store.query(&crate::application::dto::QueryEventsRequest {
2040        entity_id: None,
2041        event_type: None,
2042        tenant_id: None,
2043        as_of: None,
2044        since,
2045        until: None,
2046        limit: None,
2047        event_type_prefix: None,
2048        payload_filter: None,
2049    })?;
2050
2051    // Convert domain events to ReplicatedEvent wire format
2052    let mut replicated = Vec::with_capacity(events.len());
2053    let mut last_ms = 0u64;
2054    let mut logical = 0u32;
2055
2056    for event in &events {
2057        let event_ms = event.timestamp().timestamp_millis() as u64;
2058        if event_ms == last_ms {
2059            logical += 1;
2060        } else {
2061            last_ms = event_ms;
2062            logical = 0;
2063        }
2064
2065        replicated.push(ReplicatedEvent {
2066            event_id: event.id().to_string(),
2067            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2068            origin_region: "server".to_string(),
2069            event_data: serde_json::json!({
2070                "event_type": event.event_type_str(),
2071                "entity_id": event.entity_id_str(),
2072                "tenant_id": event.tenant_id_str(),
2073                "payload": event.payload,
2074                "metadata": event.metadata,
2075            }),
2076        });
2077    }
2078
2079    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2080        events: replicated,
2081        version_vector: std::collections::BTreeMap::new(),
2082    }))
2083}
2084
2085/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2086#[cfg(feature = "embedded-sync")]
2087pub async fn sync_push_handler(
2088    State(state): State<AppState>,
2089    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2090) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2091    let store = &state.store;
2092
2093    let mut accepted = 0usize;
2094    let mut skipped = 0usize;
2095
2096    for rep_event in &request.events {
2097        let event_data = &rep_event.event_data;
2098        let event_type = event_data
2099            .get("event_type")
2100            .and_then(|v| v.as_str())
2101            .unwrap_or("unknown")
2102            .to_string();
2103        let entity_id = event_data
2104            .get("entity_id")
2105            .and_then(|v| v.as_str())
2106            .unwrap_or("unknown")
2107            .to_string();
2108        let tenant_id = event_data
2109            .get("tenant_id")
2110            .and_then(|v| v.as_str())
2111            .unwrap_or("default")
2112            .to_string();
2113        let payload = event_data
2114            .get("payload")
2115            .cloned()
2116            .unwrap_or(serde_json::json!({}));
2117        let metadata = event_data.get("metadata").cloned();
2118
2119        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2120            Ok(domain_event) => {
2121                store.ingest(&domain_event)?;
2122                accepted += 1;
2123            }
2124            Err(_) => {
2125                skipped += 1;
2126            }
2127        }
2128    }
2129
2130    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2131        accepted,
2132        skipped,
2133        version_vector: std::collections::BTreeMap::new(),
2134    }))
2135}
2136
2137// =============================================================================
2138// Consumer endpoints for durable subscriptions (v0.14)
2139// =============================================================================
2140
2141/// POST /api/v1/consumers — Register a durable consumer
2142pub async fn register_consumer(
2143    State(store): State<SharedStore>,
2144    Json(req): Json<RegisterConsumerRequest>,
2145) -> Result<Json<ConsumerResponse>> {
2146    let consumer = store
2147        .consumer_registry()
2148        .register(&req.consumer_id, &req.event_type_filters);
2149
2150    Ok(Json(ConsumerResponse {
2151        consumer_id: consumer.consumer_id,
2152        event_type_filters: consumer.event_type_filters,
2153        cursor_position: consumer.cursor_position,
2154    }))
2155}
2156
2157/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2158pub async fn get_consumer(
2159    State(store): State<SharedStore>,
2160    Path(consumer_id): Path<String>,
2161) -> Result<Json<ConsumerResponse>> {
2162    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2163
2164    Ok(Json(ConsumerResponse {
2165        consumer_id: consumer.consumer_id,
2166        event_type_filters: consumer.event_type_filters,
2167        cursor_position: consumer.cursor_position,
2168    }))
2169}
2170
2171/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2172#[derive(Debug, Deserialize)]
2173pub struct ConsumerPollQuery {
2174    pub limit: Option<usize>,
2175}
2176
2177pub async fn poll_consumer_events(
2178    State(store): State<SharedStore>,
2179    Path(consumer_id): Path<String>,
2180    Query(query): Query<ConsumerPollQuery>,
2181) -> Result<Json<ConsumerEventsResponse>> {
2182    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2183    let offset = consumer.cursor_position.unwrap_or(0);
2184    let limit = query.limit.unwrap_or(100);
2185
2186    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2187    let count = events.len();
2188
2189    let consumer_events: Vec<ConsumerEventDto> = events
2190        .into_iter()
2191        .map(|(position, event)| ConsumerEventDto {
2192            position,
2193            event: EventDto::from(&event),
2194        })
2195        .collect();
2196
2197    Ok(Json(ConsumerEventsResponse {
2198        events: consumer_events,
2199        count,
2200    }))
2201}
2202
2203/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2204pub async fn ack_consumer(
2205    State(store): State<SharedStore>,
2206    Path(consumer_id): Path<String>,
2207    Json(req): Json<AckRequest>,
2208) -> Result<Json<serde_json::Value>> {
2209    let max_offset = store.total_events() as u64;
2210
2211    store
2212        .consumer_registry()
2213        .ack(&consumer_id, req.position, max_offset)
2214        .map_err(crate::error::AllSourceError::InvalidInput)?;
2215
2216    Ok(Json(serde_json::json!({
2217        "status": "ok",
2218        "consumer_id": consumer_id,
2219        "position": req.position,
2220    })))
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225    use super::*;
2226    use crate::{domain::entities::Event, store::EventStore};
2227
2228    fn create_test_store() -> Arc<EventStore> {
2229        Arc::new(EventStore::new())
2230    }
2231
2232    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2233        Event::from_strings(
2234            event_type.to_string(),
2235            entity_id.to_string(),
2236            "test-stream".to_string(),
2237            serde_json::json!({
2238                "name": "Test",
2239                "value": 42
2240            }),
2241            None,
2242        )
2243        .unwrap()
2244    }
2245
2246    #[tokio::test]
2247    async fn test_query_events_has_more_and_total_count() {
2248        let store = create_test_store();
2249
2250        // Ingest 50 events
2251        for i in 0..50 {
2252            store
2253                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2254                .unwrap();
2255        }
2256
2257        // Query with limit=10 — should get has_more=true, total_count=50
2258        let req = QueryEventsRequest {
2259            entity_id: None,
2260            event_type: None,
2261            tenant_id: None,
2262            as_of: None,
2263            since: None,
2264            until: None,
2265            limit: Some(10),
2266            event_type_prefix: None,
2267            payload_filter: None,
2268        };
2269
2270        let requested_limit = req.limit;
2271        let unlimited_req = QueryEventsRequest {
2272            limit: None,
2273            ..QueryEventsRequest {
2274                entity_id: req.entity_id,
2275                event_type: req.event_type,
2276                tenant_id: req.tenant_id,
2277                as_of: req.as_of,
2278                since: req.since,
2279                until: req.until,
2280                limit: None,
2281                event_type_prefix: req.event_type_prefix,
2282                payload_filter: req.payload_filter,
2283            }
2284        };
2285        let all_events = store.query(&unlimited_req).unwrap();
2286        let total_count = all_events.len();
2287        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2288            all_events.into_iter().take(limit).collect()
2289        } else {
2290            all_events
2291        };
2292        let count = limited_events.len();
2293        let has_more = count < total_count;
2294
2295        assert_eq!(count, 10);
2296        assert_eq!(total_count, 50);
2297        assert!(has_more);
2298    }
2299
2300    #[tokio::test]
2301    async fn test_query_events_no_more_results() {
2302        let store = create_test_store();
2303
2304        // Ingest 5 events
2305        for i in 0..5 {
2306            store
2307                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2308                .unwrap();
2309        }
2310
2311        // Query with limit=100 — should get has_more=false, total_count=5
2312        let all_events = store
2313            .query(&QueryEventsRequest {
2314                entity_id: None,
2315                event_type: None,
2316                tenant_id: None,
2317                as_of: None,
2318                since: None,
2319                until: None,
2320                limit: None,
2321                event_type_prefix: None,
2322                payload_filter: None,
2323            })
2324            .unwrap();
2325        let total_count = all_events.len();
2326        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2327        let count = limited_events.len();
2328        let has_more = count < total_count;
2329
2330        assert_eq!(count, 5);
2331        assert_eq!(total_count, 5);
2332        assert!(!has_more);
2333    }
2334
2335    #[tokio::test]
2336    async fn test_list_entities_by_type_prefix() {
2337        let store = create_test_store();
2338
2339        // 3 index entities
2340        store
2341            .ingest(&create_test_event("idx-1", "index.created"))
2342            .unwrap();
2343        store
2344            .ingest(&create_test_event("idx-1", "index.updated"))
2345            .unwrap();
2346        store
2347            .ingest(&create_test_event("idx-2", "index.created"))
2348            .unwrap();
2349        store
2350            .ingest(&create_test_event("idx-3", "index.created"))
2351            .unwrap();
2352        // 2 trade entities
2353        store
2354            .ingest(&create_test_event("trade-1", "trade.created"))
2355            .unwrap();
2356        store
2357            .ingest(&create_test_event("trade-2", "trade.created"))
2358            .unwrap();
2359
2360        // List entities for index.*
2361        let req = ListEntitiesRequest {
2362            event_type_prefix: Some("index.".to_string()),
2363            payload_filter: None,
2364            limit: None,
2365            offset: None,
2366        };
2367        let query_req = QueryEventsRequest {
2368            entity_id: None,
2369            event_type: None,
2370            tenant_id: None,
2371            as_of: None,
2372            since: None,
2373            until: None,
2374            limit: None,
2375            event_type_prefix: req.event_type_prefix,
2376            payload_filter: req.payload_filter,
2377        };
2378        let events = store.query(&query_req).unwrap();
2379
2380        // Group and verify
2381        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2382            std::collections::HashMap::new();
2383        for event in &events {
2384            entity_map
2385                .entry(event.entity_id().to_string())
2386                .or_default()
2387                .push(event);
2388        }
2389
2390        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2391        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2392        assert_eq!(entity_map["idx-2"].len(), 1);
2393        assert_eq!(entity_map["idx-3"].len(), 1);
2394    }
2395
2396    fn create_test_event_with_payload(
2397        entity_id: &str,
2398        event_type: &str,
2399        payload: serde_json::Value,
2400    ) -> Event {
2401        Event::from_strings(
2402            event_type.to_string(),
2403            entity_id.to_string(),
2404            "test-stream".to_string(),
2405            payload,
2406            None,
2407        )
2408        .unwrap()
2409    }
2410
2411    #[tokio::test]
2412    async fn test_detect_duplicates_by_payload_fields() {
2413        let store = create_test_store();
2414
2415        // Create entities with duplicate "name" field values
2416        store
2417            .ingest(&create_test_event_with_payload(
2418                "idx-1",
2419                "index.created",
2420                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2421            ))
2422            .unwrap();
2423        store
2424            .ingest(&create_test_event_with_payload(
2425                "idx-2",
2426                "index.created",
2427                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2428            ))
2429            .unwrap();
2430        store
2431            .ingest(&create_test_event_with_payload(
2432                "idx-3",
2433                "index.created",
2434                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2435            ))
2436            .unwrap();
2437        store
2438            .ingest(&create_test_event_with_payload(
2439                "idx-4",
2440                "index.created",
2441                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2442            ))
2443            .unwrap();
2444        store
2445            .ingest(&create_test_event_with_payload(
2446                "idx-5",
2447                "index.created",
2448                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2449            ))
2450            .unwrap();
2451
2452        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2453        let query_req = QueryEventsRequest {
2454            entity_id: None,
2455            event_type: None,
2456            tenant_id: None,
2457            as_of: None,
2458            since: None,
2459            until: None,
2460            limit: None,
2461            event_type_prefix: Some("index.".to_string()),
2462            payload_filter: None,
2463        };
2464        let events = store.query(&query_req).unwrap();
2465
2466        // Manually replicate the handler logic for testing
2467        let group_by_fields = vec!["name"];
2468        let mut entity_latest: std::collections::HashMap<String, &Event> =
2469            std::collections::HashMap::new();
2470        for event in &events {
2471            let eid = event.entity_id().to_string();
2472            entity_latest
2473                .entry(eid)
2474                .and_modify(|existing| {
2475                    if event.timestamp() > existing.timestamp() {
2476                        *existing = event;
2477                    }
2478                })
2479                .or_insert(event);
2480        }
2481
2482        let mut groups: std::collections::HashMap<String, Vec<String>> =
2483            std::collections::HashMap::new();
2484        for (entity_id, event) in &entity_latest {
2485            let payload = event.payload();
2486            let mut key_parts = serde_json::Map::new();
2487            for field in &group_by_fields {
2488                let value = payload
2489                    .get(*field)
2490                    .cloned()
2491                    .unwrap_or(serde_json::Value::Null);
2492                key_parts.insert((*field).to_string(), value);
2493            }
2494            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2495            groups.entry(key_str).or_default().push(entity_id.clone());
2496        }
2497
2498        let duplicate_groups: Vec<_> = groups
2499            .into_iter()
2500            .filter(|(_, ids)| ids.len() > 1)
2501            .collect();
2502
2503        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2504        for (_, ids) in &duplicate_groups {
2505            assert_eq!(ids.len(), 2);
2506        }
2507    }
2508
2509    #[tokio::test]
2510    async fn test_detect_duplicates_no_duplicates() {
2511        let store = create_test_store();
2512
2513        // All unique names
2514        store
2515            .ingest(&create_test_event_with_payload(
2516                "idx-1",
2517                "index.created",
2518                serde_json::json!({"name": "A"}),
2519            ))
2520            .unwrap();
2521        store
2522            .ingest(&create_test_event_with_payload(
2523                "idx-2",
2524                "index.created",
2525                serde_json::json!({"name": "B"}),
2526            ))
2527            .unwrap();
2528
2529        let query_req = QueryEventsRequest {
2530            entity_id: None,
2531            event_type: None,
2532            tenant_id: None,
2533            as_of: None,
2534            since: None,
2535            until: None,
2536            limit: None,
2537            event_type_prefix: Some("index.".to_string()),
2538            payload_filter: None,
2539        };
2540        let events = store.query(&query_req).unwrap();
2541
2542        let mut entity_latest: std::collections::HashMap<String, &Event> =
2543            std::collections::HashMap::new();
2544        for event in &events {
2545            entity_latest
2546                .entry(event.entity_id().to_string())
2547                .or_insert(event);
2548        }
2549
2550        let mut groups: std::collections::HashMap<String, Vec<String>> =
2551            std::collections::HashMap::new();
2552        for (entity_id, event) in &entity_latest {
2553            let key_str =
2554                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2555                    .unwrap();
2556            groups.entry(key_str).or_default().push(entity_id.clone());
2557        }
2558
2559        let duplicate_groups: Vec<_> = groups
2560            .into_iter()
2561            .filter(|(_, ids)| ids.len() > 1)
2562            .collect();
2563
2564        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2565    }
2566
2567    #[tokio::test]
2568    async fn test_detect_duplicates_multi_field_group_by() {
2569        let store = create_test_store();
2570
2571        // Two entities with same name AND user_id = true duplicate
2572        store
2573            .ingest(&create_test_event_with_payload(
2574                "idx-1",
2575                "index.created",
2576                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2577            ))
2578            .unwrap();
2579        store
2580            .ingest(&create_test_event_with_payload(
2581                "idx-2",
2582                "index.created",
2583                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2584            ))
2585            .unwrap();
2586        // Same name but different user_id = NOT a duplicate in multi-field group
2587        store
2588            .ingest(&create_test_event_with_payload(
2589                "idx-3",
2590                "index.created",
2591                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2592            ))
2593            .unwrap();
2594
2595        let query_req = QueryEventsRequest {
2596            entity_id: None,
2597            event_type: None,
2598            tenant_id: None,
2599            as_of: None,
2600            since: None,
2601            until: None,
2602            limit: None,
2603            event_type_prefix: Some("index.".to_string()),
2604            payload_filter: None,
2605        };
2606        let events = store.query(&query_req).unwrap();
2607
2608        let group_by_fields = vec!["name", "user_id"];
2609        let mut entity_latest: std::collections::HashMap<String, &Event> =
2610            std::collections::HashMap::new();
2611        for event in &events {
2612            entity_latest
2613                .entry(event.entity_id().to_string())
2614                .and_modify(|existing| {
2615                    if event.timestamp() > existing.timestamp() {
2616                        *existing = event;
2617                    }
2618                })
2619                .or_insert(event);
2620        }
2621
2622        let mut groups: std::collections::HashMap<String, Vec<String>> =
2623            std::collections::HashMap::new();
2624        for (entity_id, event) in &entity_latest {
2625            let payload = event.payload();
2626            let mut key_parts = serde_json::Map::new();
2627            for field in &group_by_fields {
2628                let value = payload
2629                    .get(*field)
2630                    .cloned()
2631                    .unwrap_or(serde_json::Value::Null);
2632                key_parts.insert((*field).to_string(), value);
2633            }
2634            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2635            groups.entry(key_str).or_default().push(entity_id.clone());
2636        }
2637
2638        let duplicate_groups: Vec<_> = groups
2639            .into_iter()
2640            .filter(|(_, ids)| ids.len() > 1)
2641            .collect();
2642
2643        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2644        assert_eq!(duplicate_groups.len(), 1);
2645        let (_, ref ids) = duplicate_groups[0];
2646        assert_eq!(ids.len(), 2);
2647        let mut sorted_ids = ids.clone();
2648        sorted_ids.sort();
2649        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2650    }
2651
2652    #[tokio::test]
2653    async fn test_projection_state_cache() {
2654        let store = create_test_store();
2655
2656        // Test cache insertion
2657        let cache = store.projection_state_cache();
2658        cache.insert(
2659            "entity_snapshots:user-123".to_string(),
2660            serde_json::json!({"name": "Test User", "age": 30}),
2661        );
2662
2663        // Test cache retrieval
2664        let state = cache.get("entity_snapshots:user-123");
2665        assert!(state.is_some());
2666        let state = state.unwrap();
2667        assert_eq!(state["name"], "Test User");
2668        assert_eq!(state["age"], 30);
2669    }
2670
2671    #[tokio::test]
2672    async fn test_projection_manager_list_projections() {
2673        let store = create_test_store();
2674
2675        // List projections (built-in projections should be available)
2676        let projection_manager = store.projection_manager();
2677        let projections = projection_manager.list_projections();
2678
2679        // Should have entity_snapshots and event_counters
2680        assert!(projections.len() >= 2);
2681
2682        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2683        assert!(names.contains(&"entity_snapshots"));
2684        assert!(names.contains(&"event_counters"));
2685    }
2686
2687    #[tokio::test]
2688    async fn test_projection_state_after_event_ingestion() {
2689        let store = create_test_store();
2690
2691        // Ingest an event
2692        let event = create_test_event("user-456", "user.created");
2693        store.ingest(&event).unwrap();
2694
2695        // Get projection state
2696        let projection_manager = store.projection_manager();
2697        let snapshot_projection = projection_manager
2698            .get_projection("entity_snapshots")
2699            .unwrap();
2700
2701        let state = snapshot_projection.get_state("user-456");
2702        assert!(state.is_some());
2703        let state = state.unwrap();
2704        assert_eq!(state["name"], "Test");
2705        assert_eq!(state["value"], 42);
2706    }
2707
2708    #[tokio::test]
2709    async fn test_projection_state_cache_multiple_entities() {
2710        let store = create_test_store();
2711        let cache = store.projection_state_cache();
2712
2713        // Insert multiple entities
2714        for i in 0..10 {
2715            cache.insert(
2716                format!("entity_snapshots:entity-{i}"),
2717                serde_json::json!({"id": i, "status": "active"}),
2718            );
2719        }
2720
2721        // Verify all insertions
2722        assert_eq!(cache.len(), 10);
2723
2724        // Verify each entity
2725        for i in 0..10 {
2726            let key = format!("entity_snapshots:entity-{i}");
2727            let state = cache.get(&key);
2728            assert!(state.is_some());
2729            assert_eq!(state.unwrap()["id"], i);
2730        }
2731    }
2732
2733    #[tokio::test]
2734    async fn test_projection_state_update() {
2735        let store = create_test_store();
2736        let cache = store.projection_state_cache();
2737
2738        // Initial state
2739        cache.insert(
2740            "entity_snapshots:user-789".to_string(),
2741            serde_json::json!({"balance": 100}),
2742        );
2743
2744        // Update state
2745        cache.insert(
2746            "entity_snapshots:user-789".to_string(),
2747            serde_json::json!({"balance": 150}),
2748        );
2749
2750        // Verify update
2751        let state = cache.get("entity_snapshots:user-789").unwrap();
2752        assert_eq!(state["balance"], 150);
2753    }
2754
2755    #[tokio::test]
2756    async fn test_event_counter_projection() {
2757        let store = create_test_store();
2758
2759        // Ingest events of different types
2760        store
2761            .ingest(&create_test_event("user-1", "user.created"))
2762            .unwrap();
2763        store
2764            .ingest(&create_test_event("user-2", "user.created"))
2765            .unwrap();
2766        store
2767            .ingest(&create_test_event("user-1", "user.updated"))
2768            .unwrap();
2769
2770        // Get event counter projection
2771        let projection_manager = store.projection_manager();
2772        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2773
2774        // Check counts
2775        let created_state = counter_projection.get_state("user.created");
2776        assert!(created_state.is_some());
2777        assert_eq!(created_state.unwrap()["count"], 2);
2778
2779        let updated_state = counter_projection.get_state("user.updated");
2780        assert!(updated_state.is_some());
2781        assert_eq!(updated_state.unwrap()["count"], 1);
2782    }
2783
2784    #[tokio::test]
2785    async fn test_projection_state_cache_key_format() {
2786        let store = create_test_store();
2787        let cache = store.projection_state_cache();
2788
2789        // Test standard key format: {projection_name}:{entity_id}
2790        let key = "orders:order-12345".to_string();
2791        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2792
2793        let state = cache.get(&key).unwrap();
2794        assert_eq!(state["total"], 99.99);
2795    }
2796
2797    #[tokio::test]
2798    async fn test_projection_state_cache_removal() {
2799        let store = create_test_store();
2800        let cache = store.projection_state_cache();
2801
2802        // Insert and then remove
2803        cache.insert(
2804            "test:entity-1".to_string(),
2805            serde_json::json!({"data": "value"}),
2806        );
2807        assert_eq!(cache.len(), 1);
2808
2809        cache.remove("test:entity-1");
2810        assert_eq!(cache.len(), 0);
2811        assert!(cache.get("test:entity-1").is_none());
2812    }
2813
2814    #[tokio::test]
2815    async fn test_get_nonexistent_projection() {
2816        let store = create_test_store();
2817        let projection_manager = store.projection_manager();
2818
2819        // Requesting a non-existent projection should return None
2820        let projection = projection_manager.get_projection("nonexistent_projection");
2821        assert!(projection.is_none());
2822    }
2823
2824    #[tokio::test]
2825    async fn test_get_nonexistent_entity_state() {
2826        let store = create_test_store();
2827        let projection_manager = store.projection_manager();
2828
2829        // Get state for non-existent entity
2830        let snapshot_projection = projection_manager
2831            .get_projection("entity_snapshots")
2832            .unwrap();
2833        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2834        assert!(state.is_none());
2835    }
2836
2837    #[tokio::test]
2838    async fn test_projection_state_cache_concurrent_access() {
2839        let store = create_test_store();
2840        let cache = store.projection_state_cache();
2841
2842        // Simulate concurrent writes
2843        let handles: Vec<_> = (0..10)
2844            .map(|i| {
2845                let cache_clone = cache.clone();
2846                tokio::spawn(async move {
2847                    cache_clone.insert(
2848                        format!("concurrent:entity-{i}"),
2849                        serde_json::json!({"thread": i}),
2850                    );
2851                })
2852            })
2853            .collect();
2854
2855        for handle in handles {
2856            handle.await.unwrap();
2857        }
2858
2859        // All 10 entries should be present
2860        assert_eq!(cache.len(), 10);
2861    }
2862
2863    #[tokio::test]
2864    async fn test_projection_state_large_payload() {
2865        let store = create_test_store();
2866        let cache = store.projection_state_cache();
2867
2868        // Create a large JSON payload (~10KB)
2869        let large_array: Vec<serde_json::Value> = (0..1000)
2870            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2871            .collect();
2872
2873        cache.insert(
2874            "large:entity-1".to_string(),
2875            serde_json::json!({"items": large_array}),
2876        );
2877
2878        let state = cache.get("large:entity-1").unwrap();
2879        let items = state["items"].as_array().unwrap();
2880        assert_eq!(items.len(), 1000);
2881    }
2882
2883    #[tokio::test]
2884    async fn test_projection_state_complex_json() {
2885        let store = create_test_store();
2886        let cache = store.projection_state_cache();
2887
2888        // Complex nested JSON structure
2889        let complex_state = serde_json::json!({
2890            "user": {
2891                "id": "user-123",
2892                "profile": {
2893                    "name": "John Doe",
2894                    "email": "john@example.com",
2895                    "settings": {
2896                        "theme": "dark",
2897                        "notifications": true
2898                    }
2899                },
2900                "roles": ["admin", "user"],
2901                "metadata": {
2902                    "created_at": "2025-01-01T00:00:00Z",
2903                    "last_login": null
2904                }
2905            }
2906        });
2907
2908        cache.insert("complex:user-123".to_string(), complex_state);
2909
2910        let state = cache.get("complex:user-123").unwrap();
2911        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2912        assert_eq!(state["user"]["roles"][0], "admin");
2913        assert!(state["user"]["metadata"]["last_login"].is_null());
2914    }
2915
2916    #[tokio::test]
2917    async fn test_projection_state_cache_iteration() {
2918        let store = create_test_store();
2919        let cache = store.projection_state_cache();
2920
2921        // Insert entries
2922        for i in 0..5 {
2923            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2924        }
2925
2926        // Iterate over all entries
2927        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2928        assert_eq!(entries.len(), 5);
2929    }
2930
2931    #[tokio::test]
2932    async fn test_projection_manager_get_entity_snapshots() {
2933        let store = create_test_store();
2934        let projection_manager = store.projection_manager();
2935
2936        // Get entity_snapshots projection specifically
2937        let projection = projection_manager.get_projection("entity_snapshots");
2938        assert!(projection.is_some());
2939        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2940    }
2941
2942    #[tokio::test]
2943    async fn test_projection_manager_get_event_counters() {
2944        let store = create_test_store();
2945        let projection_manager = store.projection_manager();
2946
2947        // Get event_counters projection specifically
2948        let projection = projection_manager.get_projection("event_counters");
2949        assert!(projection.is_some());
2950        assert_eq!(projection.unwrap().name(), "event_counters");
2951    }
2952
2953    #[tokio::test]
2954    async fn test_projection_state_cache_overwrite() {
2955        let store = create_test_store();
2956        let cache = store.projection_state_cache();
2957
2958        // Initial value
2959        cache.insert(
2960            "overwrite:entity-1".to_string(),
2961            serde_json::json!({"version": 1}),
2962        );
2963
2964        // Overwrite with new value
2965        cache.insert(
2966            "overwrite:entity-1".to_string(),
2967            serde_json::json!({"version": 2}),
2968        );
2969
2970        // Overwrite again
2971        cache.insert(
2972            "overwrite:entity-1".to_string(),
2973            serde_json::json!({"version": 3}),
2974        );
2975
2976        let state = cache.get("overwrite:entity-1").unwrap();
2977        assert_eq!(state["version"], 3);
2978
2979        // Should still be only 1 entry
2980        assert_eq!(cache.len(), 1);
2981    }
2982
2983    #[tokio::test]
2984    async fn test_projection_state_multiple_projections() {
2985        let store = create_test_store();
2986        let cache = store.projection_state_cache();
2987
2988        // Store states for different projections
2989        cache.insert(
2990            "entity_snapshots:user-1".to_string(),
2991            serde_json::json!({"name": "Alice"}),
2992        );
2993        cache.insert(
2994            "event_counters:user.created".to_string(),
2995            serde_json::json!({"count": 5}),
2996        );
2997        cache.insert(
2998            "custom_projection:order-1".to_string(),
2999            serde_json::json!({"total": 150.0}),
3000        );
3001
3002        // Verify each projection's state
3003        assert_eq!(
3004            cache.get("entity_snapshots:user-1").unwrap()["name"],
3005            "Alice"
3006        );
3007        assert_eq!(
3008            cache.get("event_counters:user.created").unwrap()["count"],
3009            5
3010        );
3011        assert_eq!(
3012            cache.get("custom_projection:order-1").unwrap()["total"],
3013            150.0
3014        );
3015    }
3016
3017    #[tokio::test]
3018    async fn test_bulk_projection_state_access() {
3019        let store = create_test_store();
3020
3021        // Ingest multiple events for different entities
3022        for i in 0..5 {
3023            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3024            store.ingest(&event).unwrap();
3025        }
3026
3027        // Get projection and verify bulk access
3028        let projection_manager = store.projection_manager();
3029        let snapshot_projection = projection_manager
3030            .get_projection("entity_snapshots")
3031            .unwrap();
3032
3033        // Verify we can access all entities
3034        for i in 0..5 {
3035            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3036            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3037        }
3038    }
3039
3040    #[tokio::test]
3041    async fn test_bulk_save_projection_states() {
3042        let store = create_test_store();
3043        let cache = store.projection_state_cache();
3044
3045        // Simulate bulk save request
3046        let states = vec![
3047            BulkSaveStateItem {
3048                entity_id: "bulk-entity-1".to_string(),
3049                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3050            },
3051            BulkSaveStateItem {
3052                entity_id: "bulk-entity-2".to_string(),
3053                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3054            },
3055            BulkSaveStateItem {
3056                entity_id: "bulk-entity-3".to_string(),
3057                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3058            },
3059        ];
3060
3061        let projection_name = "test_projection";
3062
3063        // Save states to cache (simulating bulk_save_projection_states handler)
3064        for item in &states {
3065            cache.insert(
3066                format!("{projection_name}:{}", item.entity_id),
3067                item.state.clone(),
3068            );
3069        }
3070
3071        // Verify all states were saved
3072        assert_eq!(cache.len(), 3);
3073
3074        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3075        assert_eq!(state1["name"], "Entity 1");
3076        assert_eq!(state1["value"], 100);
3077
3078        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3079        assert_eq!(state2["name"], "Entity 2");
3080        assert_eq!(state2["value"], 200);
3081
3082        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3083        assert_eq!(state3["name"], "Entity 3");
3084        assert_eq!(state3["value"], 300);
3085    }
3086
3087    #[tokio::test]
3088    async fn test_bulk_save_empty_states() {
3089        let store = create_test_store();
3090        let cache = store.projection_state_cache();
3091
3092        // Clear cache
3093        cache.clear();
3094
3095        // Empty states should work fine
3096        let states: Vec<BulkSaveStateItem> = vec![];
3097        assert_eq!(states.len(), 0);
3098
3099        // Cache should remain empty
3100        assert_eq!(cache.len(), 0);
3101    }
3102
3103    #[tokio::test]
3104    async fn test_bulk_save_overwrites_existing() {
3105        let store = create_test_store();
3106        let cache = store.projection_state_cache();
3107
3108        // Insert initial state
3109        cache.insert(
3110            "test:entity-1".to_string(),
3111            serde_json::json!({"version": 1, "data": "initial"}),
3112        );
3113
3114        // Bulk save with updated state
3115        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3116        cache.insert("test:entity-1".to_string(), new_state);
3117
3118        // Verify overwrite
3119        let state = cache.get("test:entity-1").unwrap();
3120        assert_eq!(state["version"], 2);
3121        assert_eq!(state["data"], "updated");
3122    }
3123
3124    #[tokio::test]
3125    async fn test_bulk_save_high_volume() {
3126        let store = create_test_store();
3127        let cache = store.projection_state_cache();
3128
3129        // Simulate high volume save (1000 entities)
3130        for i in 0..1000 {
3131            cache.insert(
3132                format!("volume_test:entity-{i}"),
3133                serde_json::json!({"index": i, "status": "active"}),
3134            );
3135        }
3136
3137        // Verify count
3138        assert_eq!(cache.len(), 1000);
3139
3140        // Spot check some entries
3141        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3142        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3143        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3144    }
3145
3146    #[tokio::test]
3147    async fn test_bulk_save_different_projections() {
3148        let store = create_test_store();
3149        let cache = store.projection_state_cache();
3150
3151        // Save to multiple projections in bulk
3152        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3153
3154        for proj in &projections {
3155            for i in 0..5 {
3156                cache.insert(
3157                    format!("{proj}:entity-{i}"),
3158                    serde_json::json!({"projection": proj, "id": i}),
3159                );
3160            }
3161        }
3162
3163        // Verify total count (3 projections * 5 entities)
3164        assert_eq!(cache.len(), 15);
3165
3166        // Verify each projection
3167        for proj in &projections {
3168            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3169            assert_eq!(state["projection"], *proj);
3170        }
3171    }
3172}