Skip to main content

allsource_core/infrastructure/web/
api.rs

1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4    application::{
5        dto::{
6            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11        },
12        services::{
13            analytics::{
14                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16            },
17            pipeline::{PipelineConfig, PipelineStats},
18            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19            schema::{
20                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21                ValidateEventRequest, ValidateEventResponse,
22            },
23            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24        },
25    },
26    domain::entities::Event,
27    error::Result,
28    infrastructure::{
29        persistence::{
30            compaction::CompactionResult,
31            snapshot::{
32                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33                ListSnapshotsResponse, SnapshotInfo,
34            },
35        },
36        query::{
37            geospatial::GeoQueryRequest,
38            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39        },
40        security::middleware::OptionalAuth,
41        web::api_v1::AppState,
42    },
43    store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46    Json, Router,
47    extract::{Path, Query, State, WebSocketUpgrade},
48    response::{IntoResponse, Response},
49    routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60/// Wait for follower ACK(s) in semi-sync/sync replication modes.
61///
62/// In async mode (default), returns immediately. In semi-sync mode, waits for
63/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
64/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
65#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67    let shipper_guard = state.wal_shipper.read().await;
68    if let Some(ref shipper) = *shipper_guard {
69        let mode = shipper.replication_mode();
70        if mode == ReplicationMode::Async {
71            return;
72        }
73
74        let target_offset = shipper.current_leader_offset();
75        if target_offset == 0 {
76            return;
77        }
78
79        let shipper = Arc::clone(shipper);
80        // Drop the read guard before the async wait to avoid holding it across await
81        drop(shipper_guard);
82
83        let timer = state
84            .store
85            .metrics()
86            .replication_ack_wait_seconds
87            .start_timer();
88        let acked = shipper.wait_for_ack(target_offset).await;
89        timer.observe_duration();
90
91        if !acked {
92            tracing::warn!(
93                "Replication ACK timeout in {} mode (offset {}). \
94                 Write succeeded locally but follower confirmation pending.",
95                mode,
96                target_offset,
97            );
98        }
99    }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103    // No-op in community edition
104}
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107    let app = Router::new()
108        .route("/health", get(health))
109        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
110        .route("/api/v1/events", post(ingest_event))
111        .route("/api/v1/events/batch", post(ingest_events_batch))
112        .route("/api/v1/events/query", get(query_events))
113        .route("/api/v1/events/{event_id}", get(get_event_by_id))
114        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
115        // v0.10: Stream and event type discovery endpoints
116        .route("/api/v1/streams", get(list_streams))
117        .route("/api/v1/event-types", get(list_event_types))
118        .route("/api/v1/entities/duplicates", get(detect_duplicates))
119        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120        .route(
121            "/api/v1/entities/{entity_id}/snapshot",
122            get(get_entity_snapshot),
123        )
124        .route("/api/v1/stats", get(get_stats))
125        // v0.2: Advanced analytics endpoints
126        .route("/api/v1/analytics/frequency", get(analytics_frequency))
127        .route("/api/v1/analytics/summary", get(analytics_summary))
128        .route("/api/v1/analytics/correlation", get(analytics_correlation))
129        // v0.2: Snapshot management endpoints
130        .route("/api/v1/snapshots", post(create_snapshot))
131        .route("/api/v1/snapshots", get(list_snapshots))
132        .route(
133            "/api/v1/snapshots/{entity_id}/latest",
134            get(get_latest_snapshot),
135        )
136        // v0.2: Compaction endpoints
137        .route("/api/v1/compaction/trigger", post(trigger_compaction))
138        .route("/api/v1/compaction/stats", get(compaction_stats))
139        // v0.5: Schema registry endpoints
140        .route("/api/v1/schemas", post(register_schema))
141        .route("/api/v1/schemas", get(list_subjects))
142        .route("/api/v1/schemas/{subject}", get(get_schema))
143        .route(
144            "/api/v1/schemas/{subject}/versions",
145            get(list_schema_versions),
146        )
147        .route("/api/v1/schemas/validate", post(validate_event_schema))
148        .route(
149            "/api/v1/schemas/{subject}/compatibility",
150            put(set_compatibility_mode),
151        )
152        // v0.5: Replay and projection rebuild endpoints
153        .route("/api/v1/replay", post(start_replay))
154        .route("/api/v1/replay", get(list_replays))
155        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157        .route(
158            "/api/v1/replay/{replay_id}",
159            axum::routing::delete(delete_replay),
160        )
161        // v0.5: Stream processing pipeline endpoints
162        .route("/api/v1/pipelines", post(register_pipeline))
163        .route("/api/v1/pipelines", get(list_pipelines))
164        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166        .route(
167            "/api/v1/pipelines/{pipeline_id}",
168            axum::routing::delete(remove_pipeline),
169        )
170        .route(
171            "/api/v1/pipelines/{pipeline_id}/stats",
172            get(get_pipeline_stats),
173        )
174        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175        // v0.7: Projection State API for Query Service integration
176        .route("/api/v1/projections", get(list_projections))
177        .route("/api/v1/projections/{name}", get(get_projection))
178        .route(
179            "/api/v1/projections/{name}",
180            axum::routing::delete(delete_projection),
181        )
182        .route(
183            "/api/v1/projections/{name}/state",
184            get(get_projection_state_summary),
185        )
186        .route("/api/v1/projections/{name}/reset", post(reset_projection))
187        .route(
188            "/api/v1/projections/{name}/{entity_id}/state",
189            get(get_projection_state),
190        )
191        .route(
192            "/api/v1/projections/{name}/{entity_id}/state",
193            post(save_projection_state),
194        )
195        .route(
196            "/api/v1/projections/{name}/{entity_id}/state",
197            put(save_projection_state),
198        )
199        .route(
200            "/api/v1/projections/{name}/bulk",
201            post(bulk_get_projection_states),
202        )
203        .route(
204            "/api/v1/projections/{name}/bulk/save",
205            post(bulk_save_projection_states),
206        )
207        // v0.11: Webhook management endpoints
208        .route("/api/v1/webhooks", post(register_webhook))
209        .route("/api/v1/webhooks", get(list_webhooks))
210        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212        .route(
213            "/api/v1/webhooks/{webhook_id}",
214            axum::routing::delete(delete_webhook),
215        )
216        .route(
217            "/api/v1/webhooks/{webhook_id}/deliveries",
218            get(list_webhook_deliveries),
219        )
220        // v2.0: Advanced query features
221        .route("/api/v1/graphql", post(graphql_query))
222        .route("/api/v1/geospatial/query", post(geo_query))
223        .route("/api/v1/geospatial/stats", get(geo_stats))
224        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225        .route(
226            "/api/v1/schema-evolution/history/{event_type}",
227            get(schema_evolution_history),
228        )
229        .route(
230            "/api/v1/schema-evolution/schema/{event_type}",
231            get(schema_evolution_schema),
232        )
233        .route(
234            "/api/v1/schema-evolution/stats",
235            get(schema_evolution_stats),
236        )
237        .layer(
238            CorsLayer::new()
239                .allow_origin(Any)
240                .allow_methods(Any)
241                .allow_headers(Any),
242        )
243        .layer(TraceLayer::new_for_http())
244        .with_state(store);
245
246    let listener = tokio::net::TcpListener::bind(addr).await?;
247    axum::serve(listener, app).await?;
248
249    Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253    Json(serde_json::json!({
254        "status": "healthy",
255        "service": "allsource-core",
256        "version": env!("CARGO_PKG_VERSION")
257    }))
258}
259
260// v0.6: Prometheus metrics endpoint
261pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262    let metrics = store.metrics();
263
264    match metrics.encode() {
265        Ok(encoded) => Response::builder()
266            .status(200)
267            .header("Content-Type", "text/plain; version=0.0.4")
268            .body(encoded)
269            .unwrap()
270            .into_response(),
271        Err(e) => Response::builder()
272            .status(500)
273            .body(format!("Error encoding metrics: {e}"))
274            .unwrap()
275            .into_response(),
276    }
277}
278
279pub async fn ingest_event(
280    State(store): State<SharedStore>,
281    Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283    let expected_version = req.expected_version;
284
285    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286    let event = Event::from_strings(
287        req.event_type,
288        req.entity_id,
289        tenant_id,
290        req.payload,
291        req.metadata,
292    )?;
293
294    let event_id = event.id;
295    let timestamp = event.timestamp;
296
297    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299    tracing::info!("Event ingested: {}", event_id);
300
301    Ok(Json(IngestEventResponse {
302        event_id,
303        timestamp,
304        version: Some(new_version),
305    }))
306}
307
308/// Ingest a single event with semi-sync/sync replication ACK waiting.
309///
310/// Used by the v1 API. Tenant comes from `req.tenant_id` — the Control Plane
311/// delegation layer sets it from the authenticated caller before forwarding.
312/// Core is internal-only and does not authenticate public traffic.
313pub async fn ingest_event_v1(
314    State(state): State<AppState>,
315    Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317    let expected_version = req.expected_version;
318
319    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321    let event = Event::from_strings(
322        req.event_type,
323        req.entity_id,
324        tenant_id,
325        req.payload,
326        req.metadata,
327    )?;
328
329    let event_id = event.id;
330    let timestamp = event.timestamp;
331
332    let new_version = state
333        .store
334        .ingest_with_expected_version(&event, expected_version)?;
335
336    // Semi-sync/sync: wait for follower ACK(s) before returning
337    await_replication_ack(&state).await;
338
339    tracing::info!("Event ingested: {}", event_id);
340
341    Ok(Json(IngestEventResponse {
342        event_id,
343        timestamp,
344        version: Some(new_version),
345    }))
346}
347
348/// Batch ingest multiple events in a single request
349///
350/// This endpoint allows ingesting multiple events atomically, which is more
351/// efficient than making individual requests for each event.
352pub async fn ingest_events_batch(
353    State(store): State<SharedStore>,
354    Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356    let total = req.events.len();
357    let mut ingested_events = Vec::with_capacity(total);
358
359    for event_req in req.events {
360        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361        let expected_version = event_req.expected_version;
362
363        let event = Event::from_strings(
364            event_req.event_type,
365            event_req.entity_id,
366            tenant_id,
367            event_req.payload,
368            event_req.metadata,
369        )?;
370
371        let event_id = event.id;
372        let timestamp = event.timestamp;
373
374        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376        ingested_events.push(IngestEventResponse {
377            event_id,
378            timestamp,
379            version: Some(new_version),
380        });
381    }
382
383    let ingested = ingested_events.len();
384    tracing::info!("Batch ingested {} events", ingested);
385
386    Ok(Json(IngestEventsBatchResponse {
387        total,
388        ingested,
389        events: ingested_events,
390    }))
391}
392
393/// Batch ingest with semi-sync/sync replication ACK waiting.
394///
395/// Used by the v1 API. Per-event tenant comes from `event_req.tenant_id` — the
396/// Control Plane delegation layer sets it from the authenticated caller before
397/// forwarding. Core is internal-only and does not authenticate public traffic.
398pub async fn ingest_events_batch_v1(
399    State(state): State<AppState>,
400    Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402    let total = req.events.len();
403    let mut ingested_events = Vec::with_capacity(total);
404
405    for event_req in req.events {
406        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407        let expected_version = event_req.expected_version;
408
409        let event = Event::from_strings(
410            event_req.event_type,
411            event_req.entity_id,
412            tenant_id,
413            event_req.payload,
414            event_req.metadata,
415        )?;
416
417        let event_id = event.id;
418        let timestamp = event.timestamp;
419
420        let new_version = state
421            .store
422            .ingest_with_expected_version(&event, expected_version)?;
423
424        ingested_events.push(IngestEventResponse {
425            event_id,
426            timestamp,
427            version: Some(new_version),
428        });
429    }
430
431    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
432    await_replication_ack(&state).await;
433
434    let ingested = ingested_events.len();
435    tracing::info!("Batch ingested {} events", ingested);
436
437    Ok(Json(IngestEventsBatchResponse {
438        total,
439        ingested,
440        events: ingested_events,
441    }))
442}
443
444pub async fn query_events(
445    OptionalAuth(auth): OptionalAuth,
446    Query(req): Query<QueryEventsRequest>,
447    State(store): State<SharedStore>,
448) -> Result<Json<QueryEventsResponse>> {
449    let requested_limit = req.limit;
450    let queried_entity_id = req.entity_id.clone();
451
452    // Enforce tenant isolation: authenticated users can only see their own
453    // tenant's events. Unauthenticated (skipped-auth paths) use request param.
454    let enforced_tenant = auth
455        .as_ref()
456        .map(|a| a.tenant_id().to_string())
457        .or(req.tenant_id.clone());
458
459    // Query without limit to get total count
460    let unlimited_req = QueryEventsRequest {
461        entity_id: req.entity_id,
462        event_type: req.event_type,
463        tenant_id: enforced_tenant,
464        as_of: req.as_of,
465        since: req.since,
466        until: req.until,
467        limit: None,
468        event_type_prefix: req.event_type_prefix,
469        payload_filter: req.payload_filter,
470    };
471    let all_events = store.query(&unlimited_req)?;
472    let total_count = all_events.len();
473
474    // Apply limit
475    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
476        all_events.into_iter().take(limit).collect()
477    } else {
478        all_events
479    };
480
481    let count = limited_events.len();
482    let has_more = count < total_count;
483    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
484
485    // Include entity_version only when filtering by a single entity_id
486    let entity_version = queried_entity_id
487        .as_deref()
488        .map(|eid| store.get_entity_version(eid));
489
490    tracing::debug!("Query returned {} events (total: {})", count, total_count);
491
492    Ok(Json(QueryEventsResponse {
493        events,
494        count,
495        total_count,
496        has_more,
497        entity_version,
498    }))
499}
500
501pub async fn list_entities(
502    State(store): State<SharedStore>,
503    Query(req): Query<ListEntitiesRequest>,
504) -> Result<Json<ListEntitiesResponse>> {
505    use std::collections::HashMap;
506
507    // Get all events matching the filters
508    let query_req = QueryEventsRequest {
509        entity_id: None,
510        event_type: None,
511        tenant_id: None,
512        as_of: None,
513        since: None,
514        until: None,
515        limit: None,
516        event_type_prefix: req.event_type_prefix,
517        payload_filter: req.payload_filter,
518    };
519    let events = store.query(&query_req)?;
520
521    // Group by entity_id
522    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
523    for event in &events {
524        entity_map
525            .entry(event.entity_id().to_string())
526            .or_default()
527            .push(event);
528    }
529
530    // Build entity summaries sorted by last event time (descending)
531    let mut summaries: Vec<EntitySummary> = entity_map
532        .into_iter()
533        .map(|(entity_id, events)| {
534            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
535            EntitySummary {
536                entity_id,
537                event_count: events.len(),
538                last_event_type: last.event_type_str().to_string(),
539                last_event_at: last.timestamp(),
540            }
541        })
542        .collect();
543    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
544
545    let total = summaries.len();
546
547    // Apply offset and limit
548    let offset = req.offset.unwrap_or(0);
549    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
550    let summaries = if let Some(limit) = req.limit {
551        let has_more = summaries.len() > limit;
552        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
553        return Ok(Json(ListEntitiesResponse {
554            entities: truncated,
555            total,
556            has_more,
557        }));
558    } else {
559        summaries
560    };
561
562    Ok(Json(ListEntitiesResponse {
563        entities: summaries,
564        total,
565        has_more: false,
566    }))
567}
568
569pub async fn detect_duplicates(
570    State(store): State<SharedStore>,
571    Query(req): Query<DetectDuplicatesRequest>,
572) -> Result<Json<DetectDuplicatesResponse>> {
573    use std::collections::HashMap;
574
575    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
576
577    // Query events scoped by the required prefix
578    let query_req = QueryEventsRequest {
579        entity_id: None,
580        event_type: None,
581        tenant_id: None,
582        as_of: None,
583        since: None,
584        until: None,
585        limit: None,
586        event_type_prefix: Some(req.event_type_prefix),
587        payload_filter: None,
588    };
589    let events = store.query(&query_req)?;
590
591    // For each entity, extract the latest event's payload fields specified by group_by
592    // Then group entities by those field values
593    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
594    for event in &events {
595        let eid = event.entity_id().to_string();
596        entity_latest
597            .entry(eid)
598            .and_modify(|existing| {
599                if event.timestamp() > existing.timestamp() {
600                    *existing = event;
601                }
602            })
603            .or_insert(event);
604    }
605
606    // Group entities by their payload field values
607    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
608    for (entity_id, event) in &entity_latest {
609        let payload = event.payload();
610        let mut key_parts = serde_json::Map::new();
611        for field in &group_by_fields {
612            let value = payload
613                .get(*field)
614                .cloned()
615                .unwrap_or(serde_json::Value::Null);
616            key_parts.insert((*field).to_string(), value);
617        }
618        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
619        groups.entry(key_str).or_default().push(entity_id.clone());
620    }
621
622    // Filter to groups with count > 1 (actual duplicates)
623    let mut duplicate_groups: Vec<DuplicateGroup> = groups
624        .into_iter()
625        .filter(|(_, ids)| ids.len() > 1)
626        .map(|(key_str, mut ids)| {
627            ids.sort();
628            let key: serde_json::Value =
629                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
630            let count = ids.len();
631            DuplicateGroup {
632                key,
633                entity_ids: ids,
634                count,
635            }
636        })
637        .collect();
638
639    // Sort by count descending for consistent output
640    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
641
642    let total = duplicate_groups.len();
643
644    // Apply offset and limit
645    let offset = req.offset.unwrap_or(0);
646    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
647
648    if let Some(limit) = req.limit {
649        let has_more = duplicate_groups.len() > limit;
650        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
651        return Ok(Json(DetectDuplicatesResponse {
652            duplicates: truncated,
653            total,
654            has_more,
655        }));
656    }
657
658    Ok(Json(DetectDuplicatesResponse {
659        duplicates: duplicate_groups,
660        total,
661        has_more: false,
662    }))
663}
664
665#[derive(Deserialize)]
666pub struct EntityStateParams {
667    as_of: Option<chrono::DateTime<chrono::Utc>>,
668}
669
670pub async fn get_entity_state(
671    State(store): State<SharedStore>,
672    Path(entity_id): Path<String>,
673    Query(params): Query<EntityStateParams>,
674) -> Result<Json<serde_json::Value>> {
675    let state = store.reconstruct_state(&entity_id, params.as_of)?;
676
677    tracing::info!("State reconstructed for entity: {}", entity_id);
678
679    Ok(Json(state))
680}
681
682pub async fn get_entity_snapshot(
683    State(store): State<SharedStore>,
684    Path(entity_id): Path<String>,
685) -> Result<Json<serde_json::Value>> {
686    let snapshot = store.get_snapshot(&entity_id)?;
687
688    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
689
690    Ok(Json(snapshot))
691}
692
693pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
694    let stats = store.stats();
695    Json(stats)
696}
697
698// v0.10: List all streams (entity_ids) in the event store
699/// Query parameters for listing streams
700#[derive(Debug, Deserialize)]
701pub struct ListStreamsParams {
702    /// Optional limit on number of streams to return
703    pub limit: Option<usize>,
704    /// Optional offset for pagination
705    pub offset: Option<usize>,
706}
707
708/// Response for listing streams
709#[derive(Debug, serde::Serialize)]
710pub struct ListStreamsResponse {
711    pub streams: Vec<StreamInfo>,
712    pub total: usize,
713}
714
715pub async fn list_streams(
716    State(store): State<SharedStore>,
717    Query(params): Query<ListStreamsParams>,
718) -> Json<ListStreamsResponse> {
719    let mut streams = store.list_streams();
720    let total = streams.len();
721
722    // Sort by last_event_at descending (most recent first)
723    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
724
725    // Apply pagination
726    if let Some(offset) = params.offset {
727        if offset < streams.len() {
728            streams = streams[offset..].to_vec();
729        } else {
730            streams = vec![];
731        }
732    }
733
734    if let Some(limit) = params.limit {
735        streams.truncate(limit);
736    }
737
738    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
739
740    Json(ListStreamsResponse { streams, total })
741}
742
743// v0.10: List all event types in the event store
744/// Query parameters for listing event types
745#[derive(Debug, Deserialize)]
746pub struct ListEventTypesParams {
747    /// Optional limit on number of event types to return
748    pub limit: Option<usize>,
749    /// Optional offset for pagination
750    pub offset: Option<usize>,
751}
752
753/// Response for listing event types
754#[derive(Debug, serde::Serialize)]
755pub struct ListEventTypesResponse {
756    pub event_types: Vec<EventTypeInfo>,
757    pub total: usize,
758}
759
760pub async fn list_event_types(
761    State(store): State<SharedStore>,
762    Query(params): Query<ListEventTypesParams>,
763) -> Json<ListEventTypesResponse> {
764    let mut event_types = store.list_event_types();
765    let total = event_types.len();
766
767    // Sort by event_count descending (most used first)
768    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
769
770    // Apply pagination
771    if let Some(offset) = params.offset {
772        if offset < event_types.len() {
773            event_types = event_types[offset..].to_vec();
774        } else {
775            event_types = vec![];
776        }
777    }
778
779    if let Some(limit) = params.limit {
780        event_types.truncate(limit);
781    }
782
783    tracing::debug!(
784        "Listed {} event types (total: {})",
785        event_types.len(),
786        total
787    );
788
789    Json(ListEventTypesResponse { event_types, total })
790}
791
792// v0.2: WebSocket endpoint for real-time event streaming
793#[derive(Debug, Deserialize)]
794pub struct WebSocketParams {
795    pub consumer_id: Option<String>,
796}
797
798pub async fn events_websocket(
799    ws: WebSocketUpgrade,
800    State(store): State<SharedStore>,
801    Query(params): Query<WebSocketParams>,
802) -> Response {
803    let websocket_manager = store.websocket_manager();
804
805    ws.on_upgrade(move |socket| async move {
806        if let Some(consumer_id) = params.consumer_id {
807            websocket_manager
808                .handle_socket_with_consumer(socket, consumer_id, store)
809                .await;
810        } else {
811            websocket_manager.handle_socket(socket).await;
812        }
813    })
814}
815
816// v0.2: Event frequency analytics endpoint
817pub async fn analytics_frequency(
818    State(store): State<SharedStore>,
819    Query(req): Query<EventFrequencyRequest>,
820) -> Result<Json<EventFrequencyResponse>> {
821    let response = AnalyticsEngine::event_frequency(&store, &req)?;
822
823    tracing::debug!(
824        "Frequency analysis returned {} buckets",
825        response.buckets.len()
826    );
827
828    Ok(Json(response))
829}
830
831// v0.2: Statistical summary endpoint
832pub async fn analytics_summary(
833    State(store): State<SharedStore>,
834    Query(req): Query<StatsSummaryRequest>,
835) -> Result<Json<StatsSummaryResponse>> {
836    let response = AnalyticsEngine::stats_summary(&store, &req)?;
837
838    tracing::debug!(
839        "Stats summary: {} events across {} entities",
840        response.total_events,
841        response.unique_entities
842    );
843
844    Ok(Json(response))
845}
846
847// v0.2: Event correlation analysis endpoint
848pub async fn analytics_correlation(
849    State(store): State<SharedStore>,
850    Query(req): Query<CorrelationRequest>,
851) -> Result<Json<CorrelationResponse>> {
852    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
853
854    tracing::debug!(
855        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
856        response.correlated_pairs,
857        response.total_a,
858        response.correlation_percentage
859    );
860
861    Ok(Json(response))
862}
863
864// v0.2: Create a snapshot for an entity
865pub async fn create_snapshot(
866    State(store): State<SharedStore>,
867    Json(req): Json<CreateSnapshotRequest>,
868) -> Result<Json<CreateSnapshotResponse>> {
869    store.create_snapshot(&req.entity_id)?;
870
871    let snapshot_manager = store.snapshot_manager();
872    let snapshot = snapshot_manager
873        .get_latest_snapshot(&req.entity_id)
874        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
875
876    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
877
878    Ok(Json(CreateSnapshotResponse {
879        snapshot_id: snapshot.id,
880        entity_id: snapshot.entity_id,
881        created_at: snapshot.created_at,
882        event_count: snapshot.event_count,
883        size_bytes: snapshot.metadata.size_bytes,
884    }))
885}
886
887// v0.2: List snapshots
888pub async fn list_snapshots(
889    State(store): State<SharedStore>,
890    Query(req): Query<ListSnapshotsRequest>,
891) -> Result<Json<ListSnapshotsResponse>> {
892    let snapshot_manager = store.snapshot_manager();
893
894    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
895        snapshot_manager
896            .get_all_snapshots(&entity_id)
897            .into_iter()
898            .map(SnapshotInfo::from)
899            .collect()
900    } else {
901        // List all entities with snapshots
902        let entities = snapshot_manager.list_entities();
903        entities
904            .iter()
905            .flat_map(|entity_id| {
906                snapshot_manager
907                    .get_all_snapshots(entity_id)
908                    .into_iter()
909                    .map(SnapshotInfo::from)
910            })
911            .collect()
912    };
913
914    let total = snapshots.len();
915
916    tracing::debug!("Listed {} snapshots", total);
917
918    Ok(Json(ListSnapshotsResponse { snapshots, total }))
919}
920
921// v0.2: Get latest snapshot for an entity
922pub async fn get_latest_snapshot(
923    State(store): State<SharedStore>,
924    Path(entity_id): Path<String>,
925) -> Result<Json<serde_json::Value>> {
926    let snapshot_manager = store.snapshot_manager();
927
928    let snapshot = snapshot_manager
929        .get_latest_snapshot(&entity_id)
930        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
931
932    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
933
934    Ok(Json(serde_json::json!({
935        "snapshot_id": snapshot.id,
936        "entity_id": snapshot.entity_id,
937        "created_at": snapshot.created_at,
938        "as_of": snapshot.as_of,
939        "event_count": snapshot.event_count,
940        "size_bytes": snapshot.metadata.size_bytes,
941        "snapshot_type": snapshot.metadata.snapshot_type,
942        "state": snapshot.state
943    })))
944}
945
946// v0.2: Trigger manual compaction
947pub async fn trigger_compaction(
948    State(store): State<SharedStore>,
949) -> Result<Json<CompactionResult>> {
950    let compaction_manager = store.compaction_manager().ok_or_else(|| {
951        crate::error::AllSourceError::InternalError(
952            "Compaction not enabled (no Parquet storage)".to_string(),
953        )
954    })?;
955
956    tracing::info!("📦 Manual compaction triggered via API");
957
958    let result = compaction_manager.compact_now()?;
959
960    Ok(Json(result))
961}
962
963// v0.2: Get compaction statistics
964pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
965    let compaction_manager = store.compaction_manager().ok_or_else(|| {
966        crate::error::AllSourceError::InternalError(
967            "Compaction not enabled (no Parquet storage)".to_string(),
968        )
969    })?;
970
971    let stats = compaction_manager.stats();
972    let config = compaction_manager.config();
973
974    Ok(Json(serde_json::json!({
975        "stats": stats,
976        "config": {
977            "min_files_to_compact": config.min_files_to_compact,
978            "target_file_size": config.target_file_size,
979            "max_file_size": config.max_file_size,
980            "small_file_threshold": config.small_file_threshold,
981            "compaction_interval_seconds": config.compaction_interval_seconds,
982            "auto_compact": config.auto_compact,
983            "strategy": config.strategy
984        }
985    })))
986}
987
988// v0.5: Register a new schema
989pub async fn register_schema(
990    State(store): State<SharedStore>,
991    Json(req): Json<RegisterSchemaRequest>,
992) -> Result<Json<RegisterSchemaResponse>> {
993    let schema_registry = store.schema_registry();
994
995    let response =
996        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
997
998    tracing::info!(
999        "📋 Schema registered: v{} for '{}'",
1000        response.version,
1001        response.subject
1002    );
1003
1004    Ok(Json(response))
1005}
1006
1007// v0.5: Get a schema by subject and optional version
1008#[derive(Deserialize)]
1009pub struct GetSchemaParams {
1010    version: Option<u32>,
1011}
1012
1013pub async fn get_schema(
1014    State(store): State<SharedStore>,
1015    Path(subject): Path<String>,
1016    Query(params): Query<GetSchemaParams>,
1017) -> Result<Json<serde_json::Value>> {
1018    let schema_registry = store.schema_registry();
1019
1020    let schema = schema_registry.get_schema(&subject, params.version)?;
1021
1022    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1023
1024    Ok(Json(serde_json::json!({
1025        "id": schema.id,
1026        "subject": schema.subject,
1027        "version": schema.version,
1028        "schema": schema.schema,
1029        "created_at": schema.created_at,
1030        "description": schema.description,
1031        "tags": schema.tags
1032    })))
1033}
1034
1035// v0.5: List all versions of a schema subject
1036pub async fn list_schema_versions(
1037    State(store): State<SharedStore>,
1038    Path(subject): Path<String>,
1039) -> Result<Json<serde_json::Value>> {
1040    let schema_registry = store.schema_registry();
1041
1042    let versions = schema_registry.list_versions(&subject)?;
1043
1044    Ok(Json(serde_json::json!({
1045        "subject": subject,
1046        "versions": versions
1047    })))
1048}
1049
1050// v0.5: List all schema subjects
1051pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1052    let schema_registry = store.schema_registry();
1053
1054    let subjects = schema_registry.list_subjects();
1055
1056    Json(serde_json::json!({
1057        "subjects": subjects,
1058        "total": subjects.len()
1059    }))
1060}
1061
1062// v0.5: Validate an event against a schema
1063pub async fn validate_event_schema(
1064    State(store): State<SharedStore>,
1065    Json(req): Json<ValidateEventRequest>,
1066) -> Result<Json<ValidateEventResponse>> {
1067    let schema_registry = store.schema_registry();
1068
1069    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1070
1071    if response.valid {
1072        tracing::debug!(
1073            "✅ Event validated against schema '{}' v{}",
1074            req.subject,
1075            response.schema_version
1076        );
1077    } else {
1078        tracing::warn!(
1079            "❌ Event validation failed for '{}': {:?}",
1080            req.subject,
1081            response.errors
1082        );
1083    }
1084
1085    Ok(Json(response))
1086}
1087
1088// v0.5: Set compatibility mode for a subject
1089#[derive(Deserialize)]
1090pub struct SetCompatibilityRequest {
1091    compatibility: CompatibilityMode,
1092}
1093
1094pub async fn set_compatibility_mode(
1095    State(store): State<SharedStore>,
1096    Path(subject): Path<String>,
1097    Json(req): Json<SetCompatibilityRequest>,
1098) -> Json<serde_json::Value> {
1099    let schema_registry = store.schema_registry();
1100
1101    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1102
1103    tracing::info!(
1104        "🔧 Set compatibility mode for '{}' to {:?}",
1105        subject,
1106        req.compatibility
1107    );
1108
1109    Json(serde_json::json!({
1110        "subject": subject,
1111        "compatibility": req.compatibility
1112    }))
1113}
1114
1115// v0.5: Start a replay operation
1116pub async fn start_replay(
1117    State(store): State<SharedStore>,
1118    Json(req): Json<StartReplayRequest>,
1119) -> Result<Json<StartReplayResponse>> {
1120    let replay_manager = store.replay_manager();
1121
1122    let response = replay_manager.start_replay(store, req)?;
1123
1124    tracing::info!(
1125        "🔄 Started replay {} with {} events",
1126        response.replay_id,
1127        response.total_events
1128    );
1129
1130    Ok(Json(response))
1131}
1132
1133// v0.5: Get replay progress
1134pub async fn get_replay_progress(
1135    State(store): State<SharedStore>,
1136    Path(replay_id): Path<uuid::Uuid>,
1137) -> Result<Json<ReplayProgress>> {
1138    let replay_manager = store.replay_manager();
1139
1140    let progress = replay_manager.get_progress(replay_id)?;
1141
1142    Ok(Json(progress))
1143}
1144
1145// v0.5: List all replay operations
1146pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1147    let replay_manager = store.replay_manager();
1148
1149    let replays = replay_manager.list_replays();
1150
1151    Json(serde_json::json!({
1152        "replays": replays,
1153        "total": replays.len()
1154    }))
1155}
1156
1157// v0.5: Cancel a running replay
1158pub async fn cancel_replay(
1159    State(store): State<SharedStore>,
1160    Path(replay_id): Path<uuid::Uuid>,
1161) -> Result<Json<serde_json::Value>> {
1162    let replay_manager = store.replay_manager();
1163
1164    replay_manager.cancel_replay(replay_id)?;
1165
1166    tracing::info!("🛑 Cancelled replay {}", replay_id);
1167
1168    Ok(Json(serde_json::json!({
1169        "replay_id": replay_id,
1170        "status": "cancelled"
1171    })))
1172}
1173
1174// v0.5: Delete a completed replay
1175pub async fn delete_replay(
1176    State(store): State<SharedStore>,
1177    Path(replay_id): Path<uuid::Uuid>,
1178) -> Result<Json<serde_json::Value>> {
1179    let replay_manager = store.replay_manager();
1180
1181    let deleted = replay_manager.delete_replay(replay_id)?;
1182
1183    if deleted {
1184        tracing::info!("🗑️  Deleted replay {}", replay_id);
1185    }
1186
1187    Ok(Json(serde_json::json!({
1188        "replay_id": replay_id,
1189        "deleted": deleted
1190    })))
1191}
1192
1193// v0.5: Register a new pipeline
1194pub async fn register_pipeline(
1195    State(store): State<SharedStore>,
1196    Json(config): Json<PipelineConfig>,
1197) -> Result<Json<serde_json::Value>> {
1198    let pipeline_manager = store.pipeline_manager();
1199
1200    let pipeline_id = pipeline_manager.register(config.clone());
1201
1202    tracing::info!(
1203        "🔀 Pipeline registered: {} (name: {})",
1204        pipeline_id,
1205        config.name
1206    );
1207
1208    Ok(Json(serde_json::json!({
1209        "pipeline_id": pipeline_id,
1210        "name": config.name,
1211        "enabled": config.enabled
1212    })))
1213}
1214
1215// v0.5: List all pipelines
1216pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1217    let pipeline_manager = store.pipeline_manager();
1218
1219    let pipelines = pipeline_manager.list();
1220
1221    tracing::debug!("Listed {} pipelines", pipelines.len());
1222
1223    Json(serde_json::json!({
1224        "pipelines": pipelines,
1225        "total": pipelines.len()
1226    }))
1227}
1228
1229// v0.5: Get a specific pipeline
1230pub async fn get_pipeline(
1231    State(store): State<SharedStore>,
1232    Path(pipeline_id): Path<uuid::Uuid>,
1233) -> Result<Json<PipelineConfig>> {
1234    let pipeline_manager = store.pipeline_manager();
1235
1236    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1237        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1238    })?;
1239
1240    Ok(Json(pipeline.config().clone()))
1241}
1242
1243// v0.5: Remove a pipeline
1244pub async fn remove_pipeline(
1245    State(store): State<SharedStore>,
1246    Path(pipeline_id): Path<uuid::Uuid>,
1247) -> Result<Json<serde_json::Value>> {
1248    let pipeline_manager = store.pipeline_manager();
1249
1250    let removed = pipeline_manager.remove(pipeline_id);
1251
1252    if removed {
1253        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1254    }
1255
1256    Ok(Json(serde_json::json!({
1257        "pipeline_id": pipeline_id,
1258        "removed": removed
1259    })))
1260}
1261
1262// v0.5: Get statistics for all pipelines
1263pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1264    let pipeline_manager = store.pipeline_manager();
1265
1266    let stats = pipeline_manager.all_stats();
1267
1268    Json(serde_json::json!({
1269        "stats": stats,
1270        "total": stats.len()
1271    }))
1272}
1273
1274// v0.5: Get statistics for a specific pipeline
1275pub async fn get_pipeline_stats(
1276    State(store): State<SharedStore>,
1277    Path(pipeline_id): Path<uuid::Uuid>,
1278) -> Result<Json<PipelineStats>> {
1279    let pipeline_manager = store.pipeline_manager();
1280
1281    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1282        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1283    })?;
1284
1285    Ok(Json(pipeline.stats()))
1286}
1287
1288// v0.5: Reset a pipeline's state
1289pub async fn reset_pipeline(
1290    State(store): State<SharedStore>,
1291    Path(pipeline_id): Path<uuid::Uuid>,
1292) -> Result<Json<serde_json::Value>> {
1293    let pipeline_manager = store.pipeline_manager();
1294
1295    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1296        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1297    })?;
1298
1299    pipeline.reset();
1300
1301    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1302
1303    Ok(Json(serde_json::json!({
1304        "pipeline_id": pipeline_id,
1305        "reset": true
1306    })))
1307}
1308
1309// =============================================================================
1310// v0.11: Single Event Lookup by ID
1311// =============================================================================
1312
1313/// Get a single event by UUID
1314pub async fn get_event_by_id(
1315    State(store): State<SharedStore>,
1316    Path(event_id): Path<uuid::Uuid>,
1317) -> Result<Json<serde_json::Value>> {
1318    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1319        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1320    })?;
1321
1322    let dto = EventDto::from(&event);
1323
1324    tracing::debug!("Event retrieved by ID: {}", event_id);
1325
1326    Ok(Json(serde_json::json!({
1327        "event": dto,
1328        "found": true
1329    })))
1330}
1331
1332// =============================================================================
1333// v0.7: Projection State API for Query Service Integration
1334// =============================================================================
1335
1336/// List all registered projections
1337pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1338    let projection_manager = store.projection_manager();
1339    let status_map = store.projection_status();
1340
1341    let projections: Vec<serde_json::Value> = projection_manager
1342        .list_projections()
1343        .iter()
1344        .map(|(name, projection)| {
1345            let status = status_map
1346                .get(name)
1347                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1348            serde_json::json!({
1349                "name": name,
1350                "type": format!("{:?}", projection.name()),
1351                "status": status,
1352            })
1353        })
1354        .collect();
1355
1356    tracing::debug!("Listed {} projections", projections.len());
1357
1358    Json(serde_json::json!({
1359        "projections": projections,
1360        "total": projections.len()
1361    }))
1362}
1363
1364/// Get projection metadata by name
1365pub async fn get_projection(
1366    State(store): State<SharedStore>,
1367    Path(name): Path<String>,
1368) -> Result<Json<serde_json::Value>> {
1369    let projection_manager = store.projection_manager();
1370
1371    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1372        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1373    })?;
1374
1375    Ok(Json(serde_json::json!({
1376        "name": projection.name(),
1377        "found": true
1378    })))
1379}
1380
1381/// Get projection state for a specific entity.
1382///
1383/// Resolution order:
1384/// 1. **Registered projection** — if `name` is registered with the projection
1385///    manager, return the projection's own `get_state(entity_id)` output.
1386/// 2. **Projection state cache** — otherwise fall back to whatever was written
1387///    via `save_projection_state` / `bulk_save_projection_states`. This supports
1388///    SDK-managed projections (e.g. the Rust SDK's `ProjectionWorker`) that
1389///    compute state client-side and push it back without registering a
1390///    projection in Core's manager.
1391///
1392/// Returns `found: false` with `state: null` when neither source has state.
1393pub async fn get_projection_state(
1394    State(store): State<SharedStore>,
1395    Path((name, entity_id)): Path<(String, String)>,
1396) -> Result<Json<serde_json::Value>> {
1397    let state = store
1398        .projection_manager()
1399        .get_projection(&name)
1400        .and_then(|p| p.get_state(&entity_id))
1401        .or_else(|| {
1402            store
1403                .projection_state_cache()
1404                .get(&format!("{name}:{entity_id}"))
1405                .map(|entry| entry.value().clone())
1406        });
1407
1408    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1409
1410    Ok(Json(serde_json::json!({
1411        "projection": name,
1412        "entity_id": entity_id,
1413        "state": state,
1414        "found": state.is_some()
1415    })))
1416}
1417
1418/// Delete (clear) a projection by name
1419///
1420/// Removes all state from the projection. The projection definition remains
1421/// registered but its accumulated state is cleared.
1422pub async fn delete_projection(
1423    State(store): State<SharedStore>,
1424    Path(name): Path<String>,
1425) -> Result<Json<serde_json::Value>> {
1426    let projection_manager = store.projection_manager();
1427
1428    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1429        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1430    })?;
1431
1432    projection.clear();
1433
1434    // Also clear any cached state for this projection
1435    let cache = store.projection_state_cache();
1436    let prefix = format!("{name}:");
1437    let keys_to_remove: Vec<String> = cache
1438        .iter()
1439        .filter(|entry| entry.key().starts_with(&prefix))
1440        .map(|entry| entry.key().clone())
1441        .collect();
1442    for key in keys_to_remove {
1443        cache.remove(&key);
1444    }
1445
1446    tracing::info!("Projection deleted (cleared): {}", name);
1447
1448    Ok(Json(serde_json::json!({
1449        "projection": name,
1450        "deleted": true
1451    })))
1452}
1453
1454/// Get aggregate projection state (all entities).
1455///
1456/// Returns the cached states written via `save_projection_state` /
1457/// `bulk_save_projection_states`. The projection does NOT need to be
1458/// registered with the projection manager — this supports SDK-managed
1459/// projections that push state without server-side registration.
1460///
1461/// Returns an empty list when no state has been written.
1462pub async fn get_projection_state_summary(
1463    State(store): State<SharedStore>,
1464    Path(name): Path<String>,
1465) -> Result<Json<serde_json::Value>> {
1466    let cache = store.projection_state_cache();
1467    let prefix = format!("{name}:");
1468    let states: Vec<serde_json::Value> = cache
1469        .iter()
1470        .filter(|entry| entry.key().starts_with(&prefix))
1471        .map(|entry| {
1472            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1473            serde_json::json!({
1474                "entity_id": entity_id,
1475                "state": entry.value().clone()
1476            })
1477        })
1478        .collect();
1479
1480    let total = states.len();
1481
1482    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1483
1484    Ok(Json(serde_json::json!({
1485        "projection": name,
1486        "states": states,
1487        "total": total
1488    })))
1489}
1490
1491/// Reset a projection to its initial state
1492///
1493/// Clears all accumulated state and reprocesses events from the beginning.
1494pub async fn reset_projection(
1495    State(store): State<SharedStore>,
1496    Path(name): Path<String>,
1497) -> Result<Json<serde_json::Value>> {
1498    let reprocessed = store.reset_projection(&name)?;
1499
1500    tracing::info!(
1501        "Projection reset: {} ({} events reprocessed)",
1502        name,
1503        reprocessed
1504    );
1505
1506    Ok(Json(serde_json::json!({
1507        "projection": name,
1508        "reset": true,
1509        "events_reprocessed": reprocessed
1510    })))
1511}
1512
1513/// Pause a projection
1514///
1515/// Sets the projection status to "paused" so it stops processing new events.
1516pub async fn pause_projection(
1517    State(store): State<SharedStore>,
1518    Path(name): Path<String>,
1519) -> Result<Json<serde_json::Value>> {
1520    let projection_manager = store.projection_manager();
1521
1522    // Verify projection exists
1523    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1524        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1525    })?;
1526
1527    store
1528        .projection_status()
1529        .insert(name.clone(), "paused".to_string());
1530
1531    tracing::info!("Projection paused: {}", name);
1532
1533    Ok(Json(serde_json::json!({
1534        "projection": name,
1535        "status": "paused"
1536    })))
1537}
1538
1539/// Start (resume) a projection
1540///
1541/// Sets the projection status to "running" so it resumes processing events.
1542pub async fn start_projection(
1543    State(store): State<SharedStore>,
1544    Path(name): Path<String>,
1545) -> Result<Json<serde_json::Value>> {
1546    let projection_manager = store.projection_manager();
1547
1548    // Verify projection exists
1549    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1550        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1551    })?;
1552
1553    store
1554        .projection_status()
1555        .insert(name.clone(), "running".to_string());
1556
1557    tracing::info!("Projection started: {}", name);
1558
1559    Ok(Json(serde_json::json!({
1560        "projection": name,
1561        "status": "running"
1562    })))
1563}
1564
1565/// Request body for saving projection state
1566#[derive(Debug, Deserialize)]
1567pub struct SaveProjectionStateRequest {
1568    pub state: serde_json::Value,
1569}
1570
1571/// Save/update projection state for an entity
1572///
1573/// This endpoint allows external services (like Elixir Query Service) to
1574/// store computed projection state back to the Core for persistence.
1575pub async fn save_projection_state(
1576    State(store): State<SharedStore>,
1577    Path((name, entity_id)): Path<(String, String)>,
1578    Json(req): Json<SaveProjectionStateRequest>,
1579) -> Result<Json<serde_json::Value>> {
1580    let projection_cache = store.projection_state_cache();
1581
1582    // Store in the projection state cache
1583    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1584
1585    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1586
1587    Ok(Json(serde_json::json!({
1588        "projection": name,
1589        "entity_id": entity_id,
1590        "saved": true
1591    })))
1592}
1593
1594/// Bulk get projection states for multiple entities
1595///
1596/// Efficient endpoint for fetching multiple entity states in a single request.
1597#[derive(Debug, Deserialize)]
1598pub struct BulkGetStateRequest {
1599    pub entity_ids: Vec<String>,
1600}
1601
1602/// Bulk save projection states for multiple entities
1603///
1604/// Efficient endpoint for saving multiple entity states in a single request.
1605#[derive(Debug, Deserialize)]
1606pub struct BulkSaveStateRequest {
1607    pub states: Vec<BulkSaveStateItem>,
1608}
1609
1610#[derive(Debug, Deserialize)]
1611pub struct BulkSaveStateItem {
1612    pub entity_id: String,
1613    pub state: serde_json::Value,
1614}
1615
1616pub async fn bulk_get_projection_states(
1617    State(store): State<SharedStore>,
1618    Path(name): Path<String>,
1619    Json(req): Json<BulkGetStateRequest>,
1620) -> Result<Json<serde_json::Value>> {
1621    // Same fallback rule as `get_projection_state`: registered projection
1622    // wins, cache is the fallback. This lets SDK-managed projections read
1623    // their pushed-back state without registering in Core's projection manager.
1624    let projection = store.projection_manager().get_projection(&name);
1625    let cache = store.projection_state_cache();
1626
1627    let states: Vec<serde_json::Value> = req
1628        .entity_ids
1629        .iter()
1630        .map(|entity_id| {
1631            let state = projection
1632                .as_ref()
1633                .and_then(|p| p.get_state(entity_id))
1634                .or_else(|| {
1635                    cache
1636                        .get(&format!("{name}:{entity_id}"))
1637                        .map(|entry| entry.value().clone())
1638                });
1639            serde_json::json!({
1640                "entity_id": entity_id,
1641                "state": state,
1642                "found": state.is_some()
1643            })
1644        })
1645        .collect();
1646
1647    tracing::debug!(
1648        "Bulk projection state retrieved: {} entities from {}",
1649        states.len(),
1650        name
1651    );
1652
1653    Ok(Json(serde_json::json!({
1654        "projection": name,
1655        "states": states,
1656        "total": states.len()
1657    })))
1658}
1659
1660/// Bulk save projection states for multiple entities
1661///
1662/// This endpoint allows efficient batch saving of projection states,
1663/// critical for high-throughput event processing pipelines.
1664pub async fn bulk_save_projection_states(
1665    State(store): State<SharedStore>,
1666    Path(name): Path<String>,
1667    Json(req): Json<BulkSaveStateRequest>,
1668) -> Result<Json<serde_json::Value>> {
1669    let projection_cache = store.projection_state_cache();
1670
1671    let mut saved_count = 0;
1672    for item in &req.states {
1673        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1674        saved_count += 1;
1675    }
1676
1677    tracing::info!(
1678        "Bulk projection state saved: {} entities for {}",
1679        saved_count,
1680        name
1681    );
1682
1683    Ok(Json(serde_json::json!({
1684        "projection": name,
1685        "saved": saved_count,
1686        "total": req.states.len()
1687    })))
1688}
1689
1690// =============================================================================
1691// v0.11: Webhook Management API
1692// =============================================================================
1693
1694/// Query parameters for listing webhooks
1695#[derive(Debug, Deserialize)]
1696pub struct ListWebhooksParams {
1697    pub tenant_id: Option<String>,
1698}
1699
1700/// Register a new webhook subscription
1701pub async fn register_webhook(
1702    State(store): State<SharedStore>,
1703    Json(req): Json<RegisterWebhookRequest>,
1704) -> Json<serde_json::Value> {
1705    let registry = store.webhook_registry();
1706    let webhook = registry.register(req);
1707
1708    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1709
1710    Json(serde_json::json!({
1711        "webhook": webhook,
1712        "created": true
1713    }))
1714}
1715
1716/// List webhooks, optionally filtered by tenant_id
1717pub async fn list_webhooks(
1718    State(store): State<SharedStore>,
1719    Query(params): Query<ListWebhooksParams>,
1720) -> Json<serde_json::Value> {
1721    let registry = store.webhook_registry();
1722
1723    let webhooks = if let Some(tenant_id) = params.tenant_id {
1724        registry.list_by_tenant(&tenant_id)
1725    } else {
1726        // Without tenant filter, return empty (tenants should always filter)
1727        vec![]
1728    };
1729
1730    let total = webhooks.len();
1731
1732    Json(serde_json::json!({
1733        "webhooks": webhooks,
1734        "total": total
1735    }))
1736}
1737
1738/// Get a specific webhook by ID
1739pub async fn get_webhook(
1740    State(store): State<SharedStore>,
1741    Path(webhook_id): Path<uuid::Uuid>,
1742) -> Result<Json<serde_json::Value>> {
1743    let registry = store.webhook_registry();
1744
1745    let webhook = registry.get(webhook_id).ok_or_else(|| {
1746        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1747    })?;
1748
1749    Ok(Json(serde_json::json!({
1750        "webhook": webhook,
1751        "found": true
1752    })))
1753}
1754
1755/// Update a webhook subscription
1756pub async fn update_webhook(
1757    State(store): State<SharedStore>,
1758    Path(webhook_id): Path<uuid::Uuid>,
1759    Json(req): Json<UpdateWebhookRequest>,
1760) -> Result<Json<serde_json::Value>> {
1761    let registry = store.webhook_registry();
1762
1763    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1764        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1765    })?;
1766
1767    tracing::info!("Webhook updated: {}", webhook_id);
1768
1769    Ok(Json(serde_json::json!({
1770        "webhook": webhook,
1771        "updated": true
1772    })))
1773}
1774
1775/// Delete a webhook subscription
1776pub async fn delete_webhook(
1777    State(store): State<SharedStore>,
1778    Path(webhook_id): Path<uuid::Uuid>,
1779) -> Result<Json<serde_json::Value>> {
1780    let registry = store.webhook_registry();
1781
1782    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1783        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1784    })?;
1785
1786    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1787
1788    Ok(Json(serde_json::json!({
1789        "webhook_id": webhook_id,
1790        "deleted": true
1791    })))
1792}
1793
1794/// Query parameters for listing webhook deliveries
1795#[derive(Debug, Deserialize)]
1796pub struct ListDeliveriesParams {
1797    pub limit: Option<usize>,
1798}
1799
1800/// List delivery history for a webhook
1801pub async fn list_webhook_deliveries(
1802    State(store): State<SharedStore>,
1803    Path(webhook_id): Path<uuid::Uuid>,
1804    Query(params): Query<ListDeliveriesParams>,
1805) -> Result<Json<serde_json::Value>> {
1806    let registry = store.webhook_registry();
1807
1808    // Verify webhook exists
1809    registry.get(webhook_id).ok_or_else(|| {
1810        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1811    })?;
1812
1813    let limit = params.limit.unwrap_or(50);
1814    let deliveries = registry.get_deliveries(webhook_id, limit);
1815    let total = deliveries.len();
1816
1817    Ok(Json(serde_json::json!({
1818        "webhook_id": webhook_id,
1819        "deliveries": deliveries,
1820        "total": total
1821    })))
1822}
1823
1824// =============================================================================
1825// v2.0: Advanced Query Features
1826// =============================================================================
1827
1828/// EventQL: Execute SQL queries over events using DataFusion
1829#[cfg(feature = "analytics")]
1830pub async fn eventql_query(
1831    State(store): State<SharedStore>,
1832    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1833) -> Result<Json<serde_json::Value>> {
1834    let events = store.snapshot_events();
1835    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1836        Ok(response) => Ok(Json(serde_json::json!({
1837            "columns": response.columns,
1838            "rows": response.rows,
1839            "row_count": response.row_count,
1840        }))),
1841        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1842    }
1843}
1844
1845/// GraphQL: Execute GraphQL queries
1846pub async fn graphql_query(
1847    State(store): State<SharedStore>,
1848    Json(req): Json<GraphQLRequest>,
1849) -> Json<serde_json::Value> {
1850    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1851        Ok(f) => f,
1852        Err(e) => {
1853            return Json(
1854                serde_json::to_value(GraphQLResponse {
1855                    data: None,
1856                    errors: vec![GraphQLError { message: e }],
1857                })
1858                .unwrap(),
1859            );
1860        }
1861    };
1862
1863    let mut data = serde_json::Map::new();
1864    let mut errors = Vec::new();
1865
1866    for field in &fields {
1867        match field.name.as_str() {
1868            "events" => {
1869                let request = crate::application::dto::QueryEventsRequest {
1870                    entity_id: field.arguments.get("entity_id").cloned(),
1871                    event_type: field.arguments.get("event_type").cloned(),
1872                    tenant_id: field.arguments.get("tenant_id").cloned(),
1873                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1874                    as_of: None,
1875                    since: None,
1876                    until: None,
1877                    event_type_prefix: None,
1878                    payload_filter: None,
1879                };
1880                match store.query(&request) {
1881                    Ok(events) => {
1882                        let json_events: Vec<serde_json::Value> = events
1883                            .iter()
1884                            .map(|e| {
1885                                crate::infrastructure::query::graphql::event_to_json(
1886                                    e,
1887                                    &field.fields,
1888                                )
1889                            })
1890                            .collect();
1891                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1892                    }
1893                    Err(e) => errors.push(GraphQLError {
1894                        message: format!("events query failed: {e}"),
1895                    }),
1896                }
1897            }
1898            "event" => {
1899                if let Some(id_str) = field.arguments.get("id") {
1900                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1901                        match store.get_event_by_id(&id) {
1902                            Ok(Some(event)) => {
1903                                data.insert(
1904                                    "event".to_string(),
1905                                    crate::infrastructure::query::graphql::event_to_json(
1906                                        &event,
1907                                        &field.fields,
1908                                    ),
1909                                );
1910                            }
1911                            Ok(None) => {
1912                                data.insert("event".to_string(), serde_json::Value::Null);
1913                            }
1914                            Err(e) => errors.push(GraphQLError {
1915                                message: format!("event lookup failed: {e}"),
1916                            }),
1917                        }
1918                    } else {
1919                        errors.push(GraphQLError {
1920                            message: format!("Invalid UUID: {id_str}"),
1921                        });
1922                    }
1923                } else {
1924                    errors.push(GraphQLError {
1925                        message: "event query requires 'id' argument".to_string(),
1926                    });
1927                }
1928            }
1929            "projections" => {
1930                let pm = store.projection_manager();
1931                let names: Vec<serde_json::Value> = pm
1932                    .list_projections()
1933                    .iter()
1934                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1935                    .collect();
1936                data.insert("projections".to_string(), serde_json::Value::Array(names));
1937            }
1938            "stats" => {
1939                let stats = store.stats();
1940                data.insert(
1941                    "stats".to_string(),
1942                    serde_json::json!({
1943                        "total_events": stats.total_events,
1944                        "total_entities": stats.total_entities,
1945                        "total_event_types": stats.total_event_types,
1946                    }),
1947                );
1948            }
1949            "__schema" => {
1950                data.insert(
1951                    "__schema".to_string(),
1952                    crate::infrastructure::query::graphql::introspection_schema(),
1953                );
1954            }
1955            other => {
1956                errors.push(GraphQLError {
1957                    message: format!("Unknown field: {other}"),
1958                });
1959            }
1960        }
1961    }
1962
1963    Json(
1964        serde_json::to_value(GraphQLResponse {
1965            data: Some(serde_json::Value::Object(data)),
1966            errors,
1967        })
1968        .unwrap(),
1969    )
1970}
1971
1972/// Geospatial: Query events by location
1973pub async fn geo_query(
1974    State(store): State<SharedStore>,
1975    Json(req): Json<GeoQueryRequest>,
1976) -> Json<serde_json::Value> {
1977    let events = store.snapshot_events();
1978    let geo_index = store.geo_index();
1979    let results =
1980        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1981    let total = results.len();
1982    Json(serde_json::json!({
1983        "results": results,
1984        "total": total,
1985    }))
1986}
1987
1988/// Geospatial index stats
1989pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1990    let stats = store.geo_index().stats();
1991    Json(serde_json::json!(stats))
1992}
1993
1994/// Exactly-once processing stats
1995pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1996    let stats = store.exactly_once().stats();
1997    Json(serde_json::json!(stats))
1998}
1999
2000/// Schema evolution history for an event type
2001pub async fn schema_evolution_history(
2002    State(store): State<SharedStore>,
2003    Path(event_type): Path<String>,
2004) -> Json<serde_json::Value> {
2005    let mgr = store.schema_evolution();
2006    let history = mgr.get_history(&event_type);
2007    let version = mgr.get_version(&event_type);
2008    Json(serde_json::json!({
2009        "event_type": event_type,
2010        "current_version": version,
2011        "history": history,
2012    }))
2013}
2014
2015/// Current inferred schema for an event type
2016pub async fn schema_evolution_schema(
2017    State(store): State<SharedStore>,
2018    Path(event_type): Path<String>,
2019) -> Json<serde_json::Value> {
2020    let mgr = store.schema_evolution();
2021    if let Some(schema) = mgr.get_schema(&event_type) {
2022        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2023        Json(serde_json::json!({
2024            "event_type": event_type,
2025            "version": mgr.get_version(&event_type),
2026            "inferred_schema": schema,
2027            "json_schema": json_schema,
2028        }))
2029    } else {
2030        Json(serde_json::json!({
2031            "event_type": event_type,
2032            "error": "No schema inferred for this event type"
2033        }))
2034    }
2035}
2036
2037/// Schema evolution stats
2038pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2039    let stats = store.schema_evolution().stats();
2040    let event_types = store.schema_evolution().list_event_types();
2041    Json(serde_json::json!({
2042        "stats": stats,
2043        "tracked_event_types": event_types,
2044    }))
2045}
2046
2047// =============================================================================
2048// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2049// =============================================================================
2050
2051/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2052#[cfg(feature = "embedded-sync")]
2053pub async fn sync_pull_handler(
2054    State(state): State<AppState>,
2055    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2056) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2057    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2058
2059    let store = &state.store;
2060
2061    // Compute "since" threshold from the client's version vector
2062    // We return all events the client hasn't seen yet
2063    let since = request
2064        .version_vector
2065        .values()
2066        .map(|ts| ts.physical_ms)
2067        .min()
2068        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2069
2070    let events = store.query(&crate::application::dto::QueryEventsRequest {
2071        entity_id: None,
2072        event_type: None,
2073        tenant_id: None,
2074        as_of: None,
2075        since,
2076        until: None,
2077        limit: None,
2078        event_type_prefix: None,
2079        payload_filter: None,
2080    })?;
2081
2082    // Convert domain events to ReplicatedEvent wire format
2083    let mut replicated = Vec::with_capacity(events.len());
2084    let mut last_ms = 0u64;
2085    let mut logical = 0u32;
2086
2087    for event in &events {
2088        let event_ms = event.timestamp().timestamp_millis() as u64;
2089        if event_ms == last_ms {
2090            logical += 1;
2091        } else {
2092            last_ms = event_ms;
2093            logical = 0;
2094        }
2095
2096        replicated.push(ReplicatedEvent {
2097            event_id: event.id().to_string(),
2098            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2099            origin_region: "server".to_string(),
2100            event_data: serde_json::json!({
2101                "event_type": event.event_type_str(),
2102                "entity_id": event.entity_id_str(),
2103                "tenant_id": event.tenant_id_str(),
2104                "payload": event.payload,
2105                "metadata": event.metadata,
2106            }),
2107        });
2108    }
2109
2110    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2111        events: replicated,
2112        version_vector: std::collections::BTreeMap::new(),
2113    }))
2114}
2115
2116/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2117#[cfg(feature = "embedded-sync")]
2118pub async fn sync_push_handler(
2119    State(state): State<AppState>,
2120    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2121) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2122    let store = &state.store;
2123
2124    let mut accepted = 0usize;
2125    let mut skipped = 0usize;
2126
2127    for rep_event in &request.events {
2128        let event_data = &rep_event.event_data;
2129        let event_type = event_data
2130            .get("event_type")
2131            .and_then(|v| v.as_str())
2132            .unwrap_or("unknown")
2133            .to_string();
2134        let entity_id = event_data
2135            .get("entity_id")
2136            .and_then(|v| v.as_str())
2137            .unwrap_or("unknown")
2138            .to_string();
2139        let tenant_id = event_data
2140            .get("tenant_id")
2141            .and_then(|v| v.as_str())
2142            .unwrap_or("default")
2143            .to_string();
2144        let payload = event_data
2145            .get("payload")
2146            .cloned()
2147            .unwrap_or(serde_json::json!({}));
2148        let metadata = event_data.get("metadata").cloned();
2149
2150        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2151            Ok(domain_event) => {
2152                store.ingest(&domain_event)?;
2153                accepted += 1;
2154            }
2155            Err(_) => {
2156                skipped += 1;
2157            }
2158        }
2159    }
2160
2161    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2162        accepted,
2163        skipped,
2164        version_vector: std::collections::BTreeMap::new(),
2165    }))
2166}
2167
2168// =============================================================================
2169// Consumer endpoints for durable subscriptions (v0.14)
2170// =============================================================================
2171
2172/// POST /api/v1/consumers — Register a durable consumer
2173pub async fn register_consumer(
2174    State(store): State<SharedStore>,
2175    Json(req): Json<RegisterConsumerRequest>,
2176) -> Result<Json<ConsumerResponse>> {
2177    let consumer = store
2178        .consumer_registry()
2179        .register(&req.consumer_id, &req.event_type_filters);
2180
2181    Ok(Json(ConsumerResponse {
2182        consumer_id: consumer.consumer_id,
2183        event_type_filters: consumer.event_type_filters,
2184        cursor_position: consumer.cursor_position,
2185    }))
2186}
2187
2188/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2189pub async fn get_consumer(
2190    State(store): State<SharedStore>,
2191    Path(consumer_id): Path<String>,
2192) -> Result<Json<ConsumerResponse>> {
2193    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2194
2195    Ok(Json(ConsumerResponse {
2196        consumer_id: consumer.consumer_id,
2197        event_type_filters: consumer.event_type_filters,
2198        cursor_position: consumer.cursor_position,
2199    }))
2200}
2201
2202/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2203#[derive(Debug, Deserialize)]
2204pub struct ConsumerPollQuery {
2205    pub limit: Option<usize>,
2206}
2207
2208pub async fn poll_consumer_events(
2209    State(store): State<SharedStore>,
2210    Path(consumer_id): Path<String>,
2211    Query(query): Query<ConsumerPollQuery>,
2212) -> Result<Json<ConsumerEventsResponse>> {
2213    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2214    let offset = consumer.cursor_position.unwrap_or(0);
2215    let limit = query.limit.unwrap_or(100);
2216
2217    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2218    let count = events.len();
2219
2220    let consumer_events: Vec<ConsumerEventDto> = events
2221        .into_iter()
2222        .map(|(position, event)| ConsumerEventDto {
2223            position,
2224            event: EventDto::from(&event),
2225        })
2226        .collect();
2227
2228    Ok(Json(ConsumerEventsResponse {
2229        events: consumer_events,
2230        count,
2231    }))
2232}
2233
2234/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2235pub async fn ack_consumer(
2236    State(store): State<SharedStore>,
2237    Path(consumer_id): Path<String>,
2238    Json(req): Json<AckRequest>,
2239) -> Result<Json<serde_json::Value>> {
2240    let max_offset = store.total_events() as u64;
2241
2242    store
2243        .consumer_registry()
2244        .ack(&consumer_id, req.position, max_offset)
2245        .map_err(crate::error::AllSourceError::InvalidInput)?;
2246
2247    Ok(Json(serde_json::json!({
2248        "status": "ok",
2249        "consumer_id": consumer_id,
2250        "position": req.position,
2251    })))
2252}
2253
2254#[cfg(test)]
2255mod tests {
2256    use super::*;
2257    use crate::{domain::entities::Event, store::EventStore};
2258
2259    fn create_test_store() -> Arc<EventStore> {
2260        Arc::new(EventStore::new())
2261    }
2262
2263    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2264        Event::from_strings(
2265            event_type.to_string(),
2266            entity_id.to_string(),
2267            "test-stream".to_string(),
2268            serde_json::json!({
2269                "name": "Test",
2270                "value": 42
2271            }),
2272            None,
2273        )
2274        .unwrap()
2275    }
2276
2277    #[tokio::test]
2278    async fn test_query_events_has_more_and_total_count() {
2279        let store = create_test_store();
2280
2281        // Ingest 50 events
2282        for i in 0..50 {
2283            store
2284                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2285                .unwrap();
2286        }
2287
2288        // Query with limit=10 — should get has_more=true, total_count=50
2289        let req = QueryEventsRequest {
2290            entity_id: None,
2291            event_type: None,
2292            tenant_id: None,
2293            as_of: None,
2294            since: None,
2295            until: None,
2296            limit: Some(10),
2297            event_type_prefix: None,
2298            payload_filter: None,
2299        };
2300
2301        let requested_limit = req.limit;
2302        let unlimited_req = QueryEventsRequest {
2303            limit: None,
2304            ..QueryEventsRequest {
2305                entity_id: req.entity_id,
2306                event_type: req.event_type,
2307                tenant_id: req.tenant_id,
2308                as_of: req.as_of,
2309                since: req.since,
2310                until: req.until,
2311                limit: None,
2312                event_type_prefix: req.event_type_prefix,
2313                payload_filter: req.payload_filter,
2314            }
2315        };
2316        let all_events = store.query(&unlimited_req).unwrap();
2317        let total_count = all_events.len();
2318        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2319            all_events.into_iter().take(limit).collect()
2320        } else {
2321            all_events
2322        };
2323        let count = limited_events.len();
2324        let has_more = count < total_count;
2325
2326        assert_eq!(count, 10);
2327        assert_eq!(total_count, 50);
2328        assert!(has_more);
2329    }
2330
2331    #[tokio::test]
2332    async fn test_query_events_no_more_results() {
2333        let store = create_test_store();
2334
2335        // Ingest 5 events
2336        for i in 0..5 {
2337            store
2338                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2339                .unwrap();
2340        }
2341
2342        // Query with limit=100 — should get has_more=false, total_count=5
2343        let all_events = store
2344            .query(&QueryEventsRequest {
2345                entity_id: None,
2346                event_type: None,
2347                tenant_id: None,
2348                as_of: None,
2349                since: None,
2350                until: None,
2351                limit: None,
2352                event_type_prefix: None,
2353                payload_filter: None,
2354            })
2355            .unwrap();
2356        let total_count = all_events.len();
2357        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2358        let count = limited_events.len();
2359        let has_more = count < total_count;
2360
2361        assert_eq!(count, 5);
2362        assert_eq!(total_count, 5);
2363        assert!(!has_more);
2364    }
2365
2366    #[tokio::test]
2367    async fn test_list_entities_by_type_prefix() {
2368        let store = create_test_store();
2369
2370        // 3 index entities
2371        store
2372            .ingest(&create_test_event("idx-1", "index.created"))
2373            .unwrap();
2374        store
2375            .ingest(&create_test_event("idx-1", "index.updated"))
2376            .unwrap();
2377        store
2378            .ingest(&create_test_event("idx-2", "index.created"))
2379            .unwrap();
2380        store
2381            .ingest(&create_test_event("idx-3", "index.created"))
2382            .unwrap();
2383        // 2 trade entities
2384        store
2385            .ingest(&create_test_event("trade-1", "trade.created"))
2386            .unwrap();
2387        store
2388            .ingest(&create_test_event("trade-2", "trade.created"))
2389            .unwrap();
2390
2391        // List entities for index.*
2392        let req = ListEntitiesRequest {
2393            event_type_prefix: Some("index.".to_string()),
2394            payload_filter: None,
2395            limit: None,
2396            offset: None,
2397        };
2398        let query_req = QueryEventsRequest {
2399            entity_id: None,
2400            event_type: None,
2401            tenant_id: None,
2402            as_of: None,
2403            since: None,
2404            until: None,
2405            limit: None,
2406            event_type_prefix: req.event_type_prefix,
2407            payload_filter: req.payload_filter,
2408        };
2409        let events = store.query(&query_req).unwrap();
2410
2411        // Group and verify
2412        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2413            std::collections::HashMap::new();
2414        for event in &events {
2415            entity_map
2416                .entry(event.entity_id().to_string())
2417                .or_default()
2418                .push(event);
2419        }
2420
2421        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2422        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2423        assert_eq!(entity_map["idx-2"].len(), 1);
2424        assert_eq!(entity_map["idx-3"].len(), 1);
2425    }
2426
2427    fn create_test_event_with_payload(
2428        entity_id: &str,
2429        event_type: &str,
2430        payload: serde_json::Value,
2431    ) -> Event {
2432        Event::from_strings(
2433            event_type.to_string(),
2434            entity_id.to_string(),
2435            "test-stream".to_string(),
2436            payload,
2437            None,
2438        )
2439        .unwrap()
2440    }
2441
2442    #[tokio::test]
2443    async fn test_detect_duplicates_by_payload_fields() {
2444        let store = create_test_store();
2445
2446        // Create entities with duplicate "name" field values
2447        store
2448            .ingest(&create_test_event_with_payload(
2449                "idx-1",
2450                "index.created",
2451                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2452            ))
2453            .unwrap();
2454        store
2455            .ingest(&create_test_event_with_payload(
2456                "idx-2",
2457                "index.created",
2458                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2459            ))
2460            .unwrap();
2461        store
2462            .ingest(&create_test_event_with_payload(
2463                "idx-3",
2464                "index.created",
2465                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2466            ))
2467            .unwrap();
2468        store
2469            .ingest(&create_test_event_with_payload(
2470                "idx-4",
2471                "index.created",
2472                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2473            ))
2474            .unwrap();
2475        store
2476            .ingest(&create_test_event_with_payload(
2477                "idx-5",
2478                "index.created",
2479                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2480            ))
2481            .unwrap();
2482
2483        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2484        let query_req = QueryEventsRequest {
2485            entity_id: None,
2486            event_type: None,
2487            tenant_id: None,
2488            as_of: None,
2489            since: None,
2490            until: None,
2491            limit: None,
2492            event_type_prefix: Some("index.".to_string()),
2493            payload_filter: None,
2494        };
2495        let events = store.query(&query_req).unwrap();
2496
2497        // Manually replicate the handler logic for testing
2498        let group_by_fields = vec!["name"];
2499        let mut entity_latest: std::collections::HashMap<String, &Event> =
2500            std::collections::HashMap::new();
2501        for event in &events {
2502            let eid = event.entity_id().to_string();
2503            entity_latest
2504                .entry(eid)
2505                .and_modify(|existing| {
2506                    if event.timestamp() > existing.timestamp() {
2507                        *existing = event;
2508                    }
2509                })
2510                .or_insert(event);
2511        }
2512
2513        let mut groups: std::collections::HashMap<String, Vec<String>> =
2514            std::collections::HashMap::new();
2515        for (entity_id, event) in &entity_latest {
2516            let payload = event.payload();
2517            let mut key_parts = serde_json::Map::new();
2518            for field in &group_by_fields {
2519                let value = payload
2520                    .get(*field)
2521                    .cloned()
2522                    .unwrap_or(serde_json::Value::Null);
2523                key_parts.insert((*field).to_string(), value);
2524            }
2525            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2526            groups.entry(key_str).or_default().push(entity_id.clone());
2527        }
2528
2529        let duplicate_groups: Vec<_> = groups
2530            .into_iter()
2531            .filter(|(_, ids)| ids.len() > 1)
2532            .collect();
2533
2534        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2535        for (_, ids) in &duplicate_groups {
2536            assert_eq!(ids.len(), 2);
2537        }
2538    }
2539
2540    #[tokio::test]
2541    async fn test_detect_duplicates_no_duplicates() {
2542        let store = create_test_store();
2543
2544        // All unique names
2545        store
2546            .ingest(&create_test_event_with_payload(
2547                "idx-1",
2548                "index.created",
2549                serde_json::json!({"name": "A"}),
2550            ))
2551            .unwrap();
2552        store
2553            .ingest(&create_test_event_with_payload(
2554                "idx-2",
2555                "index.created",
2556                serde_json::json!({"name": "B"}),
2557            ))
2558            .unwrap();
2559
2560        let query_req = QueryEventsRequest {
2561            entity_id: None,
2562            event_type: None,
2563            tenant_id: None,
2564            as_of: None,
2565            since: None,
2566            until: None,
2567            limit: None,
2568            event_type_prefix: Some("index.".to_string()),
2569            payload_filter: None,
2570        };
2571        let events = store.query(&query_req).unwrap();
2572
2573        let mut entity_latest: std::collections::HashMap<String, &Event> =
2574            std::collections::HashMap::new();
2575        for event in &events {
2576            entity_latest
2577                .entry(event.entity_id().to_string())
2578                .or_insert(event);
2579        }
2580
2581        let mut groups: std::collections::HashMap<String, Vec<String>> =
2582            std::collections::HashMap::new();
2583        for (entity_id, event) in &entity_latest {
2584            let key_str =
2585                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2586                    .unwrap();
2587            groups.entry(key_str).or_default().push(entity_id.clone());
2588        }
2589
2590        let duplicate_groups: Vec<_> = groups
2591            .into_iter()
2592            .filter(|(_, ids)| ids.len() > 1)
2593            .collect();
2594
2595        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2596    }
2597
2598    #[tokio::test]
2599    async fn test_detect_duplicates_multi_field_group_by() {
2600        let store = create_test_store();
2601
2602        // Two entities with same name AND user_id = true duplicate
2603        store
2604            .ingest(&create_test_event_with_payload(
2605                "idx-1",
2606                "index.created",
2607                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2608            ))
2609            .unwrap();
2610        store
2611            .ingest(&create_test_event_with_payload(
2612                "idx-2",
2613                "index.created",
2614                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2615            ))
2616            .unwrap();
2617        // Same name but different user_id = NOT a duplicate in multi-field group
2618        store
2619            .ingest(&create_test_event_with_payload(
2620                "idx-3",
2621                "index.created",
2622                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2623            ))
2624            .unwrap();
2625
2626        let query_req = QueryEventsRequest {
2627            entity_id: None,
2628            event_type: None,
2629            tenant_id: None,
2630            as_of: None,
2631            since: None,
2632            until: None,
2633            limit: None,
2634            event_type_prefix: Some("index.".to_string()),
2635            payload_filter: None,
2636        };
2637        let events = store.query(&query_req).unwrap();
2638
2639        let group_by_fields = vec!["name", "user_id"];
2640        let mut entity_latest: std::collections::HashMap<String, &Event> =
2641            std::collections::HashMap::new();
2642        for event in &events {
2643            entity_latest
2644                .entry(event.entity_id().to_string())
2645                .and_modify(|existing| {
2646                    if event.timestamp() > existing.timestamp() {
2647                        *existing = event;
2648                    }
2649                })
2650                .or_insert(event);
2651        }
2652
2653        let mut groups: std::collections::HashMap<String, Vec<String>> =
2654            std::collections::HashMap::new();
2655        for (entity_id, event) in &entity_latest {
2656            let payload = event.payload();
2657            let mut key_parts = serde_json::Map::new();
2658            for field in &group_by_fields {
2659                let value = payload
2660                    .get(*field)
2661                    .cloned()
2662                    .unwrap_or(serde_json::Value::Null);
2663                key_parts.insert((*field).to_string(), value);
2664            }
2665            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2666            groups.entry(key_str).or_default().push(entity_id.clone());
2667        }
2668
2669        let duplicate_groups: Vec<_> = groups
2670            .into_iter()
2671            .filter(|(_, ids)| ids.len() > 1)
2672            .collect();
2673
2674        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2675        assert_eq!(duplicate_groups.len(), 1);
2676        let (_, ref ids) = duplicate_groups[0];
2677        assert_eq!(ids.len(), 2);
2678        let mut sorted_ids = ids.clone();
2679        sorted_ids.sort();
2680        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2681    }
2682
2683    #[tokio::test]
2684    async fn test_projection_state_cache() {
2685        let store = create_test_store();
2686
2687        // Test cache insertion
2688        let cache = store.projection_state_cache();
2689        cache.insert(
2690            "entity_snapshots:user-123".to_string(),
2691            serde_json::json!({"name": "Test User", "age": 30}),
2692        );
2693
2694        // Test cache retrieval
2695        let state = cache.get("entity_snapshots:user-123");
2696        assert!(state.is_some());
2697        let state = state.unwrap();
2698        assert_eq!(state["name"], "Test User");
2699        assert_eq!(state["age"], 30);
2700    }
2701
2702    #[tokio::test]
2703    async fn test_projection_manager_list_projections() {
2704        let store = create_test_store();
2705
2706        // List projections (built-in projections should be available)
2707        let projection_manager = store.projection_manager();
2708        let projections = projection_manager.list_projections();
2709
2710        // Should have entity_snapshots and event_counters
2711        assert!(projections.len() >= 2);
2712
2713        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2714        assert!(names.contains(&"entity_snapshots"));
2715        assert!(names.contains(&"event_counters"));
2716    }
2717
2718    #[tokio::test]
2719    async fn test_projection_state_after_event_ingestion() {
2720        let store = create_test_store();
2721
2722        // Ingest an event
2723        let event = create_test_event("user-456", "user.created");
2724        store.ingest(&event).unwrap();
2725
2726        // Get projection state
2727        let projection_manager = store.projection_manager();
2728        let snapshot_projection = projection_manager
2729            .get_projection("entity_snapshots")
2730            .unwrap();
2731
2732        let state = snapshot_projection.get_state("user-456");
2733        assert!(state.is_some());
2734        let state = state.unwrap();
2735        assert_eq!(state["name"], "Test");
2736        assert_eq!(state["value"], 42);
2737    }
2738
2739    #[tokio::test]
2740    async fn test_projection_state_cache_multiple_entities() {
2741        let store = create_test_store();
2742        let cache = store.projection_state_cache();
2743
2744        // Insert multiple entities
2745        for i in 0..10 {
2746            cache.insert(
2747                format!("entity_snapshots:entity-{i}"),
2748                serde_json::json!({"id": i, "status": "active"}),
2749            );
2750        }
2751
2752        // Verify all insertions
2753        assert_eq!(cache.len(), 10);
2754
2755        // Verify each entity
2756        for i in 0..10 {
2757            let key = format!("entity_snapshots:entity-{i}");
2758            let state = cache.get(&key);
2759            assert!(state.is_some());
2760            assert_eq!(state.unwrap()["id"], i);
2761        }
2762    }
2763
2764    #[tokio::test]
2765    async fn test_projection_state_update() {
2766        let store = create_test_store();
2767        let cache = store.projection_state_cache();
2768
2769        // Initial state
2770        cache.insert(
2771            "entity_snapshots:user-789".to_string(),
2772            serde_json::json!({"balance": 100}),
2773        );
2774
2775        // Update state
2776        cache.insert(
2777            "entity_snapshots:user-789".to_string(),
2778            serde_json::json!({"balance": 150}),
2779        );
2780
2781        // Verify update
2782        let state = cache.get("entity_snapshots:user-789").unwrap();
2783        assert_eq!(state["balance"], 150);
2784    }
2785
2786    #[tokio::test]
2787    async fn test_event_counter_projection() {
2788        let store = create_test_store();
2789
2790        // Ingest events of different types
2791        store
2792            .ingest(&create_test_event("user-1", "user.created"))
2793            .unwrap();
2794        store
2795            .ingest(&create_test_event("user-2", "user.created"))
2796            .unwrap();
2797        store
2798            .ingest(&create_test_event("user-1", "user.updated"))
2799            .unwrap();
2800
2801        // Get event counter projection
2802        let projection_manager = store.projection_manager();
2803        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2804
2805        // Check counts
2806        let created_state = counter_projection.get_state("user.created");
2807        assert!(created_state.is_some());
2808        assert_eq!(created_state.unwrap()["count"], 2);
2809
2810        let updated_state = counter_projection.get_state("user.updated");
2811        assert!(updated_state.is_some());
2812        assert_eq!(updated_state.unwrap()["count"], 1);
2813    }
2814
2815    #[tokio::test]
2816    async fn test_projection_state_cache_key_format() {
2817        let store = create_test_store();
2818        let cache = store.projection_state_cache();
2819
2820        // Test standard key format: {projection_name}:{entity_id}
2821        let key = "orders:order-12345".to_string();
2822        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2823
2824        let state = cache.get(&key).unwrap();
2825        assert_eq!(state["total"], 99.99);
2826    }
2827
2828    #[tokio::test]
2829    async fn test_projection_state_cache_removal() {
2830        let store = create_test_store();
2831        let cache = store.projection_state_cache();
2832
2833        // Insert and then remove
2834        cache.insert(
2835            "test:entity-1".to_string(),
2836            serde_json::json!({"data": "value"}),
2837        );
2838        assert_eq!(cache.len(), 1);
2839
2840        cache.remove("test:entity-1");
2841        assert_eq!(cache.len(), 0);
2842        assert!(cache.get("test:entity-1").is_none());
2843    }
2844
2845    #[tokio::test]
2846    async fn test_get_nonexistent_projection() {
2847        let store = create_test_store();
2848        let projection_manager = store.projection_manager();
2849
2850        // Requesting a non-existent projection should return None
2851        let projection = projection_manager.get_projection("nonexistent_projection");
2852        assert!(projection.is_none());
2853    }
2854
2855    #[tokio::test]
2856    async fn test_get_nonexistent_entity_state() {
2857        let store = create_test_store();
2858        let projection_manager = store.projection_manager();
2859
2860        // Get state for non-existent entity
2861        let snapshot_projection = projection_manager
2862            .get_projection("entity_snapshots")
2863            .unwrap();
2864        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2865        assert!(state.is_none());
2866    }
2867
2868    #[tokio::test]
2869    async fn test_projection_state_cache_concurrent_access() {
2870        let store = create_test_store();
2871        let cache = store.projection_state_cache();
2872
2873        // Simulate concurrent writes
2874        let handles: Vec<_> = (0..10)
2875            .map(|i| {
2876                let cache_clone = cache.clone();
2877                tokio::spawn(async move {
2878                    cache_clone.insert(
2879                        format!("concurrent:entity-{i}"),
2880                        serde_json::json!({"thread": i}),
2881                    );
2882                })
2883            })
2884            .collect();
2885
2886        for handle in handles {
2887            handle.await.unwrap();
2888        }
2889
2890        // All 10 entries should be present
2891        assert_eq!(cache.len(), 10);
2892    }
2893
2894    #[tokio::test]
2895    async fn test_projection_state_large_payload() {
2896        let store = create_test_store();
2897        let cache = store.projection_state_cache();
2898
2899        // Create a large JSON payload (~10KB)
2900        let large_array: Vec<serde_json::Value> = (0..1000)
2901            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2902            .collect();
2903
2904        cache.insert(
2905            "large:entity-1".to_string(),
2906            serde_json::json!({"items": large_array}),
2907        );
2908
2909        let state = cache.get("large:entity-1").unwrap();
2910        let items = state["items"].as_array().unwrap();
2911        assert_eq!(items.len(), 1000);
2912    }
2913
2914    #[tokio::test]
2915    async fn test_projection_state_complex_json() {
2916        let store = create_test_store();
2917        let cache = store.projection_state_cache();
2918
2919        // Complex nested JSON structure
2920        let complex_state = serde_json::json!({
2921            "user": {
2922                "id": "user-123",
2923                "profile": {
2924                    "name": "John Doe",
2925                    "email": "john@example.com",
2926                    "settings": {
2927                        "theme": "dark",
2928                        "notifications": true
2929                    }
2930                },
2931                "roles": ["admin", "user"],
2932                "metadata": {
2933                    "created_at": "2025-01-01T00:00:00Z",
2934                    "last_login": null
2935                }
2936            }
2937        });
2938
2939        cache.insert("complex:user-123".to_string(), complex_state);
2940
2941        let state = cache.get("complex:user-123").unwrap();
2942        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2943        assert_eq!(state["user"]["roles"][0], "admin");
2944        assert!(state["user"]["metadata"]["last_login"].is_null());
2945    }
2946
2947    #[tokio::test]
2948    async fn test_projection_state_cache_iteration() {
2949        let store = create_test_store();
2950        let cache = store.projection_state_cache();
2951
2952        // Insert entries
2953        for i in 0..5 {
2954            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2955        }
2956
2957        // Iterate over all entries
2958        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2959        assert_eq!(entries.len(), 5);
2960    }
2961
2962    #[tokio::test]
2963    async fn test_projection_manager_get_entity_snapshots() {
2964        let store = create_test_store();
2965        let projection_manager = store.projection_manager();
2966
2967        // Get entity_snapshots projection specifically
2968        let projection = projection_manager.get_projection("entity_snapshots");
2969        assert!(projection.is_some());
2970        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2971    }
2972
2973    #[tokio::test]
2974    async fn test_projection_manager_get_event_counters() {
2975        let store = create_test_store();
2976        let projection_manager = store.projection_manager();
2977
2978        // Get event_counters projection specifically
2979        let projection = projection_manager.get_projection("event_counters");
2980        assert!(projection.is_some());
2981        assert_eq!(projection.unwrap().name(), "event_counters");
2982    }
2983
2984    #[tokio::test]
2985    async fn test_projection_state_cache_overwrite() {
2986        let store = create_test_store();
2987        let cache = store.projection_state_cache();
2988
2989        // Initial value
2990        cache.insert(
2991            "overwrite:entity-1".to_string(),
2992            serde_json::json!({"version": 1}),
2993        );
2994
2995        // Overwrite with new value
2996        cache.insert(
2997            "overwrite:entity-1".to_string(),
2998            serde_json::json!({"version": 2}),
2999        );
3000
3001        // Overwrite again
3002        cache.insert(
3003            "overwrite:entity-1".to_string(),
3004            serde_json::json!({"version": 3}),
3005        );
3006
3007        let state = cache.get("overwrite:entity-1").unwrap();
3008        assert_eq!(state["version"], 3);
3009
3010        // Should still be only 1 entry
3011        assert_eq!(cache.len(), 1);
3012    }
3013
3014    #[tokio::test]
3015    async fn test_projection_state_multiple_projections() {
3016        let store = create_test_store();
3017        let cache = store.projection_state_cache();
3018
3019        // Store states for different projections
3020        cache.insert(
3021            "entity_snapshots:user-1".to_string(),
3022            serde_json::json!({"name": "Alice"}),
3023        );
3024        cache.insert(
3025            "event_counters:user.created".to_string(),
3026            serde_json::json!({"count": 5}),
3027        );
3028        cache.insert(
3029            "custom_projection:order-1".to_string(),
3030            serde_json::json!({"total": 150.0}),
3031        );
3032
3033        // Verify each projection's state
3034        assert_eq!(
3035            cache.get("entity_snapshots:user-1").unwrap()["name"],
3036            "Alice"
3037        );
3038        assert_eq!(
3039            cache.get("event_counters:user.created").unwrap()["count"],
3040            5
3041        );
3042        assert_eq!(
3043            cache.get("custom_projection:order-1").unwrap()["total"],
3044            150.0
3045        );
3046    }
3047
3048    #[tokio::test]
3049    async fn test_bulk_projection_state_access() {
3050        let store = create_test_store();
3051
3052        // Ingest multiple events for different entities
3053        for i in 0..5 {
3054            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3055            store.ingest(&event).unwrap();
3056        }
3057
3058        // Get projection and verify bulk access
3059        let projection_manager = store.projection_manager();
3060        let snapshot_projection = projection_manager
3061            .get_projection("entity_snapshots")
3062            .unwrap();
3063
3064        // Verify we can access all entities
3065        for i in 0..5 {
3066            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3067            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3068        }
3069    }
3070
3071    #[tokio::test]
3072    async fn test_bulk_save_projection_states() {
3073        let store = create_test_store();
3074        let cache = store.projection_state_cache();
3075
3076        // Simulate bulk save request
3077        let states = vec![
3078            BulkSaveStateItem {
3079                entity_id: "bulk-entity-1".to_string(),
3080                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3081            },
3082            BulkSaveStateItem {
3083                entity_id: "bulk-entity-2".to_string(),
3084                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3085            },
3086            BulkSaveStateItem {
3087                entity_id: "bulk-entity-3".to_string(),
3088                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3089            },
3090        ];
3091
3092        let projection_name = "test_projection";
3093
3094        // Save states to cache (simulating bulk_save_projection_states handler)
3095        for item in &states {
3096            cache.insert(
3097                format!("{projection_name}:{}", item.entity_id),
3098                item.state.clone(),
3099            );
3100        }
3101
3102        // Verify all states were saved
3103        assert_eq!(cache.len(), 3);
3104
3105        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3106        assert_eq!(state1["name"], "Entity 1");
3107        assert_eq!(state1["value"], 100);
3108
3109        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3110        assert_eq!(state2["name"], "Entity 2");
3111        assert_eq!(state2["value"], 200);
3112
3113        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3114        assert_eq!(state3["name"], "Entity 3");
3115        assert_eq!(state3["value"], 300);
3116    }
3117
3118    #[tokio::test]
3119    async fn test_bulk_save_empty_states() {
3120        let store = create_test_store();
3121        let cache = store.projection_state_cache();
3122
3123        // Clear cache
3124        cache.clear();
3125
3126        // Empty states should work fine
3127        let states: Vec<BulkSaveStateItem> = vec![];
3128        assert_eq!(states.len(), 0);
3129
3130        // Cache should remain empty
3131        assert_eq!(cache.len(), 0);
3132    }
3133
3134    #[tokio::test]
3135    async fn test_bulk_save_overwrites_existing() {
3136        let store = create_test_store();
3137        let cache = store.projection_state_cache();
3138
3139        // Insert initial state
3140        cache.insert(
3141            "test:entity-1".to_string(),
3142            serde_json::json!({"version": 1, "data": "initial"}),
3143        );
3144
3145        // Bulk save with updated state
3146        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3147        cache.insert("test:entity-1".to_string(), new_state);
3148
3149        // Verify overwrite
3150        let state = cache.get("test:entity-1").unwrap();
3151        assert_eq!(state["version"], 2);
3152        assert_eq!(state["data"], "updated");
3153    }
3154
3155    #[tokio::test]
3156    async fn test_bulk_save_high_volume() {
3157        let store = create_test_store();
3158        let cache = store.projection_state_cache();
3159
3160        // Simulate high volume save (1000 entities)
3161        for i in 0..1000 {
3162            cache.insert(
3163                format!("volume_test:entity-{i}"),
3164                serde_json::json!({"index": i, "status": "active"}),
3165            );
3166        }
3167
3168        // Verify count
3169        assert_eq!(cache.len(), 1000);
3170
3171        // Spot check some entries
3172        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3173        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3174        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3175    }
3176
3177    #[tokio::test]
3178    async fn test_bulk_save_different_projections() {
3179        let store = create_test_store();
3180        let cache = store.projection_state_cache();
3181
3182        // Save to multiple projections in bulk
3183        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3184
3185        for proj in &projections {
3186            for i in 0..5 {
3187                cache.insert(
3188                    format!("{proj}:entity-{i}"),
3189                    serde_json::json!({"projection": proj, "id": i}),
3190                );
3191            }
3192        }
3193
3194        // Verify total count (3 projections * 5 entities)
3195        assert_eq!(cache.len(), 15);
3196
3197        // Verify each projection
3198        for proj in &projections {
3199            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3200            assert_eq!(state["projection"], *proj);
3201        }
3202    }
3203
3204    // -------------------------------------------------------------------------
3205    // Cache-fallback tests for the projection-state read handlers (v0.19.1).
3206    //
3207    // SDK-managed projections write state via save_projection_state /
3208    // bulk_save_projection_states without registering in projection_manager.
3209    // These tests verify the read handlers fall back to the cache instead of
3210    // 404-ing. Registered projection wins where both exist; cache fills the
3211    // gap when not.
3212    // -------------------------------------------------------------------------
3213
3214    #[tokio::test]
3215    async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3216        let store = create_test_store();
3217        store.projection_state_cache().insert(
3218            "assets:BTC".to_string(),
3219            serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3220        );
3221
3222        let resp = get_projection_state(
3223            State(Arc::clone(&store)),
3224            Path(("assets".to_string(), "BTC".to_string())),
3225        )
3226        .await
3227        .expect("should not error when projection is not registered");
3228
3229        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3230        assert_eq!(resp.0["state"]["symbol"], "BTC");
3231        assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3232    }
3233
3234    #[tokio::test]
3235    async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3236        let store = create_test_store();
3237
3238        let resp = get_projection_state(
3239            State(Arc::clone(&store)),
3240            Path(("assets".to_string(), "UNKNOWN".to_string())),
3241        )
3242        .await
3243        .unwrap();
3244
3245        assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3246        assert_eq!(resp.0["state"], serde_json::Value::Null);
3247    }
3248
3249    #[tokio::test]
3250    async fn get_projection_state_registered_wins_over_cache() {
3251        let store = create_test_store();
3252
3253        // Ingest an event so entity_snapshots (a registered projection) has state.
3254        let event = create_test_event("user-777", "user.created");
3255        store.ingest(&event).unwrap();
3256
3257        // Plant a conflicting cache entry for the same (projection, entity).
3258        store.projection_state_cache().insert(
3259            "entity_snapshots:user-777".to_string(),
3260            serde_json::json!({"stolen": "value"}),
3261        );
3262
3263        let resp = get_projection_state(
3264            State(Arc::clone(&store)),
3265            Path(("entity_snapshots".to_string(), "user-777".to_string())),
3266        )
3267        .await
3268        .unwrap();
3269
3270        // Registered projection wins — cache fallback is only consulted when
3271        // the registered projection has no state for this entity.
3272        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3273        assert!(
3274            resp.0["state"].get("stolen").is_none(),
3275            "cache entry must not shadow registered projection state: got {:?}",
3276            resp.0["state"]
3277        );
3278    }
3279
3280    #[tokio::test]
3281    async fn get_projection_state_summary_returns_cache_without_registration() {
3282        let store = create_test_store();
3283        let cache = store.projection_state_cache();
3284        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3285        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3286        // Different projection name — must not appear in the summary.
3287        cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3288
3289        let resp =
3290            get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3291                .await
3292                .unwrap();
3293
3294        assert_eq!(resp.0["total"], 2);
3295        let states = resp.0["states"].as_array().unwrap();
3296        let entity_ids: Vec<&str> = states
3297            .iter()
3298            .map(|s| s["entity_id"].as_str().unwrap())
3299            .collect();
3300        assert!(entity_ids.contains(&"BTC"));
3301        assert!(entity_ids.contains(&"ETH"));
3302    }
3303
3304    #[tokio::test]
3305    async fn bulk_get_projection_states_falls_back_to_cache() {
3306        let store = create_test_store();
3307        let cache = store.projection_state_cache();
3308        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3309        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3310
3311        let req = BulkGetStateRequest {
3312            entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3313        };
3314
3315        let resp = bulk_get_projection_states(
3316            State(Arc::clone(&store)),
3317            Path("assets".to_string()),
3318            Json(req),
3319        )
3320        .await
3321        .unwrap();
3322
3323        assert_eq!(resp.0["total"], 3);
3324        let states = resp.0["states"].as_array().unwrap();
3325        let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3326            .iter()
3327            .map(|s| (s["entity_id"].as_str().unwrap(), s))
3328            .collect();
3329
3330        assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3331        assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3332        assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3333        assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3334    }
3335}