Skip to main content

allsource_core/infrastructure/web/
api.rs

1use crate::{
2    application::{
3        dto::{
4            EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
5            IngestEventsBatchResponse, QueryEventsRequest, QueryEventsResponse,
6        },
7        services::{
8            analytics::{
9                AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
10                EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
11            },
12            pipeline::{PipelineConfig, PipelineStats},
13            replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
14            schema::{
15                CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
16                ValidateEventRequest, ValidateEventResponse,
17            },
18        },
19    },
20    domain::entities::Event,
21    error::Result,
22    infrastructure::{
23        persistence::{
24            compaction::CompactionResult,
25            snapshot::{
26                CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
27                ListSnapshotsResponse, SnapshotInfo,
28            },
29        },
30        replication::ReplicationMode,
31        web::api_v1::AppState,
32    },
33    store::{EventStore, EventTypeInfo, StreamInfo},
34};
35use axum::{
36    Json, Router,
37    extract::{Path, Query, State, WebSocketUpgrade},
38    response::{IntoResponse, Response},
39    routing::{get, post, put},
40};
41use serde::Deserialize;
42use std::sync::Arc;
43use tower_http::{
44    cors::{Any, CorsLayer},
45    trace::TraceLayer,
46};
47
48type SharedStore = Arc<EventStore>;
49
50/// Wait for follower ACK(s) in semi-sync/sync replication modes.
51///
52/// In async mode (default), returns immediately. In semi-sync mode, waits for
53/// at least 1 follower to ACK the current WAL offset. In sync mode, waits for
54/// all followers. If the timeout expires, logs a warning and continues (degraded mode).
55async fn await_replication_ack(state: &AppState) {
56    let shipper_guard = state.wal_shipper.read().await;
57    if let Some(ref shipper) = *shipper_guard {
58        let mode = shipper.replication_mode();
59        if mode == ReplicationMode::Async {
60            return;
61        }
62
63        let target_offset = shipper.current_leader_offset();
64        if target_offset == 0 {
65            return;
66        }
67
68        let shipper = Arc::clone(shipper);
69        // Drop the read guard before the async wait to avoid holding it across await
70        drop(shipper_guard);
71
72        let timer = state
73            .store
74            .metrics()
75            .replication_ack_wait_seconds
76            .start_timer();
77        let acked = shipper.wait_for_ack(target_offset).await;
78        timer.observe_duration();
79
80        if !acked {
81            tracing::warn!(
82                "Replication ACK timeout in {} mode (offset {}). \
83                 Write succeeded locally but follower confirmation pending.",
84                mode,
85                target_offset,
86            );
87        }
88    }
89}
90
91pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
92    let app = Router::new()
93        .route("/health", get(health))
94        .route("/metrics", get(prometheus_metrics)) // v0.6: Prometheus metrics endpoint
95        .route("/api/v1/events", post(ingest_event))
96        .route("/api/v1/events/batch", post(ingest_events_batch))
97        .route("/api/v1/events/query", get(query_events))
98        .route("/api/v1/events/stream", get(events_websocket)) // v0.2: WebSocket streaming
99        // v0.10: Stream and event type discovery endpoints
100        .route("/api/v1/streams", get(list_streams))
101        .route("/api/v1/event-types", get(list_event_types))
102        .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
103        .route(
104            "/api/v1/entities/{entity_id}/snapshot",
105            get(get_entity_snapshot),
106        )
107        .route("/api/v1/stats", get(get_stats))
108        // v0.2: Advanced analytics endpoints
109        .route("/api/v1/analytics/frequency", get(analytics_frequency))
110        .route("/api/v1/analytics/summary", get(analytics_summary))
111        .route("/api/v1/analytics/correlation", get(analytics_correlation))
112        // v0.2: Snapshot management endpoints
113        .route("/api/v1/snapshots", post(create_snapshot))
114        .route("/api/v1/snapshots", get(list_snapshots))
115        .route(
116            "/api/v1/snapshots/{entity_id}/latest",
117            get(get_latest_snapshot),
118        )
119        // v0.2: Compaction endpoints
120        .route("/api/v1/compaction/trigger", post(trigger_compaction))
121        .route("/api/v1/compaction/stats", get(compaction_stats))
122        // v0.5: Schema registry endpoints
123        .route("/api/v1/schemas", post(register_schema))
124        .route("/api/v1/schemas", get(list_subjects))
125        .route("/api/v1/schemas/{subject}", get(get_schema))
126        .route(
127            "/api/v1/schemas/{subject}/versions",
128            get(list_schema_versions),
129        )
130        .route("/api/v1/schemas/validate", post(validate_event_schema))
131        .route(
132            "/api/v1/schemas/{subject}/compatibility",
133            put(set_compatibility_mode),
134        )
135        // v0.5: Replay and projection rebuild endpoints
136        .route("/api/v1/replay", post(start_replay))
137        .route("/api/v1/replay", get(list_replays))
138        .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
139        .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
140        .route(
141            "/api/v1/replay/{replay_id}",
142            axum::routing::delete(delete_replay),
143        )
144        // v0.5: Stream processing pipeline endpoints
145        .route("/api/v1/pipelines", post(register_pipeline))
146        .route("/api/v1/pipelines", get(list_pipelines))
147        .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
148        .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
149        .route(
150            "/api/v1/pipelines/{pipeline_id}",
151            axum::routing::delete(remove_pipeline),
152        )
153        .route(
154            "/api/v1/pipelines/{pipeline_id}/stats",
155            get(get_pipeline_stats),
156        )
157        .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
158        // v0.7: Projection State API for Query Service integration
159        .route("/api/v1/projections", get(list_projections))
160        .route("/api/v1/projections/{name}", get(get_projection))
161        .route(
162            "/api/v1/projections/{name}/{entity_id}/state",
163            get(get_projection_state),
164        )
165        .route(
166            "/api/v1/projections/{name}/{entity_id}/state",
167            post(save_projection_state),
168        )
169        .route(
170            "/api/v1/projections/{name}/{entity_id}/state",
171            put(save_projection_state),
172        )
173        .route(
174            "/api/v1/projections/{name}/bulk",
175            post(bulk_get_projection_states),
176        )
177        .route(
178            "/api/v1/projections/{name}/bulk/save",
179            post(bulk_save_projection_states),
180        )
181        .layer(
182            CorsLayer::new()
183                .allow_origin(Any)
184                .allow_methods(Any)
185                .allow_headers(Any),
186        )
187        .layer(TraceLayer::new_for_http())
188        .with_state(store);
189
190    let listener = tokio::net::TcpListener::bind(addr).await?;
191    axum::serve(listener, app).await?;
192
193    Ok(())
194}
195
196pub async fn health() -> impl IntoResponse {
197    Json(serde_json::json!({
198        "status": "healthy",
199        "service": "allsource-core",
200        "version": env!("CARGO_PKG_VERSION")
201    }))
202}
203
204// v0.6: Prometheus metrics endpoint
205pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
206    let metrics = store.metrics();
207
208    match metrics.encode() {
209        Ok(encoded) => Response::builder()
210            .status(200)
211            .header("Content-Type", "text/plain; version=0.0.4")
212            .body(encoded)
213            .unwrap()
214            .into_response(),
215        Err(e) => Response::builder()
216            .status(500)
217            .body(format!("Error encoding metrics: {e}"))
218            .unwrap()
219            .into_response(),
220    }
221}
222
223pub async fn ingest_event(
224    State(store): State<SharedStore>,
225    Json(req): Json<IngestEventRequest>,
226) -> Result<Json<IngestEventResponse>> {
227    // Create event using from_strings with default tenant
228    let event = Event::from_strings(
229        req.event_type,
230        req.entity_id,
231        "default".to_string(),
232        req.payload,
233        req.metadata,
234    )?;
235
236    let event_id = event.id;
237    let timestamp = event.timestamp;
238
239    store.ingest(event)?;
240
241    tracing::info!("Event ingested: {}", event_id);
242
243    Ok(Json(IngestEventResponse {
244        event_id,
245        timestamp,
246    }))
247}
248
249/// Ingest a single event with semi-sync/sync replication ACK waiting.
250///
251/// Used by the v1 API (with auth and replication support).
252pub async fn ingest_event_v1(
253    State(state): State<AppState>,
254    Json(req): Json<IngestEventRequest>,
255) -> Result<Json<IngestEventResponse>> {
256    let event = Event::from_strings(
257        req.event_type,
258        req.entity_id,
259        "default".to_string(),
260        req.payload,
261        req.metadata,
262    )?;
263
264    let event_id = event.id;
265    let timestamp = event.timestamp;
266
267    state.store.ingest(event)?;
268
269    // Semi-sync/sync: wait for follower ACK(s) before returning
270    await_replication_ack(&state).await;
271
272    tracing::info!("Event ingested: {}", event_id);
273
274    Ok(Json(IngestEventResponse {
275        event_id,
276        timestamp,
277    }))
278}
279
280/// Batch ingest multiple events in a single request
281///
282/// This endpoint allows ingesting multiple events atomically, which is more
283/// efficient than making individual requests for each event.
284pub async fn ingest_events_batch(
285    State(store): State<SharedStore>,
286    Json(req): Json<IngestEventsBatchRequest>,
287) -> Result<Json<IngestEventsBatchResponse>> {
288    let total = req.events.len();
289    let mut ingested_events = Vec::with_capacity(total);
290
291    for event_req in req.events {
292        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
293
294        let event = Event::from_strings(
295            event_req.event_type,
296            event_req.entity_id,
297            tenant_id,
298            event_req.payload,
299            event_req.metadata,
300        )?;
301
302        let event_id = event.id;
303        let timestamp = event.timestamp;
304
305        store.ingest(event)?;
306
307        ingested_events.push(IngestEventResponse {
308            event_id,
309            timestamp,
310        });
311    }
312
313    let ingested = ingested_events.len();
314    tracing::info!("Batch ingested {} events", ingested);
315
316    Ok(Json(IngestEventsBatchResponse {
317        total,
318        ingested,
319        events: ingested_events,
320    }))
321}
322
323/// Batch ingest with semi-sync/sync replication ACK waiting.
324///
325/// Used by the v1 API (with auth and replication support).
326pub async fn ingest_events_batch_v1(
327    State(state): State<AppState>,
328    Json(req): Json<IngestEventsBatchRequest>,
329) -> Result<Json<IngestEventsBatchResponse>> {
330    let total = req.events.len();
331    let mut ingested_events = Vec::with_capacity(total);
332
333    for event_req in req.events {
334        let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
335
336        let event = Event::from_strings(
337            event_req.event_type,
338            event_req.entity_id,
339            tenant_id,
340            event_req.payload,
341            event_req.metadata,
342        )?;
343
344        let event_id = event.id;
345        let timestamp = event.timestamp;
346
347        state.store.ingest(event)?;
348
349        ingested_events.push(IngestEventResponse {
350            event_id,
351            timestamp,
352        });
353    }
354
355    // Semi-sync/sync: wait for follower ACK(s) after all events are ingested
356    await_replication_ack(&state).await;
357
358    let ingested = ingested_events.len();
359    tracing::info!("Batch ingested {} events", ingested);
360
361    Ok(Json(IngestEventsBatchResponse {
362        total,
363        ingested,
364        events: ingested_events,
365    }))
366}
367
368pub async fn query_events(
369    State(store): State<SharedStore>,
370    Query(req): Query<QueryEventsRequest>,
371) -> Result<Json<QueryEventsResponse>> {
372    let domain_events = store.query(req)?;
373    let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
374    let count = events.len();
375
376    tracing::debug!("Query returned {} events", count);
377
378    Ok(Json(QueryEventsResponse { events, count }))
379}
380
381#[derive(Deserialize)]
382pub struct EntityStateParams {
383    as_of: Option<chrono::DateTime<chrono::Utc>>,
384}
385
386pub async fn get_entity_state(
387    State(store): State<SharedStore>,
388    Path(entity_id): Path<String>,
389    Query(params): Query<EntityStateParams>,
390) -> Result<Json<serde_json::Value>> {
391    let state = store.reconstruct_state(&entity_id, params.as_of)?;
392
393    tracing::info!("State reconstructed for entity: {}", entity_id);
394
395    Ok(Json(state))
396}
397
398pub async fn get_entity_snapshot(
399    State(store): State<SharedStore>,
400    Path(entity_id): Path<String>,
401) -> Result<Json<serde_json::Value>> {
402    let snapshot = store.get_snapshot(&entity_id)?;
403
404    tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
405
406    Ok(Json(snapshot))
407}
408
409pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
410    let stats = store.stats();
411    Json(stats)
412}
413
414// v0.10: List all streams (entity_ids) in the event store
415/// Query parameters for listing streams
416#[derive(Debug, Deserialize)]
417pub struct ListStreamsParams {
418    /// Optional limit on number of streams to return
419    pub limit: Option<usize>,
420    /// Optional offset for pagination
421    pub offset: Option<usize>,
422}
423
424/// Response for listing streams
425#[derive(Debug, serde::Serialize)]
426pub struct ListStreamsResponse {
427    pub streams: Vec<StreamInfo>,
428    pub total: usize,
429}
430
431pub async fn list_streams(
432    State(store): State<SharedStore>,
433    Query(params): Query<ListStreamsParams>,
434) -> Json<ListStreamsResponse> {
435    let mut streams = store.list_streams();
436    let total = streams.len();
437
438    // Sort by last_event_at descending (most recent first)
439    streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
440
441    // Apply pagination
442    if let Some(offset) = params.offset {
443        if offset < streams.len() {
444            streams = streams[offset..].to_vec();
445        } else {
446            streams = vec![];
447        }
448    }
449
450    if let Some(limit) = params.limit {
451        streams.truncate(limit);
452    }
453
454    tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
455
456    Json(ListStreamsResponse { streams, total })
457}
458
459// v0.10: List all event types in the event store
460/// Query parameters for listing event types
461#[derive(Debug, Deserialize)]
462pub struct ListEventTypesParams {
463    /// Optional limit on number of event types to return
464    pub limit: Option<usize>,
465    /// Optional offset for pagination
466    pub offset: Option<usize>,
467}
468
469/// Response for listing event types
470#[derive(Debug, serde::Serialize)]
471pub struct ListEventTypesResponse {
472    pub event_types: Vec<EventTypeInfo>,
473    pub total: usize,
474}
475
476pub async fn list_event_types(
477    State(store): State<SharedStore>,
478    Query(params): Query<ListEventTypesParams>,
479) -> Json<ListEventTypesResponse> {
480    let mut event_types = store.list_event_types();
481    let total = event_types.len();
482
483    // Sort by event_count descending (most used first)
484    event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
485
486    // Apply pagination
487    if let Some(offset) = params.offset {
488        if offset < event_types.len() {
489            event_types = event_types[offset..].to_vec();
490        } else {
491            event_types = vec![];
492        }
493    }
494
495    if let Some(limit) = params.limit {
496        event_types.truncate(limit);
497    }
498
499    tracing::debug!(
500        "Listed {} event types (total: {})",
501        event_types.len(),
502        total
503    );
504
505    Json(ListEventTypesResponse { event_types, total })
506}
507
508// v0.2: WebSocket endpoint for real-time event streaming
509pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
510    let websocket_manager = store.websocket_manager();
511
512    ws.on_upgrade(move |socket| async move {
513        websocket_manager.handle_socket(socket).await;
514    })
515}
516
517// v0.2: Event frequency analytics endpoint
518pub async fn analytics_frequency(
519    State(store): State<SharedStore>,
520    Query(req): Query<EventFrequencyRequest>,
521) -> Result<Json<EventFrequencyResponse>> {
522    let response = AnalyticsEngine::event_frequency(&store, req)?;
523
524    tracing::debug!(
525        "Frequency analysis returned {} buckets",
526        response.buckets.len()
527    );
528
529    Ok(Json(response))
530}
531
532// v0.2: Statistical summary endpoint
533pub async fn analytics_summary(
534    State(store): State<SharedStore>,
535    Query(req): Query<StatsSummaryRequest>,
536) -> Result<Json<StatsSummaryResponse>> {
537    let response = AnalyticsEngine::stats_summary(&store, req)?;
538
539    tracing::debug!(
540        "Stats summary: {} events across {} entities",
541        response.total_events,
542        response.unique_entities
543    );
544
545    Ok(Json(response))
546}
547
548// v0.2: Event correlation analysis endpoint
549pub async fn analytics_correlation(
550    State(store): State<SharedStore>,
551    Query(req): Query<CorrelationRequest>,
552) -> Result<Json<CorrelationResponse>> {
553    let response = AnalyticsEngine::analyze_correlation(&store, req)?;
554
555    tracing::debug!(
556        "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
557        response.correlated_pairs,
558        response.total_a,
559        response.correlation_percentage
560    );
561
562    Ok(Json(response))
563}
564
565// v0.2: Create a snapshot for an entity
566pub async fn create_snapshot(
567    State(store): State<SharedStore>,
568    Json(req): Json<CreateSnapshotRequest>,
569) -> Result<Json<CreateSnapshotResponse>> {
570    store.create_snapshot(&req.entity_id)?;
571
572    let snapshot_manager = store.snapshot_manager();
573    let snapshot = snapshot_manager
574        .get_latest_snapshot(&req.entity_id)
575        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
576
577    tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
578
579    Ok(Json(CreateSnapshotResponse {
580        snapshot_id: snapshot.id,
581        entity_id: snapshot.entity_id,
582        created_at: snapshot.created_at,
583        event_count: snapshot.event_count,
584        size_bytes: snapshot.metadata.size_bytes,
585    }))
586}
587
588// v0.2: List snapshots
589pub async fn list_snapshots(
590    State(store): State<SharedStore>,
591    Query(req): Query<ListSnapshotsRequest>,
592) -> Result<Json<ListSnapshotsResponse>> {
593    let snapshot_manager = store.snapshot_manager();
594
595    let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
596        snapshot_manager
597            .get_all_snapshots(&entity_id)
598            .into_iter()
599            .map(SnapshotInfo::from)
600            .collect()
601    } else {
602        // List all entities with snapshots
603        let entities = snapshot_manager.list_entities();
604        entities
605            .iter()
606            .flat_map(|entity_id| {
607                snapshot_manager
608                    .get_all_snapshots(entity_id)
609                    .into_iter()
610                    .map(SnapshotInfo::from)
611            })
612            .collect()
613    };
614
615    let total = snapshots.len();
616
617    tracing::debug!("Listed {} snapshots", total);
618
619    Ok(Json(ListSnapshotsResponse { snapshots, total }))
620}
621
622// v0.2: Get latest snapshot for an entity
623pub async fn get_latest_snapshot(
624    State(store): State<SharedStore>,
625    Path(entity_id): Path<String>,
626) -> Result<Json<serde_json::Value>> {
627    let snapshot_manager = store.snapshot_manager();
628
629    let snapshot = snapshot_manager
630        .get_latest_snapshot(&entity_id)
631        .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
632
633    tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
634
635    Ok(Json(serde_json::json!({
636        "snapshot_id": snapshot.id,
637        "entity_id": snapshot.entity_id,
638        "created_at": snapshot.created_at,
639        "as_of": snapshot.as_of,
640        "event_count": snapshot.event_count,
641        "size_bytes": snapshot.metadata.size_bytes,
642        "snapshot_type": snapshot.metadata.snapshot_type,
643        "state": snapshot.state
644    })))
645}
646
647// v0.2: Trigger manual compaction
648pub async fn trigger_compaction(
649    State(store): State<SharedStore>,
650) -> Result<Json<CompactionResult>> {
651    let compaction_manager = store.compaction_manager().ok_or_else(|| {
652        crate::error::AllSourceError::InternalError(
653            "Compaction not enabled (no Parquet storage)".to_string(),
654        )
655    })?;
656
657    tracing::info!("📦 Manual compaction triggered via API");
658
659    let result = compaction_manager.compact_now()?;
660
661    Ok(Json(result))
662}
663
664// v0.2: Get compaction statistics
665pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
666    let compaction_manager = store.compaction_manager().ok_or_else(|| {
667        crate::error::AllSourceError::InternalError(
668            "Compaction not enabled (no Parquet storage)".to_string(),
669        )
670    })?;
671
672    let stats = compaction_manager.stats();
673    let config = compaction_manager.config();
674
675    Ok(Json(serde_json::json!({
676        "stats": stats,
677        "config": {
678            "min_files_to_compact": config.min_files_to_compact,
679            "target_file_size": config.target_file_size,
680            "max_file_size": config.max_file_size,
681            "small_file_threshold": config.small_file_threshold,
682            "compaction_interval_seconds": config.compaction_interval_seconds,
683            "auto_compact": config.auto_compact,
684            "strategy": config.strategy
685        }
686    })))
687}
688
689// v0.5: Register a new schema
690pub async fn register_schema(
691    State(store): State<SharedStore>,
692    Json(req): Json<RegisterSchemaRequest>,
693) -> Result<Json<RegisterSchemaResponse>> {
694    let schema_registry = store.schema_registry();
695
696    let response =
697        schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
698
699    tracing::info!(
700        "📋 Schema registered: v{} for '{}'",
701        response.version,
702        response.subject
703    );
704
705    Ok(Json(response))
706}
707
708// v0.5: Get a schema by subject and optional version
709#[derive(Deserialize)]
710pub struct GetSchemaParams {
711    version: Option<u32>,
712}
713
714pub async fn get_schema(
715    State(store): State<SharedStore>,
716    Path(subject): Path<String>,
717    Query(params): Query<GetSchemaParams>,
718) -> Result<Json<serde_json::Value>> {
719    let schema_registry = store.schema_registry();
720
721    let schema = schema_registry.get_schema(&subject, params.version)?;
722
723    tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
724
725    Ok(Json(serde_json::json!({
726        "id": schema.id,
727        "subject": schema.subject,
728        "version": schema.version,
729        "schema": schema.schema,
730        "created_at": schema.created_at,
731        "description": schema.description,
732        "tags": schema.tags
733    })))
734}
735
736// v0.5: List all versions of a schema subject
737pub async fn list_schema_versions(
738    State(store): State<SharedStore>,
739    Path(subject): Path<String>,
740) -> Result<Json<serde_json::Value>> {
741    let schema_registry = store.schema_registry();
742
743    let versions = schema_registry.list_versions(&subject)?;
744
745    Ok(Json(serde_json::json!({
746        "subject": subject,
747        "versions": versions
748    })))
749}
750
751// v0.5: List all schema subjects
752pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
753    let schema_registry = store.schema_registry();
754
755    let subjects = schema_registry.list_subjects();
756
757    Json(serde_json::json!({
758        "subjects": subjects,
759        "total": subjects.len()
760    }))
761}
762
763// v0.5: Validate an event against a schema
764pub async fn validate_event_schema(
765    State(store): State<SharedStore>,
766    Json(req): Json<ValidateEventRequest>,
767) -> Result<Json<ValidateEventResponse>> {
768    let schema_registry = store.schema_registry();
769
770    let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
771
772    if response.valid {
773        tracing::debug!(
774            "✅ Event validated against schema '{}' v{}",
775            req.subject,
776            response.schema_version
777        );
778    } else {
779        tracing::warn!(
780            "❌ Event validation failed for '{}': {:?}",
781            req.subject,
782            response.errors
783        );
784    }
785
786    Ok(Json(response))
787}
788
789// v0.5: Set compatibility mode for a subject
790#[derive(Deserialize)]
791pub struct SetCompatibilityRequest {
792    compatibility: CompatibilityMode,
793}
794
795pub async fn set_compatibility_mode(
796    State(store): State<SharedStore>,
797    Path(subject): Path<String>,
798    Json(req): Json<SetCompatibilityRequest>,
799) -> Json<serde_json::Value> {
800    let schema_registry = store.schema_registry();
801
802    schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
803
804    tracing::info!(
805        "🔧 Set compatibility mode for '{}' to {:?}",
806        subject,
807        req.compatibility
808    );
809
810    Json(serde_json::json!({
811        "subject": subject,
812        "compatibility": req.compatibility
813    }))
814}
815
816// v0.5: Start a replay operation
817pub async fn start_replay(
818    State(store): State<SharedStore>,
819    Json(req): Json<StartReplayRequest>,
820) -> Result<Json<StartReplayResponse>> {
821    let replay_manager = store.replay_manager();
822
823    let response = replay_manager.start_replay(store, req)?;
824
825    tracing::info!(
826        "🔄 Started replay {} with {} events",
827        response.replay_id,
828        response.total_events
829    );
830
831    Ok(Json(response))
832}
833
834// v0.5: Get replay progress
835pub async fn get_replay_progress(
836    State(store): State<SharedStore>,
837    Path(replay_id): Path<uuid::Uuid>,
838) -> Result<Json<ReplayProgress>> {
839    let replay_manager = store.replay_manager();
840
841    let progress = replay_manager.get_progress(replay_id)?;
842
843    Ok(Json(progress))
844}
845
846// v0.5: List all replay operations
847pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
848    let replay_manager = store.replay_manager();
849
850    let replays = replay_manager.list_replays();
851
852    Json(serde_json::json!({
853        "replays": replays,
854        "total": replays.len()
855    }))
856}
857
858// v0.5: Cancel a running replay
859pub async fn cancel_replay(
860    State(store): State<SharedStore>,
861    Path(replay_id): Path<uuid::Uuid>,
862) -> Result<Json<serde_json::Value>> {
863    let replay_manager = store.replay_manager();
864
865    replay_manager.cancel_replay(replay_id)?;
866
867    tracing::info!("🛑 Cancelled replay {}", replay_id);
868
869    Ok(Json(serde_json::json!({
870        "replay_id": replay_id,
871        "status": "cancelled"
872    })))
873}
874
875// v0.5: Delete a completed replay
876pub async fn delete_replay(
877    State(store): State<SharedStore>,
878    Path(replay_id): Path<uuid::Uuid>,
879) -> Result<Json<serde_json::Value>> {
880    let replay_manager = store.replay_manager();
881
882    let deleted = replay_manager.delete_replay(replay_id)?;
883
884    if deleted {
885        tracing::info!("🗑️  Deleted replay {}", replay_id);
886    }
887
888    Ok(Json(serde_json::json!({
889        "replay_id": replay_id,
890        "deleted": deleted
891    })))
892}
893
894// v0.5: Register a new pipeline
895pub async fn register_pipeline(
896    State(store): State<SharedStore>,
897    Json(config): Json<PipelineConfig>,
898) -> Result<Json<serde_json::Value>> {
899    let pipeline_manager = store.pipeline_manager();
900
901    let pipeline_id = pipeline_manager.register(config.clone());
902
903    tracing::info!(
904        "🔀 Pipeline registered: {} (name: {})",
905        pipeline_id,
906        config.name
907    );
908
909    Ok(Json(serde_json::json!({
910        "pipeline_id": pipeline_id,
911        "name": config.name,
912        "enabled": config.enabled
913    })))
914}
915
916// v0.5: List all pipelines
917pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
918    let pipeline_manager = store.pipeline_manager();
919
920    let pipelines = pipeline_manager.list();
921
922    tracing::debug!("Listed {} pipelines", pipelines.len());
923
924    Json(serde_json::json!({
925        "pipelines": pipelines,
926        "total": pipelines.len()
927    }))
928}
929
930// v0.5: Get a specific pipeline
931pub async fn get_pipeline(
932    State(store): State<SharedStore>,
933    Path(pipeline_id): Path<uuid::Uuid>,
934) -> Result<Json<PipelineConfig>> {
935    let pipeline_manager = store.pipeline_manager();
936
937    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
938        crate::error::AllSourceError::ValidationError(format!(
939            "Pipeline not found: {}",
940            pipeline_id
941        ))
942    })?;
943
944    Ok(Json(pipeline.config().clone()))
945}
946
947// v0.5: Remove a pipeline
948pub async fn remove_pipeline(
949    State(store): State<SharedStore>,
950    Path(pipeline_id): Path<uuid::Uuid>,
951) -> Result<Json<serde_json::Value>> {
952    let pipeline_manager = store.pipeline_manager();
953
954    let removed = pipeline_manager.remove(pipeline_id);
955
956    if removed {
957        tracing::info!("🗑️  Removed pipeline {}", pipeline_id);
958    }
959
960    Ok(Json(serde_json::json!({
961        "pipeline_id": pipeline_id,
962        "removed": removed
963    })))
964}
965
966// v0.5: Get statistics for all pipelines
967pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
968    let pipeline_manager = store.pipeline_manager();
969
970    let stats = pipeline_manager.all_stats();
971
972    Json(serde_json::json!({
973        "stats": stats,
974        "total": stats.len()
975    }))
976}
977
978// v0.5: Get statistics for a specific pipeline
979pub async fn get_pipeline_stats(
980    State(store): State<SharedStore>,
981    Path(pipeline_id): Path<uuid::Uuid>,
982) -> Result<Json<PipelineStats>> {
983    let pipeline_manager = store.pipeline_manager();
984
985    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
986        crate::error::AllSourceError::ValidationError(format!(
987            "Pipeline not found: {}",
988            pipeline_id
989        ))
990    })?;
991
992    Ok(Json(pipeline.stats()))
993}
994
995// v0.5: Reset a pipeline's state
996pub async fn reset_pipeline(
997    State(store): State<SharedStore>,
998    Path(pipeline_id): Path<uuid::Uuid>,
999) -> Result<Json<serde_json::Value>> {
1000    let pipeline_manager = store.pipeline_manager();
1001
1002    let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1003        crate::error::AllSourceError::ValidationError(format!(
1004            "Pipeline not found: {}",
1005            pipeline_id
1006        ))
1007    })?;
1008
1009    pipeline.reset();
1010
1011    tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1012
1013    Ok(Json(serde_json::json!({
1014        "pipeline_id": pipeline_id,
1015        "reset": true
1016    })))
1017}
1018
1019// =============================================================================
1020// v0.7: Projection State API for Query Service Integration
1021// =============================================================================
1022
1023/// List all registered projections
1024pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1025    let projection_manager = store.projection_manager();
1026
1027    let projections: Vec<serde_json::Value> = projection_manager
1028        .list_projections()
1029        .iter()
1030        .map(|(name, projection)| {
1031            serde_json::json!({
1032                "name": name,
1033                "type": format!("{:?}", projection.name()),
1034            })
1035        })
1036        .collect();
1037
1038    tracing::debug!("Listed {} projections", projections.len());
1039
1040    Json(serde_json::json!({
1041        "projections": projections,
1042        "total": projections.len()
1043    }))
1044}
1045
1046/// Get projection metadata by name
1047pub async fn get_projection(
1048    State(store): State<SharedStore>,
1049    Path(name): Path<String>,
1050) -> Result<Json<serde_json::Value>> {
1051    let projection_manager = store.projection_manager();
1052
1053    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1054        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1055    })?;
1056
1057    Ok(Json(serde_json::json!({
1058        "name": projection.name(),
1059        "found": true
1060    })))
1061}
1062
1063/// Get projection state for a specific entity
1064///
1065/// This endpoint allows the Elixir Query Service to fetch projection state
1066/// from the Rust Core for synchronization.
1067pub async fn get_projection_state(
1068    State(store): State<SharedStore>,
1069    Path((name, entity_id)): Path<(String, String)>,
1070) -> Result<Json<serde_json::Value>> {
1071    let projection_manager = store.projection_manager();
1072
1073    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1074        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1075    })?;
1076
1077    let state = projection.get_state(&entity_id);
1078
1079    tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1080
1081    Ok(Json(serde_json::json!({
1082        "projection": name,
1083        "entity_id": entity_id,
1084        "state": state,
1085        "found": state.is_some()
1086    })))
1087}
1088
1089/// Request body for saving projection state
1090#[derive(Debug, Deserialize)]
1091pub struct SaveProjectionStateRequest {
1092    pub state: serde_json::Value,
1093}
1094
1095/// Save/update projection state for an entity
1096///
1097/// This endpoint allows external services (like Elixir Query Service) to
1098/// store computed projection state back to the Core for persistence.
1099pub async fn save_projection_state(
1100    State(store): State<SharedStore>,
1101    Path((name, entity_id)): Path<(String, String)>,
1102    Json(req): Json<SaveProjectionStateRequest>,
1103) -> Result<Json<serde_json::Value>> {
1104    let projection_cache = store.projection_state_cache();
1105
1106    // Store in the projection state cache
1107    projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1108
1109    tracing::info!("Projection state saved: {} / {}", name, entity_id);
1110
1111    Ok(Json(serde_json::json!({
1112        "projection": name,
1113        "entity_id": entity_id,
1114        "saved": true
1115    })))
1116}
1117
1118/// Bulk get projection states for multiple entities
1119///
1120/// Efficient endpoint for fetching multiple entity states in a single request.
1121#[derive(Debug, Deserialize)]
1122pub struct BulkGetStateRequest {
1123    pub entity_ids: Vec<String>,
1124}
1125
1126/// Bulk save projection states for multiple entities
1127///
1128/// Efficient endpoint for saving multiple entity states in a single request.
1129#[derive(Debug, Deserialize)]
1130pub struct BulkSaveStateRequest {
1131    pub states: Vec<BulkSaveStateItem>,
1132}
1133
1134#[derive(Debug, Deserialize)]
1135pub struct BulkSaveStateItem {
1136    pub entity_id: String,
1137    pub state: serde_json::Value,
1138}
1139
1140pub async fn bulk_get_projection_states(
1141    State(store): State<SharedStore>,
1142    Path(name): Path<String>,
1143    Json(req): Json<BulkGetStateRequest>,
1144) -> Result<Json<serde_json::Value>> {
1145    let projection_manager = store.projection_manager();
1146
1147    let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1148        crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1149    })?;
1150
1151    let states: Vec<serde_json::Value> = req
1152        .entity_ids
1153        .iter()
1154        .map(|entity_id| {
1155            let state = projection.get_state(entity_id);
1156            serde_json::json!({
1157                "entity_id": entity_id,
1158                "state": state,
1159                "found": state.is_some()
1160            })
1161        })
1162        .collect();
1163
1164    tracing::debug!(
1165        "Bulk projection state retrieved: {} entities from {}",
1166        states.len(),
1167        name
1168    );
1169
1170    Ok(Json(serde_json::json!({
1171        "projection": name,
1172        "states": states,
1173        "total": states.len()
1174    })))
1175}
1176
1177/// Bulk save projection states for multiple entities
1178///
1179/// This endpoint allows efficient batch saving of projection states,
1180/// critical for high-throughput event processing pipelines.
1181pub async fn bulk_save_projection_states(
1182    State(store): State<SharedStore>,
1183    Path(name): Path<String>,
1184    Json(req): Json<BulkSaveStateRequest>,
1185) -> Result<Json<serde_json::Value>> {
1186    let projection_cache = store.projection_state_cache();
1187
1188    let mut saved_count = 0;
1189    for item in &req.states {
1190        projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1191        saved_count += 1;
1192    }
1193
1194    tracing::info!(
1195        "Bulk projection state saved: {} entities for {}",
1196        saved_count,
1197        name
1198    );
1199
1200    Ok(Json(serde_json::json!({
1201        "projection": name,
1202        "saved": saved_count,
1203        "total": req.states.len()
1204    })))
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209    use super::*;
1210    use crate::{domain::entities::Event, store::EventStore};
1211
1212    fn create_test_store() -> Arc<EventStore> {
1213        Arc::new(EventStore::new())
1214    }
1215
1216    fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1217        Event::from_strings(
1218            event_type.to_string(),
1219            entity_id.to_string(),
1220            "test-stream".to_string(),
1221            serde_json::json!({
1222                "name": "Test",
1223                "value": 42
1224            }),
1225            None,
1226        )
1227        .unwrap()
1228    }
1229
1230    #[tokio::test]
1231    async fn test_projection_state_cache() {
1232        let store = create_test_store();
1233
1234        // Test cache insertion
1235        let cache = store.projection_state_cache();
1236        cache.insert(
1237            "entity_snapshots:user-123".to_string(),
1238            serde_json::json!({"name": "Test User", "age": 30}),
1239        );
1240
1241        // Test cache retrieval
1242        let state = cache.get("entity_snapshots:user-123");
1243        assert!(state.is_some());
1244        let state = state.unwrap();
1245        assert_eq!(state["name"], "Test User");
1246        assert_eq!(state["age"], 30);
1247    }
1248
1249    #[tokio::test]
1250    async fn test_projection_manager_list_projections() {
1251        let store = create_test_store();
1252
1253        // List projections (built-in projections should be available)
1254        let projection_manager = store.projection_manager();
1255        let projections = projection_manager.list_projections();
1256
1257        // Should have entity_snapshots and event_counters
1258        assert!(projections.len() >= 2);
1259
1260        let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
1261        assert!(names.contains(&"entity_snapshots"));
1262        assert!(names.contains(&"event_counters"));
1263    }
1264
1265    #[tokio::test]
1266    async fn test_projection_state_after_event_ingestion() {
1267        let store = create_test_store();
1268
1269        // Ingest an event
1270        let event = create_test_event("user-456", "user.created");
1271        store.ingest(event).unwrap();
1272
1273        // Get projection state
1274        let projection_manager = store.projection_manager();
1275        let snapshot_projection = projection_manager
1276            .get_projection("entity_snapshots")
1277            .unwrap();
1278
1279        let state = snapshot_projection.get_state("user-456");
1280        assert!(state.is_some());
1281        let state = state.unwrap();
1282        assert_eq!(state["name"], "Test");
1283        assert_eq!(state["value"], 42);
1284    }
1285
1286    #[tokio::test]
1287    async fn test_projection_state_cache_multiple_entities() {
1288        let store = create_test_store();
1289        let cache = store.projection_state_cache();
1290
1291        // Insert multiple entities
1292        for i in 0..10 {
1293            cache.insert(
1294                format!("entity_snapshots:entity-{}", i),
1295                serde_json::json!({"id": i, "status": "active"}),
1296            );
1297        }
1298
1299        // Verify all insertions
1300        assert_eq!(cache.len(), 10);
1301
1302        // Verify each entity
1303        for i in 0..10 {
1304            let key = format!("entity_snapshots:entity-{}", i);
1305            let state = cache.get(&key);
1306            assert!(state.is_some());
1307            assert_eq!(state.unwrap()["id"], i);
1308        }
1309    }
1310
1311    #[tokio::test]
1312    async fn test_projection_state_update() {
1313        let store = create_test_store();
1314        let cache = store.projection_state_cache();
1315
1316        // Initial state
1317        cache.insert(
1318            "entity_snapshots:user-789".to_string(),
1319            serde_json::json!({"balance": 100}),
1320        );
1321
1322        // Update state
1323        cache.insert(
1324            "entity_snapshots:user-789".to_string(),
1325            serde_json::json!({"balance": 150}),
1326        );
1327
1328        // Verify update
1329        let state = cache.get("entity_snapshots:user-789").unwrap();
1330        assert_eq!(state["balance"], 150);
1331    }
1332
1333    #[tokio::test]
1334    async fn test_event_counter_projection() {
1335        let store = create_test_store();
1336
1337        // Ingest events of different types
1338        store
1339            .ingest(create_test_event("user-1", "user.created"))
1340            .unwrap();
1341        store
1342            .ingest(create_test_event("user-2", "user.created"))
1343            .unwrap();
1344        store
1345            .ingest(create_test_event("user-1", "user.updated"))
1346            .unwrap();
1347
1348        // Get event counter projection
1349        let projection_manager = store.projection_manager();
1350        let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1351
1352        // Check counts
1353        let created_state = counter_projection.get_state("user.created");
1354        assert!(created_state.is_some());
1355        assert_eq!(created_state.unwrap()["count"], 2);
1356
1357        let updated_state = counter_projection.get_state("user.updated");
1358        assert!(updated_state.is_some());
1359        assert_eq!(updated_state.unwrap()["count"], 1);
1360    }
1361
1362    #[tokio::test]
1363    async fn test_projection_state_cache_key_format() {
1364        let store = create_test_store();
1365        let cache = store.projection_state_cache();
1366
1367        // Test standard key format: {projection_name}:{entity_id}
1368        let key = "orders:order-12345".to_string();
1369        cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
1370
1371        let state = cache.get(&key).unwrap();
1372        assert_eq!(state["total"], 99.99);
1373    }
1374
1375    #[tokio::test]
1376    async fn test_projection_state_cache_removal() {
1377        let store = create_test_store();
1378        let cache = store.projection_state_cache();
1379
1380        // Insert and then remove
1381        cache.insert(
1382            "test:entity-1".to_string(),
1383            serde_json::json!({"data": "value"}),
1384        );
1385        assert_eq!(cache.len(), 1);
1386
1387        cache.remove("test:entity-1");
1388        assert_eq!(cache.len(), 0);
1389        assert!(cache.get("test:entity-1").is_none());
1390    }
1391
1392    #[tokio::test]
1393    async fn test_get_nonexistent_projection() {
1394        let store = create_test_store();
1395        let projection_manager = store.projection_manager();
1396
1397        // Requesting a non-existent projection should return None
1398        let projection = projection_manager.get_projection("nonexistent_projection");
1399        assert!(projection.is_none());
1400    }
1401
1402    #[tokio::test]
1403    async fn test_get_nonexistent_entity_state() {
1404        let store = create_test_store();
1405        let projection_manager = store.projection_manager();
1406
1407        // Get state for non-existent entity
1408        let snapshot_projection = projection_manager
1409            .get_projection("entity_snapshots")
1410            .unwrap();
1411        let state = snapshot_projection.get_state("nonexistent-entity-xyz");
1412        assert!(state.is_none());
1413    }
1414
1415    #[tokio::test]
1416    async fn test_projection_state_cache_concurrent_access() {
1417        let store = create_test_store();
1418        let cache = store.projection_state_cache();
1419
1420        // Simulate concurrent writes
1421        let handles: Vec<_> = (0..10)
1422            .map(|i| {
1423                let cache_clone = cache.clone();
1424                tokio::spawn(async move {
1425                    cache_clone.insert(
1426                        format!("concurrent:entity-{}", i),
1427                        serde_json::json!({"thread": i}),
1428                    );
1429                })
1430            })
1431            .collect();
1432
1433        for handle in handles {
1434            handle.await.unwrap();
1435        }
1436
1437        // All 10 entries should be present
1438        assert_eq!(cache.len(), 10);
1439    }
1440
1441    #[tokio::test]
1442    async fn test_projection_state_large_payload() {
1443        let store = create_test_store();
1444        let cache = store.projection_state_cache();
1445
1446        // Create a large JSON payload (~10KB)
1447        let large_array: Vec<serde_json::Value> = (0..1000)
1448            .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
1449            .collect();
1450
1451        cache.insert(
1452            "large:entity-1".to_string(),
1453            serde_json::json!({"items": large_array}),
1454        );
1455
1456        let state = cache.get("large:entity-1").unwrap();
1457        let items = state["items"].as_array().unwrap();
1458        assert_eq!(items.len(), 1000);
1459    }
1460
1461    #[tokio::test]
1462    async fn test_projection_state_complex_json() {
1463        let store = create_test_store();
1464        let cache = store.projection_state_cache();
1465
1466        // Complex nested JSON structure
1467        let complex_state = serde_json::json!({
1468            "user": {
1469                "id": "user-123",
1470                "profile": {
1471                    "name": "John Doe",
1472                    "email": "john@example.com",
1473                    "settings": {
1474                        "theme": "dark",
1475                        "notifications": true
1476                    }
1477                },
1478                "roles": ["admin", "user"],
1479                "metadata": {
1480                    "created_at": "2025-01-01T00:00:00Z",
1481                    "last_login": null
1482                }
1483            }
1484        });
1485
1486        cache.insert("complex:user-123".to_string(), complex_state);
1487
1488        let state = cache.get("complex:user-123").unwrap();
1489        assert_eq!(state["user"]["profile"]["name"], "John Doe");
1490        assert_eq!(state["user"]["roles"][0], "admin");
1491        assert!(state["user"]["metadata"]["last_login"].is_null());
1492    }
1493
1494    #[tokio::test]
1495    async fn test_projection_state_cache_iteration() {
1496        let store = create_test_store();
1497        let cache = store.projection_state_cache();
1498
1499        // Insert entries
1500        for i in 0..5 {
1501            cache.insert(
1502                format!("iter:entity-{}", i),
1503                serde_json::json!({"index": i}),
1504            );
1505        }
1506
1507        // Iterate over all entries
1508        let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
1509        assert_eq!(entries.len(), 5);
1510    }
1511
1512    #[tokio::test]
1513    async fn test_projection_manager_get_entity_snapshots() {
1514        let store = create_test_store();
1515        let projection_manager = store.projection_manager();
1516
1517        // Get entity_snapshots projection specifically
1518        let projection = projection_manager.get_projection("entity_snapshots");
1519        assert!(projection.is_some());
1520        assert_eq!(projection.unwrap().name(), "entity_snapshots");
1521    }
1522
1523    #[tokio::test]
1524    async fn test_projection_manager_get_event_counters() {
1525        let store = create_test_store();
1526        let projection_manager = store.projection_manager();
1527
1528        // Get event_counters projection specifically
1529        let projection = projection_manager.get_projection("event_counters");
1530        assert!(projection.is_some());
1531        assert_eq!(projection.unwrap().name(), "event_counters");
1532    }
1533
1534    #[tokio::test]
1535    async fn test_projection_state_cache_overwrite() {
1536        let store = create_test_store();
1537        let cache = store.projection_state_cache();
1538
1539        // Initial value
1540        cache.insert(
1541            "overwrite:entity-1".to_string(),
1542            serde_json::json!({"version": 1}),
1543        );
1544
1545        // Overwrite with new value
1546        cache.insert(
1547            "overwrite:entity-1".to_string(),
1548            serde_json::json!({"version": 2}),
1549        );
1550
1551        // Overwrite again
1552        cache.insert(
1553            "overwrite:entity-1".to_string(),
1554            serde_json::json!({"version": 3}),
1555        );
1556
1557        let state = cache.get("overwrite:entity-1").unwrap();
1558        assert_eq!(state["version"], 3);
1559
1560        // Should still be only 1 entry
1561        assert_eq!(cache.len(), 1);
1562    }
1563
1564    #[tokio::test]
1565    async fn test_projection_state_multiple_projections() {
1566        let store = create_test_store();
1567        let cache = store.projection_state_cache();
1568
1569        // Store states for different projections
1570        cache.insert(
1571            "entity_snapshots:user-1".to_string(),
1572            serde_json::json!({"name": "Alice"}),
1573        );
1574        cache.insert(
1575            "event_counters:user.created".to_string(),
1576            serde_json::json!({"count": 5}),
1577        );
1578        cache.insert(
1579            "custom_projection:order-1".to_string(),
1580            serde_json::json!({"total": 150.0}),
1581        );
1582
1583        // Verify each projection's state
1584        assert_eq!(
1585            cache.get("entity_snapshots:user-1").unwrap()["name"],
1586            "Alice"
1587        );
1588        assert_eq!(
1589            cache.get("event_counters:user.created").unwrap()["count"],
1590            5
1591        );
1592        assert_eq!(
1593            cache.get("custom_projection:order-1").unwrap()["total"],
1594            150.0
1595        );
1596    }
1597
1598    #[tokio::test]
1599    async fn test_bulk_projection_state_access() {
1600        let store = create_test_store();
1601
1602        // Ingest multiple events for different entities
1603        for i in 0..5 {
1604            let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
1605            store.ingest(event).unwrap();
1606        }
1607
1608        // Get projection and verify bulk access
1609        let projection_manager = store.projection_manager();
1610        let snapshot_projection = projection_manager
1611            .get_projection("entity_snapshots")
1612            .unwrap();
1613
1614        // Verify we can access all entities
1615        for i in 0..5 {
1616            let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
1617            assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
1618        }
1619    }
1620
1621    #[tokio::test]
1622    async fn test_bulk_save_projection_states() {
1623        let store = create_test_store();
1624        let cache = store.projection_state_cache();
1625
1626        // Simulate bulk save request
1627        let states = vec![
1628            BulkSaveStateItem {
1629                entity_id: "bulk-entity-1".to_string(),
1630                state: serde_json::json!({"name": "Entity 1", "value": 100}),
1631            },
1632            BulkSaveStateItem {
1633                entity_id: "bulk-entity-2".to_string(),
1634                state: serde_json::json!({"name": "Entity 2", "value": 200}),
1635            },
1636            BulkSaveStateItem {
1637                entity_id: "bulk-entity-3".to_string(),
1638                state: serde_json::json!({"name": "Entity 3", "value": 300}),
1639            },
1640        ];
1641
1642        let projection_name = "test_projection";
1643
1644        // Save states to cache (simulating bulk_save_projection_states handler)
1645        for item in &states {
1646            cache.insert(
1647                format!("{projection_name}:{}", item.entity_id),
1648                item.state.clone(),
1649            );
1650        }
1651
1652        // Verify all states were saved
1653        assert_eq!(cache.len(), 3);
1654
1655        let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
1656        assert_eq!(state1["name"], "Entity 1");
1657        assert_eq!(state1["value"], 100);
1658
1659        let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
1660        assert_eq!(state2["name"], "Entity 2");
1661        assert_eq!(state2["value"], 200);
1662
1663        let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
1664        assert_eq!(state3["name"], "Entity 3");
1665        assert_eq!(state3["value"], 300);
1666    }
1667
1668    #[tokio::test]
1669    async fn test_bulk_save_empty_states() {
1670        let store = create_test_store();
1671        let cache = store.projection_state_cache();
1672
1673        // Clear cache
1674        cache.clear();
1675
1676        // Empty states should work fine
1677        let states: Vec<BulkSaveStateItem> = vec![];
1678        assert_eq!(states.len(), 0);
1679
1680        // Cache should remain empty
1681        assert_eq!(cache.len(), 0);
1682    }
1683
1684    #[tokio::test]
1685    async fn test_bulk_save_overwrites_existing() {
1686        let store = create_test_store();
1687        let cache = store.projection_state_cache();
1688
1689        // Insert initial state
1690        cache.insert(
1691            "test:entity-1".to_string(),
1692            serde_json::json!({"version": 1, "data": "initial"}),
1693        );
1694
1695        // Bulk save with updated state
1696        let new_state = serde_json::json!({"version": 2, "data": "updated"});
1697        cache.insert("test:entity-1".to_string(), new_state);
1698
1699        // Verify overwrite
1700        let state = cache.get("test:entity-1").unwrap();
1701        assert_eq!(state["version"], 2);
1702        assert_eq!(state["data"], "updated");
1703    }
1704
1705    #[tokio::test]
1706    async fn test_bulk_save_high_volume() {
1707        let store = create_test_store();
1708        let cache = store.projection_state_cache();
1709
1710        // Simulate high volume save (1000 entities)
1711        for i in 0..1000 {
1712            cache.insert(
1713                format!("volume_test:entity-{}", i),
1714                serde_json::json!({"index": i, "status": "active"}),
1715            );
1716        }
1717
1718        // Verify count
1719        assert_eq!(cache.len(), 1000);
1720
1721        // Spot check some entries
1722        assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
1723        assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
1724        assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
1725    }
1726
1727    #[tokio::test]
1728    async fn test_bulk_save_different_projections() {
1729        let store = create_test_store();
1730        let cache = store.projection_state_cache();
1731
1732        // Save to multiple projections in bulk
1733        let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
1734
1735        for proj in projections.iter() {
1736            for i in 0..5 {
1737                cache.insert(
1738                    format!("{proj}:entity-{i}"),
1739                    serde_json::json!({"projection": proj, "id": i}),
1740                );
1741            }
1742        }
1743
1744        // Verify total count (3 projections * 5 entities)
1745        assert_eq!(cache.len(), 15);
1746
1747        // Verify each projection
1748        for proj in projections.iter() {
1749            let state = cache.get(&format!("{proj}:entity-0")).unwrap();
1750            assert_eq!(state["projection"], *proj);
1751        }
1752    }
1753}