Skip to main content

allsource_core/infrastructure/web/
api.rs

1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4    application::{
5        dto::{
6            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11        },
12        services::{
13            analytics::{
14                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16            },
17            pipeline::{PipelineConfig, PipelineStats},
18            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19            schema::{
20                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21                ValidateEventRequest, ValidateEventResponse,
22            },
23            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24        },
25    },
26    domain::entities::Event,
27    error::Result,
28    infrastructure::{
29        persistence::{
30            compaction::CompactionResult,
31            snapshot::{
32                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33                ListSnapshotsResponse, SnapshotInfo,
34            },
35        },
36        query::{
37            geospatial::GeoQueryRequest,
38            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39        },
40        security::middleware::OptionalAuth,
41        web::api_v1::AppState,
42    },
43    store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46    Json, Router,
47    extract::{Path, Query, State, WebSocketUpgrade},
48    response::{IntoResponse, Response},
49    routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60/// Wait for follower ACK(s) in semi-sync/sync replication modes.
61///
62/// In async mode (default), returns immediately. In semi-sync mode, waits for
63/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
64/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
65#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67    let shipper_guard = state.wal_shipper.read().await;
68    if let Some(ref shipper) = *shipper_guard {
69        let mode = shipper.replication_mode();
70        if mode == ReplicationMode::Async {
71            return;
72        }
73
74        let target_offset = shipper.current_leader_offset();
75        if target_offset == 0 {
76            return;
77        }
78
79        let shipper = Arc::clone(shipper);
80        // Drop the read guard before the async wait to avoid holding it across await
81        drop(shipper_guard);
82
83        let timer = state
84            .store
85            .metrics()
86            .replication_ack_wait_seconds
87            .start_timer();
88        let acked = shipper.wait_for_ack(target_offset).await;
89        timer.observe_duration();
90
91        if !acked {
92            tracing::warn!(
93                "Replication ACK timeout in {} mode (offset {}). \
94                 Write succeeded locally but follower confirmation pending.",
95                mode,
96                target_offset,
97            );
98        }
99    }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103    // No-op in community edition
104}
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107    let app = Router::new()
108        .route("/health", get(health))
109        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
110        .route("/api/v1/events", post(ingest_event))
111        .route("/api/v1/events/batch", post(ingest_events_batch))
112        .route("/api/v1/events/query", get(query_events))
113        .route("/api/v1/events/{event_id}", get(get_event_by_id))
114        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
115        // v0.10: Stream and event type discovery endpoints
116        .route("/api/v1/streams", get(list_streams))
117        .route("/api/v1/event-types", get(list_event_types))
118        .route("/api/v1/entities/duplicates", get(detect_duplicates))
119        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120        .route(
121            "/api/v1/entities/{entity_id}/snapshot",
122            get(get_entity_snapshot),
123        )
124        .route("/api/v1/stats", get(get_stats))
125        // v0.2: Advanced analytics endpoints
126        .route("/api/v1/analytics/frequency", get(analytics_frequency))
127        .route("/api/v1/analytics/summary", get(analytics_summary))
128        .route("/api/v1/analytics/correlation", get(analytics_correlation))
129        // v0.2: Snapshot management endpoints
130        .route("/api/v1/snapshots", post(create_snapshot))
131        .route("/api/v1/snapshots", get(list_snapshots))
132        .route(
133            "/api/v1/snapshots/{entity_id}/latest",
134            get(get_latest_snapshot),
135        )
136        // v0.2: Compaction endpoints
137        .route("/api/v1/compaction/trigger", post(trigger_compaction))
138        .route("/api/v1/compaction/stats", get(compaction_stats))
139        // v0.5: Schema registry endpoints
140        .route("/api/v1/schemas", post(register_schema))
141        .route("/api/v1/schemas", get(list_subjects))
142        .route("/api/v1/schemas/{subject}", get(get_schema))
143        .route(
144            "/api/v1/schemas/{subject}/versions",
145            get(list_schema_versions),
146        )
147        .route("/api/v1/schemas/validate", post(validate_event_schema))
148        .route(
149            "/api/v1/schemas/{subject}/compatibility",
150            put(set_compatibility_mode),
151        )
152        // v0.5: Replay and projection rebuild endpoints
153        .route("/api/v1/replay", post(start_replay))
154        .route("/api/v1/replay", get(list_replays))
155        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157        .route(
158            "/api/v1/replay/{replay_id}",
159            axum::routing::delete(delete_replay),
160        )
161        // v0.5: Stream processing pipeline endpoints
162        .route("/api/v1/pipelines", post(register_pipeline))
163        .route("/api/v1/pipelines", get(list_pipelines))
164        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166        .route(
167            "/api/v1/pipelines/{pipeline_id}",
168            axum::routing::delete(remove_pipeline),
169        )
170        .route(
171            "/api/v1/pipelines/{pipeline_id}/stats",
172            get(get_pipeline_stats),
173        )
174        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175        // v0.7: Projection State API for Query Service integration
176        .route("/api/v1/projections", get(list_projections))
177        .route("/api/v1/projections/{name}", get(get_projection))
178        .route(
179            "/api/v1/projections/{name}",
180            axum::routing::delete(delete_projection),
181        )
182        .route(
183            "/api/v1/projections/{name}/state",
184            get(get_projection_state_summary),
185        )
186        .route("/api/v1/projections/{name}/reset", post(reset_projection))
187        .route(
188            "/api/v1/projections/{name}/{entity_id}/state",
189            get(get_projection_state),
190        )
191        .route(
192            "/api/v1/projections/{name}/{entity_id}/state",
193            post(save_projection_state),
194        )
195        .route(
196            "/api/v1/projections/{name}/{entity_id}/state",
197            put(save_projection_state),
198        )
199        .route(
200            "/api/v1/projections/{name}/bulk",
201            post(bulk_get_projection_states),
202        )
203        .route(
204            "/api/v1/projections/{name}/bulk/save",
205            post(bulk_save_projection_states),
206        )
207        // v0.11: Webhook management endpoints
208        .route("/api/v1/webhooks", post(register_webhook))
209        .route("/api/v1/webhooks", get(list_webhooks))
210        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212        .route(
213            "/api/v1/webhooks/{webhook_id}",
214            axum::routing::delete(delete_webhook),
215        )
216        .route(
217            "/api/v1/webhooks/{webhook_id}/deliveries",
218            get(list_webhook_deliveries),
219        )
220        // v2.0: Advanced query features
221        .route("/api/v1/graphql", post(graphql_query))
222        .route("/api/v1/geospatial/query", post(geo_query))
223        .route("/api/v1/geospatial/stats", get(geo_stats))
224        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225        .route(
226            "/api/v1/schema-evolution/history/{event_type}",
227            get(schema_evolution_history),
228        )
229        .route(
230            "/api/v1/schema-evolution/schema/{event_type}",
231            get(schema_evolution_schema),
232        )
233        .route(
234            "/api/v1/schema-evolution/stats",
235            get(schema_evolution_stats),
236        )
237        .layer(
238            CorsLayer::new()
239                .allow_origin(Any)
240                .allow_methods(Any)
241                .allow_headers(Any),
242        )
243        .layer(TraceLayer::new_for_http())
244        .with_state(store);
245
246    let listener = tokio::net::TcpListener::bind(addr).await?;
247    axum::serve(listener, app).await?;
248
249    Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253    Json(serde_json::json!({
254        "status": "healthy",
255        "service": "allsource-core",
256        "version": env!("CARGO_PKG_VERSION")
257    }))
258}
259
260// v0.6: Prometheus metrics endpoint
261pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262    let metrics = store.metrics();
263
264    match metrics.encode() {
265        Ok(encoded) => Response::builder()
266            .status(200)
267            .header("Content-Type", "text/plain; version=0.0.4")
268            .body(encoded)
269            .unwrap()
270            .into_response(),
271        Err(e) => Response::builder()
272            .status(500)
273            .body(format!("Error encoding metrics: {e}"))
274            .unwrap()
275            .into_response(),
276    }
277}
278
279pub async fn ingest_event(
280    State(store): State<SharedStore>,
281    Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283    let expected_version = req.expected_version;
284
285    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286    let event = Event::from_strings(
287        req.event_type,
288        req.entity_id,
289        tenant_id,
290        req.payload,
291        req.metadata,
292    )?;
293
294    let event_id = event.id;
295    let timestamp = event.timestamp;
296
297    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299    tracing::info!("Event ingested: {}", event_id);
300
301    Ok(Json(IngestEventResponse {
302        event_id,
303        timestamp,
304        version: Some(new_version),
305    }))
306}
307
308/// Ingest a single event with semi-sync/sync replication ACK waiting.
309///
310/// Used by the v1 API. Tenant comes from `req.tenant_id` — the Control Plane
311/// delegation layer sets it from the authenticated caller before forwarding.
312/// Core is internal-only and does not authenticate public traffic.
313pub async fn ingest_event_v1(
314    State(state): State<AppState>,
315    Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317    let expected_version = req.expected_version;
318
319    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321    let event = Event::from_strings(
322        req.event_type,
323        req.entity_id,
324        tenant_id,
325        req.payload,
326        req.metadata,
327    )?;
328
329    let event_id = event.id;
330    let timestamp = event.timestamp;
331
332    let new_version = state
333        .store
334        .ingest_with_expected_version(&event, expected_version)?;
335
336    // Semi-sync/sync: wait for follower ACK(s) before returning
337    await_replication_ack(&state).await;
338
339    tracing::info!("Event ingested: {}", event_id);
340
341    Ok(Json(IngestEventResponse {
342        event_id,
343        timestamp,
344        version: Some(new_version),
345    }))
346}
347
348/// Batch ingest multiple events in a single request
349///
350/// This endpoint allows ingesting multiple events atomically, which is more
351/// efficient than making individual requests for each event.
352pub async fn ingest_events_batch(
353    State(store): State<SharedStore>,
354    Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356    let total = req.events.len();
357    let mut ingested_events = Vec::with_capacity(total);
358
359    for event_req in req.events {
360        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361        let expected_version = event_req.expected_version;
362
363        let event = Event::from_strings(
364            event_req.event_type,
365            event_req.entity_id,
366            tenant_id,
367            event_req.payload,
368            event_req.metadata,
369        )?;
370
371        let event_id = event.id;
372        let timestamp = event.timestamp;
373
374        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376        ingested_events.push(IngestEventResponse {
377            event_id,
378            timestamp,
379            version: Some(new_version),
380        });
381    }
382
383    let ingested = ingested_events.len();
384    tracing::info!("Batch ingested {} events", ingested);
385
386    Ok(Json(IngestEventsBatchResponse {
387        total,
388        ingested,
389        events: ingested_events,
390    }))
391}
392
393/// Batch ingest with semi-sync/sync replication ACK waiting.
394///
395/// Used by the v1 API. Per-event tenant comes from `event_req.tenant_id` — the
396/// Control Plane delegation layer sets it from the authenticated caller before
397/// forwarding. Core is internal-only and does not authenticate public traffic.
398pub async fn ingest_events_batch_v1(
399    State(state): State<AppState>,
400    Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402    let total = req.events.len();
403    let mut ingested_events = Vec::with_capacity(total);
404
405    for event_req in req.events {
406        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407        let expected_version = event_req.expected_version;
408
409        let event = Event::from_strings(
410            event_req.event_type,
411            event_req.entity_id,
412            tenant_id,
413            event_req.payload,
414            event_req.metadata,
415        )?;
416
417        let event_id = event.id;
418        let timestamp = event.timestamp;
419
420        let new_version = state
421            .store
422            .ingest_with_expected_version(&event, expected_version)?;
423
424        ingested_events.push(IngestEventResponse {
425            event_id,
426            timestamp,
427            version: Some(new_version),
428        });
429    }
430
431    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
432    await_replication_ack(&state).await;
433
434    let ingested = ingested_events.len();
435    tracing::info!("Batch ingested {} events", ingested);
436
437    Ok(Json(IngestEventsBatchResponse {
438        total,
439        ingested,
440        events: ingested_events,
441    }))
442}
443
444pub async fn query_events(
445    OptionalAuth(auth): OptionalAuth,
446    Query(req): Query<QueryEventsRequest>,
447    State(store): State<SharedStore>,
448) -> Result<Json<QueryEventsResponse>> {
449    let requested_limit = req.limit;
450    let queried_entity_id = req.entity_id.clone();
451
452    // Tenant resolution. Since Core is internal-only (bead t-0ff8), the only
453    // callers are Control Plane's delegation layer and other internal Fly
454    // services. Request param wins — that's what the gateway sets authoritatively
455    // from the authenticated caller's identity. Auth-context fallback is kept
456    // as a defense-in-depth for any internal caller that forgets to pass
457    // tenant_id but is authenticated (legacy; audit and remove once all
458    // internal callers are confirmed to set tenant_id explicitly).
459    let enforced_tenant = req
460        .tenant_id
461        .clone()
462        .or_else(|| auth.as_ref().map(|a| a.tenant_id().to_string()));
463
464    // Query without limit to get total count
465    let unlimited_req = QueryEventsRequest {
466        entity_id: req.entity_id,
467        event_type: req.event_type,
468        tenant_id: enforced_tenant,
469        as_of: req.as_of,
470        since: req.since,
471        until: req.until,
472        limit: None,
473        event_type_prefix: req.event_type_prefix,
474        payload_filter: req.payload_filter,
475    };
476    let all_events = store.query(&unlimited_req)?;
477    let total_count = all_events.len();
478
479    // Apply limit
480    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
481        all_events.into_iter().take(limit).collect()
482    } else {
483        all_events
484    };
485
486    let count = limited_events.len();
487    let has_more = count < total_count;
488    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
489
490    // Include entity_version only when filtering by a single entity_id
491    let entity_version = queried_entity_id
492        .as_deref()
493        .map(|eid| store.get_entity_version(eid));
494
495    tracing::debug!("Query returned {} events (total: {})", count, total_count);
496
497    Ok(Json(QueryEventsResponse {
498        events,
499        count,
500        total_count,
501        has_more,
502        entity_version,
503    }))
504}
505
506pub async fn list_entities(
507    State(store): State<SharedStore>,
508    Query(req): Query<ListEntitiesRequest>,
509) -> Result<Json<ListEntitiesResponse>> {
510    use std::collections::HashMap;
511
512    // Get all events matching the filters
513    let query_req = QueryEventsRequest {
514        entity_id: None,
515        event_type: None,
516        tenant_id: None,
517        as_of: None,
518        since: None,
519        until: None,
520        limit: None,
521        event_type_prefix: req.event_type_prefix,
522        payload_filter: req.payload_filter,
523    };
524    let events = store.query(&query_req)?;
525
526    // Group by entity_id
527    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
528    for event in &events {
529        entity_map
530            .entry(event.entity_id().to_string())
531            .or_default()
532            .push(event);
533    }
534
535    // Build entity summaries sorted by last event time (descending)
536    let mut summaries: Vec<EntitySummary> = entity_map
537        .into_iter()
538        .map(|(entity_id, events)| {
539            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
540            EntitySummary {
541                entity_id,
542                event_count: events.len(),
543                last_event_type: last.event_type_str().to_string(),
544                last_event_at: last.timestamp(),
545            }
546        })
547        .collect();
548    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
549
550    let total = summaries.len();
551
552    // Apply offset and limit
553    let offset = req.offset.unwrap_or(0);
554    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
555    let summaries = if let Some(limit) = req.limit {
556        let has_more = summaries.len() > limit;
557        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
558        return Ok(Json(ListEntitiesResponse {
559            entities: truncated,
560            total,
561            has_more,
562        }));
563    } else {
564        summaries
565    };
566
567    Ok(Json(ListEntitiesResponse {
568        entities: summaries,
569        total,
570        has_more: false,
571    }))
572}
573
574pub async fn detect_duplicates(
575    State(store): State<SharedStore>,
576    Query(req): Query<DetectDuplicatesRequest>,
577) -> Result<Json<DetectDuplicatesResponse>> {
578    use std::collections::HashMap;
579
580    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
581
582    // Query events scoped by the required prefix
583    let query_req = QueryEventsRequest {
584        entity_id: None,
585        event_type: None,
586        tenant_id: None,
587        as_of: None,
588        since: None,
589        until: None,
590        limit: None,
591        event_type_prefix: Some(req.event_type_prefix),
592        payload_filter: None,
593    };
594    let events = store.query(&query_req)?;
595
596    // For each entity, extract the latest event's payload fields specified by group_by
597    // Then group entities by those field values
598    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
599    for event in &events {
600        let eid = event.entity_id().to_string();
601        entity_latest
602            .entry(eid)
603            .and_modify(|existing| {
604                if event.timestamp() > existing.timestamp() {
605                    *existing = event;
606                }
607            })
608            .or_insert(event);
609    }
610
611    // Group entities by their payload field values
612    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
613    for (entity_id, event) in &entity_latest {
614        let payload = event.payload();
615        let mut key_parts = serde_json::Map::new();
616        for field in &group_by_fields {
617            let value = payload
618                .get(*field)
619                .cloned()
620                .unwrap_or(serde_json::Value::Null);
621            key_parts.insert((*field).to_string(), value);
622        }
623        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
624        groups.entry(key_str).or_default().push(entity_id.clone());
625    }
626
627    // Filter to groups with count > 1 (actual duplicates)
628    let mut duplicate_groups: Vec<DuplicateGroup> = groups
629        .into_iter()
630        .filter(|(_, ids)| ids.len() > 1)
631        .map(|(key_str, mut ids)| {
632            ids.sort();
633            let key: serde_json::Value =
634                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
635            let count = ids.len();
636            DuplicateGroup {
637                key,
638                entity_ids: ids,
639                count,
640            }
641        })
642        .collect();
643
644    // Sort by count descending for consistent output
645    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
646
647    let total = duplicate_groups.len();
648
649    // Apply offset and limit
650    let offset = req.offset.unwrap_or(0);
651    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
652
653    if let Some(limit) = req.limit {
654        let has_more = duplicate_groups.len() > limit;
655        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
656        return Ok(Json(DetectDuplicatesResponse {
657            duplicates: truncated,
658            total,
659            has_more,
660        }));
661    }
662
663    Ok(Json(DetectDuplicatesResponse {
664        duplicates: duplicate_groups,
665        total,
666        has_more: false,
667    }))
668}
669
670#[derive(Deserialize)]
671pub struct EntityStateParams {
672    as_of: Option<chrono::DateTime<chrono::Utc>>,
673}
674
675pub async fn get_entity_state(
676    State(store): State<SharedStore>,
677    Path(entity_id): Path<String>,
678    Query(params): Query<EntityStateParams>,
679) -> Result<Json<serde_json::Value>> {
680    let state = store.reconstruct_state(&entity_id, params.as_of)?;
681
682    tracing::info!("State reconstructed for entity: {}", entity_id);
683
684    Ok(Json(state))
685}
686
687pub async fn get_entity_snapshot(
688    State(store): State<SharedStore>,
689    Path(entity_id): Path<String>,
690) -> Result<Json<serde_json::Value>> {
691    let snapshot = store.get_snapshot(&entity_id)?;
692
693    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
694
695    Ok(Json(snapshot))
696}
697
698pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
699    let stats = store.stats();
700    Json(stats)
701}
702
703// v0.10: List all streams (entity_ids) in the event store
704/// Query parameters for listing streams
705#[derive(Debug, Deserialize)]
706pub struct ListStreamsParams {
707    /// Optional limit on number of streams to return
708    pub limit: Option<usize>,
709    /// Optional offset for pagination
710    pub offset: Option<usize>,
711}
712
713/// Response for listing streams
714#[derive(Debug, serde::Serialize)]
715pub struct ListStreamsResponse {
716    pub streams: Vec<StreamInfo>,
717    pub total: usize,
718}
719
720pub async fn list_streams(
721    State(store): State<SharedStore>,
722    Query(params): Query<ListStreamsParams>,
723) -> Json<ListStreamsResponse> {
724    let mut streams = store.list_streams();
725    let total = streams.len();
726
727    // Sort by last_event_at descending (most recent first)
728    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
729
730    // Apply pagination
731    if let Some(offset) = params.offset {
732        if offset < streams.len() {
733            streams = streams[offset..].to_vec();
734        } else {
735            streams = vec![];
736        }
737    }
738
739    if let Some(limit) = params.limit {
740        streams.truncate(limit);
741    }
742
743    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
744
745    Json(ListStreamsResponse { streams, total })
746}
747
748// v0.10: List all event types in the event store
749/// Query parameters for listing event types
750#[derive(Debug, Deserialize)]
751pub struct ListEventTypesParams {
752    /// Optional limit on number of event types to return
753    pub limit: Option<usize>,
754    /// Optional offset for pagination
755    pub offset: Option<usize>,
756}
757
758/// Response for listing event types
759#[derive(Debug, serde::Serialize)]
760pub struct ListEventTypesResponse {
761    pub event_types: Vec<EventTypeInfo>,
762    pub total: usize,
763}
764
765pub async fn list_event_types(
766    State(store): State<SharedStore>,
767    Query(params): Query<ListEventTypesParams>,
768) -> Json<ListEventTypesResponse> {
769    let mut event_types = store.list_event_types();
770    let total = event_types.len();
771
772    // Sort by event_count descending (most used first)
773    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
774
775    // Apply pagination
776    if let Some(offset) = params.offset {
777        if offset < event_types.len() {
778            event_types = event_types[offset..].to_vec();
779        } else {
780            event_types = vec![];
781        }
782    }
783
784    if let Some(limit) = params.limit {
785        event_types.truncate(limit);
786    }
787
788    tracing::debug!(
789        "Listed {} event types (total: {})",
790        event_types.len(),
791        total
792    );
793
794    Json(ListEventTypesResponse { event_types, total })
795}
796
797// v0.2: WebSocket endpoint for real-time event streaming
798#[derive(Debug, Deserialize)]
799pub struct WebSocketParams {
800    pub consumer_id: Option<String>,
801}
802
803pub async fn events_websocket(
804    ws: WebSocketUpgrade,
805    State(store): State<SharedStore>,
806    Query(params): Query<WebSocketParams>,
807) -> Response {
808    let websocket_manager = store.websocket_manager();
809
810    ws.on_upgrade(move |socket| async move {
811        if let Some(consumer_id) = params.consumer_id {
812            websocket_manager
813                .handle_socket_with_consumer(socket, consumer_id, store)
814                .await;
815        } else {
816            websocket_manager.handle_socket(socket).await;
817        }
818    })
819}
820
821// v0.2: Event frequency analytics endpoint
822pub async fn analytics_frequency(
823    State(store): State<SharedStore>,
824    Query(req): Query<EventFrequencyRequest>,
825) -> Result<Json<EventFrequencyResponse>> {
826    let response = AnalyticsEngine::event_frequency(&store, &req)?;
827
828    tracing::debug!(
829        "Frequency analysis returned {} buckets",
830        response.buckets.len()
831    );
832
833    Ok(Json(response))
834}
835
836// v0.2: Statistical summary endpoint
837pub async fn analytics_summary(
838    State(store): State<SharedStore>,
839    Query(req): Query<StatsSummaryRequest>,
840) -> Result<Json<StatsSummaryResponse>> {
841    let response = AnalyticsEngine::stats_summary(&store, &req)?;
842
843    tracing::debug!(
844        "Stats summary: {} events across {} entities",
845        response.total_events,
846        response.unique_entities
847    );
848
849    Ok(Json(response))
850}
851
852// v0.2: Event correlation analysis endpoint
853pub async fn analytics_correlation(
854    State(store): State<SharedStore>,
855    Query(req): Query<CorrelationRequest>,
856) -> Result<Json<CorrelationResponse>> {
857    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
858
859    tracing::debug!(
860        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
861        response.correlated_pairs,
862        response.total_a,
863        response.correlation_percentage
864    );
865
866    Ok(Json(response))
867}
868
869// v0.2: Create a snapshot for an entity
870pub async fn create_snapshot(
871    State(store): State<SharedStore>,
872    Json(req): Json<CreateSnapshotRequest>,
873) -> Result<Json<CreateSnapshotResponse>> {
874    store.create_snapshot(&req.entity_id)?;
875
876    let snapshot_manager = store.snapshot_manager();
877    let snapshot = snapshot_manager
878        .get_latest_snapshot(&req.entity_id)
879        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
880
881    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
882
883    Ok(Json(CreateSnapshotResponse {
884        snapshot_id: snapshot.id,
885        entity_id: snapshot.entity_id,
886        created_at: snapshot.created_at,
887        event_count: snapshot.event_count,
888        size_bytes: snapshot.metadata.size_bytes,
889    }))
890}
891
892// v0.2: List snapshots
893pub async fn list_snapshots(
894    State(store): State<SharedStore>,
895    Query(req): Query<ListSnapshotsRequest>,
896) -> Result<Json<ListSnapshotsResponse>> {
897    let snapshot_manager = store.snapshot_manager();
898
899    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
900        snapshot_manager
901            .get_all_snapshots(&entity_id)
902            .into_iter()
903            .map(SnapshotInfo::from)
904            .collect()
905    } else {
906        // List all entities with snapshots
907        let entities = snapshot_manager.list_entities();
908        entities
909            .iter()
910            .flat_map(|entity_id| {
911                snapshot_manager
912                    .get_all_snapshots(entity_id)
913                    .into_iter()
914                    .map(SnapshotInfo::from)
915            })
916            .collect()
917    };
918
919    let total = snapshots.len();
920
921    tracing::debug!("Listed {} snapshots", total);
922
923    Ok(Json(ListSnapshotsResponse { snapshots, total }))
924}
925
926// v0.2: Get latest snapshot for an entity
927pub async fn get_latest_snapshot(
928    State(store): State<SharedStore>,
929    Path(entity_id): Path<String>,
930) -> Result<Json<serde_json::Value>> {
931    let snapshot_manager = store.snapshot_manager();
932
933    let snapshot = snapshot_manager
934        .get_latest_snapshot(&entity_id)
935        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
936
937    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
938
939    Ok(Json(serde_json::json!({
940        "snapshot_id": snapshot.id,
941        "entity_id": snapshot.entity_id,
942        "created_at": snapshot.created_at,
943        "as_of": snapshot.as_of,
944        "event_count": snapshot.event_count,
945        "size_bytes": snapshot.metadata.size_bytes,
946        "snapshot_type": snapshot.metadata.snapshot_type,
947        "state": snapshot.state
948    })))
949}
950
951// v0.2: Trigger manual compaction
952pub async fn trigger_compaction(
953    State(store): State<SharedStore>,
954) -> Result<Json<CompactionResult>> {
955    let compaction_manager = store.compaction_manager().ok_or_else(|| {
956        crate::error::AllSourceError::InternalError(
957            "Compaction not enabled (no Parquet storage)".to_string(),
958        )
959    })?;
960
961    tracing::info!("📦 Manual compaction triggered via API");
962
963    let result = compaction_manager.compact_now()?;
964
965    Ok(Json(result))
966}
967
968// v0.2: Get compaction statistics
969pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
970    let compaction_manager = store.compaction_manager().ok_or_else(|| {
971        crate::error::AllSourceError::InternalError(
972            "Compaction not enabled (no Parquet storage)".to_string(),
973        )
974    })?;
975
976    let stats = compaction_manager.stats();
977    let config = compaction_manager.config();
978
979    Ok(Json(serde_json::json!({
980        "stats": stats,
981        "config": {
982            "min_files_to_compact": config.min_files_to_compact,
983            "target_file_size": config.target_file_size,
984            "max_file_size": config.max_file_size,
985            "small_file_threshold": config.small_file_threshold,
986            "compaction_interval_seconds": config.compaction_interval_seconds,
987            "auto_compact": config.auto_compact,
988            "strategy": config.strategy
989        }
990    })))
991}
992
993// v0.5: Register a new schema
994pub async fn register_schema(
995    State(store): State<SharedStore>,
996    Json(req): Json<RegisterSchemaRequest>,
997) -> Result<Json<RegisterSchemaResponse>> {
998    let schema_registry = store.schema_registry();
999
1000    let response =
1001        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
1002
1003    tracing::info!(
1004        "📋 Schema registered: v{} for '{}'",
1005        response.version,
1006        response.subject
1007    );
1008
1009    Ok(Json(response))
1010}
1011
1012// v0.5: Get a schema by subject and optional version
1013#[derive(Deserialize)]
1014pub struct GetSchemaParams {
1015    version: Option<u32>,
1016}
1017
1018pub async fn get_schema(
1019    State(store): State<SharedStore>,
1020    Path(subject): Path<String>,
1021    Query(params): Query<GetSchemaParams>,
1022) -> Result<Json<serde_json::Value>> {
1023    let schema_registry = store.schema_registry();
1024
1025    let schema = schema_registry.get_schema(&subject, params.version)?;
1026
1027    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1028
1029    Ok(Json(serde_json::json!({
1030        "id": schema.id,
1031        "subject": schema.subject,
1032        "version": schema.version,
1033        "schema": schema.schema,
1034        "created_at": schema.created_at,
1035        "description": schema.description,
1036        "tags": schema.tags
1037    })))
1038}
1039
1040// v0.5: List all versions of a schema subject
1041pub async fn list_schema_versions(
1042    State(store): State<SharedStore>,
1043    Path(subject): Path<String>,
1044) -> Result<Json<serde_json::Value>> {
1045    let schema_registry = store.schema_registry();
1046
1047    let versions = schema_registry.list_versions(&subject)?;
1048
1049    Ok(Json(serde_json::json!({
1050        "subject": subject,
1051        "versions": versions
1052    })))
1053}
1054
1055// v0.5: List all schema subjects
1056pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1057    let schema_registry = store.schema_registry();
1058
1059    let subjects = schema_registry.list_subjects();
1060
1061    Json(serde_json::json!({
1062        "subjects": subjects,
1063        "total": subjects.len()
1064    }))
1065}
1066
1067// v0.5: Validate an event against a schema
1068pub async fn validate_event_schema(
1069    State(store): State<SharedStore>,
1070    Json(req): Json<ValidateEventRequest>,
1071) -> Result<Json<ValidateEventResponse>> {
1072    let schema_registry = store.schema_registry();
1073
1074    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1075
1076    if response.valid {
1077        tracing::debug!(
1078            "✅ Event validated against schema '{}' v{}",
1079            req.subject,
1080            response.schema_version
1081        );
1082    } else {
1083        tracing::warn!(
1084            "❌ Event validation failed for '{}': {:?}",
1085            req.subject,
1086            response.errors
1087        );
1088    }
1089
1090    Ok(Json(response))
1091}
1092
1093// v0.5: Set compatibility mode for a subject
1094#[derive(Deserialize)]
1095pub struct SetCompatibilityRequest {
1096    compatibility: CompatibilityMode,
1097}
1098
1099pub async fn set_compatibility_mode(
1100    State(store): State<SharedStore>,
1101    Path(subject): Path<String>,
1102    Json(req): Json<SetCompatibilityRequest>,
1103) -> Json<serde_json::Value> {
1104    let schema_registry = store.schema_registry();
1105
1106    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1107
1108    tracing::info!(
1109        "🔧 Set compatibility mode for '{}' to {:?}",
1110        subject,
1111        req.compatibility
1112    );
1113
1114    Json(serde_json::json!({
1115        "subject": subject,
1116        "compatibility": req.compatibility
1117    }))
1118}
1119
1120// v0.5: Start a replay operation
1121pub async fn start_replay(
1122    State(store): State<SharedStore>,
1123    Json(req): Json<StartReplayRequest>,
1124) -> Result<Json<StartReplayResponse>> {
1125    let replay_manager = store.replay_manager();
1126
1127    let response = replay_manager.start_replay(store, req)?;
1128
1129    tracing::info!(
1130        "🔄 Started replay {} with {} events",
1131        response.replay_id,
1132        response.total_events
1133    );
1134
1135    Ok(Json(response))
1136}
1137
1138// v0.5: Get replay progress
1139pub async fn get_replay_progress(
1140    State(store): State<SharedStore>,
1141    Path(replay_id): Path<uuid::Uuid>,
1142) -> Result<Json<ReplayProgress>> {
1143    let replay_manager = store.replay_manager();
1144
1145    let progress = replay_manager.get_progress(replay_id)?;
1146
1147    Ok(Json(progress))
1148}
1149
1150// v0.5: List all replay operations
1151pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1152    let replay_manager = store.replay_manager();
1153
1154    let replays = replay_manager.list_replays();
1155
1156    Json(serde_json::json!({
1157        "replays": replays,
1158        "total": replays.len()
1159    }))
1160}
1161
1162// v0.5: Cancel a running replay
1163pub async fn cancel_replay(
1164    State(store): State<SharedStore>,
1165    Path(replay_id): Path<uuid::Uuid>,
1166) -> Result<Json<serde_json::Value>> {
1167    let replay_manager = store.replay_manager();
1168
1169    replay_manager.cancel_replay(replay_id)?;
1170
1171    tracing::info!("🛑 Cancelled replay {}", replay_id);
1172
1173    Ok(Json(serde_json::json!({
1174        "replay_id": replay_id,
1175        "status": "cancelled"
1176    })))
1177}
1178
1179// v0.5: Delete a completed replay
1180pub async fn delete_replay(
1181    State(store): State<SharedStore>,
1182    Path(replay_id): Path<uuid::Uuid>,
1183) -> Result<Json<serde_json::Value>> {
1184    let replay_manager = store.replay_manager();
1185
1186    let deleted = replay_manager.delete_replay(replay_id)?;
1187
1188    if deleted {
1189        tracing::info!("🗑️  Deleted replay {}", replay_id);
1190    }
1191
1192    Ok(Json(serde_json::json!({
1193        "replay_id": replay_id,
1194        "deleted": deleted
1195    })))
1196}
1197
1198// v0.5: Register a new pipeline
1199pub async fn register_pipeline(
1200    State(store): State<SharedStore>,
1201    Json(config): Json<PipelineConfig>,
1202) -> Result<Json<serde_json::Value>> {
1203    let pipeline_manager = store.pipeline_manager();
1204
1205    let pipeline_id = pipeline_manager.register(config.clone());
1206
1207    tracing::info!(
1208        "🔀 Pipeline registered: {} (name: {})",
1209        pipeline_id,
1210        config.name
1211    );
1212
1213    Ok(Json(serde_json::json!({
1214        "pipeline_id": pipeline_id,
1215        "name": config.name,
1216        "enabled": config.enabled
1217    })))
1218}
1219
1220// v0.5: List all pipelines
1221pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1222    let pipeline_manager = store.pipeline_manager();
1223
1224    let pipelines = pipeline_manager.list();
1225
1226    tracing::debug!("Listed {} pipelines", pipelines.len());
1227
1228    Json(serde_json::json!({
1229        "pipelines": pipelines,
1230        "total": pipelines.len()
1231    }))
1232}
1233
1234// v0.5: Get a specific pipeline
1235pub async fn get_pipeline(
1236    State(store): State<SharedStore>,
1237    Path(pipeline_id): Path<uuid::Uuid>,
1238) -> Result<Json<PipelineConfig>> {
1239    let pipeline_manager = store.pipeline_manager();
1240
1241    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1242        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1243    })?;
1244
1245    Ok(Json(pipeline.config().clone()))
1246}
1247
1248// v0.5: Remove a pipeline
1249pub async fn remove_pipeline(
1250    State(store): State<SharedStore>,
1251    Path(pipeline_id): Path<uuid::Uuid>,
1252) -> Result<Json<serde_json::Value>> {
1253    let pipeline_manager = store.pipeline_manager();
1254
1255    let removed = pipeline_manager.remove(pipeline_id);
1256
1257    if removed {
1258        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1259    }
1260
1261    Ok(Json(serde_json::json!({
1262        "pipeline_id": pipeline_id,
1263        "removed": removed
1264    })))
1265}
1266
1267// v0.5: Get statistics for all pipelines
1268pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1269    let pipeline_manager = store.pipeline_manager();
1270
1271    let stats = pipeline_manager.all_stats();
1272
1273    Json(serde_json::json!({
1274        "stats": stats,
1275        "total": stats.len()
1276    }))
1277}
1278
1279// v0.5: Get statistics for a specific pipeline
1280pub async fn get_pipeline_stats(
1281    State(store): State<SharedStore>,
1282    Path(pipeline_id): Path<uuid::Uuid>,
1283) -> Result<Json<PipelineStats>> {
1284    let pipeline_manager = store.pipeline_manager();
1285
1286    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1287        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1288    })?;
1289
1290    Ok(Json(pipeline.stats()))
1291}
1292
1293// v0.5: Reset a pipeline's state
1294pub async fn reset_pipeline(
1295    State(store): State<SharedStore>,
1296    Path(pipeline_id): Path<uuid::Uuid>,
1297) -> Result<Json<serde_json::Value>> {
1298    let pipeline_manager = store.pipeline_manager();
1299
1300    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1301        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1302    })?;
1303
1304    pipeline.reset();
1305
1306    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1307
1308    Ok(Json(serde_json::json!({
1309        "pipeline_id": pipeline_id,
1310        "reset": true
1311    })))
1312}
1313
1314// =============================================================================
1315// v0.11: Single Event Lookup by ID
1316// =============================================================================
1317
1318/// Get a single event by UUID
1319pub async fn get_event_by_id(
1320    State(store): State<SharedStore>,
1321    Path(event_id): Path<uuid::Uuid>,
1322) -> Result<Json<serde_json::Value>> {
1323    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1324        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1325    })?;
1326
1327    let dto = EventDto::from(&event);
1328
1329    tracing::debug!("Event retrieved by ID: {}", event_id);
1330
1331    Ok(Json(serde_json::json!({
1332        "event": dto,
1333        "found": true
1334    })))
1335}
1336
1337// =============================================================================
1338// v0.7: Projection State API for Query Service Integration
1339// =============================================================================
1340
1341/// List all registered projections
1342pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1343    let projection_manager = store.projection_manager();
1344    let status_map = store.projection_status();
1345
1346    let projections: Vec<serde_json::Value> = projection_manager
1347        .list_projections()
1348        .iter()
1349        .map(|(name, projection)| {
1350            let status = status_map
1351                .get(name)
1352                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1353            serde_json::json!({
1354                "name": name,
1355                "type": format!("{:?}", projection.name()),
1356                "status": status,
1357            })
1358        })
1359        .collect();
1360
1361    tracing::debug!("Listed {} projections", projections.len());
1362
1363    Json(serde_json::json!({
1364        "projections": projections,
1365        "total": projections.len()
1366    }))
1367}
1368
1369/// Get projection metadata by name
1370pub async fn get_projection(
1371    State(store): State<SharedStore>,
1372    Path(name): Path<String>,
1373) -> Result<Json<serde_json::Value>> {
1374    let projection_manager = store.projection_manager();
1375
1376    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1377        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1378    })?;
1379
1380    Ok(Json(serde_json::json!({
1381        "name": projection.name(),
1382        "found": true
1383    })))
1384}
1385
1386/// Get projection state for a specific entity.
1387///
1388/// Resolution order:
1389/// 1. **Registered projection** — if `name` is registered with the projection
1390///    manager, return the projection's own `get_state(entity_id)` output.
1391/// 2. **Projection state cache** — otherwise fall back to whatever was written
1392///    via `save_projection_state` / `bulk_save_projection_states`. This supports
1393///    SDK-managed projections (e.g. the Rust SDK's `ProjectionWorker`) that
1394///    compute state client-side and push it back without registering a
1395///    projection in Core's manager.
1396///
1397/// Returns `found: false` with `state: null` when neither source has state.
1398pub async fn get_projection_state(
1399    State(store): State<SharedStore>,
1400    Path((name, entity_id)): Path<(String, String)>,
1401) -> Result<Json<serde_json::Value>> {
1402    let state = store
1403        .projection_manager()
1404        .get_projection(&name)
1405        .and_then(|p| p.get_state(&entity_id))
1406        .or_else(|| {
1407            store
1408                .projection_state_cache()
1409                .get(&format!("{name}:{entity_id}"))
1410                .map(|entry| entry.value().clone())
1411        });
1412
1413    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1414
1415    Ok(Json(serde_json::json!({
1416        "projection": name,
1417        "entity_id": entity_id,
1418        "state": state,
1419        "found": state.is_some()
1420    })))
1421}
1422
1423/// Delete (clear) a projection by name
1424///
1425/// Removes all state from the projection. The projection definition remains
1426/// registered but its accumulated state is cleared.
1427pub async fn delete_projection(
1428    State(store): State<SharedStore>,
1429    Path(name): Path<String>,
1430) -> Result<Json<serde_json::Value>> {
1431    let projection_manager = store.projection_manager();
1432
1433    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1434        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1435    })?;
1436
1437    projection.clear();
1438
1439    // Also clear any cached state for this projection
1440    let cache = store.projection_state_cache();
1441    let prefix = format!("{name}:");
1442    let keys_to_remove: Vec<String> = cache
1443        .iter()
1444        .filter(|entry| entry.key().starts_with(&prefix))
1445        .map(|entry| entry.key().clone())
1446        .collect();
1447    for key in keys_to_remove {
1448        cache.remove(&key);
1449    }
1450
1451    tracing::info!("Projection deleted (cleared): {}", name);
1452
1453    Ok(Json(serde_json::json!({
1454        "projection": name,
1455        "deleted": true
1456    })))
1457}
1458
1459/// Get aggregate projection state (all entities).
1460///
1461/// Returns the cached states written via `save_projection_state` /
1462/// `bulk_save_projection_states`. The projection does NOT need to be
1463/// registered with the projection manager — this supports SDK-managed
1464/// projections that push state without server-side registration.
1465///
1466/// Returns an empty list when no state has been written.
1467pub async fn get_projection_state_summary(
1468    State(store): State<SharedStore>,
1469    Path(name): Path<String>,
1470) -> Result<Json<serde_json::Value>> {
1471    let cache = store.projection_state_cache();
1472    let prefix = format!("{name}:");
1473    let states: Vec<serde_json::Value> = cache
1474        .iter()
1475        .filter(|entry| entry.key().starts_with(&prefix))
1476        .map(|entry| {
1477            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1478            serde_json::json!({
1479                "entity_id": entity_id,
1480                "state": entry.value().clone()
1481            })
1482        })
1483        .collect();
1484
1485    let total = states.len();
1486
1487    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1488
1489    Ok(Json(serde_json::json!({
1490        "projection": name,
1491        "states": states,
1492        "total": total
1493    })))
1494}
1495
1496/// Reset a projection to its initial state
1497///
1498/// Clears all accumulated state and reprocesses events from the beginning.
1499pub async fn reset_projection(
1500    State(store): State<SharedStore>,
1501    Path(name): Path<String>,
1502) -> Result<Json<serde_json::Value>> {
1503    let reprocessed = store.reset_projection(&name)?;
1504
1505    tracing::info!(
1506        "Projection reset: {} ({} events reprocessed)",
1507        name,
1508        reprocessed
1509    );
1510
1511    Ok(Json(serde_json::json!({
1512        "projection": name,
1513        "reset": true,
1514        "events_reprocessed": reprocessed
1515    })))
1516}
1517
1518/// Pause a projection
1519///
1520/// Sets the projection status to "paused" so it stops processing new events.
1521pub async fn pause_projection(
1522    State(store): State<SharedStore>,
1523    Path(name): Path<String>,
1524) -> Result<Json<serde_json::Value>> {
1525    let projection_manager = store.projection_manager();
1526
1527    // Verify projection exists
1528    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1529        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1530    })?;
1531
1532    store
1533        .projection_status()
1534        .insert(name.clone(), "paused".to_string());
1535
1536    tracing::info!("Projection paused: {}", name);
1537
1538    Ok(Json(serde_json::json!({
1539        "projection": name,
1540        "status": "paused"
1541    })))
1542}
1543
1544/// Start (resume) a projection
1545///
1546/// Sets the projection status to "running" so it resumes processing events.
1547pub async fn start_projection(
1548    State(store): State<SharedStore>,
1549    Path(name): Path<String>,
1550) -> Result<Json<serde_json::Value>> {
1551    let projection_manager = store.projection_manager();
1552
1553    // Verify projection exists
1554    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1555        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1556    })?;
1557
1558    store
1559        .projection_status()
1560        .insert(name.clone(), "running".to_string());
1561
1562    tracing::info!("Projection started: {}", name);
1563
1564    Ok(Json(serde_json::json!({
1565        "projection": name,
1566        "status": "running"
1567    })))
1568}
1569
1570/// Request body for saving projection state
1571#[derive(Debug, Deserialize)]
1572pub struct SaveProjectionStateRequest {
1573    pub state: serde_json::Value,
1574}
1575
1576/// Save/update projection state for an entity
1577///
1578/// This endpoint allows external services (like Elixir Query Service) to
1579/// store computed projection state back to the Core for persistence.
1580pub async fn save_projection_state(
1581    State(store): State<SharedStore>,
1582    Path((name, entity_id)): Path<(String, String)>,
1583    Json(req): Json<SaveProjectionStateRequest>,
1584) -> Result<Json<serde_json::Value>> {
1585    let projection_cache = store.projection_state_cache();
1586
1587    // Store in the projection state cache
1588    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1589
1590    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1591
1592    Ok(Json(serde_json::json!({
1593        "projection": name,
1594        "entity_id": entity_id,
1595        "saved": true
1596    })))
1597}
1598
1599/// Bulk get projection states for multiple entities
1600///
1601/// Efficient endpoint for fetching multiple entity states in a single request.
1602#[derive(Debug, Deserialize)]
1603pub struct BulkGetStateRequest {
1604    pub entity_ids: Vec<String>,
1605}
1606
1607/// Bulk save projection states for multiple entities
1608///
1609/// Efficient endpoint for saving multiple entity states in a single request.
1610#[derive(Debug, Deserialize)]
1611pub struct BulkSaveStateRequest {
1612    pub states: Vec<BulkSaveStateItem>,
1613}
1614
1615#[derive(Debug, Deserialize)]
1616pub struct BulkSaveStateItem {
1617    pub entity_id: String,
1618    pub state: serde_json::Value,
1619}
1620
1621pub async fn bulk_get_projection_states(
1622    State(store): State<SharedStore>,
1623    Path(name): Path<String>,
1624    Json(req): Json<BulkGetStateRequest>,
1625) -> Result<Json<serde_json::Value>> {
1626    // Same fallback rule as `get_projection_state`: registered projection
1627    // wins, cache is the fallback. This lets SDK-managed projections read
1628    // their pushed-back state without registering in Core's projection manager.
1629    let projection = store.projection_manager().get_projection(&name);
1630    let cache = store.projection_state_cache();
1631
1632    let states: Vec<serde_json::Value> = req
1633        .entity_ids
1634        .iter()
1635        .map(|entity_id| {
1636            let state = projection
1637                .as_ref()
1638                .and_then(|p| p.get_state(entity_id))
1639                .or_else(|| {
1640                    cache
1641                        .get(&format!("{name}:{entity_id}"))
1642                        .map(|entry| entry.value().clone())
1643                });
1644            serde_json::json!({
1645                "entity_id": entity_id,
1646                "state": state,
1647                "found": state.is_some()
1648            })
1649        })
1650        .collect();
1651
1652    tracing::debug!(
1653        "Bulk projection state retrieved: {} entities from {}",
1654        states.len(),
1655        name
1656    );
1657
1658    Ok(Json(serde_json::json!({
1659        "projection": name,
1660        "states": states,
1661        "total": states.len()
1662    })))
1663}
1664
1665/// Bulk save projection states for multiple entities
1666///
1667/// This endpoint allows efficient batch saving of projection states,
1668/// critical for high-throughput event processing pipelines.
1669pub async fn bulk_save_projection_states(
1670    State(store): State<SharedStore>,
1671    Path(name): Path<String>,
1672    Json(req): Json<BulkSaveStateRequest>,
1673) -> Result<Json<serde_json::Value>> {
1674    let projection_cache = store.projection_state_cache();
1675
1676    let mut saved_count = 0;
1677    for item in &req.states {
1678        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1679        saved_count += 1;
1680    }
1681
1682    tracing::info!(
1683        "Bulk projection state saved: {} entities for {}",
1684        saved_count,
1685        name
1686    );
1687
1688    Ok(Json(serde_json::json!({
1689        "projection": name,
1690        "saved": saved_count,
1691        "total": req.states.len()
1692    })))
1693}
1694
1695// =============================================================================
1696// v0.11: Webhook Management API
1697// =============================================================================
1698
1699/// Query parameters for listing webhooks
1700#[derive(Debug, Deserialize)]
1701pub struct ListWebhooksParams {
1702    pub tenant_id: Option<String>,
1703}
1704
1705/// Register a new webhook subscription
1706pub async fn register_webhook(
1707    State(store): State<SharedStore>,
1708    Json(req): Json<RegisterWebhookRequest>,
1709) -> Json<serde_json::Value> {
1710    let registry = store.webhook_registry();
1711    let webhook = registry.register(req);
1712
1713    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1714
1715    Json(serde_json::json!({
1716        "webhook": webhook,
1717        "created": true
1718    }))
1719}
1720
1721/// List webhooks, optionally filtered by tenant_id
1722pub async fn list_webhooks(
1723    State(store): State<SharedStore>,
1724    Query(params): Query<ListWebhooksParams>,
1725) -> Json<serde_json::Value> {
1726    let registry = store.webhook_registry();
1727
1728    let webhooks = if let Some(tenant_id) = params.tenant_id {
1729        registry.list_by_tenant(&tenant_id)
1730    } else {
1731        // Without tenant filter, return empty (tenants should always filter)
1732        vec![]
1733    };
1734
1735    let total = webhooks.len();
1736
1737    Json(serde_json::json!({
1738        "webhooks": webhooks,
1739        "total": total
1740    }))
1741}
1742
1743/// Get a specific webhook by ID
1744pub async fn get_webhook(
1745    State(store): State<SharedStore>,
1746    Path(webhook_id): Path<uuid::Uuid>,
1747) -> Result<Json<serde_json::Value>> {
1748    let registry = store.webhook_registry();
1749
1750    let webhook = registry.get(webhook_id).ok_or_else(|| {
1751        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1752    })?;
1753
1754    Ok(Json(serde_json::json!({
1755        "webhook": webhook,
1756        "found": true
1757    })))
1758}
1759
1760/// Update a webhook subscription
1761pub async fn update_webhook(
1762    State(store): State<SharedStore>,
1763    Path(webhook_id): Path<uuid::Uuid>,
1764    Json(req): Json<UpdateWebhookRequest>,
1765) -> Result<Json<serde_json::Value>> {
1766    let registry = store.webhook_registry();
1767
1768    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1769        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1770    })?;
1771
1772    tracing::info!("Webhook updated: {}", webhook_id);
1773
1774    Ok(Json(serde_json::json!({
1775        "webhook": webhook,
1776        "updated": true
1777    })))
1778}
1779
1780/// Delete a webhook subscription
1781pub async fn delete_webhook(
1782    State(store): State<SharedStore>,
1783    Path(webhook_id): Path<uuid::Uuid>,
1784) -> Result<Json<serde_json::Value>> {
1785    let registry = store.webhook_registry();
1786
1787    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1788        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1789    })?;
1790
1791    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1792
1793    Ok(Json(serde_json::json!({
1794        "webhook_id": webhook_id,
1795        "deleted": true
1796    })))
1797}
1798
1799/// Query parameters for listing webhook deliveries
1800#[derive(Debug, Deserialize)]
1801pub struct ListDeliveriesParams {
1802    pub limit: Option<usize>,
1803}
1804
1805/// List delivery history for a webhook
1806pub async fn list_webhook_deliveries(
1807    State(store): State<SharedStore>,
1808    Path(webhook_id): Path<uuid::Uuid>,
1809    Query(params): Query<ListDeliveriesParams>,
1810) -> Result<Json<serde_json::Value>> {
1811    let registry = store.webhook_registry();
1812
1813    // Verify webhook exists
1814    registry.get(webhook_id).ok_or_else(|| {
1815        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1816    })?;
1817
1818    let limit = params.limit.unwrap_or(50);
1819    let deliveries = registry.get_deliveries(webhook_id, limit);
1820    let total = deliveries.len();
1821
1822    Ok(Json(serde_json::json!({
1823        "webhook_id": webhook_id,
1824        "deliveries": deliveries,
1825        "total": total
1826    })))
1827}
1828
1829// =============================================================================
1830// v2.0: Advanced Query Features
1831// =============================================================================
1832
1833/// EventQL: Execute SQL queries over events using DataFusion
1834#[cfg(feature = "analytics")]
1835pub async fn eventql_query(
1836    State(store): State<SharedStore>,
1837    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1838) -> Result<Json<serde_json::Value>> {
1839    let events = store.snapshot_events();
1840    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1841        Ok(response) => Ok(Json(serde_json::json!({
1842            "columns": response.columns,
1843            "rows": response.rows,
1844            "row_count": response.row_count,
1845        }))),
1846        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1847    }
1848}
1849
1850/// GraphQL: Execute GraphQL queries
1851pub async fn graphql_query(
1852    State(store): State<SharedStore>,
1853    Json(req): Json<GraphQLRequest>,
1854) -> Json<serde_json::Value> {
1855    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1856        Ok(f) => f,
1857        Err(e) => {
1858            return Json(
1859                serde_json::to_value(GraphQLResponse {
1860                    data: None,
1861                    errors: vec![GraphQLError { message: e }],
1862                })
1863                .unwrap(),
1864            );
1865        }
1866    };
1867
1868    let mut data = serde_json::Map::new();
1869    let mut errors = Vec::new();
1870
1871    for field in &fields {
1872        match field.name.as_str() {
1873            "events" => {
1874                let request = crate::application::dto::QueryEventsRequest {
1875                    entity_id: field.arguments.get("entity_id").cloned(),
1876                    event_type: field.arguments.get("event_type").cloned(),
1877                    tenant_id: field.arguments.get("tenant_id").cloned(),
1878                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1879                    as_of: None,
1880                    since: None,
1881                    until: None,
1882                    event_type_prefix: None,
1883                    payload_filter: None,
1884                };
1885                match store.query(&request) {
1886                    Ok(events) => {
1887                        let json_events: Vec<serde_json::Value> = events
1888                            .iter()
1889                            .map(|e| {
1890                                crate::infrastructure::query::graphql::event_to_json(
1891                                    e,
1892                                    &field.fields,
1893                                )
1894                            })
1895                            .collect();
1896                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1897                    }
1898                    Err(e) => errors.push(GraphQLError {
1899                        message: format!("events query failed: {e}"),
1900                    }),
1901                }
1902            }
1903            "event" => {
1904                if let Some(id_str) = field.arguments.get("id") {
1905                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1906                        match store.get_event_by_id(&id) {
1907                            Ok(Some(event)) => {
1908                                data.insert(
1909                                    "event".to_string(),
1910                                    crate::infrastructure::query::graphql::event_to_json(
1911                                        &event,
1912                                        &field.fields,
1913                                    ),
1914                                );
1915                            }
1916                            Ok(None) => {
1917                                data.insert("event".to_string(), serde_json::Value::Null);
1918                            }
1919                            Err(e) => errors.push(GraphQLError {
1920                                message: format!("event lookup failed: {e}"),
1921                            }),
1922                        }
1923                    } else {
1924                        errors.push(GraphQLError {
1925                            message: format!("Invalid UUID: {id_str}"),
1926                        });
1927                    }
1928                } else {
1929                    errors.push(GraphQLError {
1930                        message: "event query requires 'id' argument".to_string(),
1931                    });
1932                }
1933            }
1934            "projections" => {
1935                let pm = store.projection_manager();
1936                let names: Vec<serde_json::Value> = pm
1937                    .list_projections()
1938                    .iter()
1939                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1940                    .collect();
1941                data.insert("projections".to_string(), serde_json::Value::Array(names));
1942            }
1943            "stats" => {
1944                let stats = store.stats();
1945                data.insert(
1946                    "stats".to_string(),
1947                    serde_json::json!({
1948                        "total_events": stats.total_events,
1949                        "total_entities": stats.total_entities,
1950                        "total_event_types": stats.total_event_types,
1951                    }),
1952                );
1953            }
1954            "__schema" => {
1955                data.insert(
1956                    "__schema".to_string(),
1957                    crate::infrastructure::query::graphql::introspection_schema(),
1958                );
1959            }
1960            other => {
1961                errors.push(GraphQLError {
1962                    message: format!("Unknown field: {other}"),
1963                });
1964            }
1965        }
1966    }
1967
1968    Json(
1969        serde_json::to_value(GraphQLResponse {
1970            data: Some(serde_json::Value::Object(data)),
1971            errors,
1972        })
1973        .unwrap(),
1974    )
1975}
1976
1977/// Geospatial: Query events by location
1978pub async fn geo_query(
1979    State(store): State<SharedStore>,
1980    Json(req): Json<GeoQueryRequest>,
1981) -> Json<serde_json::Value> {
1982    let events = store.snapshot_events();
1983    let geo_index = store.geo_index();
1984    let results =
1985        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1986    let total = results.len();
1987    Json(serde_json::json!({
1988        "results": results,
1989        "total": total,
1990    }))
1991}
1992
1993/// Geospatial index stats
1994pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1995    let stats = store.geo_index().stats();
1996    Json(serde_json::json!(stats))
1997}
1998
1999/// Exactly-once processing stats
2000pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2001    let stats = store.exactly_once().stats();
2002    Json(serde_json::json!(stats))
2003}
2004
2005/// Schema evolution history for an event type
2006pub async fn schema_evolution_history(
2007    State(store): State<SharedStore>,
2008    Path(event_type): Path<String>,
2009) -> Json<serde_json::Value> {
2010    let mgr = store.schema_evolution();
2011    let history = mgr.get_history(&event_type);
2012    let version = mgr.get_version(&event_type);
2013    Json(serde_json::json!({
2014        "event_type": event_type,
2015        "current_version": version,
2016        "history": history,
2017    }))
2018}
2019
2020/// Current inferred schema for an event type
2021pub async fn schema_evolution_schema(
2022    State(store): State<SharedStore>,
2023    Path(event_type): Path<String>,
2024) -> Json<serde_json::Value> {
2025    let mgr = store.schema_evolution();
2026    if let Some(schema) = mgr.get_schema(&event_type) {
2027        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2028        Json(serde_json::json!({
2029            "event_type": event_type,
2030            "version": mgr.get_version(&event_type),
2031            "inferred_schema": schema,
2032            "json_schema": json_schema,
2033        }))
2034    } else {
2035        Json(serde_json::json!({
2036            "event_type": event_type,
2037            "error": "No schema inferred for this event type"
2038        }))
2039    }
2040}
2041
2042/// Schema evolution stats
2043pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2044    let stats = store.schema_evolution().stats();
2045    let event_types = store.schema_evolution().list_event_types();
2046    Json(serde_json::json!({
2047        "stats": stats,
2048        "tracked_event_types": event_types,
2049    }))
2050}
2051
2052// =============================================================================
2053// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2054// =============================================================================
2055
2056/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2057#[cfg(feature = "embedded-sync")]
2058pub async fn sync_pull_handler(
2059    State(state): State<AppState>,
2060    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2061) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2062    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2063
2064    let store = &state.store;
2065
2066    // Compute "since" threshold from the client's version vector
2067    // We return all events the client hasn't seen yet
2068    let since = request
2069        .version_vector
2070        .values()
2071        .map(|ts| ts.physical_ms)
2072        .min()
2073        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2074
2075    let events = store.query(&crate::application::dto::QueryEventsRequest {
2076        entity_id: None,
2077        event_type: None,
2078        tenant_id: None,
2079        as_of: None,
2080        since,
2081        until: None,
2082        limit: None,
2083        event_type_prefix: None,
2084        payload_filter: None,
2085    })?;
2086
2087    // Convert domain events to ReplicatedEvent wire format
2088    let mut replicated = Vec::with_capacity(events.len());
2089    let mut last_ms = 0u64;
2090    let mut logical = 0u32;
2091
2092    for event in &events {
2093        let event_ms = event.timestamp().timestamp_millis() as u64;
2094        if event_ms == last_ms {
2095            logical += 1;
2096        } else {
2097            last_ms = event_ms;
2098            logical = 0;
2099        }
2100
2101        replicated.push(ReplicatedEvent {
2102            event_id: event.id().to_string(),
2103            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2104            origin_region: "server".to_string(),
2105            event_data: serde_json::json!({
2106                "event_type": event.event_type_str(),
2107                "entity_id": event.entity_id_str(),
2108                "tenant_id": event.tenant_id_str(),
2109                "payload": event.payload,
2110                "metadata": event.metadata,
2111            }),
2112        });
2113    }
2114
2115    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2116        events: replicated,
2117        version_vector: std::collections::BTreeMap::new(),
2118    }))
2119}
2120
2121/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2122#[cfg(feature = "embedded-sync")]
2123pub async fn sync_push_handler(
2124    State(state): State<AppState>,
2125    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2126) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2127    let store = &state.store;
2128
2129    let mut accepted = 0usize;
2130    let mut skipped = 0usize;
2131
2132    for rep_event in &request.events {
2133        let event_data = &rep_event.event_data;
2134        let event_type = event_data
2135            .get("event_type")
2136            .and_then(|v| v.as_str())
2137            .unwrap_or("unknown")
2138            .to_string();
2139        let entity_id = event_data
2140            .get("entity_id")
2141            .and_then(|v| v.as_str())
2142            .unwrap_or("unknown")
2143            .to_string();
2144        let tenant_id = event_data
2145            .get("tenant_id")
2146            .and_then(|v| v.as_str())
2147            .unwrap_or("default")
2148            .to_string();
2149        let payload = event_data
2150            .get("payload")
2151            .cloned()
2152            .unwrap_or(serde_json::json!({}));
2153        let metadata = event_data.get("metadata").cloned();
2154
2155        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2156            Ok(domain_event) => {
2157                store.ingest(&domain_event)?;
2158                accepted += 1;
2159            }
2160            Err(_) => {
2161                skipped += 1;
2162            }
2163        }
2164    }
2165
2166    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2167        accepted,
2168        skipped,
2169        version_vector: std::collections::BTreeMap::new(),
2170    }))
2171}
2172
2173// =============================================================================
2174// Consumer endpoints for durable subscriptions (v0.14)
2175// =============================================================================
2176
2177/// POST /api/v1/consumers — Register a durable consumer
2178pub async fn register_consumer(
2179    State(store): State<SharedStore>,
2180    Json(req): Json<RegisterConsumerRequest>,
2181) -> Result<Json<ConsumerResponse>> {
2182    let consumer = store
2183        .consumer_registry()
2184        .register(&req.consumer_id, &req.event_type_filters);
2185
2186    Ok(Json(ConsumerResponse {
2187        consumer_id: consumer.consumer_id,
2188        event_type_filters: consumer.event_type_filters,
2189        cursor_position: consumer.cursor_position,
2190    }))
2191}
2192
2193/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2194pub async fn get_consumer(
2195    State(store): State<SharedStore>,
2196    Path(consumer_id): Path<String>,
2197) -> Result<Json<ConsumerResponse>> {
2198    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2199
2200    Ok(Json(ConsumerResponse {
2201        consumer_id: consumer.consumer_id,
2202        event_type_filters: consumer.event_type_filters,
2203        cursor_position: consumer.cursor_position,
2204    }))
2205}
2206
2207/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2208#[derive(Debug, Deserialize)]
2209pub struct ConsumerPollQuery {
2210    pub limit: Option<usize>,
2211}
2212
2213pub async fn poll_consumer_events(
2214    State(store): State<SharedStore>,
2215    Path(consumer_id): Path<String>,
2216    Query(query): Query<ConsumerPollQuery>,
2217) -> Result<Json<ConsumerEventsResponse>> {
2218    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2219    let offset = consumer.cursor_position.unwrap_or(0);
2220    let limit = query.limit.unwrap_or(100);
2221
2222    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2223    let count = events.len();
2224
2225    let consumer_events: Vec<ConsumerEventDto> = events
2226        .into_iter()
2227        .map(|(position, event)| ConsumerEventDto {
2228            position,
2229            event: EventDto::from(&event),
2230        })
2231        .collect();
2232
2233    Ok(Json(ConsumerEventsResponse {
2234        events: consumer_events,
2235        count,
2236    }))
2237}
2238
2239/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2240pub async fn ack_consumer(
2241    State(store): State<SharedStore>,
2242    Path(consumer_id): Path<String>,
2243    Json(req): Json<AckRequest>,
2244) -> Result<Json<serde_json::Value>> {
2245    let max_offset = store.total_events() as u64;
2246
2247    store
2248        .consumer_registry()
2249        .ack(&consumer_id, req.position, max_offset)
2250        .map_err(crate::error::AllSourceError::InvalidInput)?;
2251
2252    Ok(Json(serde_json::json!({
2253        "status": "ok",
2254        "consumer_id": consumer_id,
2255        "position": req.position,
2256    })))
2257}
2258
2259#[cfg(test)]
2260mod tests {
2261    use super::*;
2262    use crate::{domain::entities::Event, store::EventStore};
2263
2264    fn create_test_store() -> Arc<EventStore> {
2265        Arc::new(EventStore::new())
2266    }
2267
2268    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2269        Event::from_strings(
2270            event_type.to_string(),
2271            entity_id.to_string(),
2272            "test-stream".to_string(),
2273            serde_json::json!({
2274                "name": "Test",
2275                "value": 42
2276            }),
2277            None,
2278        )
2279        .unwrap()
2280    }
2281
2282    #[tokio::test]
2283    async fn test_query_events_has_more_and_total_count() {
2284        let store = create_test_store();
2285
2286        // Ingest 50 events
2287        for i in 0..50 {
2288            store
2289                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2290                .unwrap();
2291        }
2292
2293        // Query with limit=10 — should get has_more=true, total_count=50
2294        let req = QueryEventsRequest {
2295            entity_id: None,
2296            event_type: None,
2297            tenant_id: None,
2298            as_of: None,
2299            since: None,
2300            until: None,
2301            limit: Some(10),
2302            event_type_prefix: None,
2303            payload_filter: None,
2304        };
2305
2306        let requested_limit = req.limit;
2307        let unlimited_req = QueryEventsRequest {
2308            limit: None,
2309            ..QueryEventsRequest {
2310                entity_id: req.entity_id,
2311                event_type: req.event_type,
2312                tenant_id: req.tenant_id,
2313                as_of: req.as_of,
2314                since: req.since,
2315                until: req.until,
2316                limit: None,
2317                event_type_prefix: req.event_type_prefix,
2318                payload_filter: req.payload_filter,
2319            }
2320        };
2321        let all_events = store.query(&unlimited_req).unwrap();
2322        let total_count = all_events.len();
2323        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2324            all_events.into_iter().take(limit).collect()
2325        } else {
2326            all_events
2327        };
2328        let count = limited_events.len();
2329        let has_more = count < total_count;
2330
2331        assert_eq!(count, 10);
2332        assert_eq!(total_count, 50);
2333        assert!(has_more);
2334    }
2335
2336    #[tokio::test]
2337    async fn test_query_events_no_more_results() {
2338        let store = create_test_store();
2339
2340        // Ingest 5 events
2341        for i in 0..5 {
2342            store
2343                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2344                .unwrap();
2345        }
2346
2347        // Query with limit=100 — should get has_more=false, total_count=5
2348        let all_events = store
2349            .query(&QueryEventsRequest {
2350                entity_id: None,
2351                event_type: None,
2352                tenant_id: None,
2353                as_of: None,
2354                since: None,
2355                until: None,
2356                limit: None,
2357                event_type_prefix: None,
2358                payload_filter: None,
2359            })
2360            .unwrap();
2361        let total_count = all_events.len();
2362        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2363        let count = limited_events.len();
2364        let has_more = count < total_count;
2365
2366        assert_eq!(count, 5);
2367        assert_eq!(total_count, 5);
2368        assert!(!has_more);
2369    }
2370
2371    #[tokio::test]
2372    async fn test_list_entities_by_type_prefix() {
2373        let store = create_test_store();
2374
2375        // 3 index entities
2376        store
2377            .ingest(&create_test_event("idx-1", "index.created"))
2378            .unwrap();
2379        store
2380            .ingest(&create_test_event("idx-1", "index.updated"))
2381            .unwrap();
2382        store
2383            .ingest(&create_test_event("idx-2", "index.created"))
2384            .unwrap();
2385        store
2386            .ingest(&create_test_event("idx-3", "index.created"))
2387            .unwrap();
2388        // 2 trade entities
2389        store
2390            .ingest(&create_test_event("trade-1", "trade.created"))
2391            .unwrap();
2392        store
2393            .ingest(&create_test_event("trade-2", "trade.created"))
2394            .unwrap();
2395
2396        // List entities for index.*
2397        let req = ListEntitiesRequest {
2398            event_type_prefix: Some("index.".to_string()),
2399            payload_filter: None,
2400            limit: None,
2401            offset: None,
2402        };
2403        let query_req = QueryEventsRequest {
2404            entity_id: None,
2405            event_type: None,
2406            tenant_id: None,
2407            as_of: None,
2408            since: None,
2409            until: None,
2410            limit: None,
2411            event_type_prefix: req.event_type_prefix,
2412            payload_filter: req.payload_filter,
2413        };
2414        let events = store.query(&query_req).unwrap();
2415
2416        // Group and verify
2417        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2418            std::collections::HashMap::new();
2419        for event in &events {
2420            entity_map
2421                .entry(event.entity_id().to_string())
2422                .or_default()
2423                .push(event);
2424        }
2425
2426        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2427        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2428        assert_eq!(entity_map["idx-2"].len(), 1);
2429        assert_eq!(entity_map["idx-3"].len(), 1);
2430    }
2431
2432    fn create_test_event_with_payload(
2433        entity_id: &str,
2434        event_type: &str,
2435        payload: serde_json::Value,
2436    ) -> Event {
2437        Event::from_strings(
2438            event_type.to_string(),
2439            entity_id.to_string(),
2440            "test-stream".to_string(),
2441            payload,
2442            None,
2443        )
2444        .unwrap()
2445    }
2446
2447    #[tokio::test]
2448    async fn test_detect_duplicates_by_payload_fields() {
2449        let store = create_test_store();
2450
2451        // Create entities with duplicate "name" field values
2452        store
2453            .ingest(&create_test_event_with_payload(
2454                "idx-1",
2455                "index.created",
2456                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2457            ))
2458            .unwrap();
2459        store
2460            .ingest(&create_test_event_with_payload(
2461                "idx-2",
2462                "index.created",
2463                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2464            ))
2465            .unwrap();
2466        store
2467            .ingest(&create_test_event_with_payload(
2468                "idx-3",
2469                "index.created",
2470                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2471            ))
2472            .unwrap();
2473        store
2474            .ingest(&create_test_event_with_payload(
2475                "idx-4",
2476                "index.created",
2477                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2478            ))
2479            .unwrap();
2480        store
2481            .ingest(&create_test_event_with_payload(
2482                "idx-5",
2483                "index.created",
2484                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2485            ))
2486            .unwrap();
2487
2488        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2489        let query_req = QueryEventsRequest {
2490            entity_id: None,
2491            event_type: None,
2492            tenant_id: None,
2493            as_of: None,
2494            since: None,
2495            until: None,
2496            limit: None,
2497            event_type_prefix: Some("index.".to_string()),
2498            payload_filter: None,
2499        };
2500        let events = store.query(&query_req).unwrap();
2501
2502        // Manually replicate the handler logic for testing
2503        let group_by_fields = vec!["name"];
2504        let mut entity_latest: std::collections::HashMap<String, &Event> =
2505            std::collections::HashMap::new();
2506        for event in &events {
2507            let eid = event.entity_id().to_string();
2508            entity_latest
2509                .entry(eid)
2510                .and_modify(|existing| {
2511                    if event.timestamp() > existing.timestamp() {
2512                        *existing = event;
2513                    }
2514                })
2515                .or_insert(event);
2516        }
2517
2518        let mut groups: std::collections::HashMap<String, Vec<String>> =
2519            std::collections::HashMap::new();
2520        for (entity_id, event) in &entity_latest {
2521            let payload = event.payload();
2522            let mut key_parts = serde_json::Map::new();
2523            for field in &group_by_fields {
2524                let value = payload
2525                    .get(*field)
2526                    .cloned()
2527                    .unwrap_or(serde_json::Value::Null);
2528                key_parts.insert((*field).to_string(), value);
2529            }
2530            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2531            groups.entry(key_str).or_default().push(entity_id.clone());
2532        }
2533
2534        let duplicate_groups: Vec<_> = groups
2535            .into_iter()
2536            .filter(|(_, ids)| ids.len() > 1)
2537            .collect();
2538
2539        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2540        for (_, ids) in &duplicate_groups {
2541            assert_eq!(ids.len(), 2);
2542        }
2543    }
2544
2545    #[tokio::test]
2546    async fn test_detect_duplicates_no_duplicates() {
2547        let store = create_test_store();
2548
2549        // All unique names
2550        store
2551            .ingest(&create_test_event_with_payload(
2552                "idx-1",
2553                "index.created",
2554                serde_json::json!({"name": "A"}),
2555            ))
2556            .unwrap();
2557        store
2558            .ingest(&create_test_event_with_payload(
2559                "idx-2",
2560                "index.created",
2561                serde_json::json!({"name": "B"}),
2562            ))
2563            .unwrap();
2564
2565        let query_req = QueryEventsRequest {
2566            entity_id: None,
2567            event_type: None,
2568            tenant_id: None,
2569            as_of: None,
2570            since: None,
2571            until: None,
2572            limit: None,
2573            event_type_prefix: Some("index.".to_string()),
2574            payload_filter: None,
2575        };
2576        let events = store.query(&query_req).unwrap();
2577
2578        let mut entity_latest: std::collections::HashMap<String, &Event> =
2579            std::collections::HashMap::new();
2580        for event in &events {
2581            entity_latest
2582                .entry(event.entity_id().to_string())
2583                .or_insert(event);
2584        }
2585
2586        let mut groups: std::collections::HashMap<String, Vec<String>> =
2587            std::collections::HashMap::new();
2588        for (entity_id, event) in &entity_latest {
2589            let key_str =
2590                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2591                    .unwrap();
2592            groups.entry(key_str).or_default().push(entity_id.clone());
2593        }
2594
2595        let duplicate_groups: Vec<_> = groups
2596            .into_iter()
2597            .filter(|(_, ids)| ids.len() > 1)
2598            .collect();
2599
2600        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2601    }
2602
2603    #[tokio::test]
2604    async fn test_detect_duplicates_multi_field_group_by() {
2605        let store = create_test_store();
2606
2607        // Two entities with same name AND user_id = true duplicate
2608        store
2609            .ingest(&create_test_event_with_payload(
2610                "idx-1",
2611                "index.created",
2612                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2613            ))
2614            .unwrap();
2615        store
2616            .ingest(&create_test_event_with_payload(
2617                "idx-2",
2618                "index.created",
2619                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2620            ))
2621            .unwrap();
2622        // Same name but different user_id = NOT a duplicate in multi-field group
2623        store
2624            .ingest(&create_test_event_with_payload(
2625                "idx-3",
2626                "index.created",
2627                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2628            ))
2629            .unwrap();
2630
2631        let query_req = QueryEventsRequest {
2632            entity_id: None,
2633            event_type: None,
2634            tenant_id: None,
2635            as_of: None,
2636            since: None,
2637            until: None,
2638            limit: None,
2639            event_type_prefix: Some("index.".to_string()),
2640            payload_filter: None,
2641        };
2642        let events = store.query(&query_req).unwrap();
2643
2644        let group_by_fields = vec!["name", "user_id"];
2645        let mut entity_latest: std::collections::HashMap<String, &Event> =
2646            std::collections::HashMap::new();
2647        for event in &events {
2648            entity_latest
2649                .entry(event.entity_id().to_string())
2650                .and_modify(|existing| {
2651                    if event.timestamp() > existing.timestamp() {
2652                        *existing = event;
2653                    }
2654                })
2655                .or_insert(event);
2656        }
2657
2658        let mut groups: std::collections::HashMap<String, Vec<String>> =
2659            std::collections::HashMap::new();
2660        for (entity_id, event) in &entity_latest {
2661            let payload = event.payload();
2662            let mut key_parts = serde_json::Map::new();
2663            for field in &group_by_fields {
2664                let value = payload
2665                    .get(*field)
2666                    .cloned()
2667                    .unwrap_or(serde_json::Value::Null);
2668                key_parts.insert((*field).to_string(), value);
2669            }
2670            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2671            groups.entry(key_str).or_default().push(entity_id.clone());
2672        }
2673
2674        let duplicate_groups: Vec<_> = groups
2675            .into_iter()
2676            .filter(|(_, ids)| ids.len() > 1)
2677            .collect();
2678
2679        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2680        assert_eq!(duplicate_groups.len(), 1);
2681        let (_, ref ids) = duplicate_groups[0];
2682        assert_eq!(ids.len(), 2);
2683        let mut sorted_ids = ids.clone();
2684        sorted_ids.sort();
2685        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2686    }
2687
2688    #[tokio::test]
2689    async fn test_projection_state_cache() {
2690        let store = create_test_store();
2691
2692        // Test cache insertion
2693        let cache = store.projection_state_cache();
2694        cache.insert(
2695            "entity_snapshots:user-123".to_string(),
2696            serde_json::json!({"name": "Test User", "age": 30}),
2697        );
2698
2699        // Test cache retrieval
2700        let state = cache.get("entity_snapshots:user-123");
2701        assert!(state.is_some());
2702        let state = state.unwrap();
2703        assert_eq!(state["name"], "Test User");
2704        assert_eq!(state["age"], 30);
2705    }
2706
2707    #[tokio::test]
2708    async fn test_projection_manager_list_projections() {
2709        let store = create_test_store();
2710
2711        // List projections (built-in projections should be available)
2712        let projection_manager = store.projection_manager();
2713        let projections = projection_manager.list_projections();
2714
2715        // Should have entity_snapshots and event_counters
2716        assert!(projections.len() >= 2);
2717
2718        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2719        assert!(names.contains(&"entity_snapshots"));
2720        assert!(names.contains(&"event_counters"));
2721    }
2722
2723    #[tokio::test]
2724    async fn test_projection_state_after_event_ingestion() {
2725        let store = create_test_store();
2726
2727        // Ingest an event
2728        let event = create_test_event("user-456", "user.created");
2729        store.ingest(&event).unwrap();
2730
2731        // Get projection state
2732        let projection_manager = store.projection_manager();
2733        let snapshot_projection = projection_manager
2734            .get_projection("entity_snapshots")
2735            .unwrap();
2736
2737        let state = snapshot_projection.get_state("user-456");
2738        assert!(state.is_some());
2739        let state = state.unwrap();
2740        assert_eq!(state["name"], "Test");
2741        assert_eq!(state["value"], 42);
2742    }
2743
2744    #[tokio::test]
2745    async fn test_projection_state_cache_multiple_entities() {
2746        let store = create_test_store();
2747        let cache = store.projection_state_cache();
2748
2749        // Insert multiple entities
2750        for i in 0..10 {
2751            cache.insert(
2752                format!("entity_snapshots:entity-{i}"),
2753                serde_json::json!({"id": i, "status": "active"}),
2754            );
2755        }
2756
2757        // Verify all insertions
2758        assert_eq!(cache.len(), 10);
2759
2760        // Verify each entity
2761        for i in 0..10 {
2762            let key = format!("entity_snapshots:entity-{i}");
2763            let state = cache.get(&key);
2764            assert!(state.is_some());
2765            assert_eq!(state.unwrap()["id"], i);
2766        }
2767    }
2768
2769    #[tokio::test]
2770    async fn test_projection_state_update() {
2771        let store = create_test_store();
2772        let cache = store.projection_state_cache();
2773
2774        // Initial state
2775        cache.insert(
2776            "entity_snapshots:user-789".to_string(),
2777            serde_json::json!({"balance": 100}),
2778        );
2779
2780        // Update state
2781        cache.insert(
2782            "entity_snapshots:user-789".to_string(),
2783            serde_json::json!({"balance": 150}),
2784        );
2785
2786        // Verify update
2787        let state = cache.get("entity_snapshots:user-789").unwrap();
2788        assert_eq!(state["balance"], 150);
2789    }
2790
2791    #[tokio::test]
2792    async fn test_event_counter_projection() {
2793        let store = create_test_store();
2794
2795        // Ingest events of different types
2796        store
2797            .ingest(&create_test_event("user-1", "user.created"))
2798            .unwrap();
2799        store
2800            .ingest(&create_test_event("user-2", "user.created"))
2801            .unwrap();
2802        store
2803            .ingest(&create_test_event("user-1", "user.updated"))
2804            .unwrap();
2805
2806        // Get event counter projection
2807        let projection_manager = store.projection_manager();
2808        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2809
2810        // Check counts
2811        let created_state = counter_projection.get_state("user.created");
2812        assert!(created_state.is_some());
2813        assert_eq!(created_state.unwrap()["count"], 2);
2814
2815        let updated_state = counter_projection.get_state("user.updated");
2816        assert!(updated_state.is_some());
2817        assert_eq!(updated_state.unwrap()["count"], 1);
2818    }
2819
2820    #[tokio::test]
2821    async fn test_projection_state_cache_key_format() {
2822        let store = create_test_store();
2823        let cache = store.projection_state_cache();
2824
2825        // Test standard key format: {projection_name}:{entity_id}
2826        let key = "orders:order-12345".to_string();
2827        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2828
2829        let state = cache.get(&key).unwrap();
2830        assert_eq!(state["total"], 99.99);
2831    }
2832
2833    #[tokio::test]
2834    async fn test_projection_state_cache_removal() {
2835        let store = create_test_store();
2836        let cache = store.projection_state_cache();
2837
2838        // Insert and then remove
2839        cache.insert(
2840            "test:entity-1".to_string(),
2841            serde_json::json!({"data": "value"}),
2842        );
2843        assert_eq!(cache.len(), 1);
2844
2845        cache.remove("test:entity-1");
2846        assert_eq!(cache.len(), 0);
2847        assert!(cache.get("test:entity-1").is_none());
2848    }
2849
2850    #[tokio::test]
2851    async fn test_get_nonexistent_projection() {
2852        let store = create_test_store();
2853        let projection_manager = store.projection_manager();
2854
2855        // Requesting a non-existent projection should return None
2856        let projection = projection_manager.get_projection("nonexistent_projection");
2857        assert!(projection.is_none());
2858    }
2859
2860    #[tokio::test]
2861    async fn test_get_nonexistent_entity_state() {
2862        let store = create_test_store();
2863        let projection_manager = store.projection_manager();
2864
2865        // Get state for non-existent entity
2866        let snapshot_projection = projection_manager
2867            .get_projection("entity_snapshots")
2868            .unwrap();
2869        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2870        assert!(state.is_none());
2871    }
2872
2873    #[tokio::test]
2874    async fn test_projection_state_cache_concurrent_access() {
2875        let store = create_test_store();
2876        let cache = store.projection_state_cache();
2877
2878        // Simulate concurrent writes
2879        let handles: Vec<_> = (0..10)
2880            .map(|i| {
2881                let cache_clone = cache.clone();
2882                tokio::spawn(async move {
2883                    cache_clone.insert(
2884                        format!("concurrent:entity-{i}"),
2885                        serde_json::json!({"thread": i}),
2886                    );
2887                })
2888            })
2889            .collect();
2890
2891        for handle in handles {
2892            handle.await.unwrap();
2893        }
2894
2895        // All 10 entries should be present
2896        assert_eq!(cache.len(), 10);
2897    }
2898
2899    #[tokio::test]
2900    async fn test_projection_state_large_payload() {
2901        let store = create_test_store();
2902        let cache = store.projection_state_cache();
2903
2904        // Create a large JSON payload (~10KB)
2905        let large_array: Vec<serde_json::Value> = (0..1000)
2906            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2907            .collect();
2908
2909        cache.insert(
2910            "large:entity-1".to_string(),
2911            serde_json::json!({"items": large_array}),
2912        );
2913
2914        let state = cache.get("large:entity-1").unwrap();
2915        let items = state["items"].as_array().unwrap();
2916        assert_eq!(items.len(), 1000);
2917    }
2918
2919    #[tokio::test]
2920    async fn test_projection_state_complex_json() {
2921        let store = create_test_store();
2922        let cache = store.projection_state_cache();
2923
2924        // Complex nested JSON structure
2925        let complex_state = serde_json::json!({
2926            "user": {
2927                "id": "user-123",
2928                "profile": {
2929                    "name": "John Doe",
2930                    "email": "john@example.com",
2931                    "settings": {
2932                        "theme": "dark",
2933                        "notifications": true
2934                    }
2935                },
2936                "roles": ["admin", "user"],
2937                "metadata": {
2938                    "created_at": "2025-01-01T00:00:00Z",
2939                    "last_login": null
2940                }
2941            }
2942        });
2943
2944        cache.insert("complex:user-123".to_string(), complex_state);
2945
2946        let state = cache.get("complex:user-123").unwrap();
2947        assert_eq!(state["user"]["profile"]["name"], "John Doe");
2948        assert_eq!(state["user"]["roles"][0], "admin");
2949        assert!(state["user"]["metadata"]["last_login"].is_null());
2950    }
2951
2952    #[tokio::test]
2953    async fn test_projection_state_cache_iteration() {
2954        let store = create_test_store();
2955        let cache = store.projection_state_cache();
2956
2957        // Insert entries
2958        for i in 0..5 {
2959            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2960        }
2961
2962        // Iterate over all entries
2963        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2964        assert_eq!(entries.len(), 5);
2965    }
2966
2967    #[tokio::test]
2968    async fn test_projection_manager_get_entity_snapshots() {
2969        let store = create_test_store();
2970        let projection_manager = store.projection_manager();
2971
2972        // Get entity_snapshots projection specifically
2973        let projection = projection_manager.get_projection("entity_snapshots");
2974        assert!(projection.is_some());
2975        assert_eq!(projection.unwrap().name(), "entity_snapshots");
2976    }
2977
2978    #[tokio::test]
2979    async fn test_projection_manager_get_event_counters() {
2980        let store = create_test_store();
2981        let projection_manager = store.projection_manager();
2982
2983        // Get event_counters projection specifically
2984        let projection = projection_manager.get_projection("event_counters");
2985        assert!(projection.is_some());
2986        assert_eq!(projection.unwrap().name(), "event_counters");
2987    }
2988
2989    #[tokio::test]
2990    async fn test_projection_state_cache_overwrite() {
2991        let store = create_test_store();
2992        let cache = store.projection_state_cache();
2993
2994        // Initial value
2995        cache.insert(
2996            "overwrite:entity-1".to_string(),
2997            serde_json::json!({"version": 1}),
2998        );
2999
3000        // Overwrite with new value
3001        cache.insert(
3002            "overwrite:entity-1".to_string(),
3003            serde_json::json!({"version": 2}),
3004        );
3005
3006        // Overwrite again
3007        cache.insert(
3008            "overwrite:entity-1".to_string(),
3009            serde_json::json!({"version": 3}),
3010        );
3011
3012        let state = cache.get("overwrite:entity-1").unwrap();
3013        assert_eq!(state["version"], 3);
3014
3015        // Should still be only 1 entry
3016        assert_eq!(cache.len(), 1);
3017    }
3018
3019    #[tokio::test]
3020    async fn test_projection_state_multiple_projections() {
3021        let store = create_test_store();
3022        let cache = store.projection_state_cache();
3023
3024        // Store states for different projections
3025        cache.insert(
3026            "entity_snapshots:user-1".to_string(),
3027            serde_json::json!({"name": "Alice"}),
3028        );
3029        cache.insert(
3030            "event_counters:user.created".to_string(),
3031            serde_json::json!({"count": 5}),
3032        );
3033        cache.insert(
3034            "custom_projection:order-1".to_string(),
3035            serde_json::json!({"total": 150.0}),
3036        );
3037
3038        // Verify each projection's state
3039        assert_eq!(
3040            cache.get("entity_snapshots:user-1").unwrap()["name"],
3041            "Alice"
3042        );
3043        assert_eq!(
3044            cache.get("event_counters:user.created").unwrap()["count"],
3045            5
3046        );
3047        assert_eq!(
3048            cache.get("custom_projection:order-1").unwrap()["total"],
3049            150.0
3050        );
3051    }
3052
3053    #[tokio::test]
3054    async fn test_bulk_projection_state_access() {
3055        let store = create_test_store();
3056
3057        // Ingest multiple events for different entities
3058        for i in 0..5 {
3059            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3060            store.ingest(&event).unwrap();
3061        }
3062
3063        // Get projection and verify bulk access
3064        let projection_manager = store.projection_manager();
3065        let snapshot_projection = projection_manager
3066            .get_projection("entity_snapshots")
3067            .unwrap();
3068
3069        // Verify we can access all entities
3070        for i in 0..5 {
3071            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3072            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3073        }
3074    }
3075
3076    #[tokio::test]
3077    async fn test_bulk_save_projection_states() {
3078        let store = create_test_store();
3079        let cache = store.projection_state_cache();
3080
3081        // Simulate bulk save request
3082        let states = vec![
3083            BulkSaveStateItem {
3084                entity_id: "bulk-entity-1".to_string(),
3085                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3086            },
3087            BulkSaveStateItem {
3088                entity_id: "bulk-entity-2".to_string(),
3089                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3090            },
3091            BulkSaveStateItem {
3092                entity_id: "bulk-entity-3".to_string(),
3093                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3094            },
3095        ];
3096
3097        let projection_name = "test_projection";
3098
3099        // Save states to cache (simulating bulk_save_projection_states handler)
3100        for item in &states {
3101            cache.insert(
3102                format!("{projection_name}:{}", item.entity_id),
3103                item.state.clone(),
3104            );
3105        }
3106
3107        // Verify all states were saved
3108        assert_eq!(cache.len(), 3);
3109
3110        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3111        assert_eq!(state1["name"], "Entity 1");
3112        assert_eq!(state1["value"], 100);
3113
3114        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3115        assert_eq!(state2["name"], "Entity 2");
3116        assert_eq!(state2["value"], 200);
3117
3118        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3119        assert_eq!(state3["name"], "Entity 3");
3120        assert_eq!(state3["value"], 300);
3121    }
3122
3123    #[tokio::test]
3124    async fn test_bulk_save_empty_states() {
3125        let store = create_test_store();
3126        let cache = store.projection_state_cache();
3127
3128        // Clear cache
3129        cache.clear();
3130
3131        // Empty states should work fine
3132        let states: Vec<BulkSaveStateItem> = vec![];
3133        assert_eq!(states.len(), 0);
3134
3135        // Cache should remain empty
3136        assert_eq!(cache.len(), 0);
3137    }
3138
3139    #[tokio::test]
3140    async fn test_bulk_save_overwrites_existing() {
3141        let store = create_test_store();
3142        let cache = store.projection_state_cache();
3143
3144        // Insert initial state
3145        cache.insert(
3146            "test:entity-1".to_string(),
3147            serde_json::json!({"version": 1, "data": "initial"}),
3148        );
3149
3150        // Bulk save with updated state
3151        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3152        cache.insert("test:entity-1".to_string(), new_state);
3153
3154        // Verify overwrite
3155        let state = cache.get("test:entity-1").unwrap();
3156        assert_eq!(state["version"], 2);
3157        assert_eq!(state["data"], "updated");
3158    }
3159
3160    #[tokio::test]
3161    async fn test_bulk_save_high_volume() {
3162        let store = create_test_store();
3163        let cache = store.projection_state_cache();
3164
3165        // Simulate high volume save (1000 entities)
3166        for i in 0..1000 {
3167            cache.insert(
3168                format!("volume_test:entity-{i}"),
3169                serde_json::json!({"index": i, "status": "active"}),
3170            );
3171        }
3172
3173        // Verify count
3174        assert_eq!(cache.len(), 1000);
3175
3176        // Spot check some entries
3177        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3178        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3179        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3180    }
3181
3182    #[tokio::test]
3183    async fn test_bulk_save_different_projections() {
3184        let store = create_test_store();
3185        let cache = store.projection_state_cache();
3186
3187        // Save to multiple projections in bulk
3188        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3189
3190        for proj in &projections {
3191            for i in 0..5 {
3192                cache.insert(
3193                    format!("{proj}:entity-{i}"),
3194                    serde_json::json!({"projection": proj, "id": i}),
3195                );
3196            }
3197        }
3198
3199        // Verify total count (3 projections * 5 entities)
3200        assert_eq!(cache.len(), 15);
3201
3202        // Verify each projection
3203        for proj in &projections {
3204            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3205            assert_eq!(state["projection"], *proj);
3206        }
3207    }
3208
3209    // -------------------------------------------------------------------------
3210    // Cache-fallback tests for the projection-state read handlers (v0.19.1).
3211    //
3212    // SDK-managed projections write state via save_projection_state /
3213    // bulk_save_projection_states without registering in projection_manager.
3214    // These tests verify the read handlers fall back to the cache instead of
3215    // 404-ing. Registered projection wins where both exist; cache fills the
3216    // gap when not.
3217    // -------------------------------------------------------------------------
3218
3219    #[tokio::test]
3220    async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3221        let store = create_test_store();
3222        store.projection_state_cache().insert(
3223            "assets:BTC".to_string(),
3224            serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3225        );
3226
3227        let resp = get_projection_state(
3228            State(Arc::clone(&store)),
3229            Path(("assets".to_string(), "BTC".to_string())),
3230        )
3231        .await
3232        .expect("should not error when projection is not registered");
3233
3234        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3235        assert_eq!(resp.0["state"]["symbol"], "BTC");
3236        assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3237    }
3238
3239    #[tokio::test]
3240    async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3241        let store = create_test_store();
3242
3243        let resp = get_projection_state(
3244            State(Arc::clone(&store)),
3245            Path(("assets".to_string(), "UNKNOWN".to_string())),
3246        )
3247        .await
3248        .unwrap();
3249
3250        assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3251        assert_eq!(resp.0["state"], serde_json::Value::Null);
3252    }
3253
3254    #[tokio::test]
3255    async fn get_projection_state_registered_wins_over_cache() {
3256        let store = create_test_store();
3257
3258        // Ingest an event so entity_snapshots (a registered projection) has state.
3259        let event = create_test_event("user-777", "user.created");
3260        store.ingest(&event).unwrap();
3261
3262        // Plant a conflicting cache entry for the same (projection, entity).
3263        store.projection_state_cache().insert(
3264            "entity_snapshots:user-777".to_string(),
3265            serde_json::json!({"stolen": "value"}),
3266        );
3267
3268        let resp = get_projection_state(
3269            State(Arc::clone(&store)),
3270            Path(("entity_snapshots".to_string(), "user-777".to_string())),
3271        )
3272        .await
3273        .unwrap();
3274
3275        // Registered projection wins — cache fallback is only consulted when
3276        // the registered projection has no state for this entity.
3277        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3278        assert!(
3279            resp.0["state"].get("stolen").is_none(),
3280            "cache entry must not shadow registered projection state: got {:?}",
3281            resp.0["state"]
3282        );
3283    }
3284
3285    #[tokio::test]
3286    async fn get_projection_state_summary_returns_cache_without_registration() {
3287        let store = create_test_store();
3288        let cache = store.projection_state_cache();
3289        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3290        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3291        // Different projection name — must not appear in the summary.
3292        cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3293
3294        let resp =
3295            get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3296                .await
3297                .unwrap();
3298
3299        assert_eq!(resp.0["total"], 2);
3300        let states = resp.0["states"].as_array().unwrap();
3301        let entity_ids: Vec<&str> = states
3302            .iter()
3303            .map(|s| s["entity_id"].as_str().unwrap())
3304            .collect();
3305        assert!(entity_ids.contains(&"BTC"));
3306        assert!(entity_ids.contains(&"ETH"));
3307    }
3308
3309    #[tokio::test]
3310    async fn bulk_get_projection_states_falls_back_to_cache() {
3311        let store = create_test_store();
3312        let cache = store.projection_state_cache();
3313        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3314        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3315
3316        let req = BulkGetStateRequest {
3317            entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3318        };
3319
3320        let resp = bulk_get_projection_states(
3321            State(Arc::clone(&store)),
3322            Path("assets".to_string()),
3323            Json(req),
3324        )
3325        .await
3326        .unwrap();
3327
3328        assert_eq!(resp.0["total"], 3);
3329        let states = resp.0["states"].as_array().unwrap();
3330        let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3331            .iter()
3332            .map(|s| (s["entity_id"].as_str().unwrap(), s))
3333            .collect();
3334
3335        assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3336        assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3337        assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3338        assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3339    }
3340}