Skip to main content

allsource_core/infrastructure/web/
api.rs

1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4    application::{
5        dto::{
6            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11        },
12        services::{
13            analytics::{
14                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16            },
17            pipeline::{PipelineConfig, PipelineStats},
18            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19            schema::{
20                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21                ValidateEventRequest, ValidateEventResponse,
22            },
23            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24        },
25    },
26    domain::entities::Event,
27    error::Result,
28    infrastructure::{
29        persistence::{
30            compaction::CompactionResult,
31            snapshot::{
32                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33                ListSnapshotsResponse, SnapshotInfo,
34            },
35        },
36        query::{
37            geospatial::GeoQueryRequest,
38            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39        },
40        security::middleware::OptionalAuth,
41        web::api_v1::AppState,
42    },
43    store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46    Json, Router,
47    extract::{Path, Query, State, WebSocketUpgrade},
48    response::{IntoResponse, Response},
49    routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60/// Wait for follower ACK(s) in semi-sync/sync replication modes.
61///
62/// In async mode (default), returns immediately. In semi-sync mode, waits for
63/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
64/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
65#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67    let shipper_guard = state.wal_shipper.read().await;
68    if let Some(ref shipper) = *shipper_guard {
69        let mode = shipper.replication_mode();
70        if mode == ReplicationMode::Async {
71            return;
72        }
73
74        let target_offset = shipper.current_leader_offset();
75        if target_offset == 0 {
76            return;
77        }
78
79        let shipper = Arc::clone(shipper);
80        // Drop the read guard before the async wait to avoid holding it across await
81        drop(shipper_guard);
82
83        let timer = state
84            .store
85            .metrics()
86            .replication_ack_wait_seconds
87            .start_timer();
88        let acked = shipper.wait_for_ack(target_offset).await;
89        timer.observe_duration();
90
91        if !acked {
92            tracing::warn!(
93                "Replication ACK timeout in {} mode (offset {}). \
94                 Write succeeded locally but follower confirmation pending.",
95                mode,
96                target_offset,
97            );
98        }
99    }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103    // No-op in community edition
104}
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107    let app = Router::new()
108        .route("/health", get(health))
109        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
110        .route("/api/v1/events", post(ingest_event))
111        .route("/api/v1/events/batch", post(ingest_events_batch))
112        .route("/api/v1/events/query", get(query_events))
113        .route("/api/v1/events/{event_id}", get(get_event_by_id))
114        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
115        // v0.10: Stream and event type discovery endpoints
116        .route("/api/v1/streams", get(list_streams))
117        .route("/api/v1/event-types", get(list_event_types))
118        .route("/api/v1/entities/duplicates", get(detect_duplicates))
119        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120        .route(
121            "/api/v1/entities/{entity_id}/snapshot",
122            get(get_entity_snapshot),
123        )
124        .route("/api/v1/stats", get(get_stats))
125        // v0.2: Advanced analytics endpoints
126        .route("/api/v1/analytics/frequency", get(analytics_frequency))
127        .route("/api/v1/analytics/summary", get(analytics_summary))
128        .route("/api/v1/analytics/correlation", get(analytics_correlation))
129        // v0.2: Snapshot management endpoints
130        .route("/api/v1/snapshots", post(create_snapshot))
131        .route("/api/v1/snapshots", get(list_snapshots))
132        .route(
133            "/api/v1/snapshots/{entity_id}/latest",
134            get(get_latest_snapshot),
135        )
136        // v0.2: Compaction endpoints
137        .route("/api/v1/compaction/trigger", post(trigger_compaction))
138        .route("/api/v1/compaction/stats", get(compaction_stats))
139        // v0.5: Schema registry endpoints
140        .route("/api/v1/schemas", post(register_schema))
141        .route("/api/v1/schemas", get(list_subjects))
142        .route("/api/v1/schemas/{subject}", get(get_schema))
143        .route(
144            "/api/v1/schemas/{subject}/versions",
145            get(list_schema_versions),
146        )
147        .route("/api/v1/schemas/validate", post(validate_event_schema))
148        .route(
149            "/api/v1/schemas/{subject}/compatibility",
150            put(set_compatibility_mode),
151        )
152        // v0.5: Replay and projection rebuild endpoints
153        .route("/api/v1/replay", post(start_replay))
154        .route("/api/v1/replay", get(list_replays))
155        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157        .route(
158            "/api/v1/replay/{replay_id}",
159            axum::routing::delete(delete_replay),
160        )
161        // v0.5: Stream processing pipeline endpoints
162        .route("/api/v1/pipelines", post(register_pipeline))
163        .route("/api/v1/pipelines", get(list_pipelines))
164        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166        .route(
167            "/api/v1/pipelines/{pipeline_id}",
168            axum::routing::delete(remove_pipeline),
169        )
170        .route(
171            "/api/v1/pipelines/{pipeline_id}/stats",
172            get(get_pipeline_stats),
173        )
174        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175        // v0.7: Projection State API for Query Service integration
176        .route("/api/v1/projections", get(list_projections))
177        .route("/api/v1/projections/{name}", get(get_projection))
178        .route(
179            "/api/v1/projections/{name}",
180            axum::routing::delete(delete_projection),
181        )
182        .route(
183            "/api/v1/projections/{name}/state",
184            get(get_projection_state_summary),
185        )
186        .route("/api/v1/projections/{name}/reset", post(reset_projection))
187        .route(
188            "/api/v1/projections/{name}/{entity_id}/state",
189            get(get_projection_state),
190        )
191        .route(
192            "/api/v1/projections/{name}/{entity_id}/state",
193            post(save_projection_state),
194        )
195        .route(
196            "/api/v1/projections/{name}/{entity_id}/state",
197            put(save_projection_state),
198        )
199        .route(
200            "/api/v1/projections/{name}/bulk",
201            post(bulk_get_projection_states),
202        )
203        .route(
204            "/api/v1/projections/{name}/bulk/save",
205            post(bulk_save_projection_states),
206        )
207        // v0.11: Webhook management endpoints
208        .route("/api/v1/webhooks", post(register_webhook))
209        .route("/api/v1/webhooks", get(list_webhooks))
210        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212        .route(
213            "/api/v1/webhooks/{webhook_id}",
214            axum::routing::delete(delete_webhook),
215        )
216        .route(
217            "/api/v1/webhooks/{webhook_id}/deliveries",
218            get(list_webhook_deliveries),
219        )
220        // v2.0: Advanced query features
221        .route("/api/v1/graphql", post(graphql_query))
222        .route("/api/v1/geospatial/query", post(geo_query))
223        .route("/api/v1/geospatial/stats", get(geo_stats))
224        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225        .route(
226            "/api/v1/schema-evolution/history/{event_type}",
227            get(schema_evolution_history),
228        )
229        .route(
230            "/api/v1/schema-evolution/schema/{event_type}",
231            get(schema_evolution_schema),
232        )
233        .route(
234            "/api/v1/schema-evolution/stats",
235            get(schema_evolution_stats),
236        )
237        .layer(
238            CorsLayer::new()
239                .allow_origin(Any)
240                .allow_methods(Any)
241                .allow_headers(Any),
242        )
243        .layer(TraceLayer::new_for_http())
244        .with_state(store);
245
246    let listener = tokio::net::TcpListener::bind(addr).await?;
247    axum::serve(listener, app).await?;
248
249    Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253    Json(serde_json::json!({
254        "status": "healthy",
255        "service": "allsource-core",
256        "version": env!("CARGO_PKG_VERSION")
257    }))
258}
259
260// v0.6: Prometheus metrics endpoint
261pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262    let metrics = store.metrics();
263
264    match metrics.encode() {
265        Ok(encoded) => Response::builder()
266            .status(200)
267            .header("Content-Type", "text/plain; version=0.0.4")
268            .body(encoded)
269            .unwrap()
270            .into_response(),
271        Err(e) => Response::builder()
272            .status(500)
273            .body(format!("Error encoding metrics: {e}"))
274            .unwrap()
275            .into_response(),
276    }
277}
278
279pub async fn ingest_event(
280    State(store): State<SharedStore>,
281    Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283    let expected_version = req.expected_version;
284
285    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286    let event = Event::from_strings(
287        req.event_type,
288        req.entity_id,
289        tenant_id,
290        req.payload,
291        req.metadata,
292    )?;
293
294    let event_id = event.id;
295    let timestamp = event.timestamp;
296
297    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299    tracing::info!("Event ingested: {}", event_id);
300
301    Ok(Json(IngestEventResponse {
302        event_id,
303        timestamp,
304        version: Some(new_version),
305    }))
306}
307
308/// Ingest a single event with semi-sync/sync replication ACK waiting.
309///
310/// Used by the v1 API. Tenant comes from `req.tenant_id` — the Control Plane
311/// delegation layer sets it from the authenticated caller before forwarding.
312/// Core is internal-only and does not authenticate public traffic.
313pub async fn ingest_event_v1(
314    State(state): State<AppState>,
315    Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317    let expected_version = req.expected_version;
318
319    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321    let event = Event::from_strings(
322        req.event_type,
323        req.entity_id,
324        tenant_id,
325        req.payload,
326        req.metadata,
327    )?;
328
329    let event_id = event.id;
330    let timestamp = event.timestamp;
331
332    let new_version = state
333        .store
334        .ingest_with_expected_version(&event, expected_version)?;
335
336    // Semi-sync/sync: wait for follower ACK(s) before returning
337    await_replication_ack(&state).await;
338
339    tracing::info!("Event ingested: {}", event_id);
340
341    Ok(Json(IngestEventResponse {
342        event_id,
343        timestamp,
344        version: Some(new_version),
345    }))
346}
347
348/// Batch ingest multiple events in a single request
349///
350/// This endpoint allows ingesting multiple events atomically, which is more
351/// efficient than making individual requests for each event.
352pub async fn ingest_events_batch(
353    State(store): State<SharedStore>,
354    Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356    let total = req.events.len();
357    let mut ingested_events = Vec::with_capacity(total);
358
359    for event_req in req.events {
360        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361        let expected_version = event_req.expected_version;
362
363        let event = Event::from_strings(
364            event_req.event_type,
365            event_req.entity_id,
366            tenant_id,
367            event_req.payload,
368            event_req.metadata,
369        )?;
370
371        let event_id = event.id;
372        let timestamp = event.timestamp;
373
374        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376        ingested_events.push(IngestEventResponse {
377            event_id,
378            timestamp,
379            version: Some(new_version),
380        });
381    }
382
383    let ingested = ingested_events.len();
384    tracing::info!("Batch ingested {} events", ingested);
385
386    Ok(Json(IngestEventsBatchResponse {
387        total,
388        ingested,
389        events: ingested_events,
390    }))
391}
392
393/// Batch ingest with semi-sync/sync replication ACK waiting.
394///
395/// Used by the v1 API. Per-event tenant comes from `event_req.tenant_id` — the
396/// Control Plane delegation layer sets it from the authenticated caller before
397/// forwarding. Core is internal-only and does not authenticate public traffic.
398pub async fn ingest_events_batch_v1(
399    State(state): State<AppState>,
400    Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402    let total = req.events.len();
403    let mut ingested_events = Vec::with_capacity(total);
404
405    for event_req in req.events {
406        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407        let expected_version = event_req.expected_version;
408
409        let event = Event::from_strings(
410            event_req.event_type,
411            event_req.entity_id,
412            tenant_id,
413            event_req.payload,
414            event_req.metadata,
415        )?;
416
417        let event_id = event.id;
418        let timestamp = event.timestamp;
419
420        let new_version = state
421            .store
422            .ingest_with_expected_version(&event, expected_version)?;
423
424        ingested_events.push(IngestEventResponse {
425            event_id,
426            timestamp,
427            version: Some(new_version),
428        });
429    }
430
431    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
432    await_replication_ack(&state).await;
433
434    let ingested = ingested_events.len();
435    tracing::info!("Batch ingested {} events", ingested);
436
437    Ok(Json(IngestEventsBatchResponse {
438        total,
439        ingested,
440        events: ingested_events,
441    }))
442}
443
444/// Sort-order query parameter for `GET /api/v1/events/query`. Kept separate
445/// from `QueryEventsRequest` so ordering is purely an HTTP-layer concern and
446/// the internal query DTO (used by ~50 call sites) stays untouched.
447#[derive(Debug, Deserialize)]
448pub struct EventOrderParam {
449    /// `asc` (oldest first, the default) or `desc` (newest first).
450    pub order: Option<String>,
451}
452
453pub async fn query_events(
454    OptionalAuth(auth): OptionalAuth,
455    Query(req): Query<QueryEventsRequest>,
456    Query(order_param): Query<EventOrderParam>,
457    State(store): State<SharedStore>,
458) -> Result<Json<QueryEventsResponse>> {
459    let requested_limit = req.limit;
460    let queried_entity_id = req.entity_id.clone();
461
462    // Sort order. Default is ascending (oldest first) to preserve replay
463    // semantics for existing consumers. `order=desc` returns newest first,
464    // so `?entity_id=<id>&limit=1&order=desc` reliably yields the latest
465    // event for an entity (issue #177).
466    let descending = match order_param.order.as_deref() {
467        None => false,
468        Some(o) if o.eq_ignore_ascii_case("asc") => false,
469        Some(o) if o.eq_ignore_ascii_case("desc") => true,
470        Some(other) => {
471            return Err(crate::error::AllSourceError::InvalidInput(format!(
472                "invalid 'order' value '{other}': expected 'asc' or 'desc'"
473            )));
474        }
475    };
476
477    // Tenant resolution. Since Core is internal-only (bead t-0ff8), the only
478    // callers are Control Plane's delegation layer and other internal Fly
479    // services. Request param wins — that's what the gateway sets authoritatively
480    // from the authenticated caller's identity. Auth-context fallback is kept
481    // as a defense-in-depth for any internal caller that forgets to pass
482    // tenant_id but is authenticated (legacy; audit and remove once all
483    // internal callers are confirmed to set tenant_id explicitly).
484    let enforced_tenant = req
485        .tenant_id
486        .clone()
487        .or_else(|| auth.as_ref().map(|a| a.tenant_id().to_string()));
488
489    // Query without limit to get total count
490    let unlimited_req = QueryEventsRequest {
491        entity_id: req.entity_id,
492        event_type: req.event_type,
493        tenant_id: enforced_tenant,
494        as_of: req.as_of,
495        since: req.since,
496        until: req.until,
497        limit: None,
498        event_type_prefix: req.event_type_prefix,
499        payload_filter: req.payload_filter,
500    };
501    let mut all_events = store.query(&unlimited_req)?;
502    let total_count = all_events.len();
503
504    // store.query() returns events ascending by (timestamp, version).
505    // Reverse for `order=desc` so the limit below takes the newest events.
506    if descending {
507        all_events.reverse();
508    }
509
510    // Apply limit
511    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
512        all_events.into_iter().take(limit).collect()
513    } else {
514        all_events
515    };
516
517    let count = limited_events.len();
518    let has_more = count < total_count;
519    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
520
521    // Include entity_version only when filtering by a single entity_id
522    let entity_version = queried_entity_id
523        .as_deref()
524        .map(|eid| store.get_entity_version(eid));
525
526    tracing::debug!("Query returned {} events (total: {})", count, total_count);
527
528    Ok(Json(QueryEventsResponse {
529        events,
530        count,
531        total_count,
532        has_more,
533        entity_version,
534    }))
535}
536
537pub async fn list_entities(
538    State(store): State<SharedStore>,
539    Query(req): Query<ListEntitiesRequest>,
540) -> Result<Json<ListEntitiesResponse>> {
541    use std::collections::HashMap;
542
543    // Get all events matching the filters
544    let query_req = QueryEventsRequest {
545        entity_id: None,
546        event_type: None,
547        tenant_id: None,
548        as_of: None,
549        since: None,
550        until: None,
551        limit: None,
552        event_type_prefix: req.event_type_prefix,
553        payload_filter: req.payload_filter,
554    };
555    let events = store.query(&query_req)?;
556
557    // Group by entity_id
558    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
559    for event in &events {
560        entity_map
561            .entry(event.entity_id().to_string())
562            .or_default()
563            .push(event);
564    }
565
566    // Build entity summaries sorted by last event time (descending)
567    let mut summaries: Vec<EntitySummary> = entity_map
568        .into_iter()
569        .map(|(entity_id, events)| {
570            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
571            EntitySummary {
572                entity_id,
573                event_count: events.len(),
574                last_event_type: last.event_type_str().to_string(),
575                last_event_at: last.timestamp(),
576            }
577        })
578        .collect();
579    summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
580
581    let total = summaries.len();
582
583    // Apply offset and limit
584    let offset = req.offset.unwrap_or(0);
585    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
586    let summaries = if let Some(limit) = req.limit {
587        let has_more = summaries.len() > limit;
588        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
589        return Ok(Json(ListEntitiesResponse {
590            entities: truncated,
591            total,
592            has_more,
593        }));
594    } else {
595        summaries
596    };
597
598    Ok(Json(ListEntitiesResponse {
599        entities: summaries,
600        total,
601        has_more: false,
602    }))
603}
604
605pub async fn detect_duplicates(
606    State(store): State<SharedStore>,
607    Query(req): Query<DetectDuplicatesRequest>,
608) -> Result<Json<DetectDuplicatesResponse>> {
609    use std::collections::HashMap;
610
611    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
612
613    // Query events scoped by the required prefix
614    let query_req = QueryEventsRequest {
615        entity_id: None,
616        event_type: None,
617        tenant_id: None,
618        as_of: None,
619        since: None,
620        until: None,
621        limit: None,
622        event_type_prefix: Some(req.event_type_prefix),
623        payload_filter: None,
624    };
625    let events = store.query(&query_req)?;
626
627    // For each entity, extract the latest event's payload fields specified by group_by
628    // Then group entities by those field values
629    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
630    for event in &events {
631        let eid = event.entity_id().to_string();
632        entity_latest
633            .entry(eid)
634            .and_modify(|existing| {
635                if event.timestamp() > existing.timestamp() {
636                    *existing = event;
637                }
638            })
639            .or_insert(event);
640    }
641
642    // Group entities by their payload field values
643    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
644    for (entity_id, event) in &entity_latest {
645        let payload = event.payload();
646        let mut key_parts = serde_json::Map::new();
647        for field in &group_by_fields {
648            let value = payload
649                .get(*field)
650                .cloned()
651                .unwrap_or(serde_json::Value::Null);
652            key_parts.insert((*field).to_string(), value);
653        }
654        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
655        groups.entry(key_str).or_default().push(entity_id.clone());
656    }
657
658    // Filter to groups with count > 1 (actual duplicates)
659    let mut duplicate_groups: Vec<DuplicateGroup> = groups
660        .into_iter()
661        .filter(|(_, ids)| ids.len() > 1)
662        .map(|(key_str, mut ids)| {
663            ids.sort();
664            let key: serde_json::Value =
665                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
666            let count = ids.len();
667            DuplicateGroup {
668                key,
669                entity_ids: ids,
670                count,
671            }
672        })
673        .collect();
674
675    // Sort by count descending for consistent output
676    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
677
678    let total = duplicate_groups.len();
679
680    // Apply offset and limit
681    let offset = req.offset.unwrap_or(0);
682    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
683
684    if let Some(limit) = req.limit {
685        let has_more = duplicate_groups.len() > limit;
686        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
687        return Ok(Json(DetectDuplicatesResponse {
688            duplicates: truncated,
689            total,
690            has_more,
691        }));
692    }
693
694    Ok(Json(DetectDuplicatesResponse {
695        duplicates: duplicate_groups,
696        total,
697        has_more: false,
698    }))
699}
700
701#[derive(Deserialize)]
702pub struct EntityStateParams {
703    as_of: Option<chrono::DateTime<chrono::Utc>>,
704}
705
706pub async fn get_entity_state(
707    State(store): State<SharedStore>,
708    Path(entity_id): Path<String>,
709    Query(params): Query<EntityStateParams>,
710) -> Result<Json<serde_json::Value>> {
711    let state = store.reconstruct_state(&entity_id, params.as_of)?;
712
713    tracing::info!("State reconstructed for entity: {}", entity_id);
714
715    Ok(Json(state))
716}
717
718pub async fn get_entity_snapshot(
719    State(store): State<SharedStore>,
720    Path(entity_id): Path<String>,
721) -> Result<Json<serde_json::Value>> {
722    let snapshot = store.get_snapshot(&entity_id)?;
723
724    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
725
726    Ok(Json(snapshot))
727}
728
729pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
730    let stats = store.stats();
731    Json(stats)
732}
733
734// v0.10: List all streams (entity_ids) in the event store
735/// Query parameters for listing streams
736#[derive(Debug, Deserialize)]
737pub struct ListStreamsParams {
738    /// Optional limit on number of streams to return
739    pub limit: Option<usize>,
740    /// Optional offset for pagination
741    pub offset: Option<usize>,
742}
743
744/// Response for listing streams
745#[derive(Debug, serde::Serialize)]
746pub struct ListStreamsResponse {
747    pub streams: Vec<StreamInfo>,
748    pub total: usize,
749}
750
751pub async fn list_streams(
752    State(store): State<SharedStore>,
753    Query(params): Query<ListStreamsParams>,
754) -> Json<ListStreamsResponse> {
755    let mut streams = store.list_streams();
756    let total = streams.len();
757
758    // Sort by last_event_at descending (most recent first)
759    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
760
761    // Apply pagination
762    if let Some(offset) = params.offset {
763        if offset < streams.len() {
764            streams = streams[offset..].to_vec();
765        } else {
766            streams = vec![];
767        }
768    }
769
770    if let Some(limit) = params.limit {
771        streams.truncate(limit);
772    }
773
774    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
775
776    Json(ListStreamsResponse { streams, total })
777}
778
779// v0.10: List all event types in the event store
780/// Query parameters for listing event types
781#[derive(Debug, Deserialize)]
782pub struct ListEventTypesParams {
783    /// Optional limit on number of event types to return
784    pub limit: Option<usize>,
785    /// Optional offset for pagination
786    pub offset: Option<usize>,
787}
788
789/// Response for listing event types
790#[derive(Debug, serde::Serialize)]
791pub struct ListEventTypesResponse {
792    pub event_types: Vec<EventTypeInfo>,
793    pub total: usize,
794}
795
796pub async fn list_event_types(
797    State(store): State<SharedStore>,
798    Query(params): Query<ListEventTypesParams>,
799) -> Json<ListEventTypesResponse> {
800    let mut event_types = store.list_event_types();
801    let total = event_types.len();
802
803    // Sort by event_count descending (most used first)
804    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
805
806    // Apply pagination
807    if let Some(offset) = params.offset {
808        if offset < event_types.len() {
809            event_types = event_types[offset..].to_vec();
810        } else {
811            event_types = vec![];
812        }
813    }
814
815    if let Some(limit) = params.limit {
816        event_types.truncate(limit);
817    }
818
819    tracing::debug!(
820        "Listed {} event types (total: {})",
821        event_types.len(),
822        total
823    );
824
825    Json(ListEventTypesResponse { event_types, total })
826}
827
828// v0.2: WebSocket endpoint for real-time event streaming
829#[derive(Debug, Deserialize)]
830pub struct WebSocketParams {
831    pub consumer_id: Option<String>,
832}
833
834pub async fn events_websocket(
835    ws: WebSocketUpgrade,
836    State(store): State<SharedStore>,
837    Query(params): Query<WebSocketParams>,
838) -> Response {
839    let websocket_manager = store.websocket_manager();
840
841    ws.on_upgrade(move |socket| async move {
842        if let Some(consumer_id) = params.consumer_id {
843            websocket_manager
844                .handle_socket_with_consumer(socket, consumer_id, store)
845                .await;
846        } else {
847            websocket_manager.handle_socket(socket).await;
848        }
849    })
850}
851
852// v0.2: Event frequency analytics endpoint
853pub async fn analytics_frequency(
854    State(store): State<SharedStore>,
855    Query(req): Query<EventFrequencyRequest>,
856) -> Result<Json<EventFrequencyResponse>> {
857    let response = AnalyticsEngine::event_frequency(&store, &req)?;
858
859    tracing::debug!(
860        "Frequency analysis returned {} buckets",
861        response.buckets.len()
862    );
863
864    Ok(Json(response))
865}
866
867// v0.2: Statistical summary endpoint
868pub async fn analytics_summary(
869    State(store): State<SharedStore>,
870    Query(req): Query<StatsSummaryRequest>,
871) -> Result<Json<StatsSummaryResponse>> {
872    let response = AnalyticsEngine::stats_summary(&store, &req)?;
873
874    tracing::debug!(
875        "Stats summary: {} events across {} entities",
876        response.total_events,
877        response.unique_entities
878    );
879
880    Ok(Json(response))
881}
882
883// v0.2: Event correlation analysis endpoint
884pub async fn analytics_correlation(
885    State(store): State<SharedStore>,
886    Query(req): Query<CorrelationRequest>,
887) -> Result<Json<CorrelationResponse>> {
888    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
889
890    tracing::debug!(
891        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
892        response.correlated_pairs,
893        response.total_a,
894        response.correlation_percentage
895    );
896
897    Ok(Json(response))
898}
899
900// v0.2: Create a snapshot for an entity
901pub async fn create_snapshot(
902    State(store): State<SharedStore>,
903    Json(req): Json<CreateSnapshotRequest>,
904) -> Result<Json<CreateSnapshotResponse>> {
905    store.create_snapshot(&req.entity_id)?;
906
907    let snapshot_manager = store.snapshot_manager();
908    let snapshot = snapshot_manager
909        .get_latest_snapshot(&req.entity_id)
910        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
911
912    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
913
914    Ok(Json(CreateSnapshotResponse {
915        snapshot_id: snapshot.id,
916        entity_id: snapshot.entity_id,
917        created_at: snapshot.created_at,
918        event_count: snapshot.event_count,
919        size_bytes: snapshot.metadata.size_bytes,
920    }))
921}
922
923// v0.2: List snapshots
924pub async fn list_snapshots(
925    State(store): State<SharedStore>,
926    Query(req): Query<ListSnapshotsRequest>,
927) -> Result<Json<ListSnapshotsResponse>> {
928    let snapshot_manager = store.snapshot_manager();
929
930    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
931        snapshot_manager
932            .get_all_snapshots(&entity_id)
933            .into_iter()
934            .map(SnapshotInfo::from)
935            .collect()
936    } else {
937        // List all entities with snapshots
938        let entities = snapshot_manager.list_entities();
939        entities
940            .iter()
941            .flat_map(|entity_id| {
942                snapshot_manager
943                    .get_all_snapshots(entity_id)
944                    .into_iter()
945                    .map(SnapshotInfo::from)
946            })
947            .collect()
948    };
949
950    let total = snapshots.len();
951
952    tracing::debug!("Listed {} snapshots", total);
953
954    Ok(Json(ListSnapshotsResponse { snapshots, total }))
955}
956
957// v0.2: Get latest snapshot for an entity
958pub async fn get_latest_snapshot(
959    State(store): State<SharedStore>,
960    Path(entity_id): Path<String>,
961) -> Result<Json<serde_json::Value>> {
962    let snapshot_manager = store.snapshot_manager();
963
964    let snapshot = snapshot_manager
965        .get_latest_snapshot(&entity_id)
966        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
967
968    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
969
970    Ok(Json(serde_json::json!({
971        "snapshot_id": snapshot.id,
972        "entity_id": snapshot.entity_id,
973        "created_at": snapshot.created_at,
974        "as_of": snapshot.as_of,
975        "event_count": snapshot.event_count,
976        "size_bytes": snapshot.metadata.size_bytes,
977        "snapshot_type": snapshot.metadata.snapshot_type,
978        "state": snapshot.state
979    })))
980}
981
982// v0.2: Trigger manual compaction
983pub async fn trigger_compaction(
984    State(store): State<SharedStore>,
985) -> Result<Json<CompactionResult>> {
986    let compaction_manager = store.compaction_manager().ok_or_else(|| {
987        crate::error::AllSourceError::InternalError(
988            "Compaction not enabled (no Parquet storage)".to_string(),
989        )
990    })?;
991
992    tracing::info!("📦 Manual compaction triggered via API");
993
994    let result = compaction_manager.compact_now()?;
995
996    Ok(Json(result))
997}
998
999// v0.2: Get compaction statistics
1000pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
1001    let compaction_manager = store.compaction_manager().ok_or_else(|| {
1002        crate::error::AllSourceError::InternalError(
1003            "Compaction not enabled (no Parquet storage)".to_string(),
1004        )
1005    })?;
1006
1007    let stats = compaction_manager.stats();
1008    let config = compaction_manager.config();
1009
1010    Ok(Json(serde_json::json!({
1011        "stats": stats,
1012        "config": {
1013            "min_files_to_compact": config.min_files_to_compact,
1014            "target_file_size": config.target_file_size,
1015            "max_file_size": config.max_file_size,
1016            "small_file_threshold": config.small_file_threshold,
1017            "compaction_interval_seconds": config.compaction_interval_seconds,
1018            "auto_compact": config.auto_compact,
1019            "strategy": config.strategy
1020        }
1021    })))
1022}
1023
1024// v0.5: Register a new schema
1025pub async fn register_schema(
1026    State(store): State<SharedStore>,
1027    Json(req): Json<RegisterSchemaRequest>,
1028) -> Result<Json<RegisterSchemaResponse>> {
1029    let schema_registry = store.schema_registry();
1030
1031    let response =
1032        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
1033
1034    tracing::info!(
1035        "📋 Schema registered: v{} for '{}'",
1036        response.version,
1037        response.subject
1038    );
1039
1040    Ok(Json(response))
1041}
1042
1043// v0.5: Get a schema by subject and optional version
1044#[derive(Deserialize)]
1045pub struct GetSchemaParams {
1046    version: Option<u32>,
1047}
1048
1049pub async fn get_schema(
1050    State(store): State<SharedStore>,
1051    Path(subject): Path<String>,
1052    Query(params): Query<GetSchemaParams>,
1053) -> Result<Json<serde_json::Value>> {
1054    let schema_registry = store.schema_registry();
1055
1056    let schema = schema_registry.get_schema(&subject, params.version)?;
1057
1058    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1059
1060    Ok(Json(serde_json::json!({
1061        "id": schema.id,
1062        "subject": schema.subject,
1063        "version": schema.version,
1064        "schema": schema.schema,
1065        "created_at": schema.created_at,
1066        "description": schema.description,
1067        "tags": schema.tags
1068    })))
1069}
1070
1071// v0.5: List all versions of a schema subject
1072pub async fn list_schema_versions(
1073    State(store): State<SharedStore>,
1074    Path(subject): Path<String>,
1075) -> Result<Json<serde_json::Value>> {
1076    let schema_registry = store.schema_registry();
1077
1078    let versions = schema_registry.list_versions(&subject)?;
1079
1080    Ok(Json(serde_json::json!({
1081        "subject": subject,
1082        "versions": versions
1083    })))
1084}
1085
1086// v0.5: List all schema subjects
1087pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1088    let schema_registry = store.schema_registry();
1089
1090    let subjects = schema_registry.list_subjects();
1091
1092    Json(serde_json::json!({
1093        "subjects": subjects,
1094        "total": subjects.len()
1095    }))
1096}
1097
1098// v0.5: Validate an event against a schema
1099pub async fn validate_event_schema(
1100    State(store): State<SharedStore>,
1101    Json(req): Json<ValidateEventRequest>,
1102) -> Result<Json<ValidateEventResponse>> {
1103    let schema_registry = store.schema_registry();
1104
1105    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1106
1107    if response.valid {
1108        tracing::debug!(
1109            "✅ Event validated against schema '{}' v{}",
1110            req.subject,
1111            response.schema_version
1112        );
1113    } else {
1114        tracing::warn!(
1115            "❌ Event validation failed for '{}': {:?}",
1116            req.subject,
1117            response.errors
1118        );
1119    }
1120
1121    Ok(Json(response))
1122}
1123
1124// v0.5: Set compatibility mode for a subject
1125#[derive(Deserialize)]
1126pub struct SetCompatibilityRequest {
1127    compatibility: CompatibilityMode,
1128}
1129
1130pub async fn set_compatibility_mode(
1131    State(store): State<SharedStore>,
1132    Path(subject): Path<String>,
1133    Json(req): Json<SetCompatibilityRequest>,
1134) -> Json<serde_json::Value> {
1135    let schema_registry = store.schema_registry();
1136
1137    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1138
1139    tracing::info!(
1140        "🔧 Set compatibility mode for '{}' to {:?}",
1141        subject,
1142        req.compatibility
1143    );
1144
1145    Json(serde_json::json!({
1146        "subject": subject,
1147        "compatibility": req.compatibility
1148    }))
1149}
1150
1151// v0.5: Start a replay operation
1152pub async fn start_replay(
1153    State(store): State<SharedStore>,
1154    Json(req): Json<StartReplayRequest>,
1155) -> Result<Json<StartReplayResponse>> {
1156    let replay_manager = store.replay_manager();
1157
1158    let response = replay_manager.start_replay(store, req)?;
1159
1160    tracing::info!(
1161        "🔄 Started replay {} with {} events",
1162        response.replay_id,
1163        response.total_events
1164    );
1165
1166    Ok(Json(response))
1167}
1168
1169// v0.5: Get replay progress
1170pub async fn get_replay_progress(
1171    State(store): State<SharedStore>,
1172    Path(replay_id): Path<uuid::Uuid>,
1173) -> Result<Json<ReplayProgress>> {
1174    let replay_manager = store.replay_manager();
1175
1176    let progress = replay_manager.get_progress(replay_id)?;
1177
1178    Ok(Json(progress))
1179}
1180
1181// v0.5: List all replay operations
1182pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1183    let replay_manager = store.replay_manager();
1184
1185    let replays = replay_manager.list_replays();
1186
1187    Json(serde_json::json!({
1188        "replays": replays,
1189        "total": replays.len()
1190    }))
1191}
1192
1193// v0.5: Cancel a running replay
1194pub async fn cancel_replay(
1195    State(store): State<SharedStore>,
1196    Path(replay_id): Path<uuid::Uuid>,
1197) -> Result<Json<serde_json::Value>> {
1198    let replay_manager = store.replay_manager();
1199
1200    replay_manager.cancel_replay(replay_id)?;
1201
1202    tracing::info!("🛑 Cancelled replay {}", replay_id);
1203
1204    Ok(Json(serde_json::json!({
1205        "replay_id": replay_id,
1206        "status": "cancelled"
1207    })))
1208}
1209
1210// v0.5: Delete a completed replay
1211pub async fn delete_replay(
1212    State(store): State<SharedStore>,
1213    Path(replay_id): Path<uuid::Uuid>,
1214) -> Result<Json<serde_json::Value>> {
1215    let replay_manager = store.replay_manager();
1216
1217    let deleted = replay_manager.delete_replay(replay_id)?;
1218
1219    if deleted {
1220        tracing::info!("🗑️  Deleted replay {}", replay_id);
1221    }
1222
1223    Ok(Json(serde_json::json!({
1224        "replay_id": replay_id,
1225        "deleted": deleted
1226    })))
1227}
1228
1229// v0.5: Register a new pipeline
1230pub async fn register_pipeline(
1231    State(store): State<SharedStore>,
1232    Json(config): Json<PipelineConfig>,
1233) -> Result<Json<serde_json::Value>> {
1234    let pipeline_manager = store.pipeline_manager();
1235
1236    let pipeline_id = pipeline_manager.register(config.clone());
1237
1238    tracing::info!(
1239        "🔀 Pipeline registered: {} (name: {})",
1240        pipeline_id,
1241        config.name
1242    );
1243
1244    Ok(Json(serde_json::json!({
1245        "pipeline_id": pipeline_id,
1246        "name": config.name,
1247        "enabled": config.enabled
1248    })))
1249}
1250
1251// v0.5: List all pipelines
1252pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1253    let pipeline_manager = store.pipeline_manager();
1254
1255    let pipelines = pipeline_manager.list();
1256
1257    tracing::debug!("Listed {} pipelines", pipelines.len());
1258
1259    Json(serde_json::json!({
1260        "pipelines": pipelines,
1261        "total": pipelines.len()
1262    }))
1263}
1264
1265// v0.5: Get a specific pipeline
1266pub async fn get_pipeline(
1267    State(store): State<SharedStore>,
1268    Path(pipeline_id): Path<uuid::Uuid>,
1269) -> Result<Json<PipelineConfig>> {
1270    let pipeline_manager = store.pipeline_manager();
1271
1272    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1273        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1274    })?;
1275
1276    Ok(Json(pipeline.config().clone()))
1277}
1278
1279// v0.5: Remove a pipeline
1280pub async fn remove_pipeline(
1281    State(store): State<SharedStore>,
1282    Path(pipeline_id): Path<uuid::Uuid>,
1283) -> Result<Json<serde_json::Value>> {
1284    let pipeline_manager = store.pipeline_manager();
1285
1286    let removed = pipeline_manager.remove(pipeline_id);
1287
1288    if removed {
1289        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1290    }
1291
1292    Ok(Json(serde_json::json!({
1293        "pipeline_id": pipeline_id,
1294        "removed": removed
1295    })))
1296}
1297
1298// v0.5: Get statistics for all pipelines
1299pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1300    let pipeline_manager = store.pipeline_manager();
1301
1302    let stats = pipeline_manager.all_stats();
1303
1304    Json(serde_json::json!({
1305        "stats": stats,
1306        "total": stats.len()
1307    }))
1308}
1309
1310// v0.5: Get statistics for a specific pipeline
1311pub async fn get_pipeline_stats(
1312    State(store): State<SharedStore>,
1313    Path(pipeline_id): Path<uuid::Uuid>,
1314) -> Result<Json<PipelineStats>> {
1315    let pipeline_manager = store.pipeline_manager();
1316
1317    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1318        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1319    })?;
1320
1321    Ok(Json(pipeline.stats()))
1322}
1323
1324// v0.5: Reset a pipeline's state
1325pub async fn reset_pipeline(
1326    State(store): State<SharedStore>,
1327    Path(pipeline_id): Path<uuid::Uuid>,
1328) -> Result<Json<serde_json::Value>> {
1329    let pipeline_manager = store.pipeline_manager();
1330
1331    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1332        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1333    })?;
1334
1335    pipeline.reset();
1336
1337    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1338
1339    Ok(Json(serde_json::json!({
1340        "pipeline_id": pipeline_id,
1341        "reset": true
1342    })))
1343}
1344
1345// =============================================================================
1346// v0.11: Single Event Lookup by ID
1347// =============================================================================
1348
1349/// Get a single event by UUID
1350pub async fn get_event_by_id(
1351    State(store): State<SharedStore>,
1352    Path(event_id): Path<uuid::Uuid>,
1353) -> Result<Json<serde_json::Value>> {
1354    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1355        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1356    })?;
1357
1358    let dto = EventDto::from(&event);
1359
1360    tracing::debug!("Event retrieved by ID: {}", event_id);
1361
1362    Ok(Json(serde_json::json!({
1363        "event": dto,
1364        "found": true
1365    })))
1366}
1367
1368// =============================================================================
1369// v0.7: Projection State API for Query Service Integration
1370// =============================================================================
1371
1372/// List all registered projections
1373pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1374    let projection_manager = store.projection_manager();
1375    let status_map = store.projection_status();
1376
1377    let projections: Vec<serde_json::Value> = projection_manager
1378        .list_projections()
1379        .iter()
1380        .map(|(name, projection)| {
1381            let status = status_map
1382                .get(name)
1383                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1384            serde_json::json!({
1385                "name": name,
1386                "type": format!("{:?}", projection.name()),
1387                "status": status,
1388            })
1389        })
1390        .collect();
1391
1392    tracing::debug!("Listed {} projections", projections.len());
1393
1394    Json(serde_json::json!({
1395        "projections": projections,
1396        "total": projections.len()
1397    }))
1398}
1399
1400/// Get projection metadata by name
1401pub async fn get_projection(
1402    State(store): State<SharedStore>,
1403    Path(name): Path<String>,
1404) -> Result<Json<serde_json::Value>> {
1405    let projection_manager = store.projection_manager();
1406
1407    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1408        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1409    })?;
1410
1411    Ok(Json(serde_json::json!({
1412        "name": projection.name(),
1413        "found": true
1414    })))
1415}
1416
1417/// Get projection state for a specific entity.
1418///
1419/// Resolution order:
1420/// 1. **Registered projection** — if `name` is registered with the projection
1421///    manager, return the projection's own `get_state(entity_id)` output.
1422/// 2. **Projection state cache** — otherwise fall back to whatever was written
1423///    via `save_projection_state` / `bulk_save_projection_states`. This supports
1424///    SDK-managed projections (e.g. the Rust SDK's `ProjectionWorker`) that
1425///    compute state client-side and push it back without registering a
1426///    projection in Core's manager.
1427///
1428/// Returns `found: false` with `state: null` when neither source has state.
1429pub async fn get_projection_state(
1430    State(store): State<SharedStore>,
1431    Path((name, entity_id)): Path<(String, String)>,
1432) -> Result<Json<serde_json::Value>> {
1433    let state = store
1434        .projection_manager()
1435        .get_projection(&name)
1436        .and_then(|p| p.get_state(&entity_id))
1437        .or_else(|| {
1438            store
1439                .projection_state_cache()
1440                .get(&format!("{name}:{entity_id}"))
1441                .map(|entry| entry.value().clone())
1442        });
1443
1444    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1445
1446    Ok(Json(serde_json::json!({
1447        "projection": name,
1448        "entity_id": entity_id,
1449        "state": state,
1450        "found": state.is_some()
1451    })))
1452}
1453
1454/// Delete (clear) a projection by name
1455///
1456/// Removes all state from the projection. The projection definition remains
1457/// registered but its accumulated state is cleared.
1458pub async fn delete_projection(
1459    State(store): State<SharedStore>,
1460    Path(name): Path<String>,
1461) -> Result<Json<serde_json::Value>> {
1462    let projection_manager = store.projection_manager();
1463
1464    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1465        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1466    })?;
1467
1468    projection.clear();
1469
1470    // Also clear any cached state for this projection
1471    let cache = store.projection_state_cache();
1472    let prefix = format!("{name}:");
1473    let keys_to_remove: Vec<String> = cache
1474        .iter()
1475        .filter(|entry| entry.key().starts_with(&prefix))
1476        .map(|entry| entry.key().clone())
1477        .collect();
1478    for key in keys_to_remove {
1479        cache.remove(&key);
1480    }
1481
1482    tracing::info!("Projection deleted (cleared): {}", name);
1483
1484    Ok(Json(serde_json::json!({
1485        "projection": name,
1486        "deleted": true
1487    })))
1488}
1489
1490/// Get aggregate projection state (all entities).
1491///
1492/// Returns the cached states written via `save_projection_state` /
1493/// `bulk_save_projection_states`. The projection does NOT need to be
1494/// registered with the projection manager — this supports SDK-managed
1495/// projections that push state without server-side registration.
1496///
1497/// Returns an empty list when no state has been written.
1498pub async fn get_projection_state_summary(
1499    State(store): State<SharedStore>,
1500    Path(name): Path<String>,
1501) -> Result<Json<serde_json::Value>> {
1502    let cache = store.projection_state_cache();
1503    let prefix = format!("{name}:");
1504    let states: Vec<serde_json::Value> = cache
1505        .iter()
1506        .filter(|entry| entry.key().starts_with(&prefix))
1507        .map(|entry| {
1508            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1509            serde_json::json!({
1510                "entity_id": entity_id,
1511                "state": entry.value().clone()
1512            })
1513        })
1514        .collect();
1515
1516    let total = states.len();
1517
1518    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1519
1520    Ok(Json(serde_json::json!({
1521        "projection": name,
1522        "states": states,
1523        "total": total
1524    })))
1525}
1526
1527/// Reset a projection to its initial state
1528///
1529/// Clears all accumulated state and reprocesses events from the beginning.
1530pub async fn reset_projection(
1531    State(store): State<SharedStore>,
1532    Path(name): Path<String>,
1533) -> Result<Json<serde_json::Value>> {
1534    let reprocessed = store.reset_projection(&name)?;
1535
1536    tracing::info!(
1537        "Projection reset: {} ({} events reprocessed)",
1538        name,
1539        reprocessed
1540    );
1541
1542    Ok(Json(serde_json::json!({
1543        "projection": name,
1544        "reset": true,
1545        "events_reprocessed": reprocessed
1546    })))
1547}
1548
1549/// Pause a projection
1550///
1551/// Sets the projection status to "paused" so it stops processing new events.
1552pub async fn pause_projection(
1553    State(store): State<SharedStore>,
1554    Path(name): Path<String>,
1555) -> Result<Json<serde_json::Value>> {
1556    let projection_manager = store.projection_manager();
1557
1558    // Verify projection exists
1559    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1560        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1561    })?;
1562
1563    store
1564        .projection_status()
1565        .insert(name.clone(), "paused".to_string());
1566
1567    tracing::info!("Projection paused: {}", name);
1568
1569    Ok(Json(serde_json::json!({
1570        "projection": name,
1571        "status": "paused"
1572    })))
1573}
1574
1575/// Start (resume) a projection
1576///
1577/// Sets the projection status to "running" so it resumes processing events.
1578pub async fn start_projection(
1579    State(store): State<SharedStore>,
1580    Path(name): Path<String>,
1581) -> Result<Json<serde_json::Value>> {
1582    let projection_manager = store.projection_manager();
1583
1584    // Verify projection exists
1585    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1586        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1587    })?;
1588
1589    store
1590        .projection_status()
1591        .insert(name.clone(), "running".to_string());
1592
1593    tracing::info!("Projection started: {}", name);
1594
1595    Ok(Json(serde_json::json!({
1596        "projection": name,
1597        "status": "running"
1598    })))
1599}
1600
1601/// Request body for saving projection state
1602#[derive(Debug, Deserialize)]
1603pub struct SaveProjectionStateRequest {
1604    pub state: serde_json::Value,
1605}
1606
1607/// Save/update projection state for an entity
1608///
1609/// This endpoint allows external services (like Elixir Query Service) to
1610/// store computed projection state back to the Core for persistence.
1611pub async fn save_projection_state(
1612    State(store): State<SharedStore>,
1613    Path((name, entity_id)): Path<(String, String)>,
1614    Json(req): Json<SaveProjectionStateRequest>,
1615) -> Result<Json<serde_json::Value>> {
1616    let projection_cache = store.projection_state_cache();
1617
1618    // Store in the projection state cache
1619    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1620
1621    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1622
1623    Ok(Json(serde_json::json!({
1624        "projection": name,
1625        "entity_id": entity_id,
1626        "saved": true
1627    })))
1628}
1629
1630/// Bulk get projection states for multiple entities
1631///
1632/// Efficient endpoint for fetching multiple entity states in a single request.
1633#[derive(Debug, Deserialize)]
1634pub struct BulkGetStateRequest {
1635    pub entity_ids: Vec<String>,
1636}
1637
1638/// Bulk save projection states for multiple entities
1639///
1640/// Efficient endpoint for saving multiple entity states in a single request.
1641#[derive(Debug, Deserialize)]
1642pub struct BulkSaveStateRequest {
1643    pub states: Vec<BulkSaveStateItem>,
1644}
1645
1646#[derive(Debug, Deserialize)]
1647pub struct BulkSaveStateItem {
1648    pub entity_id: String,
1649    pub state: serde_json::Value,
1650}
1651
1652pub async fn bulk_get_projection_states(
1653    State(store): State<SharedStore>,
1654    Path(name): Path<String>,
1655    Json(req): Json<BulkGetStateRequest>,
1656) -> Result<Json<serde_json::Value>> {
1657    // Same fallback rule as `get_projection_state`: registered projection
1658    // wins, cache is the fallback. This lets SDK-managed projections read
1659    // their pushed-back state without registering in Core's projection manager.
1660    let projection = store.projection_manager().get_projection(&name);
1661    let cache = store.projection_state_cache();
1662
1663    let states: Vec<serde_json::Value> = req
1664        .entity_ids
1665        .iter()
1666        .map(|entity_id| {
1667            let state = projection
1668                .as_ref()
1669                .and_then(|p| p.get_state(entity_id))
1670                .or_else(|| {
1671                    cache
1672                        .get(&format!("{name}:{entity_id}"))
1673                        .map(|entry| entry.value().clone())
1674                });
1675            serde_json::json!({
1676                "entity_id": entity_id,
1677                "state": state,
1678                "found": state.is_some()
1679            })
1680        })
1681        .collect();
1682
1683    tracing::debug!(
1684        "Bulk projection state retrieved: {} entities from {}",
1685        states.len(),
1686        name
1687    );
1688
1689    Ok(Json(serde_json::json!({
1690        "projection": name,
1691        "states": states,
1692        "total": states.len()
1693    })))
1694}
1695
1696/// Bulk save projection states for multiple entities
1697///
1698/// This endpoint allows efficient batch saving of projection states,
1699/// critical for high-throughput event processing pipelines.
1700pub async fn bulk_save_projection_states(
1701    State(store): State<SharedStore>,
1702    Path(name): Path<String>,
1703    Json(req): Json<BulkSaveStateRequest>,
1704) -> Result<Json<serde_json::Value>> {
1705    let projection_cache = store.projection_state_cache();
1706
1707    let mut saved_count = 0;
1708    for item in &req.states {
1709        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1710        saved_count += 1;
1711    }
1712
1713    tracing::info!(
1714        "Bulk projection state saved: {} entities for {}",
1715        saved_count,
1716        name
1717    );
1718
1719    Ok(Json(serde_json::json!({
1720        "projection": name,
1721        "saved": saved_count,
1722        "total": req.states.len()
1723    })))
1724}
1725
1726// =============================================================================
1727// v0.11: Webhook Management API
1728// =============================================================================
1729
1730/// Query parameters for listing webhooks
1731#[derive(Debug, Deserialize)]
1732pub struct ListWebhooksParams {
1733    pub tenant_id: Option<String>,
1734}
1735
1736/// Register a new webhook subscription
1737pub async fn register_webhook(
1738    State(store): State<SharedStore>,
1739    Json(req): Json<RegisterWebhookRequest>,
1740) -> Json<serde_json::Value> {
1741    let registry = store.webhook_registry();
1742    let webhook = registry.register(req);
1743
1744    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1745
1746    Json(serde_json::json!({
1747        "webhook": webhook,
1748        "created": true
1749    }))
1750}
1751
1752/// List webhooks, optionally filtered by tenant_id
1753pub async fn list_webhooks(
1754    State(store): State<SharedStore>,
1755    Query(params): Query<ListWebhooksParams>,
1756) -> Json<serde_json::Value> {
1757    let registry = store.webhook_registry();
1758
1759    let webhooks = if let Some(tenant_id) = params.tenant_id {
1760        registry.list_by_tenant(&tenant_id)
1761    } else {
1762        // Without tenant filter, return empty (tenants should always filter)
1763        vec![]
1764    };
1765
1766    let total = webhooks.len();
1767
1768    Json(serde_json::json!({
1769        "webhooks": webhooks,
1770        "total": total
1771    }))
1772}
1773
1774/// Get a specific webhook by ID
1775pub async fn get_webhook(
1776    State(store): State<SharedStore>,
1777    Path(webhook_id): Path<uuid::Uuid>,
1778) -> Result<Json<serde_json::Value>> {
1779    let registry = store.webhook_registry();
1780
1781    let webhook = registry.get(webhook_id).ok_or_else(|| {
1782        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1783    })?;
1784
1785    Ok(Json(serde_json::json!({
1786        "webhook": webhook,
1787        "found": true
1788    })))
1789}
1790
1791/// Update a webhook subscription
1792pub async fn update_webhook(
1793    State(store): State<SharedStore>,
1794    Path(webhook_id): Path<uuid::Uuid>,
1795    Json(req): Json<UpdateWebhookRequest>,
1796) -> Result<Json<serde_json::Value>> {
1797    let registry = store.webhook_registry();
1798
1799    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1800        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1801    })?;
1802
1803    tracing::info!("Webhook updated: {}", webhook_id);
1804
1805    Ok(Json(serde_json::json!({
1806        "webhook": webhook,
1807        "updated": true
1808    })))
1809}
1810
1811/// Delete a webhook subscription
1812pub async fn delete_webhook(
1813    State(store): State<SharedStore>,
1814    Path(webhook_id): Path<uuid::Uuid>,
1815) -> Result<Json<serde_json::Value>> {
1816    let registry = store.webhook_registry();
1817
1818    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1819        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1820    })?;
1821
1822    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1823
1824    Ok(Json(serde_json::json!({
1825        "webhook_id": webhook_id,
1826        "deleted": true
1827    })))
1828}
1829
1830/// Query parameters for listing webhook deliveries
1831#[derive(Debug, Deserialize)]
1832pub struct ListDeliveriesParams {
1833    pub limit: Option<usize>,
1834}
1835
1836/// List delivery history for a webhook
1837pub async fn list_webhook_deliveries(
1838    State(store): State<SharedStore>,
1839    Path(webhook_id): Path<uuid::Uuid>,
1840    Query(params): Query<ListDeliveriesParams>,
1841) -> Result<Json<serde_json::Value>> {
1842    let registry = store.webhook_registry();
1843
1844    // Verify webhook exists
1845    registry.get(webhook_id).ok_or_else(|| {
1846        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1847    })?;
1848
1849    let limit = params.limit.unwrap_or(50);
1850    let deliveries = registry.get_deliveries(webhook_id, limit);
1851    let total = deliveries.len();
1852
1853    Ok(Json(serde_json::json!({
1854        "webhook_id": webhook_id,
1855        "deliveries": deliveries,
1856        "total": total
1857    })))
1858}
1859
1860// =============================================================================
1861// v2.0: Advanced Query Features
1862// =============================================================================
1863
1864/// EventQL: Execute SQL queries over events using DataFusion
1865#[cfg(feature = "analytics")]
1866pub async fn eventql_query(
1867    State(store): State<SharedStore>,
1868    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1869) -> Result<Json<serde_json::Value>> {
1870    let events = store.snapshot_events();
1871    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1872        Ok(response) => Ok(Json(serde_json::json!({
1873            "columns": response.columns,
1874            "rows": response.rows,
1875            "row_count": response.row_count,
1876        }))),
1877        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1878    }
1879}
1880
1881/// GraphQL: Execute GraphQL queries
1882pub async fn graphql_query(
1883    State(store): State<SharedStore>,
1884    Json(req): Json<GraphQLRequest>,
1885) -> Json<serde_json::Value> {
1886    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1887        Ok(f) => f,
1888        Err(e) => {
1889            return Json(
1890                serde_json::to_value(GraphQLResponse {
1891                    data: None,
1892                    errors: vec![GraphQLError { message: e }],
1893                })
1894                .unwrap(),
1895            );
1896        }
1897    };
1898
1899    let mut data = serde_json::Map::new();
1900    let mut errors = Vec::new();
1901
1902    for field in &fields {
1903        match field.name.as_str() {
1904            "events" => {
1905                let request = crate::application::dto::QueryEventsRequest {
1906                    entity_id: field.arguments.get("entity_id").cloned(),
1907                    event_type: field.arguments.get("event_type").cloned(),
1908                    tenant_id: field.arguments.get("tenant_id").cloned(),
1909                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1910                    as_of: None,
1911                    since: None,
1912                    until: None,
1913                    event_type_prefix: None,
1914                    payload_filter: None,
1915                };
1916                match store.query(&request) {
1917                    Ok(events) => {
1918                        let json_events: Vec<serde_json::Value> = events
1919                            .iter()
1920                            .map(|e| {
1921                                crate::infrastructure::query::graphql::event_to_json(
1922                                    e,
1923                                    &field.fields,
1924                                )
1925                            })
1926                            .collect();
1927                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1928                    }
1929                    Err(e) => errors.push(GraphQLError {
1930                        message: format!("events query failed: {e}"),
1931                    }),
1932                }
1933            }
1934            "event" => {
1935                if let Some(id_str) = field.arguments.get("id") {
1936                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1937                        match store.get_event_by_id(&id) {
1938                            Ok(Some(event)) => {
1939                                data.insert(
1940                                    "event".to_string(),
1941                                    crate::infrastructure::query::graphql::event_to_json(
1942                                        &event,
1943                                        &field.fields,
1944                                    ),
1945                                );
1946                            }
1947                            Ok(None) => {
1948                                data.insert("event".to_string(), serde_json::Value::Null);
1949                            }
1950                            Err(e) => errors.push(GraphQLError {
1951                                message: format!("event lookup failed: {e}"),
1952                            }),
1953                        }
1954                    } else {
1955                        errors.push(GraphQLError {
1956                            message: format!("Invalid UUID: {id_str}"),
1957                        });
1958                    }
1959                } else {
1960                    errors.push(GraphQLError {
1961                        message: "event query requires 'id' argument".to_string(),
1962                    });
1963                }
1964            }
1965            "projections" => {
1966                let pm = store.projection_manager();
1967                let names: Vec<serde_json::Value> = pm
1968                    .list_projections()
1969                    .iter()
1970                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1971                    .collect();
1972                data.insert("projections".to_string(), serde_json::Value::Array(names));
1973            }
1974            "stats" => {
1975                let stats = store.stats();
1976                data.insert(
1977                    "stats".to_string(),
1978                    serde_json::json!({
1979                        "total_events": stats.total_events,
1980                        "total_entities": stats.total_entities,
1981                        "total_event_types": stats.total_event_types,
1982                    }),
1983                );
1984            }
1985            "__schema" => {
1986                data.insert(
1987                    "__schema".to_string(),
1988                    crate::infrastructure::query::graphql::introspection_schema(),
1989                );
1990            }
1991            other => {
1992                errors.push(GraphQLError {
1993                    message: format!("Unknown field: {other}"),
1994                });
1995            }
1996        }
1997    }
1998
1999    Json(
2000        serde_json::to_value(GraphQLResponse {
2001            data: Some(serde_json::Value::Object(data)),
2002            errors,
2003        })
2004        .unwrap(),
2005    )
2006}
2007
2008/// Geospatial: Query events by location
2009pub async fn geo_query(
2010    State(store): State<SharedStore>,
2011    Json(req): Json<GeoQueryRequest>,
2012) -> Json<serde_json::Value> {
2013    let events = store.snapshot_events();
2014    let geo_index = store.geo_index();
2015    let results =
2016        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
2017    let total = results.len();
2018    Json(serde_json::json!({
2019        "results": results,
2020        "total": total,
2021    }))
2022}
2023
2024/// Geospatial index stats
2025pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2026    let stats = store.geo_index().stats();
2027    Json(serde_json::json!(stats))
2028}
2029
2030/// Exactly-once processing stats
2031pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2032    let stats = store.exactly_once().stats();
2033    Json(serde_json::json!(stats))
2034}
2035
2036/// Schema evolution history for an event type
2037pub async fn schema_evolution_history(
2038    State(store): State<SharedStore>,
2039    Path(event_type): Path<String>,
2040) -> Json<serde_json::Value> {
2041    let mgr = store.schema_evolution();
2042    let history = mgr.get_history(&event_type);
2043    let version = mgr.get_version(&event_type);
2044    Json(serde_json::json!({
2045        "event_type": event_type,
2046        "current_version": version,
2047        "history": history,
2048    }))
2049}
2050
2051/// Current inferred schema for an event type
2052pub async fn schema_evolution_schema(
2053    State(store): State<SharedStore>,
2054    Path(event_type): Path<String>,
2055) -> Json<serde_json::Value> {
2056    let mgr = store.schema_evolution();
2057    if let Some(schema) = mgr.get_schema(&event_type) {
2058        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2059        Json(serde_json::json!({
2060            "event_type": event_type,
2061            "version": mgr.get_version(&event_type),
2062            "inferred_schema": schema,
2063            "json_schema": json_schema,
2064        }))
2065    } else {
2066        Json(serde_json::json!({
2067            "event_type": event_type,
2068            "error": "No schema inferred for this event type"
2069        }))
2070    }
2071}
2072
2073/// Schema evolution stats
2074pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2075    let stats = store.schema_evolution().stats();
2076    let event_types = store.schema_evolution().list_event_types();
2077    Json(serde_json::json!({
2078        "stats": stats,
2079        "tracked_event_types": event_types,
2080    }))
2081}
2082
2083// =============================================================================
2084// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2085// =============================================================================
2086
2087/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2088#[cfg(feature = "embedded-sync")]
2089pub async fn sync_pull_handler(
2090    State(state): State<AppState>,
2091    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2092) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2093    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2094
2095    let store = &state.store;
2096
2097    // Compute "since" threshold from the client's version vector
2098    // We return all events the client hasn't seen yet
2099    let since = request
2100        .version_vector
2101        .values()
2102        .map(|ts| ts.physical_ms)
2103        .min()
2104        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2105
2106    let events = store.query(&crate::application::dto::QueryEventsRequest {
2107        entity_id: None,
2108        event_type: None,
2109        tenant_id: None,
2110        as_of: None,
2111        since,
2112        until: None,
2113        limit: None,
2114        event_type_prefix: None,
2115        payload_filter: None,
2116    })?;
2117
2118    // Convert domain events to ReplicatedEvent wire format
2119    let mut replicated = Vec::with_capacity(events.len());
2120    let mut last_ms = 0u64;
2121    let mut logical = 0u32;
2122
2123    for event in &events {
2124        let event_ms = event.timestamp().timestamp_millis() as u64;
2125        if event_ms == last_ms {
2126            logical += 1;
2127        } else {
2128            last_ms = event_ms;
2129            logical = 0;
2130        }
2131
2132        replicated.push(ReplicatedEvent {
2133            event_id: event.id().to_string(),
2134            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2135            origin_region: "server".to_string(),
2136            event_data: serde_json::json!({
2137                "event_type": event.event_type_str(),
2138                "entity_id": event.entity_id_str(),
2139                "tenant_id": event.tenant_id_str(),
2140                "payload": event.payload,
2141                "metadata": event.metadata,
2142            }),
2143        });
2144    }
2145
2146    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2147        events: replicated,
2148        version_vector: std::collections::BTreeMap::new(),
2149    }))
2150}
2151
2152/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2153#[cfg(feature = "embedded-sync")]
2154pub async fn sync_push_handler(
2155    State(state): State<AppState>,
2156    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2157) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2158    let store = &state.store;
2159
2160    let mut accepted = 0usize;
2161    let mut skipped = 0usize;
2162
2163    for rep_event in &request.events {
2164        let event_data = &rep_event.event_data;
2165        let event_type = event_data
2166            .get("event_type")
2167            .and_then(|v| v.as_str())
2168            .unwrap_or("unknown")
2169            .to_string();
2170        let entity_id = event_data
2171            .get("entity_id")
2172            .and_then(|v| v.as_str())
2173            .unwrap_or("unknown")
2174            .to_string();
2175        let tenant_id = event_data
2176            .get("tenant_id")
2177            .and_then(|v| v.as_str())
2178            .unwrap_or("default")
2179            .to_string();
2180        let payload = event_data
2181            .get("payload")
2182            .cloned()
2183            .unwrap_or(serde_json::json!({}));
2184        let metadata = event_data.get("metadata").cloned();
2185
2186        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2187            Ok(domain_event) => {
2188                store.ingest(&domain_event)?;
2189                accepted += 1;
2190            }
2191            Err(_) => {
2192                skipped += 1;
2193            }
2194        }
2195    }
2196
2197    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2198        accepted,
2199        skipped,
2200        version_vector: std::collections::BTreeMap::new(),
2201    }))
2202}
2203
2204// =============================================================================
2205// Consumer endpoints for durable subscriptions (v0.14)
2206// =============================================================================
2207
2208/// POST /api/v1/consumers — Register a durable consumer
2209pub async fn register_consumer(
2210    State(store): State<SharedStore>,
2211    Json(req): Json<RegisterConsumerRequest>,
2212) -> Result<Json<ConsumerResponse>> {
2213    let consumer = store
2214        .consumer_registry()
2215        .register(&req.consumer_id, &req.event_type_filters);
2216
2217    Ok(Json(ConsumerResponse {
2218        consumer_id: consumer.consumer_id,
2219        event_type_filters: consumer.event_type_filters,
2220        cursor_position: consumer.cursor_position,
2221    }))
2222}
2223
2224/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2225pub async fn get_consumer(
2226    State(store): State<SharedStore>,
2227    Path(consumer_id): Path<String>,
2228) -> Result<Json<ConsumerResponse>> {
2229    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2230
2231    Ok(Json(ConsumerResponse {
2232        consumer_id: consumer.consumer_id,
2233        event_type_filters: consumer.event_type_filters,
2234        cursor_position: consumer.cursor_position,
2235    }))
2236}
2237
2238/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2239#[derive(Debug, Deserialize)]
2240pub struct ConsumerPollQuery {
2241    pub limit: Option<usize>,
2242}
2243
2244pub async fn poll_consumer_events(
2245    State(store): State<SharedStore>,
2246    Path(consumer_id): Path<String>,
2247    Query(query): Query<ConsumerPollQuery>,
2248) -> Result<Json<ConsumerEventsResponse>> {
2249    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2250    let offset = consumer.cursor_position.unwrap_or(0);
2251    let limit = query.limit.unwrap_or(100);
2252
2253    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2254    let count = events.len();
2255
2256    let consumer_events: Vec<ConsumerEventDto> = events
2257        .into_iter()
2258        .map(|(position, event)| ConsumerEventDto {
2259            position,
2260            event: EventDto::from(&event),
2261        })
2262        .collect();
2263
2264    Ok(Json(ConsumerEventsResponse {
2265        events: consumer_events,
2266        count,
2267    }))
2268}
2269
2270/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2271pub async fn ack_consumer(
2272    State(store): State<SharedStore>,
2273    Path(consumer_id): Path<String>,
2274    Json(req): Json<AckRequest>,
2275) -> Result<Json<serde_json::Value>> {
2276    let max_offset = store.total_events() as u64;
2277
2278    store
2279        .consumer_registry()
2280        .ack(&consumer_id, req.position, max_offset)
2281        .map_err(crate::error::AllSourceError::InvalidInput)?;
2282
2283    Ok(Json(serde_json::json!({
2284        "status": "ok",
2285        "consumer_id": consumer_id,
2286        "position": req.position,
2287    })))
2288}
2289
2290#[cfg(test)]
2291mod tests {
2292    use super::*;
2293    use crate::{domain::entities::Event, store::EventStore};
2294
2295    fn create_test_store() -> Arc<EventStore> {
2296        Arc::new(EventStore::new())
2297    }
2298
2299    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2300        Event::from_strings(
2301            event_type.to_string(),
2302            entity_id.to_string(),
2303            "test-stream".to_string(),
2304            serde_json::json!({
2305                "name": "Test",
2306                "value": 42
2307            }),
2308            None,
2309        )
2310        .unwrap()
2311    }
2312
2313    #[tokio::test]
2314    async fn test_query_events_has_more_and_total_count() {
2315        let store = create_test_store();
2316
2317        // Ingest 50 events
2318        for i in 0..50 {
2319            store
2320                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2321                .unwrap();
2322        }
2323
2324        // Query with limit=10 — should get has_more=true, total_count=50
2325        let req = QueryEventsRequest {
2326            entity_id: None,
2327            event_type: None,
2328            tenant_id: None,
2329            as_of: None,
2330            since: None,
2331            until: None,
2332            limit: Some(10),
2333            event_type_prefix: None,
2334            payload_filter: None,
2335        };
2336
2337        let requested_limit = req.limit;
2338        let unlimited_req = QueryEventsRequest {
2339            limit: None,
2340            ..QueryEventsRequest {
2341                entity_id: req.entity_id,
2342                event_type: req.event_type,
2343                tenant_id: req.tenant_id,
2344                as_of: req.as_of,
2345                since: req.since,
2346                until: req.until,
2347                limit: None,
2348                event_type_prefix: req.event_type_prefix,
2349                payload_filter: req.payload_filter,
2350            }
2351        };
2352        let all_events = store.query(&unlimited_req).unwrap();
2353        let total_count = all_events.len();
2354        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2355            all_events.into_iter().take(limit).collect()
2356        } else {
2357            all_events
2358        };
2359        let count = limited_events.len();
2360        let has_more = count < total_count;
2361
2362        assert_eq!(count, 10);
2363        assert_eq!(total_count, 50);
2364        assert!(has_more);
2365    }
2366
2367    #[tokio::test]
2368    async fn test_query_events_no_more_results() {
2369        let store = create_test_store();
2370
2371        // Ingest 5 events
2372        for i in 0..5 {
2373            store
2374                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2375                .unwrap();
2376        }
2377
2378        // Query with limit=100 — should get has_more=false, total_count=5
2379        let all_events = store
2380            .query(&QueryEventsRequest {
2381                entity_id: None,
2382                event_type: None,
2383                tenant_id: None,
2384                as_of: None,
2385                since: None,
2386                until: None,
2387                limit: None,
2388                event_type_prefix: None,
2389                payload_filter: None,
2390            })
2391            .unwrap();
2392        let total_count = all_events.len();
2393        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2394        let count = limited_events.len();
2395        let has_more = count < total_count;
2396
2397        assert_eq!(count, 5);
2398        assert_eq!(total_count, 5);
2399        assert!(!has_more);
2400    }
2401
2402    // Regression test for issue #177: `order=desc` + `limit=1` must return
2403    // the NEWEST event for an entity, not the oldest. Mirrors the ordering
2404    // logic in `query_events` (store.query → reverse-if-desc → take(limit)).
2405    #[tokio::test]
2406    async fn test_query_events_order_desc_returns_latest() {
2407        let store = create_test_store();
2408
2409        // Three events for the same entity with strictly increasing
2410        // timestamps — mimics a backfill appending a corrected event.
2411        let base = chrono::Utc::now();
2412        for i in 0..3i64 {
2413            let mut event = create_test_event("org-1", "auth.org.updated");
2414            event.timestamp = base + chrono::Duration::seconds(i);
2415            event.version = i + 1;
2416            store.ingest(&event).unwrap();
2417        }
2418
2419        let req = QueryEventsRequest {
2420            entity_id: Some("org-1".to_string()),
2421            ..Default::default()
2422        };
2423        let ascending = store.query(&req).unwrap();
2424        assert_eq!(ascending.len(), 3);
2425        // Ascending order: oldest timestamp first.
2426        assert!(ascending[0].timestamp < ascending[1].timestamp);
2427        assert!(ascending[1].timestamp < ascending[2].timestamp);
2428        let oldest_ts = ascending[0].timestamp;
2429        let newest_ts = ascending[2].timestamp;
2430
2431        // `order=desc`: handler reverses, then `limit=1` takes the front.
2432        let mut descending = ascending.clone();
2433        descending.reverse();
2434        let latest: Vec<Event> = descending.into_iter().take(1).collect();
2435        assert_eq!(latest.len(), 1);
2436        assert_eq!(
2437            latest[0].timestamp, newest_ts,
2438            "order=desc&limit=1 must yield the newest event"
2439        );
2440
2441        // Default (ascending) + limit=1 still yields the oldest event.
2442        let oldest: Vec<Event> = ascending.into_iter().take(1).collect();
2443        assert_eq!(oldest[0].timestamp, oldest_ts);
2444    }
2445
2446    #[tokio::test]
2447    async fn test_list_entities_by_type_prefix() {
2448        let store = create_test_store();
2449
2450        // 3 index entities
2451        store
2452            .ingest(&create_test_event("idx-1", "index.created"))
2453            .unwrap();
2454        store
2455            .ingest(&create_test_event("idx-1", "index.updated"))
2456            .unwrap();
2457        store
2458            .ingest(&create_test_event("idx-2", "index.created"))
2459            .unwrap();
2460        store
2461            .ingest(&create_test_event("idx-3", "index.created"))
2462            .unwrap();
2463        // 2 trade entities
2464        store
2465            .ingest(&create_test_event("trade-1", "trade.created"))
2466            .unwrap();
2467        store
2468            .ingest(&create_test_event("trade-2", "trade.created"))
2469            .unwrap();
2470
2471        // List entities for index.*
2472        let req = ListEntitiesRequest {
2473            event_type_prefix: Some("index.".to_string()),
2474            payload_filter: None,
2475            limit: None,
2476            offset: None,
2477        };
2478        let query_req = QueryEventsRequest {
2479            entity_id: None,
2480            event_type: None,
2481            tenant_id: None,
2482            as_of: None,
2483            since: None,
2484            until: None,
2485            limit: None,
2486            event_type_prefix: req.event_type_prefix,
2487            payload_filter: req.payload_filter,
2488        };
2489        let events = store.query(&query_req).unwrap();
2490
2491        // Group and verify
2492        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2493            std::collections::HashMap::new();
2494        for event in &events {
2495            entity_map
2496                .entry(event.entity_id().to_string())
2497                .or_default()
2498                .push(event);
2499        }
2500
2501        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2502        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2503        assert_eq!(entity_map["idx-2"].len(), 1);
2504        assert_eq!(entity_map["idx-3"].len(), 1);
2505    }
2506
2507    fn create_test_event_with_payload(
2508        entity_id: &str,
2509        event_type: &str,
2510        payload: serde_json::Value,
2511    ) -> Event {
2512        Event::from_strings(
2513            event_type.to_string(),
2514            entity_id.to_string(),
2515            "test-stream".to_string(),
2516            payload,
2517            None,
2518        )
2519        .unwrap()
2520    }
2521
2522    #[tokio::test]
2523    async fn test_detect_duplicates_by_payload_fields() {
2524        let store = create_test_store();
2525
2526        // Create entities with duplicate "name" field values
2527        store
2528            .ingest(&create_test_event_with_payload(
2529                "idx-1",
2530                "index.created",
2531                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2532            ))
2533            .unwrap();
2534        store
2535            .ingest(&create_test_event_with_payload(
2536                "idx-2",
2537                "index.created",
2538                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2539            ))
2540            .unwrap();
2541        store
2542            .ingest(&create_test_event_with_payload(
2543                "idx-3",
2544                "index.created",
2545                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2546            ))
2547            .unwrap();
2548        store
2549            .ingest(&create_test_event_with_payload(
2550                "idx-4",
2551                "index.created",
2552                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2553            ))
2554            .unwrap();
2555        store
2556            .ingest(&create_test_event_with_payload(
2557                "idx-5",
2558                "index.created",
2559                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2560            ))
2561            .unwrap();
2562
2563        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2564        let query_req = QueryEventsRequest {
2565            entity_id: None,
2566            event_type: None,
2567            tenant_id: None,
2568            as_of: None,
2569            since: None,
2570            until: None,
2571            limit: None,
2572            event_type_prefix: Some("index.".to_string()),
2573            payload_filter: None,
2574        };
2575        let events = store.query(&query_req).unwrap();
2576
2577        // Manually replicate the handler logic for testing
2578        let group_by_fields = vec!["name"];
2579        let mut entity_latest: std::collections::HashMap<String, &Event> =
2580            std::collections::HashMap::new();
2581        for event in &events {
2582            let eid = event.entity_id().to_string();
2583            entity_latest
2584                .entry(eid)
2585                .and_modify(|existing| {
2586                    if event.timestamp() > existing.timestamp() {
2587                        *existing = event;
2588                    }
2589                })
2590                .or_insert(event);
2591        }
2592
2593        let mut groups: std::collections::HashMap<String, Vec<String>> =
2594            std::collections::HashMap::new();
2595        for (entity_id, event) in &entity_latest {
2596            let payload = event.payload();
2597            let mut key_parts = serde_json::Map::new();
2598            for field in &group_by_fields {
2599                let value = payload
2600                    .get(*field)
2601                    .cloned()
2602                    .unwrap_or(serde_json::Value::Null);
2603                key_parts.insert((*field).to_string(), value);
2604            }
2605            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2606            groups.entry(key_str).or_default().push(entity_id.clone());
2607        }
2608
2609        let duplicate_groups: Vec<_> = groups
2610            .into_iter()
2611            .filter(|(_, ids)| ids.len() > 1)
2612            .collect();
2613
2614        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2615        for (_, ids) in &duplicate_groups {
2616            assert_eq!(ids.len(), 2);
2617        }
2618    }
2619
2620    #[tokio::test]
2621    async fn test_detect_duplicates_no_duplicates() {
2622        let store = create_test_store();
2623
2624        // All unique names
2625        store
2626            .ingest(&create_test_event_with_payload(
2627                "idx-1",
2628                "index.created",
2629                serde_json::json!({"name": "A"}),
2630            ))
2631            .unwrap();
2632        store
2633            .ingest(&create_test_event_with_payload(
2634                "idx-2",
2635                "index.created",
2636                serde_json::json!({"name": "B"}),
2637            ))
2638            .unwrap();
2639
2640        let query_req = QueryEventsRequest {
2641            entity_id: None,
2642            event_type: None,
2643            tenant_id: None,
2644            as_of: None,
2645            since: None,
2646            until: None,
2647            limit: None,
2648            event_type_prefix: Some("index.".to_string()),
2649            payload_filter: None,
2650        };
2651        let events = store.query(&query_req).unwrap();
2652
2653        let mut entity_latest: std::collections::HashMap<String, &Event> =
2654            std::collections::HashMap::new();
2655        for event in &events {
2656            entity_latest
2657                .entry(event.entity_id().to_string())
2658                .or_insert(event);
2659        }
2660
2661        let mut groups: std::collections::HashMap<String, Vec<String>> =
2662            std::collections::HashMap::new();
2663        for (entity_id, event) in &entity_latest {
2664            let key_str =
2665                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2666                    .unwrap();
2667            groups.entry(key_str).or_default().push(entity_id.clone());
2668        }
2669
2670        let duplicate_groups: Vec<_> = groups
2671            .into_iter()
2672            .filter(|(_, ids)| ids.len() > 1)
2673            .collect();
2674
2675        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2676    }
2677
2678    #[tokio::test]
2679    async fn test_detect_duplicates_multi_field_group_by() {
2680        let store = create_test_store();
2681
2682        // Two entities with same name AND user_id = true duplicate
2683        store
2684            .ingest(&create_test_event_with_payload(
2685                "idx-1",
2686                "index.created",
2687                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2688            ))
2689            .unwrap();
2690        store
2691            .ingest(&create_test_event_with_payload(
2692                "idx-2",
2693                "index.created",
2694                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2695            ))
2696            .unwrap();
2697        // Same name but different user_id = NOT a duplicate in multi-field group
2698        store
2699            .ingest(&create_test_event_with_payload(
2700                "idx-3",
2701                "index.created",
2702                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2703            ))
2704            .unwrap();
2705
2706        let query_req = QueryEventsRequest {
2707            entity_id: None,
2708            event_type: None,
2709            tenant_id: None,
2710            as_of: None,
2711            since: None,
2712            until: None,
2713            limit: None,
2714            event_type_prefix: Some("index.".to_string()),
2715            payload_filter: None,
2716        };
2717        let events = store.query(&query_req).unwrap();
2718
2719        let group_by_fields = vec!["name", "user_id"];
2720        let mut entity_latest: std::collections::HashMap<String, &Event> =
2721            std::collections::HashMap::new();
2722        for event in &events {
2723            entity_latest
2724                .entry(event.entity_id().to_string())
2725                .and_modify(|existing| {
2726                    if event.timestamp() > existing.timestamp() {
2727                        *existing = event;
2728                    }
2729                })
2730                .or_insert(event);
2731        }
2732
2733        let mut groups: std::collections::HashMap<String, Vec<String>> =
2734            std::collections::HashMap::new();
2735        for (entity_id, event) in &entity_latest {
2736            let payload = event.payload();
2737            let mut key_parts = serde_json::Map::new();
2738            for field in &group_by_fields {
2739                let value = payload
2740                    .get(*field)
2741                    .cloned()
2742                    .unwrap_or(serde_json::Value::Null);
2743                key_parts.insert((*field).to_string(), value);
2744            }
2745            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2746            groups.entry(key_str).or_default().push(entity_id.clone());
2747        }
2748
2749        let duplicate_groups: Vec<_> = groups
2750            .into_iter()
2751            .filter(|(_, ids)| ids.len() > 1)
2752            .collect();
2753
2754        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2755        assert_eq!(duplicate_groups.len(), 1);
2756        let (_, ref ids) = duplicate_groups[0];
2757        assert_eq!(ids.len(), 2);
2758        let mut sorted_ids = ids.clone();
2759        sorted_ids.sort();
2760        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2761    }
2762
2763    #[tokio::test]
2764    async fn test_projection_state_cache() {
2765        let store = create_test_store();
2766
2767        // Test cache insertion
2768        let cache = store.projection_state_cache();
2769        cache.insert(
2770            "entity_snapshots:user-123".to_string(),
2771            serde_json::json!({"name": "Test User", "age": 30}),
2772        );
2773
2774        // Test cache retrieval
2775        let state = cache.get("entity_snapshots:user-123");
2776        assert!(state.is_some());
2777        let state = state.unwrap();
2778        assert_eq!(state["name"], "Test User");
2779        assert_eq!(state["age"], 30);
2780    }
2781
2782    #[tokio::test]
2783    async fn test_projection_manager_list_projections() {
2784        let store = create_test_store();
2785
2786        // List projections (built-in projections should be available)
2787        let projection_manager = store.projection_manager();
2788        let projections = projection_manager.list_projections();
2789
2790        // Should have entity_snapshots and event_counters
2791        assert!(projections.len() >= 2);
2792
2793        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2794        assert!(names.contains(&"entity_snapshots"));
2795        assert!(names.contains(&"event_counters"));
2796    }
2797
2798    #[tokio::test]
2799    async fn test_projection_state_after_event_ingestion() {
2800        let store = create_test_store();
2801
2802        // Ingest an event
2803        let event = create_test_event("user-456", "user.created");
2804        store.ingest(&event).unwrap();
2805
2806        // Get projection state
2807        let projection_manager = store.projection_manager();
2808        let snapshot_projection = projection_manager
2809            .get_projection("entity_snapshots")
2810            .unwrap();
2811
2812        let state = snapshot_projection.get_state("user-456");
2813        assert!(state.is_some());
2814        let state = state.unwrap();
2815        assert_eq!(state["name"], "Test");
2816        assert_eq!(state["value"], 42);
2817    }
2818
2819    #[tokio::test]
2820    async fn test_projection_state_cache_multiple_entities() {
2821        let store = create_test_store();
2822        let cache = store.projection_state_cache();
2823
2824        // Insert multiple entities
2825        for i in 0..10 {
2826            cache.insert(
2827                format!("entity_snapshots:entity-{i}"),
2828                serde_json::json!({"id": i, "status": "active"}),
2829            );
2830        }
2831
2832        // Verify all insertions
2833        assert_eq!(cache.len(), 10);
2834
2835        // Verify each entity
2836        for i in 0..10 {
2837            let key = format!("entity_snapshots:entity-{i}");
2838            let state = cache.get(&key);
2839            assert!(state.is_some());
2840            assert_eq!(state.unwrap()["id"], i);
2841        }
2842    }
2843
2844    #[tokio::test]
2845    async fn test_projection_state_update() {
2846        let store = create_test_store();
2847        let cache = store.projection_state_cache();
2848
2849        // Initial state
2850        cache.insert(
2851            "entity_snapshots:user-789".to_string(),
2852            serde_json::json!({"balance": 100}),
2853        );
2854
2855        // Update state
2856        cache.insert(
2857            "entity_snapshots:user-789".to_string(),
2858            serde_json::json!({"balance": 150}),
2859        );
2860
2861        // Verify update
2862        let state = cache.get("entity_snapshots:user-789").unwrap();
2863        assert_eq!(state["balance"], 150);
2864    }
2865
2866    #[tokio::test]
2867    async fn test_event_counter_projection() {
2868        let store = create_test_store();
2869
2870        // Ingest events of different types
2871        store
2872            .ingest(&create_test_event("user-1", "user.created"))
2873            .unwrap();
2874        store
2875            .ingest(&create_test_event("user-2", "user.created"))
2876            .unwrap();
2877        store
2878            .ingest(&create_test_event("user-1", "user.updated"))
2879            .unwrap();
2880
2881        // Get event counter projection
2882        let projection_manager = store.projection_manager();
2883        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2884
2885        // Check counts
2886        let created_state = counter_projection.get_state("user.created");
2887        assert!(created_state.is_some());
2888        assert_eq!(created_state.unwrap()["count"], 2);
2889
2890        let updated_state = counter_projection.get_state("user.updated");
2891        assert!(updated_state.is_some());
2892        assert_eq!(updated_state.unwrap()["count"], 1);
2893    }
2894
2895    #[tokio::test]
2896    async fn test_projection_state_cache_key_format() {
2897        let store = create_test_store();
2898        let cache = store.projection_state_cache();
2899
2900        // Test standard key format: {projection_name}:{entity_id}
2901        let key = "orders:order-12345".to_string();
2902        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2903
2904        let state = cache.get(&key).unwrap();
2905        assert_eq!(state["total"], 99.99);
2906    }
2907
2908    #[tokio::test]
2909    async fn test_projection_state_cache_removal() {
2910        let store = create_test_store();
2911        let cache = store.projection_state_cache();
2912
2913        // Insert and then remove
2914        cache.insert(
2915            "test:entity-1".to_string(),
2916            serde_json::json!({"data": "value"}),
2917        );
2918        assert_eq!(cache.len(), 1);
2919
2920        cache.remove("test:entity-1");
2921        assert_eq!(cache.len(), 0);
2922        assert!(cache.get("test:entity-1").is_none());
2923    }
2924
2925    #[tokio::test]
2926    async fn test_get_nonexistent_projection() {
2927        let store = create_test_store();
2928        let projection_manager = store.projection_manager();
2929
2930        // Requesting a non-existent projection should return None
2931        let projection = projection_manager.get_projection("nonexistent_projection");
2932        assert!(projection.is_none());
2933    }
2934
2935    #[tokio::test]
2936    async fn test_get_nonexistent_entity_state() {
2937        let store = create_test_store();
2938        let projection_manager = store.projection_manager();
2939
2940        // Get state for non-existent entity
2941        let snapshot_projection = projection_manager
2942            .get_projection("entity_snapshots")
2943            .unwrap();
2944        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2945        assert!(state.is_none());
2946    }
2947
2948    #[tokio::test]
2949    async fn test_projection_state_cache_concurrent_access() {
2950        let store = create_test_store();
2951        let cache = store.projection_state_cache();
2952
2953        // Simulate concurrent writes
2954        let handles: Vec<_> = (0..10)
2955            .map(|i| {
2956                let cache_clone = cache.clone();
2957                tokio::spawn(async move {
2958                    cache_clone.insert(
2959                        format!("concurrent:entity-{i}"),
2960                        serde_json::json!({"thread": i}),
2961                    );
2962                })
2963            })
2964            .collect();
2965
2966        for handle in handles {
2967            handle.await.unwrap();
2968        }
2969
2970        // All 10 entries should be present
2971        assert_eq!(cache.len(), 10);
2972    }
2973
2974    #[tokio::test]
2975    async fn test_projection_state_large_payload() {
2976        let store = create_test_store();
2977        let cache = store.projection_state_cache();
2978
2979        // Create a large JSON payload (~10KB)
2980        let large_array: Vec<serde_json::Value> = (0..1000)
2981            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2982            .collect();
2983
2984        cache.insert(
2985            "large:entity-1".to_string(),
2986            serde_json::json!({"items": large_array}),
2987        );
2988
2989        let state = cache.get("large:entity-1").unwrap();
2990        let items = state["items"].as_array().unwrap();
2991        assert_eq!(items.len(), 1000);
2992    }
2993
2994    #[tokio::test]
2995    async fn test_projection_state_complex_json() {
2996        let store = create_test_store();
2997        let cache = store.projection_state_cache();
2998
2999        // Complex nested JSON structure
3000        let complex_state = serde_json::json!({
3001            "user": {
3002                "id": "user-123",
3003                "profile": {
3004                    "name": "John Doe",
3005                    "email": "john@example.com",
3006                    "settings": {
3007                        "theme": "dark",
3008                        "notifications": true
3009                    }
3010                },
3011                "roles": ["admin", "user"],
3012                "metadata": {
3013                    "created_at": "2025-01-01T00:00:00Z",
3014                    "last_login": null
3015                }
3016            }
3017        });
3018
3019        cache.insert("complex:user-123".to_string(), complex_state);
3020
3021        let state = cache.get("complex:user-123").unwrap();
3022        assert_eq!(state["user"]["profile"]["name"], "John Doe");
3023        assert_eq!(state["user"]["roles"][0], "admin");
3024        assert!(state["user"]["metadata"]["last_login"].is_null());
3025    }
3026
3027    #[tokio::test]
3028    async fn test_projection_state_cache_iteration() {
3029        let store = create_test_store();
3030        let cache = store.projection_state_cache();
3031
3032        // Insert entries
3033        for i in 0..5 {
3034            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
3035        }
3036
3037        // Iterate over all entries
3038        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
3039        assert_eq!(entries.len(), 5);
3040    }
3041
3042    #[tokio::test]
3043    async fn test_projection_manager_get_entity_snapshots() {
3044        let store = create_test_store();
3045        let projection_manager = store.projection_manager();
3046
3047        // Get entity_snapshots projection specifically
3048        let projection = projection_manager.get_projection("entity_snapshots");
3049        assert!(projection.is_some());
3050        assert_eq!(projection.unwrap().name(), "entity_snapshots");
3051    }
3052
3053    #[tokio::test]
3054    async fn test_projection_manager_get_event_counters() {
3055        let store = create_test_store();
3056        let projection_manager = store.projection_manager();
3057
3058        // Get event_counters projection specifically
3059        let projection = projection_manager.get_projection("event_counters");
3060        assert!(projection.is_some());
3061        assert_eq!(projection.unwrap().name(), "event_counters");
3062    }
3063
3064    #[tokio::test]
3065    async fn test_projection_state_cache_overwrite() {
3066        let store = create_test_store();
3067        let cache = store.projection_state_cache();
3068
3069        // Initial value
3070        cache.insert(
3071            "overwrite:entity-1".to_string(),
3072            serde_json::json!({"version": 1}),
3073        );
3074
3075        // Overwrite with new value
3076        cache.insert(
3077            "overwrite:entity-1".to_string(),
3078            serde_json::json!({"version": 2}),
3079        );
3080
3081        // Overwrite again
3082        cache.insert(
3083            "overwrite:entity-1".to_string(),
3084            serde_json::json!({"version": 3}),
3085        );
3086
3087        let state = cache.get("overwrite:entity-1").unwrap();
3088        assert_eq!(state["version"], 3);
3089
3090        // Should still be only 1 entry
3091        assert_eq!(cache.len(), 1);
3092    }
3093
3094    #[tokio::test]
3095    async fn test_projection_state_multiple_projections() {
3096        let store = create_test_store();
3097        let cache = store.projection_state_cache();
3098
3099        // Store states for different projections
3100        cache.insert(
3101            "entity_snapshots:user-1".to_string(),
3102            serde_json::json!({"name": "Alice"}),
3103        );
3104        cache.insert(
3105            "event_counters:user.created".to_string(),
3106            serde_json::json!({"count": 5}),
3107        );
3108        cache.insert(
3109            "custom_projection:order-1".to_string(),
3110            serde_json::json!({"total": 150.0}),
3111        );
3112
3113        // Verify each projection's state
3114        assert_eq!(
3115            cache.get("entity_snapshots:user-1").unwrap()["name"],
3116            "Alice"
3117        );
3118        assert_eq!(
3119            cache.get("event_counters:user.created").unwrap()["count"],
3120            5
3121        );
3122        assert_eq!(
3123            cache.get("custom_projection:order-1").unwrap()["total"],
3124            150.0
3125        );
3126    }
3127
3128    #[tokio::test]
3129    async fn test_bulk_projection_state_access() {
3130        let store = create_test_store();
3131
3132        // Ingest multiple events for different entities
3133        for i in 0..5 {
3134            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3135            store.ingest(&event).unwrap();
3136        }
3137
3138        // Get projection and verify bulk access
3139        let projection_manager = store.projection_manager();
3140        let snapshot_projection = projection_manager
3141            .get_projection("entity_snapshots")
3142            .unwrap();
3143
3144        // Verify we can access all entities
3145        for i in 0..5 {
3146            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3147            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3148        }
3149    }
3150
3151    #[tokio::test]
3152    async fn test_bulk_save_projection_states() {
3153        let store = create_test_store();
3154        let cache = store.projection_state_cache();
3155
3156        // Simulate bulk save request
3157        let states = vec![
3158            BulkSaveStateItem {
3159                entity_id: "bulk-entity-1".to_string(),
3160                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3161            },
3162            BulkSaveStateItem {
3163                entity_id: "bulk-entity-2".to_string(),
3164                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3165            },
3166            BulkSaveStateItem {
3167                entity_id: "bulk-entity-3".to_string(),
3168                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3169            },
3170        ];
3171
3172        let projection_name = "test_projection";
3173
3174        // Save states to cache (simulating bulk_save_projection_states handler)
3175        for item in &states {
3176            cache.insert(
3177                format!("{projection_name}:{}", item.entity_id),
3178                item.state.clone(),
3179            );
3180        }
3181
3182        // Verify all states were saved
3183        assert_eq!(cache.len(), 3);
3184
3185        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3186        assert_eq!(state1["name"], "Entity 1");
3187        assert_eq!(state1["value"], 100);
3188
3189        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3190        assert_eq!(state2["name"], "Entity 2");
3191        assert_eq!(state2["value"], 200);
3192
3193        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3194        assert_eq!(state3["name"], "Entity 3");
3195        assert_eq!(state3["value"], 300);
3196    }
3197
3198    #[tokio::test]
3199    async fn test_bulk_save_empty_states() {
3200        let store = create_test_store();
3201        let cache = store.projection_state_cache();
3202
3203        // Clear cache
3204        cache.clear();
3205
3206        // Empty states should work fine
3207        let states: Vec<BulkSaveStateItem> = vec![];
3208        assert_eq!(states.len(), 0);
3209
3210        // Cache should remain empty
3211        assert_eq!(cache.len(), 0);
3212    }
3213
3214    #[tokio::test]
3215    async fn test_bulk_save_overwrites_existing() {
3216        let store = create_test_store();
3217        let cache = store.projection_state_cache();
3218
3219        // Insert initial state
3220        cache.insert(
3221            "test:entity-1".to_string(),
3222            serde_json::json!({"version": 1, "data": "initial"}),
3223        );
3224
3225        // Bulk save with updated state
3226        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3227        cache.insert("test:entity-1".to_string(), new_state);
3228
3229        // Verify overwrite
3230        let state = cache.get("test:entity-1").unwrap();
3231        assert_eq!(state["version"], 2);
3232        assert_eq!(state["data"], "updated");
3233    }
3234
3235    #[tokio::test]
3236    async fn test_bulk_save_high_volume() {
3237        let store = create_test_store();
3238        let cache = store.projection_state_cache();
3239
3240        // Simulate high volume save (1000 entities)
3241        for i in 0..1000 {
3242            cache.insert(
3243                format!("volume_test:entity-{i}"),
3244                serde_json::json!({"index": i, "status": "active"}),
3245            );
3246        }
3247
3248        // Verify count
3249        assert_eq!(cache.len(), 1000);
3250
3251        // Spot check some entries
3252        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3253        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3254        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3255    }
3256
3257    #[tokio::test]
3258    async fn test_bulk_save_different_projections() {
3259        let store = create_test_store();
3260        let cache = store.projection_state_cache();
3261
3262        // Save to multiple projections in bulk
3263        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3264
3265        for proj in &projections {
3266            for i in 0..5 {
3267                cache.insert(
3268                    format!("{proj}:entity-{i}"),
3269                    serde_json::json!({"projection": proj, "id": i}),
3270                );
3271            }
3272        }
3273
3274        // Verify total count (3 projections * 5 entities)
3275        assert_eq!(cache.len(), 15);
3276
3277        // Verify each projection
3278        for proj in &projections {
3279            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3280            assert_eq!(state["projection"], *proj);
3281        }
3282    }
3283
3284    // -------------------------------------------------------------------------
3285    // Cache-fallback tests for the projection-state read handlers (v0.19.1).
3286    //
3287    // SDK-managed projections write state via save_projection_state /
3288    // bulk_save_projection_states without registering in projection_manager.
3289    // These tests verify the read handlers fall back to the cache instead of
3290    // 404-ing. Registered projection wins where both exist; cache fills the
3291    // gap when not.
3292    // -------------------------------------------------------------------------
3293
3294    #[tokio::test]
3295    async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3296        let store = create_test_store();
3297        store.projection_state_cache().insert(
3298            "assets:BTC".to_string(),
3299            serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3300        );
3301
3302        let resp = get_projection_state(
3303            State(Arc::clone(&store)),
3304            Path(("assets".to_string(), "BTC".to_string())),
3305        )
3306        .await
3307        .expect("should not error when projection is not registered");
3308
3309        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3310        assert_eq!(resp.0["state"]["symbol"], "BTC");
3311        assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3312    }
3313
3314    #[tokio::test]
3315    async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3316        let store = create_test_store();
3317
3318        let resp = get_projection_state(
3319            State(Arc::clone(&store)),
3320            Path(("assets".to_string(), "UNKNOWN".to_string())),
3321        )
3322        .await
3323        .unwrap();
3324
3325        assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3326        assert_eq!(resp.0["state"], serde_json::Value::Null);
3327    }
3328
3329    #[tokio::test]
3330    async fn get_projection_state_registered_wins_over_cache() {
3331        let store = create_test_store();
3332
3333        // Ingest an event so entity_snapshots (a registered projection) has state.
3334        let event = create_test_event("user-777", "user.created");
3335        store.ingest(&event).unwrap();
3336
3337        // Plant a conflicting cache entry for the same (projection, entity).
3338        store.projection_state_cache().insert(
3339            "entity_snapshots:user-777".to_string(),
3340            serde_json::json!({"stolen": "value"}),
3341        );
3342
3343        let resp = get_projection_state(
3344            State(Arc::clone(&store)),
3345            Path(("entity_snapshots".to_string(), "user-777".to_string())),
3346        )
3347        .await
3348        .unwrap();
3349
3350        // Registered projection wins — cache fallback is only consulted when
3351        // the registered projection has no state for this entity.
3352        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3353        assert!(
3354            resp.0["state"].get("stolen").is_none(),
3355            "cache entry must not shadow registered projection state: got {:?}",
3356            resp.0["state"]
3357        );
3358    }
3359
3360    #[tokio::test]
3361    async fn get_projection_state_summary_returns_cache_without_registration() {
3362        let store = create_test_store();
3363        let cache = store.projection_state_cache();
3364        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3365        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3366        // Different projection name — must not appear in the summary.
3367        cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3368
3369        let resp =
3370            get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3371                .await
3372                .unwrap();
3373
3374        assert_eq!(resp.0["total"], 2);
3375        let states = resp.0["states"].as_array().unwrap();
3376        let entity_ids: Vec<&str> = states
3377            .iter()
3378            .map(|s| s["entity_id"].as_str().unwrap())
3379            .collect();
3380        assert!(entity_ids.contains(&"BTC"));
3381        assert!(entity_ids.contains(&"ETH"));
3382    }
3383
3384    #[tokio::test]
3385    async fn bulk_get_projection_states_falls_back_to_cache() {
3386        let store = create_test_store();
3387        let cache = store.projection_state_cache();
3388        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3389        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3390
3391        let req = BulkGetStateRequest {
3392            entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3393        };
3394
3395        let resp = bulk_get_projection_states(
3396            State(Arc::clone(&store)),
3397            Path("assets".to_string()),
3398            Json(req),
3399        )
3400        .await
3401        .unwrap();
3402
3403        assert_eq!(resp.0["total"], 3);
3404        let states = resp.0["states"].as_array().unwrap();
3405        let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3406            .iter()
3407            .map(|s| (s["entity_id"].as_str().unwrap(), s))
3408            .collect();
3409
3410        assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3411        assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3412        assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3413        assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3414    }
3415}