Skip to main content

allsource_core/infrastructure/web/
api.rs

1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4    application::{
5        dto::{
6            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11        },
12        services::{
13            analytics::{
14                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16            },
17            pipeline::{PipelineConfig, PipelineStats},
18            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19            schema::{
20                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21                ValidateEventRequest, ValidateEventResponse,
22            },
23            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24        },
25    },
26    domain::entities::Event,
27    error::Result,
28    infrastructure::{
29        persistence::{
30            compaction::CompactionResult,
31            snapshot::{
32                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33                ListSnapshotsResponse, SnapshotInfo,
34            },
35        },
36        query::{
37            geospatial::GeoQueryRequest,
38            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39        },
40        security::middleware::OptionalAuth,
41        web::api_v1::AppState,
42    },
43    store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46    Json, Router,
47    extract::{Path, Query, State, WebSocketUpgrade},
48    response::{IntoResponse, Response},
49    routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60/// Wait for follower ACK(s) in semi-sync/sync replication modes.
61///
62/// In async mode (default), returns immediately. In semi-sync mode, waits for
63/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
64/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
65#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67    let shipper_guard = state.wal_shipper.read().await;
68    if let Some(ref shipper) = *shipper_guard {
69        let mode = shipper.replication_mode();
70        if mode == ReplicationMode::Async {
71            return;
72        }
73
74        let target_offset = shipper.current_leader_offset();
75        if target_offset == 0 {
76            return;
77        }
78
79        let shipper = Arc::clone(shipper);
80        // Drop the read guard before the async wait to avoid holding it across await
81        drop(shipper_guard);
82
83        let timer = state
84            .store
85            .metrics()
86            .replication_ack_wait_seconds
87            .start_timer();
88        let acked = shipper.wait_for_ack(target_offset).await;
89        timer.observe_duration();
90
91        if !acked {
92            tracing::warn!(
93                "Replication ACK timeout in {} mode (offset {}). \
94                 Write succeeded locally but follower confirmation pending.",
95                mode,
96                target_offset,
97            );
98        }
99    }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103    // No-op in community edition
104}
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107    let app = Router::new()
108        .route("/health", get(health))
109        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
110        .route("/api/v1/events", post(ingest_event))
111        .route("/api/v1/events/batch", post(ingest_events_batch))
112        .route("/api/v1/events/query", get(query_events))
113        .route("/api/v1/events/{event_id}", get(get_event_by_id))
114        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
115        // v0.10: Stream and event type discovery endpoints
116        .route("/api/v1/streams", get(list_streams))
117        .route("/api/v1/event-types", get(list_event_types))
118        .route("/api/v1/entities/duplicates", get(detect_duplicates))
119        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120        .route(
121            "/api/v1/entities/{entity_id}/snapshot",
122            get(get_entity_snapshot),
123        )
124        .route("/api/v1/stats", get(get_stats))
125        // v0.2: Advanced analytics endpoints
126        .route("/api/v1/analytics/frequency", get(analytics_frequency))
127        .route("/api/v1/analytics/summary", get(analytics_summary))
128        .route("/api/v1/analytics/correlation", get(analytics_correlation))
129        // v0.2: Snapshot management endpoints
130        .route("/api/v1/snapshots", post(create_snapshot))
131        .route("/api/v1/snapshots", get(list_snapshots))
132        .route(
133            "/api/v1/snapshots/{entity_id}/latest",
134            get(get_latest_snapshot),
135        )
136        // v0.2: Compaction endpoints
137        .route("/api/v1/compaction/trigger", post(trigger_compaction))
138        .route("/api/v1/compaction/stats", get(compaction_stats))
139        // v0.5: Schema registry endpoints
140        .route("/api/v1/schemas", post(register_schema))
141        .route("/api/v1/schemas", get(list_subjects))
142        .route("/api/v1/schemas/{subject}", get(get_schema))
143        .route(
144            "/api/v1/schemas/{subject}/versions",
145            get(list_schema_versions),
146        )
147        .route("/api/v1/schemas/validate", post(validate_event_schema))
148        .route(
149            "/api/v1/schemas/{subject}/compatibility",
150            put(set_compatibility_mode),
151        )
152        // v0.5: Replay and projection rebuild endpoints
153        .route("/api/v1/replay", post(start_replay))
154        .route("/api/v1/replay", get(list_replays))
155        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157        .route(
158            "/api/v1/replay/{replay_id}",
159            axum::routing::delete(delete_replay),
160        )
161        // v0.5: Stream processing pipeline endpoints
162        .route("/api/v1/pipelines", post(register_pipeline))
163        .route("/api/v1/pipelines", get(list_pipelines))
164        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166        .route(
167            "/api/v1/pipelines/{pipeline_id}",
168            axum::routing::delete(remove_pipeline),
169        )
170        .route(
171            "/api/v1/pipelines/{pipeline_id}/stats",
172            get(get_pipeline_stats),
173        )
174        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175        // v0.7: Projection State API for Query Service integration
176        .route("/api/v1/projections", get(list_projections))
177        .route("/api/v1/projections/{name}", get(get_projection))
178        .route(
179            "/api/v1/projections/{name}",
180            axum::routing::delete(delete_projection),
181        )
182        .route(
183            "/api/v1/projections/{name}/state",
184            get(get_projection_state_summary),
185        )
186        .route("/api/v1/projections/{name}/reset", post(reset_projection))
187        .route(
188            "/api/v1/projections/{name}/{entity_id}/state",
189            get(get_projection_state),
190        )
191        .route(
192            "/api/v1/projections/{name}/{entity_id}/state",
193            post(save_projection_state),
194        )
195        .route(
196            "/api/v1/projections/{name}/{entity_id}/state",
197            put(save_projection_state),
198        )
199        .route(
200            "/api/v1/projections/{name}/bulk",
201            post(bulk_get_projection_states),
202        )
203        .route(
204            "/api/v1/projections/{name}/bulk/save",
205            post(bulk_save_projection_states),
206        )
207        // v0.11: Webhook management endpoints
208        .route("/api/v1/webhooks", post(register_webhook))
209        .route("/api/v1/webhooks", get(list_webhooks))
210        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212        .route(
213            "/api/v1/webhooks/{webhook_id}",
214            axum::routing::delete(delete_webhook),
215        )
216        .route(
217            "/api/v1/webhooks/{webhook_id}/deliveries",
218            get(list_webhook_deliveries),
219        )
220        // v2.0: Advanced query features
221        .route("/api/v1/graphql", post(graphql_query))
222        .route("/api/v1/geospatial/query", post(geo_query))
223        .route("/api/v1/geospatial/stats", get(geo_stats))
224        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225        .route(
226            "/api/v1/schema-evolution/history/{event_type}",
227            get(schema_evolution_history),
228        )
229        .route(
230            "/api/v1/schema-evolution/schema/{event_type}",
231            get(schema_evolution_schema),
232        )
233        .route(
234            "/api/v1/schema-evolution/stats",
235            get(schema_evolution_stats),
236        )
237        .layer(
238            CorsLayer::new()
239                .allow_origin(Any)
240                .allow_methods(Any)
241                .allow_headers(Any),
242        )
243        .layer(TraceLayer::new_for_http())
244        .with_state(store);
245
246    let listener = tokio::net::TcpListener::bind(addr).await?;
247    axum::serve(listener, app).await?;
248
249    Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253    Json(serde_json::json!({
254        "status": "healthy",
255        "service": "allsource-core",
256        "version": env!("CARGO_PKG_VERSION")
257    }))
258}
259
260// v0.6: Prometheus metrics endpoint
261pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262    let metrics = store.metrics();
263
264    match metrics.encode() {
265        Ok(encoded) => Response::builder()
266            .status(200)
267            .header("Content-Type", "text/plain; version=0.0.4")
268            .body(encoded)
269            .unwrap()
270            .into_response(),
271        Err(e) => Response::builder()
272            .status(500)
273            .body(format!("Error encoding metrics: {e}"))
274            .unwrap()
275            .into_response(),
276    }
277}
278
279pub async fn ingest_event(
280    State(store): State<SharedStore>,
281    Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283    let expected_version = req.expected_version;
284
285    // Create event using from_strings with default tenant
286    let event = Event::from_strings(
287        req.event_type,
288        req.entity_id,
289        "default".to_string(),
290        req.payload,
291        req.metadata,
292    )?;
293
294    let event_id = event.id;
295    let timestamp = event.timestamp;
296
297    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299    tracing::info!("Event ingested: {}", event_id);
300
301    Ok(Json(IngestEventResponse {
302        event_id,
303        timestamp,
304        version: Some(new_version),
305    }))
306}
307
308/// Ingest a single event with semi-sync/sync replication ACK waiting.
309///
310/// Used by the v1 API (with auth and replication support).
311pub async fn ingest_event_v1(
312    State(state): State<AppState>,
313    Json(req): Json<IngestEventRequest>,
314) -> Result<Json<IngestEventResponse>> {
315    let expected_version = req.expected_version;
316
317    let event = Event::from_strings(
318        req.event_type,
319        req.entity_id,
320        "default".to_string(),
321        req.payload,
322        req.metadata,
323    )?;
324
325    let event_id = event.id;
326    let timestamp = event.timestamp;
327
328    let new_version = state
329        .store
330        .ingest_with_expected_version(&event, expected_version)?;
331
332    // Semi-sync/sync: wait for follower ACK(s) before returning
333    await_replication_ack(&state).await;
334
335    tracing::info!("Event ingested: {}", event_id);
336
337    Ok(Json(IngestEventResponse {
338        event_id,
339        timestamp,
340        version: Some(new_version),
341    }))
342}
343
344/// Batch ingest multiple events in a single request
345///
346/// This endpoint allows ingesting multiple events atomically, which is more
347/// efficient than making individual requests for each event.
348pub async fn ingest_events_batch(
349    State(store): State<SharedStore>,
350    Json(req): Json<IngestEventsBatchRequest>,
351) -> Result<Json<IngestEventsBatchResponse>> {
352    let total = req.events.len();
353    let mut ingested_events = Vec::with_capacity(total);
354
355    for event_req in req.events {
356        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
357        let expected_version = event_req.expected_version;
358
359        let event = Event::from_strings(
360            event_req.event_type,
361            event_req.entity_id,
362            tenant_id,
363            event_req.payload,
364            event_req.metadata,
365        )?;
366
367        let event_id = event.id;
368        let timestamp = event.timestamp;
369
370        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
371
372        ingested_events.push(IngestEventResponse {
373            event_id,
374            timestamp,
375            version: Some(new_version),
376        });
377    }
378
379    let ingested = ingested_events.len();
380    tracing::info!("Batch ingested {} events", ingested);
381
382    Ok(Json(IngestEventsBatchResponse {
383        total,
384        ingested,
385        events: ingested_events,
386    }))
387}
388
389/// Batch ingest with semi-sync/sync replication ACK waiting.
390///
391/// Used by the v1 API (with auth and replication support).
392pub async fn ingest_events_batch_v1(
393    State(state): State<AppState>,
394    Json(req): Json<IngestEventsBatchRequest>,
395) -> Result<Json<IngestEventsBatchResponse>> {
396    let total = req.events.len();
397    let mut ingested_events = Vec::with_capacity(total);
398
399    for event_req in req.events {
400        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
401        let expected_version = event_req.expected_version;
402
403        let event = Event::from_strings(
404            event_req.event_type,
405            event_req.entity_id,
406            tenant_id,
407            event_req.payload,
408            event_req.metadata,
409        )?;
410
411        let event_id = event.id;
412        let timestamp = event.timestamp;
413
414        let new_version = state
415            .store
416            .ingest_with_expected_version(&event, expected_version)?;
417
418        ingested_events.push(IngestEventResponse {
419            event_id,
420            timestamp,
421            version: Some(new_version),
422        });
423    }
424
425    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
426    await_replication_ack(&state).await;
427
428    let ingested = ingested_events.len();
429    tracing::info!("Batch ingested {} events", ingested);
430
431    Ok(Json(IngestEventsBatchResponse {
432        total,
433        ingested,
434        events: ingested_events,
435    }))
436}
437
438pub async fn query_events(
439    OptionalAuth(auth): OptionalAuth,
440    Query(req): Query<QueryEventsRequest>,
441    State(store): State<SharedStore>,
442) -> Result<Json<QueryEventsResponse>> {
443    let requested_limit = req.limit;
444    let queried_entity_id = req.entity_id.clone();
445
446    // Enforce tenant isolation: authenticated users can only see their own
447    // tenant's events. Unauthenticated (skipped-auth paths) use request param.
448    let enforced_tenant = auth
449        .as_ref()
450        .map(|a| a.tenant_id().to_string())
451        .or(req.tenant_id.clone());
452
453    // Query without limit to get total count
454    let unlimited_req = QueryEventsRequest {
455        entity_id: req.entity_id,
456        event_type: req.event_type,
457        tenant_id: enforced_tenant,
458        as_of: req.as_of,
459        since: req.since,
460        until: req.until,
461        limit: None,
462        event_type_prefix: req.event_type_prefix,
463        payload_filter: req.payload_filter,
464    };
465    let all_events = store.query(&unlimited_req)?;
466    let total_count = all_events.len();
467
468    // Apply limit
469    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
470        all_events.into_iter().take(limit).collect()
471    } else {
472        all_events
473    };
474
475    let count = limited_events.len();
476    let has_more = count < total_count;
477    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
478
479    // Include entity_version only when filtering by a single entity_id
480    let entity_version = queried_entity_id
481        .as_deref()
482        .map(|eid| store.get_entity_version(eid));
483
484    tracing::debug!("Query returned {} events (total: {})", count, total_count);
485
486    Ok(Json(QueryEventsResponse {
487        events,
488        count,
489        total_count,
490        has_more,
491        entity_version,
492    }))
493}
494
495pub async fn list_entities(
496    State(store): State<SharedStore>,
497    Query(req): Query<ListEntitiesRequest>,
498) -> Result<Json<ListEntitiesResponse>> {
499    use std::collections::HashMap;
500
501    // Get all events matching the filters
502    let query_req = QueryEventsRequest {
503        entity_id: None,
504        event_type: None,
505        tenant_id: None,
506        as_of: None,
507        since: None,
508        until: None,
509        limit: None,
510        event_type_prefix: req.event_type_prefix,
511        payload_filter: req.payload_filter,
512    };
513    let events = store.query(&query_req)?;
514
515    // Group by entity_id
516    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
517    for event in &events {
518        entity_map
519            .entry(event.entity_id().to_string())
520            .or_default()
521            .push(event);
522    }
523
524    // Build entity summaries sorted by last event time (descending)
525    let mut summaries: Vec<EntitySummary> = entity_map
526        .into_iter()
527        .map(|(entity_id, events)| {
528            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
529            EntitySummary {
530                entity_id,
531                event_count: events.len(),
532                last_event_type: last.event_type_str().to_string(),
533                last_event_at: last.timestamp(),
534            }
535        })
536        .collect();
537    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
538
539    let total = summaries.len();
540
541    // Apply offset and limit
542    let offset = req.offset.unwrap_or(0);
543    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
544    let summaries = if let Some(limit) = req.limit {
545        let has_more = summaries.len() > limit;
546        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
547        return Ok(Json(ListEntitiesResponse {
548            entities: truncated,
549            total,
550            has_more,
551        }));
552    } else {
553        summaries
554    };
555
556    Ok(Json(ListEntitiesResponse {
557        entities: summaries,
558        total,
559        has_more: false,
560    }))
561}
562
563pub async fn detect_duplicates(
564    State(store): State<SharedStore>,
565    Query(req): Query<DetectDuplicatesRequest>,
566) -> Result<Json<DetectDuplicatesResponse>> {
567    use std::collections::HashMap;
568
569    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
570
571    // Query events scoped by the required prefix
572    let query_req = QueryEventsRequest {
573        entity_id: None,
574        event_type: None,
575        tenant_id: None,
576        as_of: None,
577        since: None,
578        until: None,
579        limit: None,
580        event_type_prefix: Some(req.event_type_prefix),
581        payload_filter: None,
582    };
583    let events = store.query(&query_req)?;
584
585    // For each entity, extract the latest event's payload fields specified by group_by
586    // Then group entities by those field values
587    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
588    for event in &events {
589        let eid = event.entity_id().to_string();
590        entity_latest
591            .entry(eid)
592            .and_modify(|existing| {
593                if event.timestamp() > existing.timestamp() {
594                    *existing = event;
595                }
596            })
597            .or_insert(event);
598    }
599
600    // Group entities by their payload field values
601    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
602    for (entity_id, event) in &entity_latest {
603        let payload = event.payload();
604        let mut key_parts = serde_json::Map::new();
605        for field in &group_by_fields {
606            let value = payload
607                .get(*field)
608                .cloned()
609                .unwrap_or(serde_json::Value::Null);
610            key_parts.insert((*field).to_string(), value);
611        }
612        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
613        groups.entry(key_str).or_default().push(entity_id.clone());
614    }
615
616    // Filter to groups with count > 1 (actual duplicates)
617    let mut duplicate_groups: Vec<DuplicateGroup> = groups
618        .into_iter()
619        .filter(|(_, ids)| ids.len() > 1)
620        .map(|(key_str, mut ids)| {
621            ids.sort();
622            let key: serde_json::Value =
623                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
624            let count = ids.len();
625            DuplicateGroup {
626                key,
627                entity_ids: ids,
628                count,
629            }
630        })
631        .collect();
632
633    // Sort by count descending for consistent output
634    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
635
636    let total = duplicate_groups.len();
637
638    // Apply offset and limit
639    let offset = req.offset.unwrap_or(0);
640    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
641
642    if let Some(limit) = req.limit {
643        let has_more = duplicate_groups.len() > limit;
644        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
645        return Ok(Json(DetectDuplicatesResponse {
646            duplicates: truncated,
647            total,
648            has_more,
649        }));
650    }
651
652    Ok(Json(DetectDuplicatesResponse {
653        duplicates: duplicate_groups,
654        total,
655        has_more: false,
656    }))
657}
658
659#[derive(Deserialize)]
660pub struct EntityStateParams {
661    as_of: Option<chrono::DateTime<chrono::Utc>>,
662}
663
664pub async fn get_entity_state(
665    State(store): State<SharedStore>,
666    Path(entity_id): Path<String>,
667    Query(params): Query<EntityStateParams>,
668) -> Result<Json<serde_json::Value>> {
669    let state = store.reconstruct_state(&entity_id, params.as_of)?;
670
671    tracing::info!("State reconstructed for entity: {}", entity_id);
672
673    Ok(Json(state))
674}
675
676pub async fn get_entity_snapshot(
677    State(store): State<SharedStore>,
678    Path(entity_id): Path<String>,
679) -> Result<Json<serde_json::Value>> {
680    let snapshot = store.get_snapshot(&entity_id)?;
681
682    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
683
684    Ok(Json(snapshot))
685}
686
687pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
688    let stats = store.stats();
689    Json(stats)
690}
691
692// v0.10: List all streams (entity_ids) in the event store
693/// Query parameters for listing streams
694#[derive(Debug, Deserialize)]
695pub struct ListStreamsParams {
696    /// Optional limit on number of streams to return
697    pub limit: Option<usize>,
698    /// Optional offset for pagination
699    pub offset: Option<usize>,
700}
701
702/// Response for listing streams
703#[derive(Debug, serde::Serialize)]
704pub struct ListStreamsResponse {
705    pub streams: Vec<StreamInfo>,
706    pub total: usize,
707}
708
709pub async fn list_streams(
710    State(store): State<SharedStore>,
711    Query(params): Query<ListStreamsParams>,
712) -> Json<ListStreamsResponse> {
713    let mut streams = store.list_streams();
714    let total = streams.len();
715
716    // Sort by last_event_at descending (most recent first)
717    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
718
719    // Apply pagination
720    if let Some(offset) = params.offset {
721        if offset < streams.len() {
722            streams = streams[offset..].to_vec();
723        } else {
724            streams = vec![];
725        }
726    }
727
728    if let Some(limit) = params.limit {
729        streams.truncate(limit);
730    }
731
732    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
733
734    Json(ListStreamsResponse { streams, total })
735}
736
737// v0.10: List all event types in the event store
738/// Query parameters for listing event types
739#[derive(Debug, Deserialize)]
740pub struct ListEventTypesParams {
741    /// Optional limit on number of event types to return
742    pub limit: Option<usize>,
743    /// Optional offset for pagination
744    pub offset: Option<usize>,
745}
746
747/// Response for listing event types
748#[derive(Debug, serde::Serialize)]
749pub struct ListEventTypesResponse {
750    pub event_types: Vec<EventTypeInfo>,
751    pub total: usize,
752}
753
754pub async fn list_event_types(
755    State(store): State<SharedStore>,
756    Query(params): Query<ListEventTypesParams>,
757) -> Json<ListEventTypesResponse> {
758    let mut event_types = store.list_event_types();
759    let total = event_types.len();
760
761    // Sort by event_count descending (most used first)
762    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
763
764    // Apply pagination
765    if let Some(offset) = params.offset {
766        if offset < event_types.len() {
767            event_types = event_types[offset..].to_vec();
768        } else {
769            event_types = vec![];
770        }
771    }
772
773    if let Some(limit) = params.limit {
774        event_types.truncate(limit);
775    }
776
777    tracing::debug!(
778        "Listed {} event types (total: {})",
779        event_types.len(),
780        total
781    );
782
783    Json(ListEventTypesResponse { event_types, total })
784}
785
786// v0.2: WebSocket endpoint for real-time event streaming
787#[derive(Debug, Deserialize)]
788pub struct WebSocketParams {
789    pub consumer_id: Option<String>,
790}
791
792pub async fn events_websocket(
793    ws: WebSocketUpgrade,
794    State(store): State<SharedStore>,
795    Query(params): Query<WebSocketParams>,
796) -> Response {
797    let websocket_manager = store.websocket_manager();
798
799    ws.on_upgrade(move |socket| async move {
800        if let Some(consumer_id) = params.consumer_id {
801            websocket_manager
802                .handle_socket_with_consumer(socket, consumer_id, store)
803                .await;
804        } else {
805            websocket_manager.handle_socket(socket).await;
806        }
807    })
808}
809
810// v0.2: Event frequency analytics endpoint
811pub async fn analytics_frequency(
812    State(store): State<SharedStore>,
813    Query(req): Query<EventFrequencyRequest>,
814) -> Result<Json<EventFrequencyResponse>> {
815    let response = AnalyticsEngine::event_frequency(&store, &req)?;
816
817    tracing::debug!(
818        "Frequency analysis returned {} buckets",
819        response.buckets.len()
820    );
821
822    Ok(Json(response))
823}
824
825// v0.2: Statistical summary endpoint
826pub async fn analytics_summary(
827    State(store): State<SharedStore>,
828    Query(req): Query<StatsSummaryRequest>,
829) -> Result<Json<StatsSummaryResponse>> {
830    let response = AnalyticsEngine::stats_summary(&store, &req)?;
831
832    tracing::debug!(
833        "Stats summary: {} events across {} entities",
834        response.total_events,
835        response.unique_entities
836    );
837
838    Ok(Json(response))
839}
840
841// v0.2: Event correlation analysis endpoint
842pub async fn analytics_correlation(
843    State(store): State<SharedStore>,
844    Query(req): Query<CorrelationRequest>,
845) -> Result<Json<CorrelationResponse>> {
846    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
847
848    tracing::debug!(
849        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
850        response.correlated_pairs,
851        response.total_a,
852        response.correlation_percentage
853    );
854
855    Ok(Json(response))
856}
857
858// v0.2: Create a snapshot for an entity
859pub async fn create_snapshot(
860    State(store): State<SharedStore>,
861    Json(req): Json<CreateSnapshotRequest>,
862) -> Result<Json<CreateSnapshotResponse>> {
863    store.create_snapshot(&req.entity_id)?;
864
865    let snapshot_manager = store.snapshot_manager();
866    let snapshot = snapshot_manager
867        .get_latest_snapshot(&req.entity_id)
868        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
869
870    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
871
872    Ok(Json(CreateSnapshotResponse {
873        snapshot_id: snapshot.id,
874        entity_id: snapshot.entity_id,
875        created_at: snapshot.created_at,
876        event_count: snapshot.event_count,
877        size_bytes: snapshot.metadata.size_bytes,
878    }))
879}
880
881// v0.2: List snapshots
882pub async fn list_snapshots(
883    State(store): State<SharedStore>,
884    Query(req): Query<ListSnapshotsRequest>,
885) -> Result<Json<ListSnapshotsResponse>> {
886    let snapshot_manager = store.snapshot_manager();
887
888    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
889        snapshot_manager
890            .get_all_snapshots(&entity_id)
891            .into_iter()
892            .map(SnapshotInfo::from)
893            .collect()
894    } else {
895        // List all entities with snapshots
896        let entities = snapshot_manager.list_entities();
897        entities
898            .iter()
899            .flat_map(|entity_id| {
900                snapshot_manager
901                    .get_all_snapshots(entity_id)
902                    .into_iter()
903                    .map(SnapshotInfo::from)
904            })
905            .collect()
906    };
907
908    let total = snapshots.len();
909
910    tracing::debug!("Listed {} snapshots", total);
911
912    Ok(Json(ListSnapshotsResponse { snapshots, total }))
913}
914
915// v0.2: Get latest snapshot for an entity
916pub async fn get_latest_snapshot(
917    State(store): State<SharedStore>,
918    Path(entity_id): Path<String>,
919) -> Result<Json<serde_json::Value>> {
920    let snapshot_manager = store.snapshot_manager();
921
922    let snapshot = snapshot_manager
923        .get_latest_snapshot(&entity_id)
924        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
925
926    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
927
928    Ok(Json(serde_json::json!({
929        "snapshot_id": snapshot.id,
930        "entity_id": snapshot.entity_id,
931        "created_at": snapshot.created_at,
932        "as_of": snapshot.as_of,
933        "event_count": snapshot.event_count,
934        "size_bytes": snapshot.metadata.size_bytes,
935        "snapshot_type": snapshot.metadata.snapshot_type,
936        "state": snapshot.state
937    })))
938}
939
940// v0.2: Trigger manual compaction
941pub async fn trigger_compaction(
942    State(store): State<SharedStore>,
943) -> Result<Json<CompactionResult>> {
944    let compaction_manager = store.compaction_manager().ok_or_else(|| {
945        crate::error::AllSourceError::InternalError(
946            "Compaction not enabled (no Parquet storage)".to_string(),
947        )
948    })?;
949
950    tracing::info!("📦 Manual compaction triggered via API");
951
952    let result = compaction_manager.compact_now()?;
953
954    Ok(Json(result))
955}
956
957// v0.2: Get compaction statistics
958pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
959    let compaction_manager = store.compaction_manager().ok_or_else(|| {
960        crate::error::AllSourceError::InternalError(
961            "Compaction not enabled (no Parquet storage)".to_string(),
962        )
963    })?;
964
965    let stats = compaction_manager.stats();
966    let config = compaction_manager.config();
967
968    Ok(Json(serde_json::json!({
969        "stats": stats,
970        "config": {
971            "min_files_to_compact": config.min_files_to_compact,
972            "target_file_size": config.target_file_size,
973            "max_file_size": config.max_file_size,
974            "small_file_threshold": config.small_file_threshold,
975            "compaction_interval_seconds": config.compaction_interval_seconds,
976            "auto_compact": config.auto_compact,
977            "strategy": config.strategy
978        }
979    })))
980}
981
982// v0.5: Register a new schema
983pub async fn register_schema(
984    State(store): State<SharedStore>,
985    Json(req): Json<RegisterSchemaRequest>,
986) -> Result<Json<RegisterSchemaResponse>> {
987    let schema_registry = store.schema_registry();
988
989    let response =
990        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
991
992    tracing::info!(
993        "📋 Schema registered: v{} for '{}'",
994        response.version,
995        response.subject
996    );
997
998    Ok(Json(response))
999}
1000
1001// v0.5: Get a schema by subject and optional version
1002#[derive(Deserialize)]
1003pub struct GetSchemaParams {
1004    version: Option<u32>,
1005}
1006
1007pub async fn get_schema(
1008    State(store): State<SharedStore>,
1009    Path(subject): Path<String>,
1010    Query(params): Query<GetSchemaParams>,
1011) -> Result<Json<serde_json::Value>> {
1012    let schema_registry = store.schema_registry();
1013
1014    let schema = schema_registry.get_schema(&subject, params.version)?;
1015
1016    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1017
1018    Ok(Json(serde_json::json!({
1019        "id": schema.id,
1020        "subject": schema.subject,
1021        "version": schema.version,
1022        "schema": schema.schema,
1023        "created_at": schema.created_at,
1024        "description": schema.description,
1025        "tags": schema.tags
1026    })))
1027}
1028
1029// v0.5: List all versions of a schema subject
1030pub async fn list_schema_versions(
1031    State(store): State<SharedStore>,
1032    Path(subject): Path<String>,
1033) -> Result<Json<serde_json::Value>> {
1034    let schema_registry = store.schema_registry();
1035
1036    let versions = schema_registry.list_versions(&subject)?;
1037
1038    Ok(Json(serde_json::json!({
1039        "subject": subject,
1040        "versions": versions
1041    })))
1042}
1043
1044// v0.5: List all schema subjects
1045pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1046    let schema_registry = store.schema_registry();
1047
1048    let subjects = schema_registry.list_subjects();
1049
1050    Json(serde_json::json!({
1051        "subjects": subjects,
1052        "total": subjects.len()
1053    }))
1054}
1055
1056// v0.5: Validate an event against a schema
1057pub async fn validate_event_schema(
1058    State(store): State<SharedStore>,
1059    Json(req): Json<ValidateEventRequest>,
1060) -> Result<Json<ValidateEventResponse>> {
1061    let schema_registry = store.schema_registry();
1062
1063    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1064
1065    if response.valid {
1066        tracing::debug!(
1067            "✅ Event validated against schema '{}' v{}",
1068            req.subject,
1069            response.schema_version
1070        );
1071    } else {
1072        tracing::warn!(
1073            "❌ Event validation failed for '{}': {:?}",
1074            req.subject,
1075            response.errors
1076        );
1077    }
1078
1079    Ok(Json(response))
1080}
1081
1082// v0.5: Set compatibility mode for a subject
1083#[derive(Deserialize)]
1084pub struct SetCompatibilityRequest {
1085    compatibility: CompatibilityMode,
1086}
1087
1088pub async fn set_compatibility_mode(
1089    State(store): State<SharedStore>,
1090    Path(subject): Path<String>,
1091    Json(req): Json<SetCompatibilityRequest>,
1092) -> Json<serde_json::Value> {
1093    let schema_registry = store.schema_registry();
1094
1095    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1096
1097    tracing::info!(
1098        "🔧 Set compatibility mode for '{}' to {:?}",
1099        subject,
1100        req.compatibility
1101    );
1102
1103    Json(serde_json::json!({
1104        "subject": subject,
1105        "compatibility": req.compatibility
1106    }))
1107}
1108
1109// v0.5: Start a replay operation
1110pub async fn start_replay(
1111    State(store): State<SharedStore>,
1112    Json(req): Json<StartReplayRequest>,
1113) -> Result<Json<StartReplayResponse>> {
1114    let replay_manager = store.replay_manager();
1115
1116    let response = replay_manager.start_replay(store, req)?;
1117
1118    tracing::info!(
1119        "🔄 Started replay {} with {} events",
1120        response.replay_id,
1121        response.total_events
1122    );
1123
1124    Ok(Json(response))
1125}
1126
1127// v0.5: Get replay progress
1128pub async fn get_replay_progress(
1129    State(store): State<SharedStore>,
1130    Path(replay_id): Path<uuid::Uuid>,
1131) -> Result<Json<ReplayProgress>> {
1132    let replay_manager = store.replay_manager();
1133
1134    let progress = replay_manager.get_progress(replay_id)?;
1135
1136    Ok(Json(progress))
1137}
1138
1139// v0.5: List all replay operations
1140pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1141    let replay_manager = store.replay_manager();
1142
1143    let replays = replay_manager.list_replays();
1144
1145    Json(serde_json::json!({
1146        "replays": replays,
1147        "total": replays.len()
1148    }))
1149}
1150
1151// v0.5: Cancel a running replay
1152pub async fn cancel_replay(
1153    State(store): State<SharedStore>,
1154    Path(replay_id): Path<uuid::Uuid>,
1155) -> Result<Json<serde_json::Value>> {
1156    let replay_manager = store.replay_manager();
1157
1158    replay_manager.cancel_replay(replay_id)?;
1159
1160    tracing::info!("🛑 Cancelled replay {}", replay_id);
1161
1162    Ok(Json(serde_json::json!({
1163        "replay_id": replay_id,
1164        "status": "cancelled"
1165    })))
1166}
1167
1168// v0.5: Delete a completed replay
1169pub async fn delete_replay(
1170    State(store): State<SharedStore>,
1171    Path(replay_id): Path<uuid::Uuid>,
1172) -> Result<Json<serde_json::Value>> {
1173    let replay_manager = store.replay_manager();
1174
1175    let deleted = replay_manager.delete_replay(replay_id)?;
1176
1177    if deleted {
1178        tracing::info!("🗑️  Deleted replay {}", replay_id);
1179    }
1180
1181    Ok(Json(serde_json::json!({
1182        "replay_id": replay_id,
1183        "deleted": deleted
1184    })))
1185}
1186
1187// v0.5: Register a new pipeline
1188pub async fn register_pipeline(
1189    State(store): State<SharedStore>,
1190    Json(config): Json<PipelineConfig>,
1191) -> Result<Json<serde_json::Value>> {
1192    let pipeline_manager = store.pipeline_manager();
1193
1194    let pipeline_id = pipeline_manager.register(config.clone());
1195
1196    tracing::info!(
1197        "🔀 Pipeline registered: {} (name: {})",
1198        pipeline_id,
1199        config.name
1200    );
1201
1202    Ok(Json(serde_json::json!({
1203        "pipeline_id": pipeline_id,
1204        "name": config.name,
1205        "enabled": config.enabled
1206    })))
1207}
1208
1209// v0.5: List all pipelines
1210pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1211    let pipeline_manager = store.pipeline_manager();
1212
1213    let pipelines = pipeline_manager.list();
1214
1215    tracing::debug!("Listed {} pipelines", pipelines.len());
1216
1217    Json(serde_json::json!({
1218        "pipelines": pipelines,
1219        "total": pipelines.len()
1220    }))
1221}
1222
1223// v0.5: Get a specific pipeline
1224pub async fn get_pipeline(
1225    State(store): State<SharedStore>,
1226    Path(pipeline_id): Path<uuid::Uuid>,
1227) -> Result<Json<PipelineConfig>> {
1228    let pipeline_manager = store.pipeline_manager();
1229
1230    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1231        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1232    })?;
1233
1234    Ok(Json(pipeline.config().clone()))
1235}
1236
1237// v0.5: Remove a pipeline
1238pub async fn remove_pipeline(
1239    State(store): State<SharedStore>,
1240    Path(pipeline_id): Path<uuid::Uuid>,
1241) -> Result<Json<serde_json::Value>> {
1242    let pipeline_manager = store.pipeline_manager();
1243
1244    let removed = pipeline_manager.remove(pipeline_id);
1245
1246    if removed {
1247        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1248    }
1249
1250    Ok(Json(serde_json::json!({
1251        "pipeline_id": pipeline_id,
1252        "removed": removed
1253    })))
1254}
1255
1256// v0.5: Get statistics for all pipelines
1257pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1258    let pipeline_manager = store.pipeline_manager();
1259
1260    let stats = pipeline_manager.all_stats();
1261
1262    Json(serde_json::json!({
1263        "stats": stats,
1264        "total": stats.len()
1265    }))
1266}
1267
1268// v0.5: Get statistics for a specific pipeline
1269pub async fn get_pipeline_stats(
1270    State(store): State<SharedStore>,
1271    Path(pipeline_id): Path<uuid::Uuid>,
1272) -> Result<Json<PipelineStats>> {
1273    let pipeline_manager = store.pipeline_manager();
1274
1275    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1276        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1277    })?;
1278
1279    Ok(Json(pipeline.stats()))
1280}
1281
1282// v0.5: Reset a pipeline's state
1283pub async fn reset_pipeline(
1284    State(store): State<SharedStore>,
1285    Path(pipeline_id): Path<uuid::Uuid>,
1286) -> Result<Json<serde_json::Value>> {
1287    let pipeline_manager = store.pipeline_manager();
1288
1289    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1290        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1291    })?;
1292
1293    pipeline.reset();
1294
1295    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1296
1297    Ok(Json(serde_json::json!({
1298        "pipeline_id": pipeline_id,
1299        "reset": true
1300    })))
1301}
1302
1303// =============================================================================
1304// v0.11: Single Event Lookup by ID
1305// =============================================================================
1306
1307/// Get a single event by UUID
1308pub async fn get_event_by_id(
1309    State(store): State<SharedStore>,
1310    Path(event_id): Path<uuid::Uuid>,
1311) -> Result<Json<serde_json::Value>> {
1312    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1313        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1314    })?;
1315
1316    let dto = EventDto::from(&event);
1317
1318    tracing::debug!("Event retrieved by ID: {}", event_id);
1319
1320    Ok(Json(serde_json::json!({
1321        "event": dto,
1322        "found": true
1323    })))
1324}
1325
1326// =============================================================================
1327// v0.7: Projection State API for Query Service Integration
1328// =============================================================================
1329
1330/// List all registered projections
1331pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1332    let projection_manager = store.projection_manager();
1333    let status_map = store.projection_status();
1334
1335    let projections: Vec<serde_json::Value> = projection_manager
1336        .list_projections()
1337        .iter()
1338        .map(|(name, projection)| {
1339            let status = status_map
1340                .get(name)
1341                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1342            serde_json::json!({
1343                "name": name,
1344                "type": format!("{:?}", projection.name()),
1345                "status": status,
1346            })
1347        })
1348        .collect();
1349
1350    tracing::debug!("Listed {} projections", projections.len());
1351
1352    Json(serde_json::json!({
1353        "projections": projections,
1354        "total": projections.len()
1355    }))
1356}
1357
1358/// Get projection metadata by name
1359pub async fn get_projection(
1360    State(store): State<SharedStore>,
1361    Path(name): Path<String>,
1362) -> Result<Json<serde_json::Value>> {
1363    let projection_manager = store.projection_manager();
1364
1365    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1366        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1367    })?;
1368
1369    Ok(Json(serde_json::json!({
1370        "name": projection.name(),
1371        "found": true
1372    })))
1373}
1374
1375/// Get projection state for a specific entity
1376///
1377/// This endpoint allows the Elixir Query Service to fetch projection state
1378/// from the Rust Core for synchronization.
1379pub async fn get_projection_state(
1380    State(store): State<SharedStore>,
1381    Path((name, entity_id)): Path<(String, String)>,
1382) -> Result<Json<serde_json::Value>> {
1383    let projection_manager = store.projection_manager();
1384
1385    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1386        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1387    })?;
1388
1389    let state = projection.get_state(&entity_id);
1390
1391    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1392
1393    Ok(Json(serde_json::json!({
1394        "projection": name,
1395        "entity_id": entity_id,
1396        "state": state,
1397        "found": state.is_some()
1398    })))
1399}
1400
1401/// Delete (clear) a projection by name
1402///
1403/// Removes all state from the projection. The projection definition remains
1404/// registered but its accumulated state is cleared.
1405pub async fn delete_projection(
1406    State(store): State<SharedStore>,
1407    Path(name): Path<String>,
1408) -> Result<Json<serde_json::Value>> {
1409    let projection_manager = store.projection_manager();
1410
1411    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1412        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1413    })?;
1414
1415    projection.clear();
1416
1417    // Also clear any cached state for this projection
1418    let cache = store.projection_state_cache();
1419    let prefix = format!("{name}:");
1420    let keys_to_remove: Vec<String> = cache
1421        .iter()
1422        .filter(|entry| entry.key().starts_with(&prefix))
1423        .map(|entry| entry.key().clone())
1424        .collect();
1425    for key in keys_to_remove {
1426        cache.remove(&key);
1427    }
1428
1429    tracing::info!("Projection deleted (cleared): {}", name);
1430
1431    Ok(Json(serde_json::json!({
1432        "projection": name,
1433        "deleted": true
1434    })))
1435}
1436
1437/// Get aggregate projection state (all entities)
1438///
1439/// Returns summary information about a projection's state across all entities.
1440pub async fn get_projection_state_summary(
1441    State(store): State<SharedStore>,
1442    Path(name): Path<String>,
1443) -> Result<Json<serde_json::Value>> {
1444    let projection_manager = store.projection_manager();
1445
1446    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1447        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1448    })?;
1449
1450    // Collect cached states for this projection
1451    let cache = store.projection_state_cache();
1452    let prefix = format!("{name}:");
1453    let states: Vec<serde_json::Value> = cache
1454        .iter()
1455        .filter(|entry| entry.key().starts_with(&prefix))
1456        .map(|entry| {
1457            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1458            serde_json::json!({
1459                "entity_id": entity_id,
1460                "state": entry.value().clone()
1461            })
1462        })
1463        .collect();
1464
1465    let total = states.len();
1466
1467    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1468
1469    Ok(Json(serde_json::json!({
1470        "projection": name,
1471        "states": states,
1472        "total": total
1473    })))
1474}
1475
1476/// Reset a projection to its initial state
1477///
1478/// Clears all accumulated state and reprocesses events from the beginning.
1479pub async fn reset_projection(
1480    State(store): State<SharedStore>,
1481    Path(name): Path<String>,
1482) -> Result<Json<serde_json::Value>> {
1483    let reprocessed = store.reset_projection(&name)?;
1484
1485    tracing::info!(
1486        "Projection reset: {} ({} events reprocessed)",
1487        name,
1488        reprocessed
1489    );
1490
1491    Ok(Json(serde_json::json!({
1492        "projection": name,
1493        "reset": true,
1494        "events_reprocessed": reprocessed
1495    })))
1496}
1497
1498/// Pause a projection
1499///
1500/// Sets the projection status to "paused" so it stops processing new events.
1501pub async fn pause_projection(
1502    State(store): State<SharedStore>,
1503    Path(name): Path<String>,
1504) -> Result<Json<serde_json::Value>> {
1505    let projection_manager = store.projection_manager();
1506
1507    // Verify projection exists
1508    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1509        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1510    })?;
1511
1512    store
1513        .projection_status()
1514        .insert(name.clone(), "paused".to_string());
1515
1516    tracing::info!("Projection paused: {}", name);
1517
1518    Ok(Json(serde_json::json!({
1519        "projection": name,
1520        "status": "paused"
1521    })))
1522}
1523
1524/// Start (resume) a projection
1525///
1526/// Sets the projection status to "running" so it resumes processing events.
1527pub async fn start_projection(
1528    State(store): State<SharedStore>,
1529    Path(name): Path<String>,
1530) -> Result<Json<serde_json::Value>> {
1531    let projection_manager = store.projection_manager();
1532
1533    // Verify projection exists
1534    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1535        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1536    })?;
1537
1538    store
1539        .projection_status()
1540        .insert(name.clone(), "running".to_string());
1541
1542    tracing::info!("Projection started: {}", name);
1543
1544    Ok(Json(serde_json::json!({
1545        "projection": name,
1546        "status": "running"
1547    })))
1548}
1549
1550/// Request body for saving projection state
1551#[derive(Debug, Deserialize)]
1552pub struct SaveProjectionStateRequest {
1553    pub state: serde_json::Value,
1554}
1555
1556/// Save/update projection state for an entity
1557///
1558/// This endpoint allows external services (like Elixir Query Service) to
1559/// store computed projection state back to the Core for persistence.
1560pub async fn save_projection_state(
1561    State(store): State<SharedStore>,
1562    Path((name, entity_id)): Path<(String, String)>,
1563    Json(req): Json<SaveProjectionStateRequest>,
1564) -> Result<Json<serde_json::Value>> {
1565    let projection_cache = store.projection_state_cache();
1566
1567    // Store in the projection state cache
1568    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1569
1570    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1571
1572    Ok(Json(serde_json::json!({
1573        "projection": name,
1574        "entity_id": entity_id,
1575        "saved": true
1576    })))
1577}
1578
1579/// Bulk get projection states for multiple entities
1580///
1581/// Efficient endpoint for fetching multiple entity states in a single request.
1582#[derive(Debug, Deserialize)]
1583pub struct BulkGetStateRequest {
1584    pub entity_ids: Vec<String>,
1585}
1586
1587/// Bulk save projection states for multiple entities
1588///
1589/// Efficient endpoint for saving multiple entity states in a single request.
1590#[derive(Debug, Deserialize)]
1591pub struct BulkSaveStateRequest {
1592    pub states: Vec<BulkSaveStateItem>,
1593}
1594
1595#[derive(Debug, Deserialize)]
1596pub struct BulkSaveStateItem {
1597    pub entity_id: String,
1598    pub state: serde_json::Value,
1599}
1600
1601pub async fn bulk_get_projection_states(
1602    State(store): State<SharedStore>,
1603    Path(name): Path<String>,
1604    Json(req): Json<BulkGetStateRequest>,
1605) -> Result<Json<serde_json::Value>> {
1606    let projection_manager = store.projection_manager();
1607
1608    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1609        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1610    })?;
1611
1612    let states: Vec<serde_json::Value> = req
1613        .entity_ids
1614        .iter()
1615        .map(|entity_id| {
1616            let state = projection.get_state(entity_id);
1617            serde_json::json!({
1618                "entity_id": entity_id,
1619                "state": state,
1620                "found": state.is_some()
1621            })
1622        })
1623        .collect();
1624
1625    tracing::debug!(
1626        "Bulk projection state retrieved: {} entities from {}",
1627        states.len(),
1628        name
1629    );
1630
1631    Ok(Json(serde_json::json!({
1632        "projection": name,
1633        "states": states,
1634        "total": states.len()
1635    })))
1636}
1637
1638/// Bulk save projection states for multiple entities
1639///
1640/// This endpoint allows efficient batch saving of projection states,
1641/// critical for high-throughput event processing pipelines.
1642pub async fn bulk_save_projection_states(
1643    State(store): State<SharedStore>,
1644    Path(name): Path<String>,
1645    Json(req): Json<BulkSaveStateRequest>,
1646) -> Result<Json<serde_json::Value>> {
1647    let projection_cache = store.projection_state_cache();
1648
1649    let mut saved_count = 0;
1650    for item in &req.states {
1651        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1652        saved_count += 1;
1653    }
1654
1655    tracing::info!(
1656        "Bulk projection state saved: {} entities for {}",
1657        saved_count,
1658        name
1659    );
1660
1661    Ok(Json(serde_json::json!({
1662        "projection": name,
1663        "saved": saved_count,
1664        "total": req.states.len()
1665    })))
1666}
1667
1668// =============================================================================
1669// v0.11: Webhook Management API
1670// =============================================================================
1671
1672/// Query parameters for listing webhooks
1673#[derive(Debug, Deserialize)]
1674pub struct ListWebhooksParams {
1675    pub tenant_id: Option<String>,
1676}
1677
1678/// Register a new webhook subscription
1679pub async fn register_webhook(
1680    State(store): State<SharedStore>,
1681    Json(req): Json<RegisterWebhookRequest>,
1682) -> Json<serde_json::Value> {
1683    let registry = store.webhook_registry();
1684    let webhook = registry.register(req);
1685
1686    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1687
1688    Json(serde_json::json!({
1689        "webhook": webhook,
1690        "created": true
1691    }))
1692}
1693
1694/// List webhooks, optionally filtered by tenant_id
1695pub async fn list_webhooks(
1696    State(store): State<SharedStore>,
1697    Query(params): Query<ListWebhooksParams>,
1698) -> Json<serde_json::Value> {
1699    let registry = store.webhook_registry();
1700
1701    let webhooks = if let Some(tenant_id) = params.tenant_id {
1702        registry.list_by_tenant(&tenant_id)
1703    } else {
1704        // Without tenant filter, return empty (tenants should always filter)
1705        vec![]
1706    };
1707
1708    let total = webhooks.len();
1709
1710    Json(serde_json::json!({
1711        "webhooks": webhooks,
1712        "total": total
1713    }))
1714}
1715
1716/// Get a specific webhook by ID
1717pub async fn get_webhook(
1718    State(store): State<SharedStore>,
1719    Path(webhook_id): Path<uuid::Uuid>,
1720) -> Result<Json<serde_json::Value>> {
1721    let registry = store.webhook_registry();
1722
1723    let webhook = registry.get(webhook_id).ok_or_else(|| {
1724        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1725    })?;
1726
1727    Ok(Json(serde_json::json!({
1728        "webhook": webhook,
1729        "found": true
1730    })))
1731}
1732
1733/// Update a webhook subscription
1734pub async fn update_webhook(
1735    State(store): State<SharedStore>,
1736    Path(webhook_id): Path<uuid::Uuid>,
1737    Json(req): Json<UpdateWebhookRequest>,
1738) -> Result<Json<serde_json::Value>> {
1739    let registry = store.webhook_registry();
1740
1741    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1742        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1743    })?;
1744
1745    tracing::info!("Webhook updated: {}", webhook_id);
1746
1747    Ok(Json(serde_json::json!({
1748        "webhook": webhook,
1749        "updated": true
1750    })))
1751}
1752
1753/// Delete a webhook subscription
1754pub async fn delete_webhook(
1755    State(store): State<SharedStore>,
1756    Path(webhook_id): Path<uuid::Uuid>,
1757) -> Result<Json<serde_json::Value>> {
1758    let registry = store.webhook_registry();
1759
1760    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1761        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1762    })?;
1763
1764    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1765
1766    Ok(Json(serde_json::json!({
1767        "webhook_id": webhook_id,
1768        "deleted": true
1769    })))
1770}
1771
1772/// Query parameters for listing webhook deliveries
1773#[derive(Debug, Deserialize)]
1774pub struct ListDeliveriesParams {
1775    pub limit: Option<usize>,
1776}
1777
1778/// List delivery history for a webhook
1779pub async fn list_webhook_deliveries(
1780    State(store): State<SharedStore>,
1781    Path(webhook_id): Path<uuid::Uuid>,
1782    Query(params): Query<ListDeliveriesParams>,
1783) -> Result<Json<serde_json::Value>> {
1784    let registry = store.webhook_registry();
1785
1786    // Verify webhook exists
1787    registry.get(webhook_id).ok_or_else(|| {
1788        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1789    })?;
1790
1791    let limit = params.limit.unwrap_or(50);
1792    let deliveries = registry.get_deliveries(webhook_id, limit);
1793    let total = deliveries.len();
1794
1795    Ok(Json(serde_json::json!({
1796        "webhook_id": webhook_id,
1797        "deliveries": deliveries,
1798        "total": total
1799    })))
1800}
1801
1802// =============================================================================
1803// v2.0: Advanced Query Features
1804// =============================================================================
1805
1806/// EventQL: Execute SQL queries over events using DataFusion
1807#[cfg(feature = "analytics")]
1808pub async fn eventql_query(
1809    State(store): State<SharedStore>,
1810    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1811) -> Result<Json<serde_json::Value>> {
1812    let events = store.snapshot_events();
1813    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1814        Ok(response) => Ok(Json(serde_json::json!({
1815            "columns": response.columns,
1816            "rows": response.rows,
1817            "row_count": response.row_count,
1818        }))),
1819        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1820    }
1821}
1822
1823/// GraphQL: Execute GraphQL queries
1824pub async fn graphql_query(
1825    State(store): State<SharedStore>,
1826    Json(req): Json<GraphQLRequest>,
1827) -> Json<serde_json::Value> {
1828    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1829        Ok(f) => f,
1830        Err(e) => {
1831            return Json(
1832                serde_json::to_value(GraphQLResponse {
1833                    data: None,
1834                    errors: vec![GraphQLError { message: e }],
1835                })
1836                .unwrap(),
1837            );
1838        }
1839    };
1840
1841    let mut data = serde_json::Map::new();
1842    let mut errors = Vec::new();
1843
1844    for field in &fields {
1845        match field.name.as_str() {
1846            "events" => {
1847                let request = crate::application::dto::QueryEventsRequest {
1848                    entity_id: field.arguments.get("entity_id").cloned(),
1849                    event_type: field.arguments.get("event_type").cloned(),
1850                    tenant_id: field.arguments.get("tenant_id").cloned(),
1851                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1852                    as_of: None,
1853                    since: None,
1854                    until: None,
1855                    event_type_prefix: None,
1856                    payload_filter: None,
1857                };
1858                match store.query(&request) {
1859                    Ok(events) => {
1860                        let json_events: Vec<serde_json::Value> = events
1861                            .iter()
1862                            .map(|e| {
1863                                crate::infrastructure::query::graphql::event_to_json(
1864                                    e,
1865                                    &field.fields,
1866                                )
1867                            })
1868                            .collect();
1869                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1870                    }
1871                    Err(e) => errors.push(GraphQLError {
1872                        message: format!("events query failed: {e}"),
1873                    }),
1874                }
1875            }
1876            "event" => {
1877                if let Some(id_str) = field.arguments.get("id") {
1878                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1879                        match store.get_event_by_id(&id) {
1880                            Ok(Some(event)) => {
1881                                data.insert(
1882                                    "event".to_string(),
1883                                    crate::infrastructure::query::graphql::event_to_json(
1884                                        &event,
1885                                        &field.fields,
1886                                    ),
1887                                );
1888                            }
1889                            Ok(None) => {
1890                                data.insert("event".to_string(), serde_json::Value::Null);
1891                            }
1892                            Err(e) => errors.push(GraphQLError {
1893                                message: format!("event lookup failed: {e}"),
1894                            }),
1895                        }
1896                    } else {
1897                        errors.push(GraphQLError {
1898                            message: format!("Invalid UUID: {id_str}"),
1899                        });
1900                    }
1901                } else {
1902                    errors.push(GraphQLError {
1903                        message: "event query requires 'id' argument".to_string(),
1904                    });
1905                }
1906            }
1907            "projections" => {
1908                let pm = store.projection_manager();
1909                let names: Vec<serde_json::Value> = pm
1910                    .list_projections()
1911                    .iter()
1912                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1913                    .collect();
1914                data.insert("projections".to_string(), serde_json::Value::Array(names));
1915            }
1916            "stats" => {
1917                let stats = store.stats();
1918                data.insert(
1919                    "stats".to_string(),
1920                    serde_json::json!({
1921                        "total_events": stats.total_events,
1922                        "total_entities": stats.total_entities,
1923                        "total_event_types": stats.total_event_types,
1924                    }),
1925                );
1926            }
1927            "__schema" => {
1928                data.insert(
1929                    "__schema".to_string(),
1930                    crate::infrastructure::query::graphql::introspection_schema(),
1931                );
1932            }
1933            other => {
1934                errors.push(GraphQLError {
1935                    message: format!("Unknown field: {other}"),
1936                });
1937            }
1938        }
1939    }
1940
1941    Json(
1942        serde_json::to_value(GraphQLResponse {
1943            data: Some(serde_json::Value::Object(data)),
1944            errors,
1945        })
1946        .unwrap(),
1947    )
1948}
1949
1950/// Geospatial: Query events by location
1951pub async fn geo_query(
1952    State(store): State<SharedStore>,
1953    Json(req): Json<GeoQueryRequest>,
1954) -> Json<serde_json::Value> {
1955    let events = store.snapshot_events();
1956    let geo_index = store.geo_index();
1957    let results =
1958        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1959    let total = results.len();
1960    Json(serde_json::json!({
1961        "results": results,
1962        "total": total,
1963    }))
1964}
1965
1966/// Geospatial index stats
1967pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1968    let stats = store.geo_index().stats();
1969    Json(serde_json::json!(stats))
1970}
1971
1972/// Exactly-once processing stats
1973pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1974    let stats = store.exactly_once().stats();
1975    Json(serde_json::json!(stats))
1976}
1977
1978/// Schema evolution history for an event type
1979pub async fn schema_evolution_history(
1980    State(store): State<SharedStore>,
1981    Path(event_type): Path<String>,
1982) -> Json<serde_json::Value> {
1983    let mgr = store.schema_evolution();
1984    let history = mgr.get_history(&event_type);
1985    let version = mgr.get_version(&event_type);
1986    Json(serde_json::json!({
1987        "event_type": event_type,
1988        "current_version": version,
1989        "history": history,
1990    }))
1991}
1992
1993/// Current inferred schema for an event type
1994pub async fn schema_evolution_schema(
1995    State(store): State<SharedStore>,
1996    Path(event_type): Path<String>,
1997) -> Json<serde_json::Value> {
1998    let mgr = store.schema_evolution();
1999    if let Some(schema) = mgr.get_schema(&event_type) {
2000        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2001        Json(serde_json::json!({
2002            "event_type": event_type,
2003            "version": mgr.get_version(&event_type),
2004            "inferred_schema": schema,
2005            "json_schema": json_schema,
2006        }))
2007    } else {
2008        Json(serde_json::json!({
2009            "event_type": event_type,
2010            "error": "No schema inferred for this event type"
2011        }))
2012    }
2013}
2014
2015/// Schema evolution stats
2016pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2017    let stats = store.schema_evolution().stats();
2018    let event_types = store.schema_evolution().list_event_types();
2019    Json(serde_json::json!({
2020        "stats": stats,
2021        "tracked_event_types": event_types,
2022    }))
2023}
2024
2025// =============================================================================
2026// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2027// =============================================================================
2028
2029/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2030#[cfg(feature = "embedded-sync")]
2031pub async fn sync_pull_handler(
2032    State(state): State<AppState>,
2033    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2034) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2035    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2036
2037    let store = &state.store;
2038
2039    // Compute "since" threshold from the client's version vector
2040    // We return all events the client hasn't seen yet
2041    let since = request
2042        .version_vector
2043        .values()
2044        .map(|ts| ts.physical_ms)
2045        .min()
2046        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2047
2048    let events = store.query(&crate::application::dto::QueryEventsRequest {
2049        entity_id: None,
2050        event_type: None,
2051        tenant_id: None,
2052        as_of: None,
2053        since,
2054        until: None,
2055        limit: None,
2056        event_type_prefix: None,
2057        payload_filter: None,
2058    })?;
2059
2060    // Convert domain events to ReplicatedEvent wire format
2061    let mut replicated = Vec::with_capacity(events.len());
2062    let mut last_ms = 0u64;
2063    let mut logical = 0u32;
2064
2065    for event in &events {
2066        let event_ms = event.timestamp().timestamp_millis() as u64;
2067        if event_ms == last_ms {
2068            logical += 1;
2069        } else {
2070            last_ms = event_ms;
2071            logical = 0;
2072        }
2073
2074        replicated.push(ReplicatedEvent {
2075            event_id: event.id().to_string(),
2076            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2077            origin_region: "server".to_string(),
2078            event_data: serde_json::json!({
2079                "event_type": event.event_type_str(),
2080                "entity_id": event.entity_id_str(),
2081                "tenant_id": event.tenant_id_str(),
2082                "payload": event.payload,
2083                "metadata": event.metadata,
2084            }),
2085        });
2086    }
2087
2088    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2089        events: replicated,
2090        version_vector: std::collections::BTreeMap::new(),
2091    }))
2092}
2093
2094/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2095#[cfg(feature = "embedded-sync")]
2096pub async fn sync_push_handler(
2097    State(state): State<AppState>,
2098    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2099) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2100    let store = &state.store;
2101
2102    let mut accepted = 0usize;
2103    let mut skipped = 0usize;
2104
2105    for rep_event in &request.events {
2106        let event_data = &rep_event.event_data;
2107        let event_type = event_data
2108            .get("event_type")
2109            .and_then(|v| v.as_str())
2110            .unwrap_or("unknown")
2111            .to_string();
2112        let entity_id = event_data
2113            .get("entity_id")
2114            .and_then(|v| v.as_str())
2115            .unwrap_or("unknown")
2116            .to_string();
2117        let tenant_id = event_data
2118            .get("tenant_id")
2119            .and_then(|v| v.as_str())
2120            .unwrap_or("default")
2121            .to_string();
2122        let payload = event_data
2123            .get("payload")
2124            .cloned()
2125            .unwrap_or(serde_json::json!({}));
2126        let metadata = event_data.get("metadata").cloned();
2127
2128        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2129            Ok(domain_event) => {
2130                store.ingest(&domain_event)?;
2131                accepted += 1;
2132            }
2133            Err(_) => {
2134                skipped += 1;
2135            }
2136        }
2137    }
2138
2139    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2140        accepted,
2141        skipped,
2142        version_vector: std::collections::BTreeMap::new(),
2143    }))
2144}
2145
2146// =============================================================================
2147// Consumer endpoints for durable subscriptions (v0.14)
2148// =============================================================================
2149
2150/// POST /api/v1/consumers — Register a durable consumer
2151pub async fn register_consumer(
2152    State(store): State<SharedStore>,
2153    Json(req): Json<RegisterConsumerRequest>,
2154) -> Result<Json<ConsumerResponse>> {
2155    let consumer = store
2156        .consumer_registry()
2157        .register(&req.consumer_id, &req.event_type_filters);
2158
2159    Ok(Json(ConsumerResponse {
2160        consumer_id: consumer.consumer_id,
2161        event_type_filters: consumer.event_type_filters,
2162        cursor_position: consumer.cursor_position,
2163    }))
2164}
2165
2166/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2167pub async fn get_consumer(
2168    State(store): State<SharedStore>,
2169    Path(consumer_id): Path<String>,
2170) -> Result<Json<ConsumerResponse>> {
2171    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2172
2173    Ok(Json(ConsumerResponse {
2174        consumer_id: consumer.consumer_id,
2175        event_type_filters: consumer.event_type_filters,
2176        cursor_position: consumer.cursor_position,
2177    }))
2178}
2179
2180/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2181#[derive(Debug, Deserialize)]
2182pub struct ConsumerPollQuery {
2183    pub limit: Option<usize>,
2184}
2185
2186pub async fn poll_consumer_events(
2187    State(store): State<SharedStore>,
2188    Path(consumer_id): Path<String>,
2189    Query(query): Query<ConsumerPollQuery>,
2190) -> Result<Json<ConsumerEventsResponse>> {
2191    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2192    let offset = consumer.cursor_position.unwrap_or(0);
2193    let limit = query.limit.unwrap_or(100);
2194
2195    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2196    let count = events.len();
2197
2198    let consumer_events: Vec<ConsumerEventDto> = events
2199        .into_iter()
2200        .map(|(position, event)| ConsumerEventDto {
2201            position,
2202            event: EventDto::from(&event),
2203        })
2204        .collect();
2205
2206    Ok(Json(ConsumerEventsResponse {
2207        events: consumer_events,
2208        count,
2209    }))
2210}
2211
2212/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2213pub async fn ack_consumer(
2214    State(store): State<SharedStore>,
2215    Path(consumer_id): Path<String>,
2216    Json(req): Json<AckRequest>,
2217) -> Result<Json<serde_json::Value>> {
2218    let max_offset = store.total_events() as u64;
2219
2220    store
2221        .consumer_registry()
2222        .ack(&consumer_id, req.position, max_offset)
2223        .map_err(crate::error::AllSourceError::InvalidInput)?;
2224
2225    Ok(Json(serde_json::json!({
2226        "status": "ok",
2227        "consumer_id": consumer_id,
2228        "position": req.position,
2229    })))
2230}
2231
2232#[cfg(test)]
2233mod tests {
2234    use super::*;
2235    use crate::{domain::entities::Event, store::EventStore};
2236
2237    fn create_test_store() -> Arc<EventStore> {
2238        Arc::new(EventStore::new())
2239    }
2240
2241    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2242        Event::from_strings(
2243            event_type.to_string(),
2244            entity_id.to_string(),
2245            "test-stream".to_string(),
2246            serde_json::json!({
2247                "name": "Test",
2248                "value": 42
2249            }),
2250            None,
2251        )
2252        .unwrap()
2253    }
2254
2255    #[tokio::test]
2256    async fn test_query_events_has_more_and_total_count() {
2257        let store = create_test_store();
2258
2259        // Ingest 50 events
2260        for i in 0..50 {
2261            store
2262                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2263                .unwrap();
2264        }
2265
2266        // Query with limit=10 — should get has_more=true, total_count=50
2267        let req = QueryEventsRequest {
2268            entity_id: None,
2269            event_type: None,
2270            tenant_id: None,
2271            as_of: None,
2272            since: None,
2273            until: None,
2274            limit: Some(10),
2275            event_type_prefix: None,
2276            payload_filter: None,
2277        };
2278
2279        let requested_limit = req.limit;
2280        let unlimited_req = QueryEventsRequest {
2281            limit: None,
2282            ..QueryEventsRequest {
2283                entity_id: req.entity_id,
2284                event_type: req.event_type,
2285                tenant_id: req.tenant_id,
2286                as_of: req.as_of,
2287                since: req.since,
2288                until: req.until,
2289                limit: None,
2290                event_type_prefix: req.event_type_prefix,
2291                payload_filter: req.payload_filter,
2292            }
2293        };
2294        let all_events = store.query(&unlimited_req).unwrap();
2295        let total_count = all_events.len();
2296        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2297            all_events.into_iter().take(limit).collect()
2298        } else {
2299            all_events
2300        };
2301        let count = limited_events.len();
2302        let has_more = count < total_count;
2303
2304        assert_eq!(count, 10);
2305        assert_eq!(total_count, 50);
2306        assert!(has_more);
2307    }
2308
2309    #[tokio::test]
2310    async fn test_query_events_no_more_results() {
2311        let store = create_test_store();
2312
2313        // Ingest 5 events
2314        for i in 0..5 {
2315            store
2316                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2317                .unwrap();
2318        }
2319
2320        // Query with limit=100 — should get has_more=false, total_count=5
2321        let all_events = store
2322            .query(&QueryEventsRequest {
2323                entity_id: None,
2324                event_type: None,
2325                tenant_id: None,
2326                as_of: None,
2327                since: None,
2328                until: None,
2329                limit: None,
2330                event_type_prefix: None,
2331                payload_filter: None,
2332            })
2333            .unwrap();
2334        let total_count = all_events.len();
2335        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2336        let count = limited_events.len();
2337        let has_more = count < total_count;
2338
2339        assert_eq!(count, 5);
2340        assert_eq!(total_count, 5);
2341        assert!(!has_more);
2342    }
2343
2344    #[tokio::test]
2345    async fn test_list_entities_by_type_prefix() {
2346        let store = create_test_store();
2347
2348        // 3 index entities
2349        store
2350            .ingest(&create_test_event("idx-1", "index.created"))
2351            .unwrap();
2352        store
2353            .ingest(&create_test_event("idx-1", "index.updated"))
2354            .unwrap();
2355        store
2356            .ingest(&create_test_event("idx-2", "index.created"))
2357            .unwrap();
2358        store
2359            .ingest(&create_test_event("idx-3", "index.created"))
2360            .unwrap();
2361        // 2 trade entities
2362        store
2363            .ingest(&create_test_event("trade-1", "trade.created"))
2364            .unwrap();
2365        store
2366            .ingest(&create_test_event("trade-2", "trade.created"))
2367            .unwrap();
2368
2369        // List entities for index.*
2370        let req = ListEntitiesRequest {
2371            event_type_prefix: Some("index.".to_string()),
2372            payload_filter: None,
2373            limit: None,
2374            offset: None,
2375        };
2376        let query_req = QueryEventsRequest {
2377            entity_id: None,
2378            event_type: None,
2379            tenant_id: None,
2380            as_of: None,
2381            since: None,
2382            until: None,
2383            limit: None,
2384            event_type_prefix: req.event_type_prefix,
2385            payload_filter: req.payload_filter,
2386        };
2387        let events = store.query(&query_req).unwrap();
2388
2389        // Group and verify
2390        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2391            std::collections::HashMap::new();
2392        for event in &events {
2393            entity_map
2394                .entry(event.entity_id().to_string())
2395                .or_default()
2396                .push(event);
2397        }
2398
2399        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2400        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2401        assert_eq!(entity_map["idx-2"].len(), 1);
2402        assert_eq!(entity_map["idx-3"].len(), 1);
2403    }
2404
2405    fn create_test_event_with_payload(
2406        entity_id: &str,
2407        event_type: &str,
2408        payload: serde_json::Value,
2409    ) -> Event {
2410        Event::from_strings(
2411            event_type.to_string(),
2412            entity_id.to_string(),
2413            "test-stream".to_string(),
2414            payload,
2415            None,
2416        )
2417        .unwrap()
2418    }
2419
2420    #[tokio::test]
2421    async fn test_detect_duplicates_by_payload_fields() {
2422        let store = create_test_store();
2423
2424        // Create entities with duplicate "name" field values
2425        store
2426            .ingest(&create_test_event_with_payload(
2427                "idx-1",
2428                "index.created",
2429                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2430            ))
2431            .unwrap();
2432        store
2433            .ingest(&create_test_event_with_payload(
2434                "idx-2",
2435                "index.created",
2436                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2437            ))
2438            .unwrap();
2439        store
2440            .ingest(&create_test_event_with_payload(
2441                "idx-3",
2442                "index.created",
2443                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2444            ))
2445            .unwrap();
2446        store
2447            .ingest(&create_test_event_with_payload(
2448                "idx-4",
2449                "index.created",
2450                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2451            ))
2452            .unwrap();
2453        store
2454            .ingest(&create_test_event_with_payload(
2455                "idx-5",
2456                "index.created",
2457                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2458            ))
2459            .unwrap();
2460
2461        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2462        let query_req = QueryEventsRequest {
2463            entity_id: None,
2464            event_type: None,
2465            tenant_id: None,
2466            as_of: None,
2467            since: None,
2468            until: None,
2469            limit: None,
2470            event_type_prefix: Some("index.".to_string()),
2471            payload_filter: None,
2472        };
2473        let events = store.query(&query_req).unwrap();
2474
2475        // Manually replicate the handler logic for testing
2476        let group_by_fields = vec!["name"];
2477        let mut entity_latest: std::collections::HashMap<String, &Event> =
2478            std::collections::HashMap::new();
2479        for event in &events {
2480            let eid = event.entity_id().to_string();
2481            entity_latest
2482                .entry(eid)
2483                .and_modify(|existing| {
2484                    if event.timestamp() > existing.timestamp() {
2485                        *existing = event;
2486                    }
2487                })
2488                .or_insert(event);
2489        }
2490
2491        let mut groups: std::collections::HashMap<String, Vec<String>> =
2492            std::collections::HashMap::new();
2493        for (entity_id, event) in &entity_latest {
2494            let payload = event.payload();
2495            let mut key_parts = serde_json::Map::new();
2496            for field in &group_by_fields {
2497                let value = payload
2498                    .get(*field)
2499                    .cloned()
2500                    .unwrap_or(serde_json::Value::Null);
2501                key_parts.insert((*field).to_string(), value);
2502            }
2503            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2504            groups.entry(key_str).or_default().push(entity_id.clone());
2505        }
2506
2507        let duplicate_groups: Vec<_> = groups
2508            .into_iter()
2509            .filter(|(_, ids)| ids.len() > 1)
2510            .collect();
2511
2512        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2513        for (_, ids) in &duplicate_groups {
2514            assert_eq!(ids.len(), 2);
2515        }
2516    }
2517
2518    #[tokio::test]
2519    async fn test_detect_duplicates_no_duplicates() {
2520        let store = create_test_store();
2521
2522        // All unique names
2523        store
2524            .ingest(&create_test_event_with_payload(
2525                "idx-1",
2526                "index.created",
2527                serde_json::json!({"name": "A"}),
2528            ))
2529            .unwrap();
2530        store
2531            .ingest(&create_test_event_with_payload(
2532                "idx-2",
2533                "index.created",
2534                serde_json::json!({"name": "B"}),
2535            ))
2536            .unwrap();
2537
2538        let query_req = QueryEventsRequest {
2539            entity_id: None,
2540            event_type: None,
2541            tenant_id: None,
2542            as_of: None,
2543            since: None,
2544            until: None,
2545            limit: None,
2546            event_type_prefix: Some("index.".to_string()),
2547            payload_filter: None,
2548        };
2549        let events = store.query(&query_req).unwrap();
2550
2551        let mut entity_latest: std::collections::HashMap<String, &Event> =
2552            std::collections::HashMap::new();
2553        for event in &events {
2554            entity_latest
2555                .entry(event.entity_id().to_string())
2556                .or_insert(event);
2557        }
2558
2559        let mut groups: std::collections::HashMap<String, Vec<String>> =
2560            std::collections::HashMap::new();
2561        for (entity_id, event) in &entity_latest {
2562            let key_str =
2563                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2564                    .unwrap();
2565            groups.entry(key_str).or_default().push(entity_id.clone());
2566        }
2567
2568        let duplicate_groups: Vec<_> = groups
2569            .into_iter()
2570            .filter(|(_, ids)| ids.len() > 1)
2571            .collect();
2572
2573        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2574    }
2575
2576    #[tokio::test]
2577    async fn test_detect_duplicates_multi_field_group_by() {
2578        let store = create_test_store();
2579
2580        // Two entities with same name AND user_id = true duplicate
2581        store
2582            .ingest(&create_test_event_with_payload(
2583                "idx-1",
2584                "index.created",
2585                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2586            ))
2587            .unwrap();
2588        store
2589            .ingest(&create_test_event_with_payload(
2590                "idx-2",
2591                "index.created",
2592                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2593            ))
2594            .unwrap();
2595        // Same name but different user_id = NOT a duplicate in multi-field group
2596        store
2597            .ingest(&create_test_event_with_payload(
2598                "idx-3",
2599                "index.created",
2600                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2601            ))
2602            .unwrap();
2603
2604        let query_req = QueryEventsRequest {
2605            entity_id: None,
2606            event_type: None,
2607            tenant_id: None,
2608            as_of: None,
2609            since: None,
2610            until: None,
2611            limit: None,
2612            event_type_prefix: Some("index.".to_string()),
2613            payload_filter: None,
2614        };
2615        let events = store.query(&query_req).unwrap();
2616
2617        let group_by_fields = vec!["name", "user_id"];
2618        let mut entity_latest: std::collections::HashMap<String, &Event> =
2619            std::collections::HashMap::new();
2620        for event in &events {
2621            entity_latest
2622                .entry(event.entity_id().to_string())
2623                .and_modify(|existing| {
2624                    if event.timestamp() > existing.timestamp() {
2625                        *existing = event;
2626                    }
2627                })
2628                .or_insert(event);
2629        }
2630
2631        let mut groups: std::collections::HashMap<String, Vec<String>> =
2632            std::collections::HashMap::new();
2633        for (entity_id, event) in &entity_latest {
2634            let payload = event.payload();
2635            let mut key_parts = serde_json::Map::new();
2636            for field in &group_by_fields {
2637                let value = payload
2638                    .get(*field)
2639                    .cloned()
2640                    .unwrap_or(serde_json::Value::Null);
2641                key_parts.insert((*field).to_string(), value);
2642            }
2643            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2644            groups.entry(key_str).or_default().push(entity_id.clone());
2645        }
2646
2647        let duplicate_groups: Vec<_> = groups
2648            .into_iter()
2649            .filter(|(_, ids)| ids.len() > 1)
2650            .collect();
2651
2652        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2653        assert_eq!(duplicate_groups.len(), 1);
2654        let (_, ref ids) = duplicate_groups[0];
2655        assert_eq!(ids.len(), 2);
2656        let mut sorted_ids = ids.clone();
2657        sorted_ids.sort();
2658        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2659    }
2660
2661    #[tokio::test]
2662    async fn test_projection_state_cache() {
2663        let store = create_test_store();
2664
2665        // Test cache insertion
2666        let cache = store.projection_state_cache();
2667        cache.insert(
2668            "entity_snapshots:user-123".to_string(),
2669            serde_json::json!({"name": "Test User", "age": 30}),
2670        );
2671
2672        // Test cache retrieval
2673        let state = cache.get("entity_snapshots:user-123");
2674        assert!(state.is_some());
2675        let state = state.unwrap();
2676        assert_eq!(state["name"], "Test User");
2677        assert_eq!(state["age"], 30);
2678    }
2679
2680    #[tokio::test]
2681    async fn test_projection_manager_list_projections() {
2682        let store = create_test_store();
2683
2684        // List projections (built-in projections should be available)
2685        let projection_manager = store.projection_manager();
2686        let projections = projection_manager.list_projections();
2687
2688        // Should have entity_snapshots and event_counters
2689        assert!(projections.len() >= 2);
2690
2691        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2692        assert!(names.contains(&"entity_snapshots"));
2693        assert!(names.contains(&"event_counters"));
2694    }
2695
2696    #[tokio::test]
2697    async fn test_projection_state_after_event_ingestion() {
2698        let store = create_test_store();
2699
2700        // Ingest an event
2701        let event = create_test_event("user-456", "user.created");
2702        store.ingest(&event).unwrap();
2703
2704        // Get projection state
2705        let projection_manager = store.projection_manager();
2706        let snapshot_projection = projection_manager
2707            .get_projection("entity_snapshots")
2708            .unwrap();
2709
2710        let state = snapshot_projection.get_state("user-456");
2711        assert!(state.is_some());
2712        let state = state.unwrap();
2713        assert_eq!(state["name"], "Test");
2714        assert_eq!(state["value"], 42);
2715    }
2716
2717    #[tokio::test]
2718    async fn test_projection_state_cache_multiple_entities() {
2719        let store = create_test_store();
2720        let cache = store.projection_state_cache();
2721
2722        // Insert multiple entities
2723        for i in 0..10 {
2724            cache.insert(
2725                format!("entity_snapshots:entity-{i}"),
2726                serde_json::json!({"id": i, "status": "active"}),
2727            );
2728        }
2729
2730        // Verify all insertions
2731        assert_eq!(cache.len(), 10);
2732
2733        // Verify each entity
2734        for i in 0..10 {
2735            let key = format!("entity_snapshots:entity-{i}");
2736            let state = cache.get(&key);
2737            assert!(state.is_some());
2738            assert_eq!(state.unwrap()["id"], i);
2739        }
2740    }
2741
2742    #[tokio::test]
2743    async fn test_projection_state_update() {
2744        let store = create_test_store();
2745        let cache = store.projection_state_cache();
2746
2747        // Initial state
2748        cache.insert(
2749            "entity_snapshots:user-789".to_string(),
2750            serde_json::json!({"balance": 100}),
2751        );
2752
2753        // Update state
2754        cache.insert(
2755            "entity_snapshots:user-789".to_string(),
2756            serde_json::json!({"balance": 150}),
2757        );
2758
2759        // Verify update
2760        let state = cache.get("entity_snapshots:user-789").unwrap();
2761        assert_eq!(state["balance"], 150);
2762    }
2763
2764    #[tokio::test]
2765    async fn test_event_counter_projection() {
2766        let store = create_test_store();
2767
2768        // Ingest events of different types
2769        store
2770            .ingest(&create_test_event("user-1", "user.created"))
2771            .unwrap();
2772        store
2773            .ingest(&create_test_event("user-2", "user.created"))
2774            .unwrap();
2775        store
2776            .ingest(&create_test_event("user-1", "user.updated"))
2777            .unwrap();
2778
2779        // Get event counter projection
2780        let projection_manager = store.projection_manager();
2781        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2782
2783        // Check counts
2784        let created_state = counter_projection.get_state("user.created");
2785        assert!(created_state.is_some());
2786        assert_eq!(created_state.unwrap()["count"], 2);
2787
2788        let updated_state = counter_projection.get_state("user.updated");
2789        assert!(updated_state.is_some());
2790        assert_eq!(updated_state.unwrap()["count"], 1);
2791    }
2792
2793    #[tokio::test]
2794    async fn test_projection_state_cache_key_format() {
2795        let store = create_test_store();
2796        let cache = store.projection_state_cache();
2797
2798        // Test standard key format: {projection_name}:{entity_id}
2799        let key = "orders:order-12345".to_string();
2800        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2801
2802        let state = cache.get(&key).unwrap();
2803        assert_eq!(state["total"], 99.99);
2804    }
2805
2806    #[tokio::test]
2807    async fn test_projection_state_cache_removal() {
2808        let store = create_test_store();
2809        let cache = store.projection_state_cache();
2810
2811        // Insert and then remove
2812        cache.insert(
2813            "test:entity-1".to_string(),
2814            serde_json::json!({"data": "value"}),
2815        );
2816        assert_eq!(cache.len(), 1);
2817
2818        cache.remove("test:entity-1");
2819        assert_eq!(cache.len(), 0);
2820        assert!(cache.get("test:entity-1").is_none());
2821    }
2822
2823    #[tokio::test]
2824    async fn test_get_nonexistent_projection() {
2825        let store = create_test_store();
2826        let projection_manager = store.projection_manager();
2827
2828        // Requesting a non-existent projection should return None
2829        let projection = projection_manager.get_projection("nonexistent_projection");
2830        assert!(projection.is_none());
2831    }
2832
2833    #[tokio::test]
2834    async fn test_get_nonexistent_entity_state() {
2835        let store = create_test_store();
2836        let projection_manager = store.projection_manager();
2837
2838        // Get state for non-existent entity
2839        let snapshot_projection = projection_manager
2840            .get_projection("entity_snapshots")
2841            .unwrap();
2842        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2843        assert!(state.is_none());
2844    }
2845
2846    #[tokio::test]
2847    async fn test_projection_state_cache_concurrent_access() {
2848        let store = create_test_store();
2849        let cache = store.projection_state_cache();
2850
2851        // Simulate concurrent writes
2852        let handles: Vec<_> = (0..10)
2853            .map(|i| {
2854                let cache_clone = cache.clone();
2855                tokio::spawn(async move {
2856                    cache_clone.insert(
2857                        format!("concurrent:entity-{i}"),
2858                        serde_json::json!({"thread": i}),
2859                    );
2860                })
2861            })
2862            .collect();
2863
2864        for handle in handles {
2865            handle.await.unwrap();
2866        }
2867
2868        // All 10 entries should be present
2869        assert_eq!(cache.len(), 10);
2870    }
2871
2872    #[tokio::test]
2873    async fn test_projection_state_large_payload() {
2874        let store = create_test_store();
2875        let cache = store.projection_state_cache();
2876
2877        // Create a large JSON payload (~10KB)
2878        let large_array: Vec<serde_json::Value> = (0..1000)
2879            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2880            .collect();
2881
2882        cache.insert(
2883            "large:entity-1".to_string(),
2884            serde_json::json!({"items": large_array}),
2885        );
2886
2887        let state = cache.get("large:entity-1").unwrap();
2888        let items = state["items"].as_array().unwrap();
2889        assert_eq!(items.len(), 1000);
2890    }
2891
2892    #[tokio::test]
2893    async fn test_projection_state_complex_json() {
2894        let store = create_test_store();
2895        let cache = store.projection_state_cache();
2896
2897        // Complex nested JSON structure
2898        let complex_state = serde_json::json!({
2899            "user": {
2900                "id": "user-123",
2901                "profile": {
2902                    "name": "John Doe",
2903                    "email": "john@example.com",
2904                    "settings": {
2905                        "theme": "dark",
2906                        "notifications": true
2907                    }
2908                },
2909                "roles": ["admin", "user"],
2910                "metadata": {
2911                    "created_at": "2025-01-01T00:00:00Z",
2912                    "last_login": null
2913                }
2914            }
2915        });
2916
2917        cache.insert("complex:user-123".to_string(), complex_state);
2918
2919        let state = cache.get("complex:user-123").unwrap();
2920        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2921        assert_eq!(state["user"]["roles"][0], "admin");
2922        assert!(state["user"]["metadata"]["last_login"].is_null());
2923    }
2924
2925    #[tokio::test]
2926    async fn test_projection_state_cache_iteration() {
2927        let store = create_test_store();
2928        let cache = store.projection_state_cache();
2929
2930        // Insert entries
2931        for i in 0..5 {
2932            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2933        }
2934
2935        // Iterate over all entries
2936        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2937        assert_eq!(entries.len(), 5);
2938    }
2939
2940    #[tokio::test]
2941    async fn test_projection_manager_get_entity_snapshots() {
2942        let store = create_test_store();
2943        let projection_manager = store.projection_manager();
2944
2945        // Get entity_snapshots projection specifically
2946        let projection = projection_manager.get_projection("entity_snapshots");
2947        assert!(projection.is_some());
2948        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2949    }
2950
2951    #[tokio::test]
2952    async fn test_projection_manager_get_event_counters() {
2953        let store = create_test_store();
2954        let projection_manager = store.projection_manager();
2955
2956        // Get event_counters projection specifically
2957        let projection = projection_manager.get_projection("event_counters");
2958        assert!(projection.is_some());
2959        assert_eq!(projection.unwrap().name(), "event_counters");
2960    }
2961
2962    #[tokio::test]
2963    async fn test_projection_state_cache_overwrite() {
2964        let store = create_test_store();
2965        let cache = store.projection_state_cache();
2966
2967        // Initial value
2968        cache.insert(
2969            "overwrite:entity-1".to_string(),
2970            serde_json::json!({"version": 1}),
2971        );
2972
2973        // Overwrite with new value
2974        cache.insert(
2975            "overwrite:entity-1".to_string(),
2976            serde_json::json!({"version": 2}),
2977        );
2978
2979        // Overwrite again
2980        cache.insert(
2981            "overwrite:entity-1".to_string(),
2982            serde_json::json!({"version": 3}),
2983        );
2984
2985        let state = cache.get("overwrite:entity-1").unwrap();
2986        assert_eq!(state["version"], 3);
2987
2988        // Should still be only 1 entry
2989        assert_eq!(cache.len(), 1);
2990    }
2991
2992    #[tokio::test]
2993    async fn test_projection_state_multiple_projections() {
2994        let store = create_test_store();
2995        let cache = store.projection_state_cache();
2996
2997        // Store states for different projections
2998        cache.insert(
2999            "entity_snapshots:user-1".to_string(),
3000            serde_json::json!({"name": "Alice"}),
3001        );
3002        cache.insert(
3003            "event_counters:user.created".to_string(),
3004            serde_json::json!({"count": 5}),
3005        );
3006        cache.insert(
3007            "custom_projection:order-1".to_string(),
3008            serde_json::json!({"total": 150.0}),
3009        );
3010
3011        // Verify each projection's state
3012        assert_eq!(
3013            cache.get("entity_snapshots:user-1").unwrap()["name"],
3014            "Alice"
3015        );
3016        assert_eq!(
3017            cache.get("event_counters:user.created").unwrap()["count"],
3018            5
3019        );
3020        assert_eq!(
3021            cache.get("custom_projection:order-1").unwrap()["total"],
3022            150.0
3023        );
3024    }
3025
3026    #[tokio::test]
3027    async fn test_bulk_projection_state_access() {
3028        let store = create_test_store();
3029
3030        // Ingest multiple events for different entities
3031        for i in 0..5 {
3032            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3033            store.ingest(&event).unwrap();
3034        }
3035
3036        // Get projection and verify bulk access
3037        let projection_manager = store.projection_manager();
3038        let snapshot_projection = projection_manager
3039            .get_projection("entity_snapshots")
3040            .unwrap();
3041
3042        // Verify we can access all entities
3043        for i in 0..5 {
3044            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3045            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3046        }
3047    }
3048
3049    #[tokio::test]
3050    async fn test_bulk_save_projection_states() {
3051        let store = create_test_store();
3052        let cache = store.projection_state_cache();
3053
3054        // Simulate bulk save request
3055        let states = vec![
3056            BulkSaveStateItem {
3057                entity_id: "bulk-entity-1".to_string(),
3058                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3059            },
3060            BulkSaveStateItem {
3061                entity_id: "bulk-entity-2".to_string(),
3062                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3063            },
3064            BulkSaveStateItem {
3065                entity_id: "bulk-entity-3".to_string(),
3066                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3067            },
3068        ];
3069
3070        let projection_name = "test_projection";
3071
3072        // Save states to cache (simulating bulk_save_projection_states handler)
3073        for item in &states {
3074            cache.insert(
3075                format!("{projection_name}:{}", item.entity_id),
3076                item.state.clone(),
3077            );
3078        }
3079
3080        // Verify all states were saved
3081        assert_eq!(cache.len(), 3);
3082
3083        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3084        assert_eq!(state1["name"], "Entity 1");
3085        assert_eq!(state1["value"], 100);
3086
3087        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3088        assert_eq!(state2["name"], "Entity 2");
3089        assert_eq!(state2["value"], 200);
3090
3091        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3092        assert_eq!(state3["name"], "Entity 3");
3093        assert_eq!(state3["value"], 300);
3094    }
3095
3096    #[tokio::test]
3097    async fn test_bulk_save_empty_states() {
3098        let store = create_test_store();
3099        let cache = store.projection_state_cache();
3100
3101        // Clear cache
3102        cache.clear();
3103
3104        // Empty states should work fine
3105        let states: Vec<BulkSaveStateItem> = vec![];
3106        assert_eq!(states.len(), 0);
3107
3108        // Cache should remain empty
3109        assert_eq!(cache.len(), 0);
3110    }
3111
3112    #[tokio::test]
3113    async fn test_bulk_save_overwrites_existing() {
3114        let store = create_test_store();
3115        let cache = store.projection_state_cache();
3116
3117        // Insert initial state
3118        cache.insert(
3119            "test:entity-1".to_string(),
3120            serde_json::json!({"version": 1, "data": "initial"}),
3121        );
3122
3123        // Bulk save with updated state
3124        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3125        cache.insert("test:entity-1".to_string(), new_state);
3126
3127        // Verify overwrite
3128        let state = cache.get("test:entity-1").unwrap();
3129        assert_eq!(state["version"], 2);
3130        assert_eq!(state["data"], "updated");
3131    }
3132
3133    #[tokio::test]
3134    async fn test_bulk_save_high_volume() {
3135        let store = create_test_store();
3136        let cache = store.projection_state_cache();
3137
3138        // Simulate high volume save (1000 entities)
3139        for i in 0..1000 {
3140            cache.insert(
3141                format!("volume_test:entity-{i}"),
3142                serde_json::json!({"index": i, "status": "active"}),
3143            );
3144        }
3145
3146        // Verify count
3147        assert_eq!(cache.len(), 1000);
3148
3149        // Spot check some entries
3150        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3151        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3152        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3153    }
3154
3155    #[tokio::test]
3156    async fn test_bulk_save_different_projections() {
3157        let store = create_test_store();
3158        let cache = store.projection_state_cache();
3159
3160        // Save to multiple projections in bulk
3161        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3162
3163        for proj in &projections {
3164            for i in 0..5 {
3165                cache.insert(
3166                    format!("{proj}:entity-{i}"),
3167                    serde_json::json!({"projection": proj, "id": i}),
3168                );
3169            }
3170        }
3171
3172        // Verify total count (3 projections * 5 entities)
3173        assert_eq!(cache.len(), 15);
3174
3175        // Verify each projection
3176        for proj in &projections {
3177            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3178            assert_eq!(state["projection"], *proj);
3179        }
3180    }
3181}