Skip to main content

allsource_core/infrastructure/web/
api.rs

1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4    application::{
5        dto::{
6            AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7            DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9            IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10            QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11        },
12        services::{
13            analytics::{
14                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16            },
17            pipeline::{PipelineConfig, PipelineStats},
18            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19            schema::{
20                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21                ValidateEventRequest, ValidateEventResponse,
22            },
23            webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24        },
25    },
26    domain::entities::Event,
27    error::Result,
28    infrastructure::{
29        persistence::{
30            compaction::CompactionResult,
31            snapshot::{
32                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33                ListSnapshotsResponse, SnapshotInfo,
34            },
35        },
36        query::{
37            geospatial::GeoQueryRequest,
38            graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39        },
40        security::middleware::OptionalAuth,
41        web::api_v1::AppState,
42    },
43    store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46    Json, Router,
47    extract::{Path, Query, State, WebSocketUpgrade},
48    response::{IntoResponse, Response},
49    routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54    cors::{Any, CorsLayer},
55    trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60/// Wait for follower ACK(s) in semi-sync/sync replication modes.
61///
62/// In async mode (default), returns immediately. In semi-sync mode, waits for
63/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
64/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
65#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67    let shipper_guard = state.wal_shipper.read().await;
68    if let Some(ref shipper) = *shipper_guard {
69        let mode = shipper.replication_mode();
70        if mode == ReplicationMode::Async {
71            return;
72        }
73
74        let target_offset = shipper.current_leader_offset();
75        if target_offset == 0 {
76            return;
77        }
78
79        let shipper = Arc::clone(shipper);
80        // Drop the read guard before the async wait to avoid holding it across await
81        drop(shipper_guard);
82
83        let timer = state
84            .store
85            .metrics()
86            .replication_ack_wait_seconds
87            .start_timer();
88        let acked = shipper.wait_for_ack(target_offset).await;
89        timer.observe_duration();
90
91        if !acked {
92            tracing::warn!(
93                "Replication ACK timeout in {} mode (offset {}). \
94                 Write succeeded locally but follower confirmation pending.",
95                mode,
96                target_offset,
97            );
98        }
99    }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103    // No-op in community edition
104}
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107    let app = Router::new()
108        .route("/health", get(health))
109        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
110        .route("/api/v1/events", post(ingest_event))
111        .route("/api/v1/events/batch", post(ingest_events_batch))
112        .route("/api/v1/events/query", get(query_events))
113        .route("/api/v1/events/{event_id}", get(get_event_by_id))
114        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
115        // v0.10: Stream and event type discovery endpoints
116        .route("/api/v1/streams", get(list_streams))
117        .route("/api/v1/event-types", get(list_event_types))
118        .route("/api/v1/entities/duplicates", get(detect_duplicates))
119        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120        .route(
121            "/api/v1/entities/{entity_id}/snapshot",
122            get(get_entity_snapshot),
123        )
124        .route("/api/v1/stats", get(get_stats))
125        // v0.2: Advanced analytics endpoints
126        .route("/api/v1/analytics/frequency", get(analytics_frequency))
127        .route("/api/v1/analytics/summary", get(analytics_summary))
128        .route("/api/v1/analytics/correlation", get(analytics_correlation))
129        // v0.2: Snapshot management endpoints
130        .route("/api/v1/snapshots", post(create_snapshot))
131        .route("/api/v1/snapshots", get(list_snapshots))
132        .route(
133            "/api/v1/snapshots/{entity_id}/latest",
134            get(get_latest_snapshot),
135        )
136        // v0.2: Compaction endpoints
137        .route("/api/v1/compaction/trigger", post(trigger_compaction))
138        .route("/api/v1/compaction/stats", get(compaction_stats))
139        // v0.5: Schema registry endpoints
140        .route("/api/v1/schemas", post(register_schema))
141        .route("/api/v1/schemas", get(list_subjects))
142        .route("/api/v1/schemas/{subject}", get(get_schema))
143        .route(
144            "/api/v1/schemas/{subject}/versions",
145            get(list_schema_versions),
146        )
147        .route("/api/v1/schemas/validate", post(validate_event_schema))
148        .route(
149            "/api/v1/schemas/{subject}/compatibility",
150            put(set_compatibility_mode),
151        )
152        // v0.5: Replay and projection rebuild endpoints
153        .route("/api/v1/replay", post(start_replay))
154        .route("/api/v1/replay", get(list_replays))
155        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157        .route(
158            "/api/v1/replay/{replay_id}",
159            axum::routing::delete(delete_replay),
160        )
161        // v0.5: Stream processing pipeline endpoints
162        .route("/api/v1/pipelines", post(register_pipeline))
163        .route("/api/v1/pipelines", get(list_pipelines))
164        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166        .route(
167            "/api/v1/pipelines/{pipeline_id}",
168            axum::routing::delete(remove_pipeline),
169        )
170        .route(
171            "/api/v1/pipelines/{pipeline_id}/stats",
172            get(get_pipeline_stats),
173        )
174        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175        // v0.7: Projection State API for Query Service integration
176        .route("/api/v1/projections", get(list_projections))
177        .route("/api/v1/projections/{name}", get(get_projection))
178        .route(
179            "/api/v1/projections/{name}",
180            axum::routing::delete(delete_projection),
181        )
182        .route(
183            "/api/v1/projections/{name}/state",
184            get(get_projection_state_summary),
185        )
186        .route("/api/v1/projections/{name}/reset", post(reset_projection))
187        .route(
188            "/api/v1/projections/{name}/{entity_id}/state",
189            get(get_projection_state),
190        )
191        .route(
192            "/api/v1/projections/{name}/{entity_id}/state",
193            post(save_projection_state),
194        )
195        .route(
196            "/api/v1/projections/{name}/{entity_id}/state",
197            put(save_projection_state),
198        )
199        .route(
200            "/api/v1/projections/{name}/bulk",
201            post(bulk_get_projection_states),
202        )
203        .route(
204            "/api/v1/projections/{name}/bulk/save",
205            post(bulk_save_projection_states),
206        )
207        // v0.11: Webhook management endpoints
208        .route("/api/v1/webhooks", post(register_webhook))
209        .route("/api/v1/webhooks", get(list_webhooks))
210        .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211        .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212        .route(
213            "/api/v1/webhooks/{webhook_id}",
214            axum::routing::delete(delete_webhook),
215        )
216        .route(
217            "/api/v1/webhooks/{webhook_id}/deliveries",
218            get(list_webhook_deliveries),
219        )
220        // v2.0: Advanced query features
221        .route("/api/v1/graphql", post(graphql_query))
222        .route("/api/v1/geospatial/query", post(geo_query))
223        .route("/api/v1/geospatial/stats", get(geo_stats))
224        .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225        .route(
226            "/api/v1/schema-evolution/history/{event_type}",
227            get(schema_evolution_history),
228        )
229        .route(
230            "/api/v1/schema-evolution/schema/{event_type}",
231            get(schema_evolution_schema),
232        )
233        .route(
234            "/api/v1/schema-evolution/stats",
235            get(schema_evolution_stats),
236        )
237        .layer(
238            CorsLayer::new()
239                .allow_origin(Any)
240                .allow_methods(Any)
241                .allow_headers(Any),
242        )
243        .layer(TraceLayer::new_for_http())
244        .with_state(store);
245
246    let listener = tokio::net::TcpListener::bind(addr).await?;
247    axum::serve(listener, app).await?;
248
249    Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253    Json(serde_json::json!({
254        "status": "healthy",
255        "service": "allsource-core",
256        "version": env!("CARGO_PKG_VERSION")
257    }))
258}
259
260// v0.6: Prometheus metrics endpoint
261pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262    let metrics = store.metrics();
263
264    match metrics.encode() {
265        Ok(encoded) => Response::builder()
266            .status(200)
267            .header("Content-Type", "text/plain; version=0.0.4")
268            .body(encoded)
269            .unwrap()
270            .into_response(),
271        Err(e) => Response::builder()
272            .status(500)
273            .body(format!("Error encoding metrics: {e}"))
274            .unwrap()
275            .into_response(),
276    }
277}
278
279pub async fn ingest_event(
280    State(store): State<SharedStore>,
281    Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283    let expected_version = req.expected_version;
284
285    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286    let event = Event::from_strings(
287        req.event_type,
288        req.entity_id,
289        tenant_id,
290        req.payload,
291        req.metadata,
292    )?;
293
294    let event_id = event.id;
295    let timestamp = event.timestamp;
296
297    let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299    tracing::info!("Event ingested: {}", event_id);
300
301    Ok(Json(IngestEventResponse {
302        event_id,
303        timestamp,
304        version: Some(new_version),
305    }))
306}
307
308/// Ingest a single event with semi-sync/sync replication ACK waiting.
309///
310/// Used by the v1 API. Tenant comes from `req.tenant_id` — the Control Plane
311/// delegation layer sets it from the authenticated caller before forwarding.
312/// Core is internal-only and does not authenticate public traffic.
313pub async fn ingest_event_v1(
314    State(state): State<AppState>,
315    Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317    let expected_version = req.expected_version;
318
319    let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321    let event = Event::from_strings(
322        req.event_type,
323        req.entity_id,
324        tenant_id,
325        req.payload,
326        req.metadata,
327    )?;
328
329    let event_id = event.id;
330    let timestamp = event.timestamp;
331
332    let new_version = state
333        .store
334        .ingest_with_expected_version(&event, expected_version)?;
335
336    // Semi-sync/sync: wait for follower ACK(s) before returning
337    await_replication_ack(&state).await;
338
339    tracing::info!("Event ingested: {}", event_id);
340
341    Ok(Json(IngestEventResponse {
342        event_id,
343        timestamp,
344        version: Some(new_version),
345    }))
346}
347
348/// Batch ingest multiple events in a single request
349///
350/// This endpoint allows ingesting multiple events atomically, which is more
351/// efficient than making individual requests for each event.
352pub async fn ingest_events_batch(
353    State(store): State<SharedStore>,
354    Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356    let total = req.events.len();
357    let mut ingested_events = Vec::with_capacity(total);
358
359    for event_req in req.events {
360        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361        let expected_version = event_req.expected_version;
362
363        let event = Event::from_strings(
364            event_req.event_type,
365            event_req.entity_id,
366            tenant_id,
367            event_req.payload,
368            event_req.metadata,
369        )?;
370
371        let event_id = event.id;
372        let timestamp = event.timestamp;
373
374        let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376        ingested_events.push(IngestEventResponse {
377            event_id,
378            timestamp,
379            version: Some(new_version),
380        });
381    }
382
383    let ingested = ingested_events.len();
384    tracing::info!("Batch ingested {} events", ingested);
385
386    Ok(Json(IngestEventsBatchResponse {
387        total,
388        ingested,
389        events: ingested_events,
390    }))
391}
392
393/// Batch ingest with semi-sync/sync replication ACK waiting.
394///
395/// Used by the v1 API. Per-event tenant comes from `event_req.tenant_id` — the
396/// Control Plane delegation layer sets it from the authenticated caller before
397/// forwarding. Core is internal-only and does not authenticate public traffic.
398pub async fn ingest_events_batch_v1(
399    State(state): State<AppState>,
400    Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402    let total = req.events.len();
403    let mut ingested_events = Vec::with_capacity(total);
404
405    for event_req in req.events {
406        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407        let expected_version = event_req.expected_version;
408
409        let event = Event::from_strings(
410            event_req.event_type,
411            event_req.entity_id,
412            tenant_id,
413            event_req.payload,
414            event_req.metadata,
415        )?;
416
417        let event_id = event.id;
418        let timestamp = event.timestamp;
419
420        let new_version = state
421            .store
422            .ingest_with_expected_version(&event, expected_version)?;
423
424        ingested_events.push(IngestEventResponse {
425            event_id,
426            timestamp,
427            version: Some(new_version),
428        });
429    }
430
431    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
432    await_replication_ack(&state).await;
433
434    let ingested = ingested_events.len();
435    tracing::info!("Batch ingested {} events", ingested);
436
437    Ok(Json(IngestEventsBatchResponse {
438        total,
439        ingested,
440        events: ingested_events,
441    }))
442}
443
444/// Sort-order query parameter for `GET /api/v1/events/query`. Kept separate
445/// from `QueryEventsRequest` so ordering is purely an HTTP-layer concern and
446/// the internal query DTO (used by ~50 call sites) stays untouched.
447#[derive(Debug, Deserialize)]
448pub struct EventOrderParam {
449    /// `asc` (oldest first, the default) or `desc` (newest first).
450    pub order: Option<String>,
451}
452
453pub async fn query_events(
454    OptionalAuth(auth): OptionalAuth,
455    Query(req): Query<QueryEventsRequest>,
456    Query(order_param): Query<EventOrderParam>,
457    State(store): State<SharedStore>,
458) -> Result<Json<QueryEventsResponse>> {
459    let requested_limit = req.limit;
460    let queried_entity_id = req.entity_id.clone();
461
462    // Sort order. Default is ascending (oldest first) to preserve replay
463    // semantics for existing consumers. `order=desc` returns newest first,
464    // so `?entity_id=<id>&limit=1&order=desc` reliably yields the latest
465    // event for an entity (issue #177).
466    let descending = match order_param.order.as_deref() {
467        None => false,
468        Some(o) if o.eq_ignore_ascii_case("asc") => false,
469        Some(o) if o.eq_ignore_ascii_case("desc") => true,
470        Some(other) => {
471            return Err(crate::error::AllSourceError::InvalidInput(format!(
472                "invalid 'order' value '{other}': expected 'asc' or 'desc'"
473            )));
474        }
475    };
476
477    // Tenant resolution. Since Core is internal-only (bead t-0ff8), the only
478    // callers are Control Plane's delegation layer and other internal Fly
479    // services. Request param wins — that's what the gateway sets authoritatively
480    // from the authenticated caller's identity. Auth-context fallback is kept
481    // as a defense-in-depth for any internal caller that forgets to pass
482    // tenant_id but is authenticated (legacy; audit and remove once all
483    // internal callers are confirmed to set tenant_id explicitly).
484    let enforced_tenant = req
485        .tenant_id
486        .clone()
487        .or_else(|| auth.as_ref().map(|a| a.tenant_id().to_string()));
488
489    // Query without limit to get total count
490    let unlimited_req = QueryEventsRequest {
491        entity_id: req.entity_id,
492        event_type: req.event_type,
493        tenant_id: enforced_tenant,
494        as_of: req.as_of,
495        since: req.since,
496        until: req.until,
497        limit: None,
498        event_type_prefix: req.event_type_prefix,
499        payload_filter: req.payload_filter,
500    };
501    let mut all_events = store.query(&unlimited_req)?;
502    let total_count = all_events.len();
503
504    // store.query() returns events ascending by (timestamp, version).
505    // Reverse for `order=desc` so the limit below takes the newest events.
506    if descending {
507        all_events.reverse();
508    }
509
510    // Apply limit
511    let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
512        all_events.into_iter().take(limit).collect()
513    } else {
514        all_events
515    };
516
517    let count = limited_events.len();
518    let has_more = count < total_count;
519    let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
520
521    // Include entity_version only when filtering by a single entity_id
522    let entity_version = queried_entity_id
523        .as_deref()
524        .map(|eid| store.get_entity_version(eid));
525
526    tracing::debug!("Query returned {} events (total: {})", count, total_count);
527
528    Ok(Json(QueryEventsResponse {
529        events,
530        count,
531        total_count,
532        has_more,
533        entity_version,
534    }))
535}
536
537pub async fn list_entities(
538    State(store): State<SharedStore>,
539    Query(req): Query<ListEntitiesRequest>,
540) -> Result<Json<ListEntitiesResponse>> {
541    use std::collections::HashMap;
542
543    // Get all events matching the filters
544    let query_req = QueryEventsRequest {
545        entity_id: None,
546        event_type: None,
547        tenant_id: None,
548        as_of: None,
549        since: None,
550        until: None,
551        limit: None,
552        event_type_prefix: req.event_type_prefix,
553        payload_filter: req.payload_filter,
554    };
555    let events = store.query(&query_req)?;
556
557    // Group by entity_id
558    let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
559    for event in &events {
560        entity_map
561            .entry(event.entity_id().to_string())
562            .or_default()
563            .push(event);
564    }
565
566    // Sort direction by last-event time. Default is `desc` (newest activity
567    // first) to preserve the endpoint's long-standing behavior; `order=asc`
568    // returns oldest activity first (issue #178).
569    let ascending = match req.order.as_deref() {
570        None => false,
571        Some(o) if o.eq_ignore_ascii_case("desc") => false,
572        Some(o) if o.eq_ignore_ascii_case("asc") => true,
573        Some(other) => {
574            return Err(crate::error::AllSourceError::InvalidInput(format!(
575                "invalid 'order' value '{other}': expected 'asc' or 'desc'"
576            )));
577        }
578    };
579
580    // Build entity summaries, then sort by last-event time in the requested
581    // direction. Ties are broken by `entity_id` (always ascending) so the
582    // total order is deterministic — required for stable offset pagination.
583    let mut summaries: Vec<EntitySummary> = entity_map
584        .into_iter()
585        .map(|(entity_id, events)| {
586            let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
587            EntitySummary {
588                entity_id,
589                event_count: events.len(),
590                last_event_type: last.event_type_str().to_string(),
591                last_event_at: last.timestamp(),
592            }
593        })
594        .collect();
595    summaries.sort_by(|a, b| {
596        let by_time = a.last_event_at.cmp(&b.last_event_at);
597        let by_time = if ascending {
598            by_time
599        } else {
600            by_time.reverse()
601        };
602        by_time.then_with(|| a.entity_id.cmp(&b.entity_id))
603    });
604
605    let total = summaries.len();
606
607    // Apply offset and limit
608    let offset = req.offset.unwrap_or(0);
609    let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
610    let summaries = if let Some(limit) = req.limit {
611        let has_more = summaries.len() > limit;
612        let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
613        return Ok(Json(ListEntitiesResponse {
614            entities: truncated,
615            total,
616            has_more,
617        }));
618    } else {
619        summaries
620    };
621
622    Ok(Json(ListEntitiesResponse {
623        entities: summaries,
624        total,
625        has_more: false,
626    }))
627}
628
629pub async fn detect_duplicates(
630    State(store): State<SharedStore>,
631    Query(req): Query<DetectDuplicatesRequest>,
632) -> Result<Json<DetectDuplicatesResponse>> {
633    use std::collections::HashMap;
634
635    let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
636
637    // Query events scoped by the required prefix
638    let query_req = QueryEventsRequest {
639        entity_id: None,
640        event_type: None,
641        tenant_id: None,
642        as_of: None,
643        since: None,
644        until: None,
645        limit: None,
646        event_type_prefix: Some(req.event_type_prefix),
647        payload_filter: None,
648    };
649    let events = store.query(&query_req)?;
650
651    // For each entity, extract the latest event's payload fields specified by group_by
652    // Then group entities by those field values
653    let mut entity_latest: HashMap<String, &Event> = HashMap::new();
654    for event in &events {
655        let eid = event.entity_id().to_string();
656        entity_latest
657            .entry(eid)
658            .and_modify(|existing| {
659                if event.timestamp() > existing.timestamp() {
660                    *existing = event;
661                }
662            })
663            .or_insert(event);
664    }
665
666    // Group entities by their payload field values
667    let mut groups: HashMap<String, Vec<String>> = HashMap::new();
668    for (entity_id, event) in &entity_latest {
669        let payload = event.payload();
670        let mut key_parts = serde_json::Map::new();
671        for field in &group_by_fields {
672            let value = payload
673                .get(*field)
674                .cloned()
675                .unwrap_or(serde_json::Value::Null);
676            key_parts.insert((*field).to_string(), value);
677        }
678        let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
679        groups.entry(key_str).or_default().push(entity_id.clone());
680    }
681
682    // Filter to groups with count > 1 (actual duplicates)
683    let mut duplicate_groups: Vec<DuplicateGroup> = groups
684        .into_iter()
685        .filter(|(_, ids)| ids.len() > 1)
686        .map(|(key_str, mut ids)| {
687            ids.sort();
688            let key: serde_json::Value =
689                serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
690            let count = ids.len();
691            DuplicateGroup {
692                key,
693                entity_ids: ids,
694                count,
695            }
696        })
697        .collect();
698
699    // Sort by count descending for consistent output
700    duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
701
702    let total = duplicate_groups.len();
703
704    // Apply offset and limit
705    let offset = req.offset.unwrap_or(0);
706    let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
707
708    if let Some(limit) = req.limit {
709        let has_more = duplicate_groups.len() > limit;
710        let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
711        return Ok(Json(DetectDuplicatesResponse {
712            duplicates: truncated,
713            total,
714            has_more,
715        }));
716    }
717
718    Ok(Json(DetectDuplicatesResponse {
719        duplicates: duplicate_groups,
720        total,
721        has_more: false,
722    }))
723}
724
725#[derive(Deserialize)]
726pub struct EntityStateParams {
727    as_of: Option<chrono::DateTime<chrono::Utc>>,
728}
729
730pub async fn get_entity_state(
731    State(store): State<SharedStore>,
732    Path(entity_id): Path<String>,
733    Query(params): Query<EntityStateParams>,
734) -> Result<Json<serde_json::Value>> {
735    let state = store.reconstruct_state(&entity_id, params.as_of)?;
736
737    tracing::info!("State reconstructed for entity: {}", entity_id);
738
739    Ok(Json(state))
740}
741
742pub async fn get_entity_snapshot(
743    State(store): State<SharedStore>,
744    Path(entity_id): Path<String>,
745) -> Result<Json<serde_json::Value>> {
746    let snapshot = store.get_snapshot(&entity_id)?;
747
748    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
749
750    Ok(Json(snapshot))
751}
752
753pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
754    let stats = store.stats();
755    Json(stats)
756}
757
758// v0.10: List all streams (entity_ids) in the event store
759/// Query parameters for listing streams
760#[derive(Debug, Deserialize)]
761pub struct ListStreamsParams {
762    /// Optional limit on number of streams to return
763    pub limit: Option<usize>,
764    /// Optional offset for pagination
765    pub offset: Option<usize>,
766}
767
768/// Response for listing streams
769#[derive(Debug, serde::Serialize)]
770pub struct ListStreamsResponse {
771    pub streams: Vec<StreamInfo>,
772    pub total: usize,
773}
774
775pub async fn list_streams(
776    State(store): State<SharedStore>,
777    Query(params): Query<ListStreamsParams>,
778) -> Json<ListStreamsResponse> {
779    let mut streams = store.list_streams();
780    let total = streams.len();
781
782    // Sort by last_event_at descending (most recent first)
783    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
784
785    // Apply pagination
786    if let Some(offset) = params.offset {
787        if offset < streams.len() {
788            streams = streams[offset..].to_vec();
789        } else {
790            streams = vec![];
791        }
792    }
793
794    if let Some(limit) = params.limit {
795        streams.truncate(limit);
796    }
797
798    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
799
800    Json(ListStreamsResponse { streams, total })
801}
802
803// v0.10: List all event types in the event store
804/// Query parameters for listing event types
805#[derive(Debug, Deserialize)]
806pub struct ListEventTypesParams {
807    /// Optional limit on number of event types to return
808    pub limit: Option<usize>,
809    /// Optional offset for pagination
810    pub offset: Option<usize>,
811}
812
813/// Response for listing event types
814#[derive(Debug, serde::Serialize)]
815pub struct ListEventTypesResponse {
816    pub event_types: Vec<EventTypeInfo>,
817    pub total: usize,
818}
819
820pub async fn list_event_types(
821    State(store): State<SharedStore>,
822    Query(params): Query<ListEventTypesParams>,
823) -> Json<ListEventTypesResponse> {
824    let mut event_types = store.list_event_types();
825    let total = event_types.len();
826
827    // Sort by event_count descending (most used first)
828    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
829
830    // Apply pagination
831    if let Some(offset) = params.offset {
832        if offset < event_types.len() {
833            event_types = event_types[offset..].to_vec();
834        } else {
835            event_types = vec![];
836        }
837    }
838
839    if let Some(limit) = params.limit {
840        event_types.truncate(limit);
841    }
842
843    tracing::debug!(
844        "Listed {} event types (total: {})",
845        event_types.len(),
846        total
847    );
848
849    Json(ListEventTypesResponse { event_types, total })
850}
851
852// v0.2: WebSocket endpoint for real-time event streaming
853#[derive(Debug, Deserialize)]
854pub struct WebSocketParams {
855    pub consumer_id: Option<String>,
856}
857
858pub async fn events_websocket(
859    ws: WebSocketUpgrade,
860    State(store): State<SharedStore>,
861    Query(params): Query<WebSocketParams>,
862) -> Response {
863    let websocket_manager = store.websocket_manager();
864
865    ws.on_upgrade(move |socket| async move {
866        if let Some(consumer_id) = params.consumer_id {
867            websocket_manager
868                .handle_socket_with_consumer(socket, consumer_id, store)
869                .await;
870        } else {
871            websocket_manager.handle_socket(socket).await;
872        }
873    })
874}
875
876// v0.2: Event frequency analytics endpoint
877pub async fn analytics_frequency(
878    State(store): State<SharedStore>,
879    Query(req): Query<EventFrequencyRequest>,
880) -> Result<Json<EventFrequencyResponse>> {
881    let response = AnalyticsEngine::event_frequency(&store, &req)?;
882
883    tracing::debug!(
884        "Frequency analysis returned {} buckets",
885        response.buckets.len()
886    );
887
888    Ok(Json(response))
889}
890
891// v0.2: Statistical summary endpoint
892pub async fn analytics_summary(
893    State(store): State<SharedStore>,
894    Query(req): Query<StatsSummaryRequest>,
895) -> Result<Json<StatsSummaryResponse>> {
896    let response = AnalyticsEngine::stats_summary(&store, &req)?;
897
898    tracing::debug!(
899        "Stats summary: {} events across {} entities",
900        response.total_events,
901        response.unique_entities
902    );
903
904    Ok(Json(response))
905}
906
907// v0.2: Event correlation analysis endpoint
908pub async fn analytics_correlation(
909    State(store): State<SharedStore>,
910    Query(req): Query<CorrelationRequest>,
911) -> Result<Json<CorrelationResponse>> {
912    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
913
914    tracing::debug!(
915        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
916        response.correlated_pairs,
917        response.total_a,
918        response.correlation_percentage
919    );
920
921    Ok(Json(response))
922}
923
924// v0.2: Create a snapshot for an entity
925pub async fn create_snapshot(
926    State(store): State<SharedStore>,
927    Json(req): Json<CreateSnapshotRequest>,
928) -> Result<Json<CreateSnapshotResponse>> {
929    store.create_snapshot(&req.entity_id)?;
930
931    let snapshot_manager = store.snapshot_manager();
932    let snapshot = snapshot_manager
933        .get_latest_snapshot(&req.entity_id)
934        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
935
936    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
937
938    Ok(Json(CreateSnapshotResponse {
939        snapshot_id: snapshot.id,
940        entity_id: snapshot.entity_id,
941        created_at: snapshot.created_at,
942        event_count: snapshot.event_count,
943        size_bytes: snapshot.metadata.size_bytes,
944    }))
945}
946
947// v0.2: List snapshots
948pub async fn list_snapshots(
949    State(store): State<SharedStore>,
950    Query(req): Query<ListSnapshotsRequest>,
951) -> Result<Json<ListSnapshotsResponse>> {
952    let snapshot_manager = store.snapshot_manager();
953
954    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
955        snapshot_manager
956            .get_all_snapshots(&entity_id)
957            .into_iter()
958            .map(SnapshotInfo::from)
959            .collect()
960    } else {
961        // List all entities with snapshots
962        let entities = snapshot_manager.list_entities();
963        entities
964            .iter()
965            .flat_map(|entity_id| {
966                snapshot_manager
967                    .get_all_snapshots(entity_id)
968                    .into_iter()
969                    .map(SnapshotInfo::from)
970            })
971            .collect()
972    };
973
974    let total = snapshots.len();
975
976    tracing::debug!("Listed {} snapshots", total);
977
978    Ok(Json(ListSnapshotsResponse { snapshots, total }))
979}
980
981// v0.2: Get latest snapshot for an entity
982pub async fn get_latest_snapshot(
983    State(store): State<SharedStore>,
984    Path(entity_id): Path<String>,
985) -> Result<Json<serde_json::Value>> {
986    let snapshot_manager = store.snapshot_manager();
987
988    let snapshot = snapshot_manager
989        .get_latest_snapshot(&entity_id)
990        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
991
992    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
993
994    Ok(Json(serde_json::json!({
995        "snapshot_id": snapshot.id,
996        "entity_id": snapshot.entity_id,
997        "created_at": snapshot.created_at,
998        "as_of": snapshot.as_of,
999        "event_count": snapshot.event_count,
1000        "size_bytes": snapshot.metadata.size_bytes,
1001        "snapshot_type": snapshot.metadata.snapshot_type,
1002        "state": snapshot.state
1003    })))
1004}
1005
1006// v0.2: Trigger manual compaction
1007pub async fn trigger_compaction(
1008    State(store): State<SharedStore>,
1009) -> Result<Json<CompactionResult>> {
1010    let compaction_manager = store.compaction_manager().ok_or_else(|| {
1011        crate::error::AllSourceError::InternalError(
1012            "Compaction not enabled (no Parquet storage)".to_string(),
1013        )
1014    })?;
1015
1016    tracing::info!("📦 Manual compaction triggered via API");
1017
1018    let result = compaction_manager.compact_now()?;
1019
1020    Ok(Json(result))
1021}
1022
1023// v0.2: Get compaction statistics
1024pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
1025    let compaction_manager = store.compaction_manager().ok_or_else(|| {
1026        crate::error::AllSourceError::InternalError(
1027            "Compaction not enabled (no Parquet storage)".to_string(),
1028        )
1029    })?;
1030
1031    let stats = compaction_manager.stats();
1032    let config = compaction_manager.config();
1033
1034    Ok(Json(serde_json::json!({
1035        "stats": stats,
1036        "config": {
1037            "min_files_to_compact": config.min_files_to_compact,
1038            "target_file_size": config.target_file_size,
1039            "max_file_size": config.max_file_size,
1040            "small_file_threshold": config.small_file_threshold,
1041            "compaction_interval_seconds": config.compaction_interval_seconds,
1042            "auto_compact": config.auto_compact,
1043            "strategy": config.strategy
1044        }
1045    })))
1046}
1047
1048// v0.5: Register a new schema
1049pub async fn register_schema(
1050    State(store): State<SharedStore>,
1051    Json(req): Json<RegisterSchemaRequest>,
1052) -> Result<Json<RegisterSchemaResponse>> {
1053    let schema_registry = store.schema_registry();
1054
1055    let response =
1056        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
1057
1058    tracing::info!(
1059        "📋 Schema registered: v{} for '{}'",
1060        response.version,
1061        response.subject
1062    );
1063
1064    Ok(Json(response))
1065}
1066
1067// v0.5: Get a schema by subject and optional version
1068#[derive(Deserialize)]
1069pub struct GetSchemaParams {
1070    version: Option<u32>,
1071}
1072
1073pub async fn get_schema(
1074    State(store): State<SharedStore>,
1075    Path(subject): Path<String>,
1076    Query(params): Query<GetSchemaParams>,
1077) -> Result<Json<serde_json::Value>> {
1078    let schema_registry = store.schema_registry();
1079
1080    let schema = schema_registry.get_schema(&subject, params.version)?;
1081
1082    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1083
1084    Ok(Json(serde_json::json!({
1085        "id": schema.id,
1086        "subject": schema.subject,
1087        "version": schema.version,
1088        "schema": schema.schema,
1089        "created_at": schema.created_at,
1090        "description": schema.description,
1091        "tags": schema.tags
1092    })))
1093}
1094
1095// v0.5: List all versions of a schema subject
1096pub async fn list_schema_versions(
1097    State(store): State<SharedStore>,
1098    Path(subject): Path<String>,
1099) -> Result<Json<serde_json::Value>> {
1100    let schema_registry = store.schema_registry();
1101
1102    let versions = schema_registry.list_versions(&subject)?;
1103
1104    Ok(Json(serde_json::json!({
1105        "subject": subject,
1106        "versions": versions
1107    })))
1108}
1109
1110// v0.5: List all schema subjects
1111pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1112    let schema_registry = store.schema_registry();
1113
1114    let subjects = schema_registry.list_subjects();
1115
1116    Json(serde_json::json!({
1117        "subjects": subjects,
1118        "total": subjects.len()
1119    }))
1120}
1121
1122// v0.5: Validate an event against a schema
1123pub async fn validate_event_schema(
1124    State(store): State<SharedStore>,
1125    Json(req): Json<ValidateEventRequest>,
1126) -> Result<Json<ValidateEventResponse>> {
1127    let schema_registry = store.schema_registry();
1128
1129    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1130
1131    if response.valid {
1132        tracing::debug!(
1133            "✅ Event validated against schema '{}' v{}",
1134            req.subject,
1135            response.schema_version
1136        );
1137    } else {
1138        tracing::warn!(
1139            "❌ Event validation failed for '{}': {:?}",
1140            req.subject,
1141            response.errors
1142        );
1143    }
1144
1145    Ok(Json(response))
1146}
1147
1148// v0.5: Set compatibility mode for a subject
1149#[derive(Deserialize)]
1150pub struct SetCompatibilityRequest {
1151    compatibility: CompatibilityMode,
1152}
1153
1154pub async fn set_compatibility_mode(
1155    State(store): State<SharedStore>,
1156    Path(subject): Path<String>,
1157    Json(req): Json<SetCompatibilityRequest>,
1158) -> Json<serde_json::Value> {
1159    let schema_registry = store.schema_registry();
1160
1161    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1162
1163    tracing::info!(
1164        "🔧 Set compatibility mode for '{}' to {:?}",
1165        subject,
1166        req.compatibility
1167    );
1168
1169    Json(serde_json::json!({
1170        "subject": subject,
1171        "compatibility": req.compatibility
1172    }))
1173}
1174
1175// v0.5: Start a replay operation
1176pub async fn start_replay(
1177    State(store): State<SharedStore>,
1178    Json(req): Json<StartReplayRequest>,
1179) -> Result<Json<StartReplayResponse>> {
1180    let replay_manager = store.replay_manager();
1181
1182    let response = replay_manager.start_replay(store, req)?;
1183
1184    tracing::info!(
1185        "🔄 Started replay {} with {} events",
1186        response.replay_id,
1187        response.total_events
1188    );
1189
1190    Ok(Json(response))
1191}
1192
1193// v0.5: Get replay progress
1194pub async fn get_replay_progress(
1195    State(store): State<SharedStore>,
1196    Path(replay_id): Path<uuid::Uuid>,
1197) -> Result<Json<ReplayProgress>> {
1198    let replay_manager = store.replay_manager();
1199
1200    let progress = replay_manager.get_progress(replay_id)?;
1201
1202    Ok(Json(progress))
1203}
1204
1205// v0.5: List all replay operations
1206pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1207    let replay_manager = store.replay_manager();
1208
1209    let replays = replay_manager.list_replays();
1210
1211    Json(serde_json::json!({
1212        "replays": replays,
1213        "total": replays.len()
1214    }))
1215}
1216
1217// v0.5: Cancel a running replay
1218pub async fn cancel_replay(
1219    State(store): State<SharedStore>,
1220    Path(replay_id): Path<uuid::Uuid>,
1221) -> Result<Json<serde_json::Value>> {
1222    let replay_manager = store.replay_manager();
1223
1224    replay_manager.cancel_replay(replay_id)?;
1225
1226    tracing::info!("🛑 Cancelled replay {}", replay_id);
1227
1228    Ok(Json(serde_json::json!({
1229        "replay_id": replay_id,
1230        "status": "cancelled"
1231    })))
1232}
1233
1234// v0.5: Delete a completed replay
1235pub async fn delete_replay(
1236    State(store): State<SharedStore>,
1237    Path(replay_id): Path<uuid::Uuid>,
1238) -> Result<Json<serde_json::Value>> {
1239    let replay_manager = store.replay_manager();
1240
1241    let deleted = replay_manager.delete_replay(replay_id)?;
1242
1243    if deleted {
1244        tracing::info!("🗑️  Deleted replay {}", replay_id);
1245    }
1246
1247    Ok(Json(serde_json::json!({
1248        "replay_id": replay_id,
1249        "deleted": deleted
1250    })))
1251}
1252
1253// v0.5: Register a new pipeline
1254pub async fn register_pipeline(
1255    State(store): State<SharedStore>,
1256    Json(config): Json<PipelineConfig>,
1257) -> Result<Json<serde_json::Value>> {
1258    let pipeline_manager = store.pipeline_manager();
1259
1260    let pipeline_id = pipeline_manager.register(config.clone());
1261
1262    tracing::info!(
1263        "🔀 Pipeline registered: {} (name: {})",
1264        pipeline_id,
1265        config.name
1266    );
1267
1268    Ok(Json(serde_json::json!({
1269        "pipeline_id": pipeline_id,
1270        "name": config.name,
1271        "enabled": config.enabled
1272    })))
1273}
1274
1275// v0.5: List all pipelines
1276pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1277    let pipeline_manager = store.pipeline_manager();
1278
1279    let pipelines = pipeline_manager.list();
1280
1281    tracing::debug!("Listed {} pipelines", pipelines.len());
1282
1283    Json(serde_json::json!({
1284        "pipelines": pipelines,
1285        "total": pipelines.len()
1286    }))
1287}
1288
1289// v0.5: Get a specific pipeline
1290pub async fn get_pipeline(
1291    State(store): State<SharedStore>,
1292    Path(pipeline_id): Path<uuid::Uuid>,
1293) -> Result<Json<PipelineConfig>> {
1294    let pipeline_manager = store.pipeline_manager();
1295
1296    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1297        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1298    })?;
1299
1300    Ok(Json(pipeline.config().clone()))
1301}
1302
1303// v0.5: Remove a pipeline
1304pub async fn remove_pipeline(
1305    State(store): State<SharedStore>,
1306    Path(pipeline_id): Path<uuid::Uuid>,
1307) -> Result<Json<serde_json::Value>> {
1308    let pipeline_manager = store.pipeline_manager();
1309
1310    let removed = pipeline_manager.remove(pipeline_id);
1311
1312    if removed {
1313        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
1314    }
1315
1316    Ok(Json(serde_json::json!({
1317        "pipeline_id": pipeline_id,
1318        "removed": removed
1319    })))
1320}
1321
1322// v0.5: Get statistics for all pipelines
1323pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1324    let pipeline_manager = store.pipeline_manager();
1325
1326    let stats = pipeline_manager.all_stats();
1327
1328    Json(serde_json::json!({
1329        "stats": stats,
1330        "total": stats.len()
1331    }))
1332}
1333
1334// v0.5: Get statistics for a specific pipeline
1335pub async fn get_pipeline_stats(
1336    State(store): State<SharedStore>,
1337    Path(pipeline_id): Path<uuid::Uuid>,
1338) -> Result<Json<PipelineStats>> {
1339    let pipeline_manager = store.pipeline_manager();
1340
1341    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1342        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1343    })?;
1344
1345    Ok(Json(pipeline.stats()))
1346}
1347
1348// v0.5: Reset a pipeline's state
1349pub async fn reset_pipeline(
1350    State(store): State<SharedStore>,
1351    Path(pipeline_id): Path<uuid::Uuid>,
1352) -> Result<Json<serde_json::Value>> {
1353    let pipeline_manager = store.pipeline_manager();
1354
1355    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1356        crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1357    })?;
1358
1359    pipeline.reset();
1360
1361    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1362
1363    Ok(Json(serde_json::json!({
1364        "pipeline_id": pipeline_id,
1365        "reset": true
1366    })))
1367}
1368
1369// =============================================================================
1370// v0.11: Single Event Lookup by ID
1371// =============================================================================
1372
1373/// Get a single event by UUID
1374pub async fn get_event_by_id(
1375    State(store): State<SharedStore>,
1376    Path(event_id): Path<uuid::Uuid>,
1377) -> Result<Json<serde_json::Value>> {
1378    let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1379        crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1380    })?;
1381
1382    let dto = EventDto::from(&event);
1383
1384    tracing::debug!("Event retrieved by ID: {}", event_id);
1385
1386    Ok(Json(serde_json::json!({
1387        "event": dto,
1388        "found": true
1389    })))
1390}
1391
1392// =============================================================================
1393// v0.7: Projection State API for Query Service Integration
1394// =============================================================================
1395
1396/// List all registered projections
1397pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1398    let projection_manager = store.projection_manager();
1399    let status_map = store.projection_status();
1400
1401    let projections: Vec<serde_json::Value> = projection_manager
1402        .list_projections()
1403        .iter()
1404        .map(|(name, projection)| {
1405            let status = status_map
1406                .get(name)
1407                .map_or_else(|| "running".to_string(), |s| s.value().clone());
1408            serde_json::json!({
1409                "name": name,
1410                "type": format!("{:?}", projection.name()),
1411                "status": status,
1412            })
1413        })
1414        .collect();
1415
1416    tracing::debug!("Listed {} projections", projections.len());
1417
1418    Json(serde_json::json!({
1419        "projections": projections,
1420        "total": projections.len()
1421    }))
1422}
1423
1424/// Get projection metadata by name
1425pub async fn get_projection(
1426    State(store): State<SharedStore>,
1427    Path(name): Path<String>,
1428) -> Result<Json<serde_json::Value>> {
1429    let projection_manager = store.projection_manager();
1430
1431    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1432        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1433    })?;
1434
1435    Ok(Json(serde_json::json!({
1436        "name": projection.name(),
1437        "found": true
1438    })))
1439}
1440
1441/// Get projection state for a specific entity.
1442///
1443/// Resolution order:
1444/// 1. **Registered projection** — if `name` is registered with the projection
1445///    manager, return the projection's own `get_state(entity_id)` output.
1446/// 2. **Projection state cache** — otherwise fall back to whatever was written
1447///    via `save_projection_state` / `bulk_save_projection_states`. This supports
1448///    SDK-managed projections (e.g. the Rust SDK's `ProjectionWorker`) that
1449///    compute state client-side and push it back without registering a
1450///    projection in Core's manager.
1451///
1452/// Returns `found: false` with `state: null` when neither source has state.
1453pub async fn get_projection_state(
1454    State(store): State<SharedStore>,
1455    Path((name, entity_id)): Path<(String, String)>,
1456) -> Result<Json<serde_json::Value>> {
1457    let state = store
1458        .projection_manager()
1459        .get_projection(&name)
1460        .and_then(|p| p.get_state(&entity_id))
1461        .or_else(|| {
1462            store
1463                .projection_state_cache()
1464                .get(&format!("{name}:{entity_id}"))
1465                .map(|entry| entry.value().clone())
1466        });
1467
1468    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1469
1470    Ok(Json(serde_json::json!({
1471        "projection": name,
1472        "entity_id": entity_id,
1473        "state": state,
1474        "found": state.is_some()
1475    })))
1476}
1477
1478/// Delete (clear) a projection by name
1479///
1480/// Removes all state from the projection. The projection definition remains
1481/// registered but its accumulated state is cleared.
1482pub async fn delete_projection(
1483    State(store): State<SharedStore>,
1484    Path(name): Path<String>,
1485) -> Result<Json<serde_json::Value>> {
1486    let projection_manager = store.projection_manager();
1487
1488    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1489        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1490    })?;
1491
1492    projection.clear();
1493
1494    // Also clear any cached state for this projection
1495    let cache = store.projection_state_cache();
1496    let prefix = format!("{name}:");
1497    let keys_to_remove: Vec<String> = cache
1498        .iter()
1499        .filter(|entry| entry.key().starts_with(&prefix))
1500        .map(|entry| entry.key().clone())
1501        .collect();
1502    for key in keys_to_remove {
1503        cache.remove(&key);
1504    }
1505
1506    tracing::info!("Projection deleted (cleared): {}", name);
1507
1508    Ok(Json(serde_json::json!({
1509        "projection": name,
1510        "deleted": true
1511    })))
1512}
1513
1514/// Get aggregate projection state (all entities).
1515///
1516/// Returns the cached states written via `save_projection_state` /
1517/// `bulk_save_projection_states`. The projection does NOT need to be
1518/// registered with the projection manager — this supports SDK-managed
1519/// projections that push state without server-side registration.
1520///
1521/// Returns an empty list when no state has been written.
1522pub async fn get_projection_state_summary(
1523    State(store): State<SharedStore>,
1524    Path(name): Path<String>,
1525) -> Result<Json<serde_json::Value>> {
1526    let cache = store.projection_state_cache();
1527    let prefix = format!("{name}:");
1528    let states: Vec<serde_json::Value> = cache
1529        .iter()
1530        .filter(|entry| entry.key().starts_with(&prefix))
1531        .map(|entry| {
1532            let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1533            serde_json::json!({
1534                "entity_id": entity_id,
1535                "state": entry.value().clone()
1536            })
1537        })
1538        .collect();
1539
1540    let total = states.len();
1541
1542    tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1543
1544    Ok(Json(serde_json::json!({
1545        "projection": name,
1546        "states": states,
1547        "total": total
1548    })))
1549}
1550
1551/// Reset a projection to its initial state
1552///
1553/// Clears all accumulated state and reprocesses events from the beginning.
1554pub async fn reset_projection(
1555    State(store): State<SharedStore>,
1556    Path(name): Path<String>,
1557) -> Result<Json<serde_json::Value>> {
1558    let reprocessed = store.reset_projection(&name)?;
1559
1560    tracing::info!(
1561        "Projection reset: {} ({} events reprocessed)",
1562        name,
1563        reprocessed
1564    );
1565
1566    Ok(Json(serde_json::json!({
1567        "projection": name,
1568        "reset": true,
1569        "events_reprocessed": reprocessed
1570    })))
1571}
1572
1573/// Pause a projection
1574///
1575/// Sets the projection status to "paused" so it stops processing new events.
1576pub async fn pause_projection(
1577    State(store): State<SharedStore>,
1578    Path(name): Path<String>,
1579) -> Result<Json<serde_json::Value>> {
1580    let projection_manager = store.projection_manager();
1581
1582    // Verify projection exists
1583    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1584        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1585    })?;
1586
1587    store
1588        .projection_status()
1589        .insert(name.clone(), "paused".to_string());
1590
1591    tracing::info!("Projection paused: {}", name);
1592
1593    Ok(Json(serde_json::json!({
1594        "projection": name,
1595        "status": "paused"
1596    })))
1597}
1598
1599/// Start (resume) a projection
1600///
1601/// Sets the projection status to "running" so it resumes processing events.
1602pub async fn start_projection(
1603    State(store): State<SharedStore>,
1604    Path(name): Path<String>,
1605) -> Result<Json<serde_json::Value>> {
1606    let projection_manager = store.projection_manager();
1607
1608    // Verify projection exists
1609    let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1610        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1611    })?;
1612
1613    store
1614        .projection_status()
1615        .insert(name.clone(), "running".to_string());
1616
1617    tracing::info!("Projection started: {}", name);
1618
1619    Ok(Json(serde_json::json!({
1620        "projection": name,
1621        "status": "running"
1622    })))
1623}
1624
1625/// Request body for saving projection state
1626#[derive(Debug, Deserialize)]
1627pub struct SaveProjectionStateRequest {
1628    pub state: serde_json::Value,
1629}
1630
1631/// Save/update projection state for an entity
1632///
1633/// This endpoint allows external services (like Elixir Query Service) to
1634/// store computed projection state back to the Core for persistence.
1635pub async fn save_projection_state(
1636    State(store): State<SharedStore>,
1637    Path((name, entity_id)): Path<(String, String)>,
1638    Json(req): Json<SaveProjectionStateRequest>,
1639) -> Result<Json<serde_json::Value>> {
1640    let projection_cache = store.projection_state_cache();
1641
1642    // Store in the projection state cache
1643    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1644
1645    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1646
1647    Ok(Json(serde_json::json!({
1648        "projection": name,
1649        "entity_id": entity_id,
1650        "saved": true
1651    })))
1652}
1653
1654/// Bulk get projection states for multiple entities
1655///
1656/// Efficient endpoint for fetching multiple entity states in a single request.
1657#[derive(Debug, Deserialize)]
1658pub struct BulkGetStateRequest {
1659    pub entity_ids: Vec<String>,
1660}
1661
1662/// Bulk save projection states for multiple entities
1663///
1664/// Efficient endpoint for saving multiple entity states in a single request.
1665#[derive(Debug, Deserialize)]
1666pub struct BulkSaveStateRequest {
1667    pub states: Vec<BulkSaveStateItem>,
1668}
1669
1670#[derive(Debug, Deserialize)]
1671pub struct BulkSaveStateItem {
1672    pub entity_id: String,
1673    pub state: serde_json::Value,
1674}
1675
1676pub async fn bulk_get_projection_states(
1677    State(store): State<SharedStore>,
1678    Path(name): Path<String>,
1679    Json(req): Json<BulkGetStateRequest>,
1680) -> Result<Json<serde_json::Value>> {
1681    // Same fallback rule as `get_projection_state`: registered projection
1682    // wins, cache is the fallback. This lets SDK-managed projections read
1683    // their pushed-back state without registering in Core's projection manager.
1684    let projection = store.projection_manager().get_projection(&name);
1685    let cache = store.projection_state_cache();
1686
1687    let states: Vec<serde_json::Value> = req
1688        .entity_ids
1689        .iter()
1690        .map(|entity_id| {
1691            let state = projection
1692                .as_ref()
1693                .and_then(|p| p.get_state(entity_id))
1694                .or_else(|| {
1695                    cache
1696                        .get(&format!("{name}:{entity_id}"))
1697                        .map(|entry| entry.value().clone())
1698                });
1699            serde_json::json!({
1700                "entity_id": entity_id,
1701                "state": state,
1702                "found": state.is_some()
1703            })
1704        })
1705        .collect();
1706
1707    tracing::debug!(
1708        "Bulk projection state retrieved: {} entities from {}",
1709        states.len(),
1710        name
1711    );
1712
1713    Ok(Json(serde_json::json!({
1714        "projection": name,
1715        "states": states,
1716        "total": states.len()
1717    })))
1718}
1719
1720/// Bulk save projection states for multiple entities
1721///
1722/// This endpoint allows efficient batch saving of projection states,
1723/// critical for high-throughput event processing pipelines.
1724pub async fn bulk_save_projection_states(
1725    State(store): State<SharedStore>,
1726    Path(name): Path<String>,
1727    Json(req): Json<BulkSaveStateRequest>,
1728) -> Result<Json<serde_json::Value>> {
1729    let projection_cache = store.projection_state_cache();
1730
1731    let mut saved_count = 0;
1732    for item in &req.states {
1733        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1734        saved_count += 1;
1735    }
1736
1737    tracing::info!(
1738        "Bulk projection state saved: {} entities for {}",
1739        saved_count,
1740        name
1741    );
1742
1743    Ok(Json(serde_json::json!({
1744        "projection": name,
1745        "saved": saved_count,
1746        "total": req.states.len()
1747    })))
1748}
1749
1750// =============================================================================
1751// v0.11: Webhook Management API
1752// =============================================================================
1753
1754/// Query parameters for listing webhooks
1755#[derive(Debug, Deserialize)]
1756pub struct ListWebhooksParams {
1757    pub tenant_id: Option<String>,
1758}
1759
1760/// Register a new webhook subscription
1761pub async fn register_webhook(
1762    State(store): State<SharedStore>,
1763    Json(req): Json<RegisterWebhookRequest>,
1764) -> Json<serde_json::Value> {
1765    let registry = store.webhook_registry();
1766    let webhook = registry.register(req);
1767
1768    tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1769
1770    Json(serde_json::json!({
1771        "webhook": webhook,
1772        "created": true
1773    }))
1774}
1775
1776/// List webhooks, optionally filtered by tenant_id
1777pub async fn list_webhooks(
1778    State(store): State<SharedStore>,
1779    Query(params): Query<ListWebhooksParams>,
1780) -> Json<serde_json::Value> {
1781    let registry = store.webhook_registry();
1782
1783    let webhooks = if let Some(tenant_id) = params.tenant_id {
1784        registry.list_by_tenant(&tenant_id)
1785    } else {
1786        // Without tenant filter, return empty (tenants should always filter)
1787        vec![]
1788    };
1789
1790    let total = webhooks.len();
1791
1792    Json(serde_json::json!({
1793        "webhooks": webhooks,
1794        "total": total
1795    }))
1796}
1797
1798/// Get a specific webhook by ID
1799pub async fn get_webhook(
1800    State(store): State<SharedStore>,
1801    Path(webhook_id): Path<uuid::Uuid>,
1802) -> Result<Json<serde_json::Value>> {
1803    let registry = store.webhook_registry();
1804
1805    let webhook = registry.get(webhook_id).ok_or_else(|| {
1806        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1807    })?;
1808
1809    Ok(Json(serde_json::json!({
1810        "webhook": webhook,
1811        "found": true
1812    })))
1813}
1814
1815/// Update a webhook subscription
1816pub async fn update_webhook(
1817    State(store): State<SharedStore>,
1818    Path(webhook_id): Path<uuid::Uuid>,
1819    Json(req): Json<UpdateWebhookRequest>,
1820) -> Result<Json<serde_json::Value>> {
1821    let registry = store.webhook_registry();
1822
1823    let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1824        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1825    })?;
1826
1827    tracing::info!("Webhook updated: {}", webhook_id);
1828
1829    Ok(Json(serde_json::json!({
1830        "webhook": webhook,
1831        "updated": true
1832    })))
1833}
1834
1835/// Delete a webhook subscription
1836pub async fn delete_webhook(
1837    State(store): State<SharedStore>,
1838    Path(webhook_id): Path<uuid::Uuid>,
1839) -> Result<Json<serde_json::Value>> {
1840    let registry = store.webhook_registry();
1841
1842    let webhook = registry.delete(webhook_id).ok_or_else(|| {
1843        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1844    })?;
1845
1846    tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1847
1848    Ok(Json(serde_json::json!({
1849        "webhook_id": webhook_id,
1850        "deleted": true
1851    })))
1852}
1853
1854/// Query parameters for listing webhook deliveries
1855#[derive(Debug, Deserialize)]
1856pub struct ListDeliveriesParams {
1857    pub limit: Option<usize>,
1858}
1859
1860/// List delivery history for a webhook
1861pub async fn list_webhook_deliveries(
1862    State(store): State<SharedStore>,
1863    Path(webhook_id): Path<uuid::Uuid>,
1864    Query(params): Query<ListDeliveriesParams>,
1865) -> Result<Json<serde_json::Value>> {
1866    let registry = store.webhook_registry();
1867
1868    // Verify webhook exists
1869    registry.get(webhook_id).ok_or_else(|| {
1870        crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1871    })?;
1872
1873    let limit = params.limit.unwrap_or(50);
1874    let deliveries = registry.get_deliveries(webhook_id, limit);
1875    let total = deliveries.len();
1876
1877    Ok(Json(serde_json::json!({
1878        "webhook_id": webhook_id,
1879        "deliveries": deliveries,
1880        "total": total
1881    })))
1882}
1883
1884// =============================================================================
1885// v2.0: Advanced Query Features
1886// =============================================================================
1887
1888/// EventQL: Execute SQL queries over events using DataFusion
1889#[cfg(feature = "analytics")]
1890pub async fn eventql_query(
1891    State(store): State<SharedStore>,
1892    Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1893) -> Result<Json<serde_json::Value>> {
1894    let events = store.snapshot_events();
1895    match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1896        Ok(response) => Ok(Json(serde_json::json!({
1897            "columns": response.columns,
1898            "rows": response.rows,
1899            "row_count": response.row_count,
1900        }))),
1901        Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1902    }
1903}
1904
1905/// GraphQL: Execute GraphQL queries
1906pub async fn graphql_query(
1907    State(store): State<SharedStore>,
1908    Json(req): Json<GraphQLRequest>,
1909) -> Json<serde_json::Value> {
1910    let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1911        Ok(f) => f,
1912        Err(e) => {
1913            return Json(
1914                serde_json::to_value(GraphQLResponse {
1915                    data: None,
1916                    errors: vec![GraphQLError { message: e }],
1917                })
1918                .unwrap(),
1919            );
1920        }
1921    };
1922
1923    let mut data = serde_json::Map::new();
1924    let mut errors = Vec::new();
1925
1926    for field in &fields {
1927        match field.name.as_str() {
1928            "events" => {
1929                let request = crate::application::dto::QueryEventsRequest {
1930                    entity_id: field.arguments.get("entity_id").cloned(),
1931                    event_type: field.arguments.get("event_type").cloned(),
1932                    tenant_id: field.arguments.get("tenant_id").cloned(),
1933                    limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1934                    as_of: None,
1935                    since: None,
1936                    until: None,
1937                    event_type_prefix: None,
1938                    payload_filter: None,
1939                };
1940                match store.query(&request) {
1941                    Ok(events) => {
1942                        let json_events: Vec<serde_json::Value> = events
1943                            .iter()
1944                            .map(|e| {
1945                                crate::infrastructure::query::graphql::event_to_json(
1946                                    e,
1947                                    &field.fields,
1948                                )
1949                            })
1950                            .collect();
1951                        data.insert("events".to_string(), serde_json::Value::Array(json_events));
1952                    }
1953                    Err(e) => errors.push(GraphQLError {
1954                        message: format!("events query failed: {e}"),
1955                    }),
1956                }
1957            }
1958            "event" => {
1959                if let Some(id_str) = field.arguments.get("id") {
1960                    if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1961                        match store.get_event_by_id(&id) {
1962                            Ok(Some(event)) => {
1963                                data.insert(
1964                                    "event".to_string(),
1965                                    crate::infrastructure::query::graphql::event_to_json(
1966                                        &event,
1967                                        &field.fields,
1968                                    ),
1969                                );
1970                            }
1971                            Ok(None) => {
1972                                data.insert("event".to_string(), serde_json::Value::Null);
1973                            }
1974                            Err(e) => errors.push(GraphQLError {
1975                                message: format!("event lookup failed: {e}"),
1976                            }),
1977                        }
1978                    } else {
1979                        errors.push(GraphQLError {
1980                            message: format!("Invalid UUID: {id_str}"),
1981                        });
1982                    }
1983                } else {
1984                    errors.push(GraphQLError {
1985                        message: "event query requires 'id' argument".to_string(),
1986                    });
1987                }
1988            }
1989            "projections" => {
1990                let pm = store.projection_manager();
1991                let names: Vec<serde_json::Value> = pm
1992                    .list_projections()
1993                    .iter()
1994                    .map(|(name, _)| serde_json::Value::String(name.clone()))
1995                    .collect();
1996                data.insert("projections".to_string(), serde_json::Value::Array(names));
1997            }
1998            "stats" => {
1999                let stats = store.stats();
2000                data.insert(
2001                    "stats".to_string(),
2002                    serde_json::json!({
2003                        "total_events": stats.total_events,
2004                        "total_entities": stats.total_entities,
2005                        "total_event_types": stats.total_event_types,
2006                    }),
2007                );
2008            }
2009            "__schema" => {
2010                data.insert(
2011                    "__schema".to_string(),
2012                    crate::infrastructure::query::graphql::introspection_schema(),
2013                );
2014            }
2015            other => {
2016                errors.push(GraphQLError {
2017                    message: format!("Unknown field: {other}"),
2018                });
2019            }
2020        }
2021    }
2022
2023    Json(
2024        serde_json::to_value(GraphQLResponse {
2025            data: Some(serde_json::Value::Object(data)),
2026            errors,
2027        })
2028        .unwrap(),
2029    )
2030}
2031
2032/// Geospatial: Query events by location
2033pub async fn geo_query(
2034    State(store): State<SharedStore>,
2035    Json(req): Json<GeoQueryRequest>,
2036) -> Json<serde_json::Value> {
2037    let events = store.snapshot_events();
2038    let geo_index = store.geo_index();
2039    let results =
2040        crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
2041    let total = results.len();
2042    Json(serde_json::json!({
2043        "results": results,
2044        "total": total,
2045    }))
2046}
2047
2048/// Geospatial index stats
2049pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2050    let stats = store.geo_index().stats();
2051    Json(serde_json::json!(stats))
2052}
2053
2054/// Exactly-once processing stats
2055pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2056    let stats = store.exactly_once().stats();
2057    Json(serde_json::json!(stats))
2058}
2059
2060/// Schema evolution history for an event type
2061pub async fn schema_evolution_history(
2062    State(store): State<SharedStore>,
2063    Path(event_type): Path<String>,
2064) -> Json<serde_json::Value> {
2065    let mgr = store.schema_evolution();
2066    let history = mgr.get_history(&event_type);
2067    let version = mgr.get_version(&event_type);
2068    Json(serde_json::json!({
2069        "event_type": event_type,
2070        "current_version": version,
2071        "history": history,
2072    }))
2073}
2074
2075/// Current inferred schema for an event type
2076pub async fn schema_evolution_schema(
2077    State(store): State<SharedStore>,
2078    Path(event_type): Path<String>,
2079) -> Json<serde_json::Value> {
2080    let mgr = store.schema_evolution();
2081    if let Some(schema) = mgr.get_schema(&event_type) {
2082        let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2083        Json(serde_json::json!({
2084            "event_type": event_type,
2085            "version": mgr.get_version(&event_type),
2086            "inferred_schema": schema,
2087            "json_schema": json_schema,
2088        }))
2089    } else {
2090        Json(serde_json::json!({
2091            "event_type": event_type,
2092            "error": "No schema inferred for this event type"
2093        }))
2094    }
2095}
2096
2097/// Schema evolution stats
2098pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2099    let stats = store.schema_evolution().stats();
2100    let event_types = store.schema_evolution().list_event_types();
2101    Json(serde_json::json!({
2102        "stats": stats,
2103        "tracked_event_types": event_types,
2104    }))
2105}
2106
2107// =============================================================================
2108// Sync Protocol Endpoints (v0.11: embedded↔server bidirectional sync)
2109// =============================================================================
2110
2111/// POST /api/v1/sync/pull — Client sends version vector, server returns delta events.
2112#[cfg(feature = "embedded-sync")]
2113pub async fn sync_pull_handler(
2114    State(state): State<AppState>,
2115    Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2116) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2117    use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2118
2119    let store = &state.store;
2120
2121    // Compute "since" threshold from the client's version vector
2122    // We return all events the client hasn't seen yet
2123    let since = request
2124        .version_vector
2125        .values()
2126        .map(|ts| ts.physical_ms)
2127        .min()
2128        .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2129
2130    let events = store.query(&crate::application::dto::QueryEventsRequest {
2131        entity_id: None,
2132        event_type: None,
2133        tenant_id: None,
2134        as_of: None,
2135        since,
2136        until: None,
2137        limit: None,
2138        event_type_prefix: None,
2139        payload_filter: None,
2140    })?;
2141
2142    // Convert domain events to ReplicatedEvent wire format
2143    let mut replicated = Vec::with_capacity(events.len());
2144    let mut last_ms = 0u64;
2145    let mut logical = 0u32;
2146
2147    for event in &events {
2148        let event_ms = event.timestamp().timestamp_millis() as u64;
2149        if event_ms == last_ms {
2150            logical += 1;
2151        } else {
2152            last_ms = event_ms;
2153            logical = 0;
2154        }
2155
2156        replicated.push(ReplicatedEvent {
2157            event_id: event.id().to_string(),
2158            hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2159            origin_region: "server".to_string(),
2160            event_data: serde_json::json!({
2161                "event_type": event.event_type_str(),
2162                "entity_id": event.entity_id_str(),
2163                "tenant_id": event.tenant_id_str(),
2164                "payload": event.payload,
2165                "metadata": event.metadata,
2166            }),
2167        });
2168    }
2169
2170    Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2171        events: replicated,
2172        version_vector: std::collections::BTreeMap::new(),
2173    }))
2174}
2175
2176/// POST /api/v1/sync/push — Client pushes events, server applies CRDT resolution.
2177#[cfg(feature = "embedded-sync")]
2178pub async fn sync_push_handler(
2179    State(state): State<AppState>,
2180    Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2181) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2182    let store = &state.store;
2183
2184    let mut accepted = 0usize;
2185    let mut skipped = 0usize;
2186
2187    for rep_event in &request.events {
2188        let event_data = &rep_event.event_data;
2189        let event_type = event_data
2190            .get("event_type")
2191            .and_then(|v| v.as_str())
2192            .unwrap_or("unknown")
2193            .to_string();
2194        let entity_id = event_data
2195            .get("entity_id")
2196            .and_then(|v| v.as_str())
2197            .unwrap_or("unknown")
2198            .to_string();
2199        let tenant_id = event_data
2200            .get("tenant_id")
2201            .and_then(|v| v.as_str())
2202            .unwrap_or("default")
2203            .to_string();
2204        let payload = event_data
2205            .get("payload")
2206            .cloned()
2207            .unwrap_or(serde_json::json!({}));
2208        let metadata = event_data.get("metadata").cloned();
2209
2210        match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2211            Ok(domain_event) => {
2212                store.ingest(&domain_event)?;
2213                accepted += 1;
2214            }
2215            Err(_) => {
2216                skipped += 1;
2217            }
2218        }
2219    }
2220
2221    Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2222        accepted,
2223        skipped,
2224        version_vector: std::collections::BTreeMap::new(),
2225    }))
2226}
2227
2228// =============================================================================
2229// Consumer endpoints for durable subscriptions (v0.14)
2230// =============================================================================
2231
2232/// POST /api/v1/consumers — Register a durable consumer
2233pub async fn register_consumer(
2234    State(store): State<SharedStore>,
2235    Json(req): Json<RegisterConsumerRequest>,
2236) -> Result<Json<ConsumerResponse>> {
2237    let consumer = store
2238        .consumer_registry()
2239        .register(&req.consumer_id, &req.event_type_filters);
2240
2241    Ok(Json(ConsumerResponse {
2242        consumer_id: consumer.consumer_id,
2243        event_type_filters: consumer.event_type_filters,
2244        cursor_position: consumer.cursor_position,
2245    }))
2246}
2247
2248/// GET /api/v1/consumers/{consumer_id} — Get consumer metadata and cursor position
2249pub async fn get_consumer(
2250    State(store): State<SharedStore>,
2251    Path(consumer_id): Path<String>,
2252) -> Result<Json<ConsumerResponse>> {
2253    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2254
2255    Ok(Json(ConsumerResponse {
2256        consumer_id: consumer.consumer_id,
2257        event_type_filters: consumer.event_type_filters,
2258        cursor_position: consumer.cursor_position,
2259    }))
2260}
2261
2262/// GET /api/v1/consumers/{consumer_id}/events — Poll for events since last ack
2263#[derive(Debug, Deserialize)]
2264pub struct ConsumerPollQuery {
2265    pub limit: Option<usize>,
2266}
2267
2268pub async fn poll_consumer_events(
2269    State(store): State<SharedStore>,
2270    Path(consumer_id): Path<String>,
2271    Query(query): Query<ConsumerPollQuery>,
2272) -> Result<Json<ConsumerEventsResponse>> {
2273    let consumer = store.consumer_registry().get_or_create(&consumer_id);
2274    let offset = consumer.cursor_position.unwrap_or(0);
2275    let limit = query.limit.unwrap_or(100);
2276
2277    let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2278    let count = events.len();
2279
2280    let consumer_events: Vec<ConsumerEventDto> = events
2281        .into_iter()
2282        .map(|(position, event)| ConsumerEventDto {
2283            position,
2284            event: EventDto::from(&event),
2285        })
2286        .collect();
2287
2288    Ok(Json(ConsumerEventsResponse {
2289        events: consumer_events,
2290        count,
2291    }))
2292}
2293
2294/// POST /api/v1/consumers/{consumer_id}/ack — Acknowledge processed events
2295pub async fn ack_consumer(
2296    State(store): State<SharedStore>,
2297    Path(consumer_id): Path<String>,
2298    Json(req): Json<AckRequest>,
2299) -> Result<Json<serde_json::Value>> {
2300    let max_offset = store.total_events() as u64;
2301
2302    store
2303        .consumer_registry()
2304        .ack(&consumer_id, req.position, max_offset)
2305        .map_err(crate::error::AllSourceError::InvalidInput)?;
2306
2307    Ok(Json(serde_json::json!({
2308        "status": "ok",
2309        "consumer_id": consumer_id,
2310        "position": req.position,
2311    })))
2312}
2313
2314#[cfg(test)]
2315mod tests {
2316    use super::*;
2317    use crate::{domain::entities::Event, store::EventStore};
2318
2319    fn create_test_store() -> Arc<EventStore> {
2320        Arc::new(EventStore::new())
2321    }
2322
2323    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2324        Event::from_strings(
2325            event_type.to_string(),
2326            entity_id.to_string(),
2327            "test-stream".to_string(),
2328            serde_json::json!({
2329                "name": "Test",
2330                "value": 42
2331            }),
2332            None,
2333        )
2334        .unwrap()
2335    }
2336
2337    #[tokio::test]
2338    async fn test_query_events_has_more_and_total_count() {
2339        let store = create_test_store();
2340
2341        // Ingest 50 events
2342        for i in 0..50 {
2343            store
2344                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2345                .unwrap();
2346        }
2347
2348        // Query with limit=10 — should get has_more=true, total_count=50
2349        let req = QueryEventsRequest {
2350            entity_id: None,
2351            event_type: None,
2352            tenant_id: None,
2353            as_of: None,
2354            since: None,
2355            until: None,
2356            limit: Some(10),
2357            event_type_prefix: None,
2358            payload_filter: None,
2359        };
2360
2361        let requested_limit = req.limit;
2362        let unlimited_req = QueryEventsRequest {
2363            limit: None,
2364            ..QueryEventsRequest {
2365                entity_id: req.entity_id,
2366                event_type: req.event_type,
2367                tenant_id: req.tenant_id,
2368                as_of: req.as_of,
2369                since: req.since,
2370                until: req.until,
2371                limit: None,
2372                event_type_prefix: req.event_type_prefix,
2373                payload_filter: req.payload_filter,
2374            }
2375        };
2376        let all_events = store.query(&unlimited_req).unwrap();
2377        let total_count = all_events.len();
2378        let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2379            all_events.into_iter().take(limit).collect()
2380        } else {
2381            all_events
2382        };
2383        let count = limited_events.len();
2384        let has_more = count < total_count;
2385
2386        assert_eq!(count, 10);
2387        assert_eq!(total_count, 50);
2388        assert!(has_more);
2389    }
2390
2391    #[tokio::test]
2392    async fn test_query_events_no_more_results() {
2393        let store = create_test_store();
2394
2395        // Ingest 5 events
2396        for i in 0..5 {
2397            store
2398                .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2399                .unwrap();
2400        }
2401
2402        // Query with limit=100 — should get has_more=false, total_count=5
2403        let all_events = store
2404            .query(&QueryEventsRequest {
2405                entity_id: None,
2406                event_type: None,
2407                tenant_id: None,
2408                as_of: None,
2409                since: None,
2410                until: None,
2411                limit: None,
2412                event_type_prefix: None,
2413                payload_filter: None,
2414            })
2415            .unwrap();
2416        let total_count = all_events.len();
2417        let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2418        let count = limited_events.len();
2419        let has_more = count < total_count;
2420
2421        assert_eq!(count, 5);
2422        assert_eq!(total_count, 5);
2423        assert!(!has_more);
2424    }
2425
2426    // Regression test for issue #177: `order=desc` + `limit=1` must return
2427    // the NEWEST event for an entity, not the oldest. Mirrors the ordering
2428    // logic in `query_events` (store.query → reverse-if-desc → take(limit)).
2429    #[tokio::test]
2430    async fn test_query_events_order_desc_returns_latest() {
2431        let store = create_test_store();
2432
2433        // Three events for the same entity with strictly increasing
2434        // timestamps — mimics a backfill appending a corrected event.
2435        let base = chrono::Utc::now();
2436        for i in 0..3i64 {
2437            let mut event = create_test_event("org-1", "auth.org.updated");
2438            event.timestamp = base + chrono::Duration::seconds(i);
2439            event.version = i + 1;
2440            store.ingest(&event).unwrap();
2441        }
2442
2443        let req = QueryEventsRequest {
2444            entity_id: Some("org-1".to_string()),
2445            ..Default::default()
2446        };
2447        let ascending = store.query(&req).unwrap();
2448        assert_eq!(ascending.len(), 3);
2449        // Ascending order: oldest timestamp first.
2450        assert!(ascending[0].timestamp < ascending[1].timestamp);
2451        assert!(ascending[1].timestamp < ascending[2].timestamp);
2452        let oldest_ts = ascending[0].timestamp;
2453        let newest_ts = ascending[2].timestamp;
2454
2455        // `order=desc`: handler reverses, then `limit=1` takes the front.
2456        let mut descending = ascending.clone();
2457        descending.reverse();
2458        let latest: Vec<Event> = descending.into_iter().take(1).collect();
2459        assert_eq!(latest.len(), 1);
2460        assert_eq!(
2461            latest[0].timestamp, newest_ts,
2462            "order=desc&limit=1 must yield the newest event"
2463        );
2464
2465        // Default (ascending) + limit=1 still yields the oldest event.
2466        let oldest: Vec<Event> = ascending.into_iter().take(1).collect();
2467        assert_eq!(oldest[0].timestamp, oldest_ts);
2468    }
2469
2470    #[tokio::test]
2471    async fn test_list_entities_by_type_prefix() {
2472        let store = create_test_store();
2473
2474        // 3 index entities
2475        store
2476            .ingest(&create_test_event("idx-1", "index.created"))
2477            .unwrap();
2478        store
2479            .ingest(&create_test_event("idx-1", "index.updated"))
2480            .unwrap();
2481        store
2482            .ingest(&create_test_event("idx-2", "index.created"))
2483            .unwrap();
2484        store
2485            .ingest(&create_test_event("idx-3", "index.created"))
2486            .unwrap();
2487        // 2 trade entities
2488        store
2489            .ingest(&create_test_event("trade-1", "trade.created"))
2490            .unwrap();
2491        store
2492            .ingest(&create_test_event("trade-2", "trade.created"))
2493            .unwrap();
2494
2495        // List entities for index.*
2496        let req = ListEntitiesRequest {
2497            event_type_prefix: Some("index.".to_string()),
2498            ..Default::default()
2499        };
2500        let query_req = QueryEventsRequest {
2501            entity_id: None,
2502            event_type: None,
2503            tenant_id: None,
2504            as_of: None,
2505            since: None,
2506            until: None,
2507            limit: None,
2508            event_type_prefix: req.event_type_prefix,
2509            payload_filter: req.payload_filter,
2510        };
2511        let events = store.query(&query_req).unwrap();
2512
2513        // Group and verify
2514        let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2515            std::collections::HashMap::new();
2516        for event in &events {
2517            entity_map
2518                .entry(event.entity_id().to_string())
2519                .or_default()
2520                .push(event);
2521        }
2522
2523        assert_eq!(entity_map.len(), 3); // idx-1, idx-2, idx-3
2524        assert_eq!(entity_map["idx-1"].len(), 2); // 2 events for idx-1
2525        assert_eq!(entity_map["idx-2"].len(), 1);
2526        assert_eq!(entity_map["idx-3"].len(), 1);
2527    }
2528
2529    // Issue #178: `list_entities` accepts an `order` param and pages
2530    // deterministically over the resulting sort.
2531    #[tokio::test]
2532    async fn test_list_entities_order_and_pagination() {
2533        let store = create_test_store();
2534
2535        // Three entities with strictly increasing last-event times.
2536        let base = chrono::Utc::now();
2537        for (i, eid) in ["org-a", "org-b", "org-c"].iter().enumerate() {
2538            let mut event = create_test_event(eid, "auth.org.created");
2539            event.timestamp = base + chrono::Duration::seconds(i as i64);
2540            store.ingest(&event).unwrap();
2541        }
2542        let prefix = || Some("auth.org.".to_string());
2543
2544        // Default: newest activity first (desc) — preserves prior behavior.
2545        let desc = list_entities(
2546            State(store.clone()),
2547            Query(ListEntitiesRequest {
2548                event_type_prefix: prefix(),
2549                ..Default::default()
2550            }),
2551        )
2552        .await
2553        .unwrap();
2554        let desc_ids: Vec<&str> = desc
2555            .0
2556            .entities
2557            .iter()
2558            .map(|e| e.entity_id.as_str())
2559            .collect();
2560        assert_eq!(desc_ids, ["org-c", "org-b", "org-a"]);
2561
2562        // order=asc: oldest activity first.
2563        let asc = list_entities(
2564            State(store.clone()),
2565            Query(ListEntitiesRequest {
2566                event_type_prefix: prefix(),
2567                order: Some("asc".to_string()),
2568                ..Default::default()
2569            }),
2570        )
2571        .await
2572        .unwrap();
2573        let asc_ids: Vec<&str> = asc
2574            .0
2575            .entities
2576            .iter()
2577            .map(|e| e.entity_id.as_str())
2578            .collect();
2579        assert_eq!(asc_ids, ["org-a", "org-b", "org-c"]);
2580
2581        // Offset pagination over the deterministic asc order: page 2, size 1.
2582        let page2 = list_entities(
2583            State(store.clone()),
2584            Query(ListEntitiesRequest {
2585                event_type_prefix: prefix(),
2586                order: Some("asc".to_string()),
2587                limit: Some(1),
2588                offset: Some(1),
2589                ..Default::default()
2590            }),
2591        )
2592        .await
2593        .unwrap();
2594        assert_eq!(page2.0.entities.len(), 1);
2595        assert_eq!(page2.0.entities[0].entity_id, "org-b");
2596        assert_eq!(page2.0.total, 3);
2597        assert!(page2.0.has_more);
2598
2599        // Invalid order value is rejected.
2600        let err = list_entities(
2601            State(store.clone()),
2602            Query(ListEntitiesRequest {
2603                event_type_prefix: prefix(),
2604                order: Some("sideways".to_string()),
2605                ..Default::default()
2606            }),
2607        )
2608        .await;
2609        assert!(err.is_err(), "invalid order value must be rejected");
2610    }
2611
2612    fn create_test_event_with_payload(
2613        entity_id: &str,
2614        event_type: &str,
2615        payload: serde_json::Value,
2616    ) -> Event {
2617        Event::from_strings(
2618            event_type.to_string(),
2619            entity_id.to_string(),
2620            "test-stream".to_string(),
2621            payload,
2622            None,
2623        )
2624        .unwrap()
2625    }
2626
2627    #[tokio::test]
2628    async fn test_detect_duplicates_by_payload_fields() {
2629        let store = create_test_store();
2630
2631        // Create entities with duplicate "name" field values
2632        store
2633            .ingest(&create_test_event_with_payload(
2634                "idx-1",
2635                "index.created",
2636                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2637            ))
2638            .unwrap();
2639        store
2640            .ingest(&create_test_event_with_payload(
2641                "idx-2",
2642                "index.created",
2643                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2644            ))
2645            .unwrap();
2646        store
2647            .ingest(&create_test_event_with_payload(
2648                "idx-3",
2649                "index.created",
2650                serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2651            ))
2652            .unwrap();
2653        store
2654            .ingest(&create_test_event_with_payload(
2655                "idx-4",
2656                "index.created",
2657                serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2658            ))
2659            .unwrap();
2660        store
2661            .ingest(&create_test_event_with_payload(
2662                "idx-5",
2663                "index.created",
2664                serde_json::json!({"name": "DAX", "user_id": "dave"}),
2665            ))
2666            .unwrap();
2667
2668        // Group by name — should find 2 groups: "S&P 500" (idx-1, idx-2) and "NASDAQ" (idx-3, idx-4)
2669        let query_req = QueryEventsRequest {
2670            entity_id: None,
2671            event_type: None,
2672            tenant_id: None,
2673            as_of: None,
2674            since: None,
2675            until: None,
2676            limit: None,
2677            event_type_prefix: Some("index.".to_string()),
2678            payload_filter: None,
2679        };
2680        let events = store.query(&query_req).unwrap();
2681
2682        // Manually replicate the handler logic for testing
2683        let group_by_fields = vec!["name"];
2684        let mut entity_latest: std::collections::HashMap<String, &Event> =
2685            std::collections::HashMap::new();
2686        for event in &events {
2687            let eid = event.entity_id().to_string();
2688            entity_latest
2689                .entry(eid)
2690                .and_modify(|existing| {
2691                    if event.timestamp() > existing.timestamp() {
2692                        *existing = event;
2693                    }
2694                })
2695                .or_insert(event);
2696        }
2697
2698        let mut groups: std::collections::HashMap<String, Vec<String>> =
2699            std::collections::HashMap::new();
2700        for (entity_id, event) in &entity_latest {
2701            let payload = event.payload();
2702            let mut key_parts = serde_json::Map::new();
2703            for field in &group_by_fields {
2704                let value = payload
2705                    .get(*field)
2706                    .cloned()
2707                    .unwrap_or(serde_json::Value::Null);
2708                key_parts.insert((*field).to_string(), value);
2709            }
2710            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2711            groups.entry(key_str).or_default().push(entity_id.clone());
2712        }
2713
2714        let duplicate_groups: Vec<_> = groups
2715            .into_iter()
2716            .filter(|(_, ids)| ids.len() > 1)
2717            .collect();
2718
2719        assert_eq!(duplicate_groups.len(), 2); // S&P 500 and NASDAQ groups
2720        for (_, ids) in &duplicate_groups {
2721            assert_eq!(ids.len(), 2);
2722        }
2723    }
2724
2725    #[tokio::test]
2726    async fn test_detect_duplicates_no_duplicates() {
2727        let store = create_test_store();
2728
2729        // All unique names
2730        store
2731            .ingest(&create_test_event_with_payload(
2732                "idx-1",
2733                "index.created",
2734                serde_json::json!({"name": "A"}),
2735            ))
2736            .unwrap();
2737        store
2738            .ingest(&create_test_event_with_payload(
2739                "idx-2",
2740                "index.created",
2741                serde_json::json!({"name": "B"}),
2742            ))
2743            .unwrap();
2744
2745        let query_req = QueryEventsRequest {
2746            entity_id: None,
2747            event_type: None,
2748            tenant_id: None,
2749            as_of: None,
2750            since: None,
2751            until: None,
2752            limit: None,
2753            event_type_prefix: Some("index.".to_string()),
2754            payload_filter: None,
2755        };
2756        let events = store.query(&query_req).unwrap();
2757
2758        let mut entity_latest: std::collections::HashMap<String, &Event> =
2759            std::collections::HashMap::new();
2760        for event in &events {
2761            entity_latest
2762                .entry(event.entity_id().to_string())
2763                .or_insert(event);
2764        }
2765
2766        let mut groups: std::collections::HashMap<String, Vec<String>> =
2767            std::collections::HashMap::new();
2768        for (entity_id, event) in &entity_latest {
2769            let key_str =
2770                serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2771                    .unwrap();
2772            groups.entry(key_str).or_default().push(entity_id.clone());
2773        }
2774
2775        let duplicate_groups: Vec<_> = groups
2776            .into_iter()
2777            .filter(|(_, ids)| ids.len() > 1)
2778            .collect();
2779
2780        assert_eq!(duplicate_groups.len(), 0); // No duplicates
2781    }
2782
2783    #[tokio::test]
2784    async fn test_detect_duplicates_multi_field_group_by() {
2785        let store = create_test_store();
2786
2787        // Two entities with same name AND user_id = true duplicate
2788        store
2789            .ingest(&create_test_event_with_payload(
2790                "idx-1",
2791                "index.created",
2792                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2793            ))
2794            .unwrap();
2795        store
2796            .ingest(&create_test_event_with_payload(
2797                "idx-2",
2798                "index.created",
2799                serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2800            ))
2801            .unwrap();
2802        // Same name but different user_id = NOT a duplicate in multi-field group
2803        store
2804            .ingest(&create_test_event_with_payload(
2805                "idx-3",
2806                "index.created",
2807                serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2808            ))
2809            .unwrap();
2810
2811        let query_req = QueryEventsRequest {
2812            entity_id: None,
2813            event_type: None,
2814            tenant_id: None,
2815            as_of: None,
2816            since: None,
2817            until: None,
2818            limit: None,
2819            event_type_prefix: Some("index.".to_string()),
2820            payload_filter: None,
2821        };
2822        let events = store.query(&query_req).unwrap();
2823
2824        let group_by_fields = vec!["name", "user_id"];
2825        let mut entity_latest: std::collections::HashMap<String, &Event> =
2826            std::collections::HashMap::new();
2827        for event in &events {
2828            entity_latest
2829                .entry(event.entity_id().to_string())
2830                .and_modify(|existing| {
2831                    if event.timestamp() > existing.timestamp() {
2832                        *existing = event;
2833                    }
2834                })
2835                .or_insert(event);
2836        }
2837
2838        let mut groups: std::collections::HashMap<String, Vec<String>> =
2839            std::collections::HashMap::new();
2840        for (entity_id, event) in &entity_latest {
2841            let payload = event.payload();
2842            let mut key_parts = serde_json::Map::new();
2843            for field in &group_by_fields {
2844                let value = payload
2845                    .get(*field)
2846                    .cloned()
2847                    .unwrap_or(serde_json::Value::Null);
2848                key_parts.insert((*field).to_string(), value);
2849            }
2850            let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2851            groups.entry(key_str).or_default().push(entity_id.clone());
2852        }
2853
2854        let duplicate_groups: Vec<_> = groups
2855            .into_iter()
2856            .filter(|(_, ids)| ids.len() > 1)
2857            .collect();
2858
2859        // Only 1 duplicate group: name=S&P 500, user_id=alice (idx-1, idx-2)
2860        assert_eq!(duplicate_groups.len(), 1);
2861        let (_, ref ids) = duplicate_groups[0];
2862        assert_eq!(ids.len(), 2);
2863        let mut sorted_ids = ids.clone();
2864        sorted_ids.sort();
2865        assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2866    }
2867
2868    #[tokio::test]
2869    async fn test_projection_state_cache() {
2870        let store = create_test_store();
2871
2872        // Test cache insertion
2873        let cache = store.projection_state_cache();
2874        cache.insert(
2875            "entity_snapshots:user-123".to_string(),
2876            serde_json::json!({"name": "Test User", "age": 30}),
2877        );
2878
2879        // Test cache retrieval
2880        let state = cache.get("entity_snapshots:user-123");
2881        assert!(state.is_some());
2882        let state = state.unwrap();
2883        assert_eq!(state["name"], "Test User");
2884        assert_eq!(state["age"], 30);
2885    }
2886
2887    #[tokio::test]
2888    async fn test_projection_manager_list_projections() {
2889        let store = create_test_store();
2890
2891        // List projections (built-in projections should be available)
2892        let projection_manager = store.projection_manager();
2893        let projections = projection_manager.list_projections();
2894
2895        // Should have entity_snapshots and event_counters
2896        assert!(projections.len() >= 2);
2897
2898        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2899        assert!(names.contains(&"entity_snapshots"));
2900        assert!(names.contains(&"event_counters"));
2901    }
2902
2903    #[tokio::test]
2904    async fn test_projection_state_after_event_ingestion() {
2905        let store = create_test_store();
2906
2907        // Ingest an event
2908        let event = create_test_event("user-456", "user.created");
2909        store.ingest(&event).unwrap();
2910
2911        // Get projection state
2912        let projection_manager = store.projection_manager();
2913        let snapshot_projection = projection_manager
2914            .get_projection("entity_snapshots")
2915            .unwrap();
2916
2917        let state = snapshot_projection.get_state("user-456");
2918        assert!(state.is_some());
2919        let state = state.unwrap();
2920        assert_eq!(state["name"], "Test");
2921        assert_eq!(state["value"], 42);
2922    }
2923
2924    #[tokio::test]
2925    async fn test_projection_state_cache_multiple_entities() {
2926        let store = create_test_store();
2927        let cache = store.projection_state_cache();
2928
2929        // Insert multiple entities
2930        for i in 0..10 {
2931            cache.insert(
2932                format!("entity_snapshots:entity-{i}"),
2933                serde_json::json!({"id": i, "status": "active"}),
2934            );
2935        }
2936
2937        // Verify all insertions
2938        assert_eq!(cache.len(), 10);
2939
2940        // Verify each entity
2941        for i in 0..10 {
2942            let key = format!("entity_snapshots:entity-{i}");
2943            let state = cache.get(&key);
2944            assert!(state.is_some());
2945            assert_eq!(state.unwrap()["id"], i);
2946        }
2947    }
2948
2949    #[tokio::test]
2950    async fn test_projection_state_update() {
2951        let store = create_test_store();
2952        let cache = store.projection_state_cache();
2953
2954        // Initial state
2955        cache.insert(
2956            "entity_snapshots:user-789".to_string(),
2957            serde_json::json!({"balance": 100}),
2958        );
2959
2960        // Update state
2961        cache.insert(
2962            "entity_snapshots:user-789".to_string(),
2963            serde_json::json!({"balance": 150}),
2964        );
2965
2966        // Verify update
2967        let state = cache.get("entity_snapshots:user-789").unwrap();
2968        assert_eq!(state["balance"], 150);
2969    }
2970
2971    #[tokio::test]
2972    async fn test_event_counter_projection() {
2973        let store = create_test_store();
2974
2975        // Ingest events of different types
2976        store
2977            .ingest(&create_test_event("user-1", "user.created"))
2978            .unwrap();
2979        store
2980            .ingest(&create_test_event("user-2", "user.created"))
2981            .unwrap();
2982        store
2983            .ingest(&create_test_event("user-1", "user.updated"))
2984            .unwrap();
2985
2986        // Get event counter projection
2987        let projection_manager = store.projection_manager();
2988        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2989
2990        // Check counts
2991        let created_state = counter_projection.get_state("user.created");
2992        assert!(created_state.is_some());
2993        assert_eq!(created_state.unwrap()["count"], 2);
2994
2995        let updated_state = counter_projection.get_state("user.updated");
2996        assert!(updated_state.is_some());
2997        assert_eq!(updated_state.unwrap()["count"], 1);
2998    }
2999
3000    #[tokio::test]
3001    async fn test_projection_state_cache_key_format() {
3002        let store = create_test_store();
3003        let cache = store.projection_state_cache();
3004
3005        // Test standard key format: {projection_name}:{entity_id}
3006        let key = "orders:order-12345".to_string();
3007        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
3008
3009        let state = cache.get(&key).unwrap();
3010        assert_eq!(state["total"], 99.99);
3011    }
3012
3013    #[tokio::test]
3014    async fn test_projection_state_cache_removal() {
3015        let store = create_test_store();
3016        let cache = store.projection_state_cache();
3017
3018        // Insert and then remove
3019        cache.insert(
3020            "test:entity-1".to_string(),
3021            serde_json::json!({"data": "value"}),
3022        );
3023        assert_eq!(cache.len(), 1);
3024
3025        cache.remove("test:entity-1");
3026        assert_eq!(cache.len(), 0);
3027        assert!(cache.get("test:entity-1").is_none());
3028    }
3029
3030    #[tokio::test]
3031    async fn test_get_nonexistent_projection() {
3032        let store = create_test_store();
3033        let projection_manager = store.projection_manager();
3034
3035        // Requesting a non-existent projection should return None
3036        let projection = projection_manager.get_projection("nonexistent_projection");
3037        assert!(projection.is_none());
3038    }
3039
3040    #[tokio::test]
3041    async fn test_get_nonexistent_entity_state() {
3042        let store = create_test_store();
3043        let projection_manager = store.projection_manager();
3044
3045        // Get state for non-existent entity
3046        let snapshot_projection = projection_manager
3047            .get_projection("entity_snapshots")
3048            .unwrap();
3049        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
3050        assert!(state.is_none());
3051    }
3052
3053    #[tokio::test]
3054    async fn test_projection_state_cache_concurrent_access() {
3055        let store = create_test_store();
3056        let cache = store.projection_state_cache();
3057
3058        // Simulate concurrent writes
3059        let handles: Vec<_> = (0..10)
3060            .map(|i| {
3061                let cache_clone = cache.clone();
3062                tokio::spawn(async move {
3063                    cache_clone.insert(
3064                        format!("concurrent:entity-{i}"),
3065                        serde_json::json!({"thread": i}),
3066                    );
3067                })
3068            })
3069            .collect();
3070
3071        for handle in handles {
3072            handle.await.unwrap();
3073        }
3074
3075        // All 10 entries should be present
3076        assert_eq!(cache.len(), 10);
3077    }
3078
3079    #[tokio::test]
3080    async fn test_projection_state_large_payload() {
3081        let store = create_test_store();
3082        let cache = store.projection_state_cache();
3083
3084        // Create a large JSON payload (~10KB)
3085        let large_array: Vec<serde_json::Value> = (0..1000)
3086            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
3087            .collect();
3088
3089        cache.insert(
3090            "large:entity-1".to_string(),
3091            serde_json::json!({"items": large_array}),
3092        );
3093
3094        let state = cache.get("large:entity-1").unwrap();
3095        let items = state["items"].as_array().unwrap();
3096        assert_eq!(items.len(), 1000);
3097    }
3098
3099    #[tokio::test]
3100    async fn test_projection_state_complex_json() {
3101        let store = create_test_store();
3102        let cache = store.projection_state_cache();
3103
3104        // Complex nested JSON structure
3105        let complex_state = serde_json::json!({
3106            "user": {
3107                "id": "user-123",
3108                "profile": {
3109                    "name": "John Doe",
3110                    "email": "john@example.com",
3111                    "settings": {
3112                        "theme": "dark",
3113                        "notifications": true
3114                    }
3115                },
3116                "roles": ["admin", "user"],
3117                "metadata": {
3118                    "created_at": "2025-01-01T00:00:00Z",
3119                    "last_login": null
3120                }
3121            }
3122        });
3123
3124        cache.insert("complex:user-123".to_string(), complex_state);
3125
3126        let state = cache.get("complex:user-123").unwrap();
3127        assert_eq!(state["user"]["profile"]["name"], "John Doe");
3128        assert_eq!(state["user"]["roles"][0], "admin");
3129        assert!(state["user"]["metadata"]["last_login"].is_null());
3130    }
3131
3132    #[tokio::test]
3133    async fn test_projection_state_cache_iteration() {
3134        let store = create_test_store();
3135        let cache = store.projection_state_cache();
3136
3137        // Insert entries
3138        for i in 0..5 {
3139            cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
3140        }
3141
3142        // Iterate over all entries
3143        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
3144        assert_eq!(entries.len(), 5);
3145    }
3146
3147    #[tokio::test]
3148    async fn test_projection_manager_get_entity_snapshots() {
3149        let store = create_test_store();
3150        let projection_manager = store.projection_manager();
3151
3152        // Get entity_snapshots projection specifically
3153        let projection = projection_manager.get_projection("entity_snapshots");
3154        assert!(projection.is_some());
3155        assert_eq!(projection.unwrap().name(), "entity_snapshots");
3156    }
3157
3158    #[tokio::test]
3159    async fn test_projection_manager_get_event_counters() {
3160        let store = create_test_store();
3161        let projection_manager = store.projection_manager();
3162
3163        // Get event_counters projection specifically
3164        let projection = projection_manager.get_projection("event_counters");
3165        assert!(projection.is_some());
3166        assert_eq!(projection.unwrap().name(), "event_counters");
3167    }
3168
3169    #[tokio::test]
3170    async fn test_projection_state_cache_overwrite() {
3171        let store = create_test_store();
3172        let cache = store.projection_state_cache();
3173
3174        // Initial value
3175        cache.insert(
3176            "overwrite:entity-1".to_string(),
3177            serde_json::json!({"version": 1}),
3178        );
3179
3180        // Overwrite with new value
3181        cache.insert(
3182            "overwrite:entity-1".to_string(),
3183            serde_json::json!({"version": 2}),
3184        );
3185
3186        // Overwrite again
3187        cache.insert(
3188            "overwrite:entity-1".to_string(),
3189            serde_json::json!({"version": 3}),
3190        );
3191
3192        let state = cache.get("overwrite:entity-1").unwrap();
3193        assert_eq!(state["version"], 3);
3194
3195        // Should still be only 1 entry
3196        assert_eq!(cache.len(), 1);
3197    }
3198
3199    #[tokio::test]
3200    async fn test_projection_state_multiple_projections() {
3201        let store = create_test_store();
3202        let cache = store.projection_state_cache();
3203
3204        // Store states for different projections
3205        cache.insert(
3206            "entity_snapshots:user-1".to_string(),
3207            serde_json::json!({"name": "Alice"}),
3208        );
3209        cache.insert(
3210            "event_counters:user.created".to_string(),
3211            serde_json::json!({"count": 5}),
3212        );
3213        cache.insert(
3214            "custom_projection:order-1".to_string(),
3215            serde_json::json!({"total": 150.0}),
3216        );
3217
3218        // Verify each projection's state
3219        assert_eq!(
3220            cache.get("entity_snapshots:user-1").unwrap()["name"],
3221            "Alice"
3222        );
3223        assert_eq!(
3224            cache.get("event_counters:user.created").unwrap()["count"],
3225            5
3226        );
3227        assert_eq!(
3228            cache.get("custom_projection:order-1").unwrap()["total"],
3229            150.0
3230        );
3231    }
3232
3233    #[tokio::test]
3234    async fn test_bulk_projection_state_access() {
3235        let store = create_test_store();
3236
3237        // Ingest multiple events for different entities
3238        for i in 0..5 {
3239            let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3240            store.ingest(&event).unwrap();
3241        }
3242
3243        // Get projection and verify bulk access
3244        let projection_manager = store.projection_manager();
3245        let snapshot_projection = projection_manager
3246            .get_projection("entity_snapshots")
3247            .unwrap();
3248
3249        // Verify we can access all entities
3250        for i in 0..5 {
3251            let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3252            assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3253        }
3254    }
3255
3256    #[tokio::test]
3257    async fn test_bulk_save_projection_states() {
3258        let store = create_test_store();
3259        let cache = store.projection_state_cache();
3260
3261        // Simulate bulk save request
3262        let states = vec![
3263            BulkSaveStateItem {
3264                entity_id: "bulk-entity-1".to_string(),
3265                state: serde_json::json!({"name": "Entity 1", "value": 100}),
3266            },
3267            BulkSaveStateItem {
3268                entity_id: "bulk-entity-2".to_string(),
3269                state: serde_json::json!({"name": "Entity 2", "value": 200}),
3270            },
3271            BulkSaveStateItem {
3272                entity_id: "bulk-entity-3".to_string(),
3273                state: serde_json::json!({"name": "Entity 3", "value": 300}),
3274            },
3275        ];
3276
3277        let projection_name = "test_projection";
3278
3279        // Save states to cache (simulating bulk_save_projection_states handler)
3280        for item in &states {
3281            cache.insert(
3282                format!("{projection_name}:{}", item.entity_id),
3283                item.state.clone(),
3284            );
3285        }
3286
3287        // Verify all states were saved
3288        assert_eq!(cache.len(), 3);
3289
3290        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3291        assert_eq!(state1["name"], "Entity 1");
3292        assert_eq!(state1["value"], 100);
3293
3294        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3295        assert_eq!(state2["name"], "Entity 2");
3296        assert_eq!(state2["value"], 200);
3297
3298        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3299        assert_eq!(state3["name"], "Entity 3");
3300        assert_eq!(state3["value"], 300);
3301    }
3302
3303    #[tokio::test]
3304    async fn test_bulk_save_empty_states() {
3305        let store = create_test_store();
3306        let cache = store.projection_state_cache();
3307
3308        // Clear cache
3309        cache.clear();
3310
3311        // Empty states should work fine
3312        let states: Vec<BulkSaveStateItem> = vec![];
3313        assert_eq!(states.len(), 0);
3314
3315        // Cache should remain empty
3316        assert_eq!(cache.len(), 0);
3317    }
3318
3319    #[tokio::test]
3320    async fn test_bulk_save_overwrites_existing() {
3321        let store = create_test_store();
3322        let cache = store.projection_state_cache();
3323
3324        // Insert initial state
3325        cache.insert(
3326            "test:entity-1".to_string(),
3327            serde_json::json!({"version": 1, "data": "initial"}),
3328        );
3329
3330        // Bulk save with updated state
3331        let new_state = serde_json::json!({"version": 2, "data": "updated"});
3332        cache.insert("test:entity-1".to_string(), new_state);
3333
3334        // Verify overwrite
3335        let state = cache.get("test:entity-1").unwrap();
3336        assert_eq!(state["version"], 2);
3337        assert_eq!(state["data"], "updated");
3338    }
3339
3340    #[tokio::test]
3341    async fn test_bulk_save_high_volume() {
3342        let store = create_test_store();
3343        let cache = store.projection_state_cache();
3344
3345        // Simulate high volume save (1000 entities)
3346        for i in 0..1000 {
3347            cache.insert(
3348                format!("volume_test:entity-{i}"),
3349                serde_json::json!({"index": i, "status": "active"}),
3350            );
3351        }
3352
3353        // Verify count
3354        assert_eq!(cache.len(), 1000);
3355
3356        // Spot check some entries
3357        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3358        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3359        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3360    }
3361
3362    #[tokio::test]
3363    async fn test_bulk_save_different_projections() {
3364        let store = create_test_store();
3365        let cache = store.projection_state_cache();
3366
3367        // Save to multiple projections in bulk
3368        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3369
3370        for proj in &projections {
3371            for i in 0..5 {
3372                cache.insert(
3373                    format!("{proj}:entity-{i}"),
3374                    serde_json::json!({"projection": proj, "id": i}),
3375                );
3376            }
3377        }
3378
3379        // Verify total count (3 projections * 5 entities)
3380        assert_eq!(cache.len(), 15);
3381
3382        // Verify each projection
3383        for proj in &projections {
3384            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3385            assert_eq!(state["projection"], *proj);
3386        }
3387    }
3388
3389    // -------------------------------------------------------------------------
3390    // Cache-fallback tests for the projection-state read handlers (v0.19.1).
3391    //
3392    // SDK-managed projections write state via save_projection_state /
3393    // bulk_save_projection_states without registering in projection_manager.
3394    // These tests verify the read handlers fall back to the cache instead of
3395    // 404-ing. Registered projection wins where both exist; cache fills the
3396    // gap when not.
3397    // -------------------------------------------------------------------------
3398
3399    #[tokio::test]
3400    async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3401        let store = create_test_store();
3402        store.projection_state_cache().insert(
3403            "assets:BTC".to_string(),
3404            serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3405        );
3406
3407        let resp = get_projection_state(
3408            State(Arc::clone(&store)),
3409            Path(("assets".to_string(), "BTC".to_string())),
3410        )
3411        .await
3412        .expect("should not error when projection is not registered");
3413
3414        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3415        assert_eq!(resp.0["state"]["symbol"], "BTC");
3416        assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3417    }
3418
3419    #[tokio::test]
3420    async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3421        let store = create_test_store();
3422
3423        let resp = get_projection_state(
3424            State(Arc::clone(&store)),
3425            Path(("assets".to_string(), "UNKNOWN".to_string())),
3426        )
3427        .await
3428        .unwrap();
3429
3430        assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3431        assert_eq!(resp.0["state"], serde_json::Value::Null);
3432    }
3433
3434    #[tokio::test]
3435    async fn get_projection_state_registered_wins_over_cache() {
3436        let store = create_test_store();
3437
3438        // Ingest an event so entity_snapshots (a registered projection) has state.
3439        let event = create_test_event("user-777", "user.created");
3440        store.ingest(&event).unwrap();
3441
3442        // Plant a conflicting cache entry for the same (projection, entity).
3443        store.projection_state_cache().insert(
3444            "entity_snapshots:user-777".to_string(),
3445            serde_json::json!({"stolen": "value"}),
3446        );
3447
3448        let resp = get_projection_state(
3449            State(Arc::clone(&store)),
3450            Path(("entity_snapshots".to_string(), "user-777".to_string())),
3451        )
3452        .await
3453        .unwrap();
3454
3455        // Registered projection wins — cache fallback is only consulted when
3456        // the registered projection has no state for this entity.
3457        assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3458        assert!(
3459            resp.0["state"].get("stolen").is_none(),
3460            "cache entry must not shadow registered projection state: got {:?}",
3461            resp.0["state"]
3462        );
3463    }
3464
3465    #[tokio::test]
3466    async fn get_projection_state_summary_returns_cache_without_registration() {
3467        let store = create_test_store();
3468        let cache = store.projection_state_cache();
3469        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3470        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3471        // Different projection name — must not appear in the summary.
3472        cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3473
3474        let resp =
3475            get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3476                .await
3477                .unwrap();
3478
3479        assert_eq!(resp.0["total"], 2);
3480        let states = resp.0["states"].as_array().unwrap();
3481        let entity_ids: Vec<&str> = states
3482            .iter()
3483            .map(|s| s["entity_id"].as_str().unwrap())
3484            .collect();
3485        assert!(entity_ids.contains(&"BTC"));
3486        assert!(entity_ids.contains(&"ETH"));
3487    }
3488
3489    #[tokio::test]
3490    async fn bulk_get_projection_states_falls_back_to_cache() {
3491        let store = create_test_store();
3492        let cache = store.projection_state_cache();
3493        cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3494        cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3495
3496        let req = BulkGetStateRequest {
3497            entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3498        };
3499
3500        let resp = bulk_get_projection_states(
3501            State(Arc::clone(&store)),
3502            Path("assets".to_string()),
3503            Json(req),
3504        )
3505        .await
3506        .unwrap();
3507
3508        assert_eq!(resp.0["total"], 3);
3509        let states = resp.0["states"].as_array().unwrap();
3510        let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3511            .iter()
3512            .map(|s| (s["entity_id"].as_str().unwrap(), s))
3513            .collect();
3514
3515        assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3516        assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3517        assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3518        assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3519    }
3520}