1use crate::{
2 application::{
3 dto::{
4 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
5 IngestEventsBatchResponse, QueryEventsRequest, QueryEventsResponse,
6 },
7 services::{
8 analytics::{
9 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
10 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
11 },
12 pipeline::{PipelineConfig, PipelineStats},
13 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
14 schema::{
15 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
16 ValidateEventRequest, ValidateEventResponse,
17 },
18 },
19 },
20 domain::entities::Event,
21 error::Result,
22 infrastructure::{
23 persistence::{
24 compaction::CompactionResult,
25 snapshot::{
26 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
27 ListSnapshotsResponse, SnapshotInfo,
28 },
29 },
30 replication::ReplicationMode,
31 web::api_v1::AppState,
32 },
33 store::{EventStore, EventTypeInfo, StreamInfo},
34};
35use axum::{
36 Json, Router,
37 extract::{Path, Query, State, WebSocketUpgrade},
38 response::{IntoResponse, Response},
39 routing::{get, post, put},
40};
41use serde::Deserialize;
42use std::sync::Arc;
43use tower_http::{
44 cors::{Any, CorsLayer},
45 trace::TraceLayer,
46};
47
48type SharedStore = Arc<EventStore>;
49
50async fn await_replication_ack(state: &AppState) {
56 let shipper_guard = state.wal_shipper.read().await;
57 if let Some(ref shipper) = *shipper_guard {
58 let mode = shipper.replication_mode();
59 if mode == ReplicationMode::Async {
60 return;
61 }
62
63 let target_offset = shipper.current_leader_offset();
64 if target_offset == 0 {
65 return;
66 }
67
68 let shipper = Arc::clone(shipper);
69 drop(shipper_guard);
71
72 let timer = state
73 .store
74 .metrics()
75 .replication_ack_wait_seconds
76 .start_timer();
77 let acked = shipper.wait_for_ack(target_offset).await;
78 timer.observe_duration();
79
80 if !acked {
81 tracing::warn!(
82 "Replication ACK timeout in {} mode (offset {}). \
83 Write succeeded locally but follower confirmation pending.",
84 mode,
85 target_offset,
86 );
87 }
88 }
89}
90
91pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
92 let app = Router::new()
93 .route("/health", get(health))
94 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
96 .route("/api/v1/events/batch", post(ingest_events_batch))
97 .route("/api/v1/events/query", get(query_events))
98 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
101 .route("/api/v1/event-types", get(list_event_types))
102 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
103 .route(
104 "/api/v1/entities/{entity_id}/snapshot",
105 get(get_entity_snapshot),
106 )
107 .route("/api/v1/stats", get(get_stats))
108 .route("/api/v1/analytics/frequency", get(analytics_frequency))
110 .route("/api/v1/analytics/summary", get(analytics_summary))
111 .route("/api/v1/analytics/correlation", get(analytics_correlation))
112 .route("/api/v1/snapshots", post(create_snapshot))
114 .route("/api/v1/snapshots", get(list_snapshots))
115 .route(
116 "/api/v1/snapshots/{entity_id}/latest",
117 get(get_latest_snapshot),
118 )
119 .route("/api/v1/compaction/trigger", post(trigger_compaction))
121 .route("/api/v1/compaction/stats", get(compaction_stats))
122 .route("/api/v1/schemas", post(register_schema))
124 .route("/api/v1/schemas", get(list_subjects))
125 .route("/api/v1/schemas/{subject}", get(get_schema))
126 .route(
127 "/api/v1/schemas/{subject}/versions",
128 get(list_schema_versions),
129 )
130 .route("/api/v1/schemas/validate", post(validate_event_schema))
131 .route(
132 "/api/v1/schemas/{subject}/compatibility",
133 put(set_compatibility_mode),
134 )
135 .route("/api/v1/replay", post(start_replay))
137 .route("/api/v1/replay", get(list_replays))
138 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
139 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
140 .route(
141 "/api/v1/replay/{replay_id}",
142 axum::routing::delete(delete_replay),
143 )
144 .route("/api/v1/pipelines", post(register_pipeline))
146 .route("/api/v1/pipelines", get(list_pipelines))
147 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
148 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
149 .route(
150 "/api/v1/pipelines/{pipeline_id}",
151 axum::routing::delete(remove_pipeline),
152 )
153 .route(
154 "/api/v1/pipelines/{pipeline_id}/stats",
155 get(get_pipeline_stats),
156 )
157 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
158 .route("/api/v1/projections", get(list_projections))
160 .route("/api/v1/projections/{name}", get(get_projection))
161 .route(
162 "/api/v1/projections/{name}/{entity_id}/state",
163 get(get_projection_state),
164 )
165 .route(
166 "/api/v1/projections/{name}/{entity_id}/state",
167 post(save_projection_state),
168 )
169 .route(
170 "/api/v1/projections/{name}/{entity_id}/state",
171 put(save_projection_state),
172 )
173 .route(
174 "/api/v1/projections/{name}/bulk",
175 post(bulk_get_projection_states),
176 )
177 .route(
178 "/api/v1/projections/{name}/bulk/save",
179 post(bulk_save_projection_states),
180 )
181 .layer(
182 CorsLayer::new()
183 .allow_origin(Any)
184 .allow_methods(Any)
185 .allow_headers(Any),
186 )
187 .layer(TraceLayer::new_for_http())
188 .with_state(store);
189
190 let listener = tokio::net::TcpListener::bind(addr).await?;
191 axum::serve(listener, app).await?;
192
193 Ok(())
194}
195
196pub async fn health() -> impl IntoResponse {
197 Json(serde_json::json!({
198 "status": "healthy",
199 "service": "allsource-core",
200 "version": env!("CARGO_PKG_VERSION")
201 }))
202}
203
204pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
206 let metrics = store.metrics();
207
208 match metrics.encode() {
209 Ok(encoded) => Response::builder()
210 .status(200)
211 .header("Content-Type", "text/plain; version=0.0.4")
212 .body(encoded)
213 .unwrap()
214 .into_response(),
215 Err(e) => Response::builder()
216 .status(500)
217 .body(format!("Error encoding metrics: {e}"))
218 .unwrap()
219 .into_response(),
220 }
221}
222
223pub async fn ingest_event(
224 State(store): State<SharedStore>,
225 Json(req): Json<IngestEventRequest>,
226) -> Result<Json<IngestEventResponse>> {
227 let event = Event::from_strings(
229 req.event_type,
230 req.entity_id,
231 "default".to_string(),
232 req.payload,
233 req.metadata,
234 )?;
235
236 let event_id = event.id;
237 let timestamp = event.timestamp;
238
239 store.ingest(event)?;
240
241 tracing::info!("Event ingested: {}", event_id);
242
243 Ok(Json(IngestEventResponse {
244 event_id,
245 timestamp,
246 }))
247}
248
249pub async fn ingest_event_v1(
253 State(state): State<AppState>,
254 Json(req): Json<IngestEventRequest>,
255) -> Result<Json<IngestEventResponse>> {
256 let event = Event::from_strings(
257 req.event_type,
258 req.entity_id,
259 "default".to_string(),
260 req.payload,
261 req.metadata,
262 )?;
263
264 let event_id = event.id;
265 let timestamp = event.timestamp;
266
267 state.store.ingest(event)?;
268
269 await_replication_ack(&state).await;
271
272 tracing::info!("Event ingested: {}", event_id);
273
274 Ok(Json(IngestEventResponse {
275 event_id,
276 timestamp,
277 }))
278}
279
280pub async fn ingest_events_batch(
285 State(store): State<SharedStore>,
286 Json(req): Json<IngestEventsBatchRequest>,
287) -> Result<Json<IngestEventsBatchResponse>> {
288 let total = req.events.len();
289 let mut ingested_events = Vec::with_capacity(total);
290
291 for event_req in req.events {
292 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
293
294 let event = Event::from_strings(
295 event_req.event_type,
296 event_req.entity_id,
297 tenant_id,
298 event_req.payload,
299 event_req.metadata,
300 )?;
301
302 let event_id = event.id;
303 let timestamp = event.timestamp;
304
305 store.ingest(event)?;
306
307 ingested_events.push(IngestEventResponse {
308 event_id,
309 timestamp,
310 });
311 }
312
313 let ingested = ingested_events.len();
314 tracing::info!("Batch ingested {} events", ingested);
315
316 Ok(Json(IngestEventsBatchResponse {
317 total,
318 ingested,
319 events: ingested_events,
320 }))
321}
322
323pub async fn ingest_events_batch_v1(
327 State(state): State<AppState>,
328 Json(req): Json<IngestEventsBatchRequest>,
329) -> Result<Json<IngestEventsBatchResponse>> {
330 let total = req.events.len();
331 let mut ingested_events = Vec::with_capacity(total);
332
333 for event_req in req.events {
334 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
335
336 let event = Event::from_strings(
337 event_req.event_type,
338 event_req.entity_id,
339 tenant_id,
340 event_req.payload,
341 event_req.metadata,
342 )?;
343
344 let event_id = event.id;
345 let timestamp = event.timestamp;
346
347 state.store.ingest(event)?;
348
349 ingested_events.push(IngestEventResponse {
350 event_id,
351 timestamp,
352 });
353 }
354
355 await_replication_ack(&state).await;
357
358 let ingested = ingested_events.len();
359 tracing::info!("Batch ingested {} events", ingested);
360
361 Ok(Json(IngestEventsBatchResponse {
362 total,
363 ingested,
364 events: ingested_events,
365 }))
366}
367
368pub async fn query_events(
369 State(store): State<SharedStore>,
370 Query(req): Query<QueryEventsRequest>,
371) -> Result<Json<QueryEventsResponse>> {
372 let domain_events = store.query(req)?;
373 let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
374 let count = events.len();
375
376 tracing::debug!("Query returned {} events", count);
377
378 Ok(Json(QueryEventsResponse { events, count }))
379}
380
381#[derive(Deserialize)]
382pub struct EntityStateParams {
383 as_of: Option<chrono::DateTime<chrono::Utc>>,
384}
385
386pub async fn get_entity_state(
387 State(store): State<SharedStore>,
388 Path(entity_id): Path<String>,
389 Query(params): Query<EntityStateParams>,
390) -> Result<Json<serde_json::Value>> {
391 let state = store.reconstruct_state(&entity_id, params.as_of)?;
392
393 tracing::info!("State reconstructed for entity: {}", entity_id);
394
395 Ok(Json(state))
396}
397
398pub async fn get_entity_snapshot(
399 State(store): State<SharedStore>,
400 Path(entity_id): Path<String>,
401) -> Result<Json<serde_json::Value>> {
402 let snapshot = store.get_snapshot(&entity_id)?;
403
404 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
405
406 Ok(Json(snapshot))
407}
408
409pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
410 let stats = store.stats();
411 Json(stats)
412}
413
414#[derive(Debug, Deserialize)]
417pub struct ListStreamsParams {
418 pub limit: Option<usize>,
420 pub offset: Option<usize>,
422}
423
424#[derive(Debug, serde::Serialize)]
426pub struct ListStreamsResponse {
427 pub streams: Vec<StreamInfo>,
428 pub total: usize,
429}
430
431pub async fn list_streams(
432 State(store): State<SharedStore>,
433 Query(params): Query<ListStreamsParams>,
434) -> Json<ListStreamsResponse> {
435 let mut streams = store.list_streams();
436 let total = streams.len();
437
438 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
440
441 if let Some(offset) = params.offset {
443 if offset < streams.len() {
444 streams = streams[offset..].to_vec();
445 } else {
446 streams = vec![];
447 }
448 }
449
450 if let Some(limit) = params.limit {
451 streams.truncate(limit);
452 }
453
454 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
455
456 Json(ListStreamsResponse { streams, total })
457}
458
459#[derive(Debug, Deserialize)]
462pub struct ListEventTypesParams {
463 pub limit: Option<usize>,
465 pub offset: Option<usize>,
467}
468
469#[derive(Debug, serde::Serialize)]
471pub struct ListEventTypesResponse {
472 pub event_types: Vec<EventTypeInfo>,
473 pub total: usize,
474}
475
476pub async fn list_event_types(
477 State(store): State<SharedStore>,
478 Query(params): Query<ListEventTypesParams>,
479) -> Json<ListEventTypesResponse> {
480 let mut event_types = store.list_event_types();
481 let total = event_types.len();
482
483 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
485
486 if let Some(offset) = params.offset {
488 if offset < event_types.len() {
489 event_types = event_types[offset..].to_vec();
490 } else {
491 event_types = vec![];
492 }
493 }
494
495 if let Some(limit) = params.limit {
496 event_types.truncate(limit);
497 }
498
499 tracing::debug!(
500 "Listed {} event types (total: {})",
501 event_types.len(),
502 total
503 );
504
505 Json(ListEventTypesResponse { event_types, total })
506}
507
508pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
510 let websocket_manager = store.websocket_manager();
511
512 ws.on_upgrade(move |socket| async move {
513 websocket_manager.handle_socket(socket).await;
514 })
515}
516
517pub async fn analytics_frequency(
519 State(store): State<SharedStore>,
520 Query(req): Query<EventFrequencyRequest>,
521) -> Result<Json<EventFrequencyResponse>> {
522 let response = AnalyticsEngine::event_frequency(&store, req)?;
523
524 tracing::debug!(
525 "Frequency analysis returned {} buckets",
526 response.buckets.len()
527 );
528
529 Ok(Json(response))
530}
531
532pub async fn analytics_summary(
534 State(store): State<SharedStore>,
535 Query(req): Query<StatsSummaryRequest>,
536) -> Result<Json<StatsSummaryResponse>> {
537 let response = AnalyticsEngine::stats_summary(&store, req)?;
538
539 tracing::debug!(
540 "Stats summary: {} events across {} entities",
541 response.total_events,
542 response.unique_entities
543 );
544
545 Ok(Json(response))
546}
547
548pub async fn analytics_correlation(
550 State(store): State<SharedStore>,
551 Query(req): Query<CorrelationRequest>,
552) -> Result<Json<CorrelationResponse>> {
553 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
554
555 tracing::debug!(
556 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
557 response.correlated_pairs,
558 response.total_a,
559 response.correlation_percentage
560 );
561
562 Ok(Json(response))
563}
564
565pub async fn create_snapshot(
567 State(store): State<SharedStore>,
568 Json(req): Json<CreateSnapshotRequest>,
569) -> Result<Json<CreateSnapshotResponse>> {
570 store.create_snapshot(&req.entity_id)?;
571
572 let snapshot_manager = store.snapshot_manager();
573 let snapshot = snapshot_manager
574 .get_latest_snapshot(&req.entity_id)
575 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
576
577 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
578
579 Ok(Json(CreateSnapshotResponse {
580 snapshot_id: snapshot.id,
581 entity_id: snapshot.entity_id,
582 created_at: snapshot.created_at,
583 event_count: snapshot.event_count,
584 size_bytes: snapshot.metadata.size_bytes,
585 }))
586}
587
588pub async fn list_snapshots(
590 State(store): State<SharedStore>,
591 Query(req): Query<ListSnapshotsRequest>,
592) -> Result<Json<ListSnapshotsResponse>> {
593 let snapshot_manager = store.snapshot_manager();
594
595 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
596 snapshot_manager
597 .get_all_snapshots(&entity_id)
598 .into_iter()
599 .map(SnapshotInfo::from)
600 .collect()
601 } else {
602 let entities = snapshot_manager.list_entities();
604 entities
605 .iter()
606 .flat_map(|entity_id| {
607 snapshot_manager
608 .get_all_snapshots(entity_id)
609 .into_iter()
610 .map(SnapshotInfo::from)
611 })
612 .collect()
613 };
614
615 let total = snapshots.len();
616
617 tracing::debug!("Listed {} snapshots", total);
618
619 Ok(Json(ListSnapshotsResponse { snapshots, total }))
620}
621
622pub async fn get_latest_snapshot(
624 State(store): State<SharedStore>,
625 Path(entity_id): Path<String>,
626) -> Result<Json<serde_json::Value>> {
627 let snapshot_manager = store.snapshot_manager();
628
629 let snapshot = snapshot_manager
630 .get_latest_snapshot(&entity_id)
631 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
632
633 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
634
635 Ok(Json(serde_json::json!({
636 "snapshot_id": snapshot.id,
637 "entity_id": snapshot.entity_id,
638 "created_at": snapshot.created_at,
639 "as_of": snapshot.as_of,
640 "event_count": snapshot.event_count,
641 "size_bytes": snapshot.metadata.size_bytes,
642 "snapshot_type": snapshot.metadata.snapshot_type,
643 "state": snapshot.state
644 })))
645}
646
647pub async fn trigger_compaction(
649 State(store): State<SharedStore>,
650) -> Result<Json<CompactionResult>> {
651 let compaction_manager = store.compaction_manager().ok_or_else(|| {
652 crate::error::AllSourceError::InternalError(
653 "Compaction not enabled (no Parquet storage)".to_string(),
654 )
655 })?;
656
657 tracing::info!("📦 Manual compaction triggered via API");
658
659 let result = compaction_manager.compact_now()?;
660
661 Ok(Json(result))
662}
663
664pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
666 let compaction_manager = store.compaction_manager().ok_or_else(|| {
667 crate::error::AllSourceError::InternalError(
668 "Compaction not enabled (no Parquet storage)".to_string(),
669 )
670 })?;
671
672 let stats = compaction_manager.stats();
673 let config = compaction_manager.config();
674
675 Ok(Json(serde_json::json!({
676 "stats": stats,
677 "config": {
678 "min_files_to_compact": config.min_files_to_compact,
679 "target_file_size": config.target_file_size,
680 "max_file_size": config.max_file_size,
681 "small_file_threshold": config.small_file_threshold,
682 "compaction_interval_seconds": config.compaction_interval_seconds,
683 "auto_compact": config.auto_compact,
684 "strategy": config.strategy
685 }
686 })))
687}
688
689pub async fn register_schema(
691 State(store): State<SharedStore>,
692 Json(req): Json<RegisterSchemaRequest>,
693) -> Result<Json<RegisterSchemaResponse>> {
694 let schema_registry = store.schema_registry();
695
696 let response =
697 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
698
699 tracing::info!(
700 "📋 Schema registered: v{} for '{}'",
701 response.version,
702 response.subject
703 );
704
705 Ok(Json(response))
706}
707
708#[derive(Deserialize)]
710pub struct GetSchemaParams {
711 version: Option<u32>,
712}
713
714pub async fn get_schema(
715 State(store): State<SharedStore>,
716 Path(subject): Path<String>,
717 Query(params): Query<GetSchemaParams>,
718) -> Result<Json<serde_json::Value>> {
719 let schema_registry = store.schema_registry();
720
721 let schema = schema_registry.get_schema(&subject, params.version)?;
722
723 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
724
725 Ok(Json(serde_json::json!({
726 "id": schema.id,
727 "subject": schema.subject,
728 "version": schema.version,
729 "schema": schema.schema,
730 "created_at": schema.created_at,
731 "description": schema.description,
732 "tags": schema.tags
733 })))
734}
735
736pub async fn list_schema_versions(
738 State(store): State<SharedStore>,
739 Path(subject): Path<String>,
740) -> Result<Json<serde_json::Value>> {
741 let schema_registry = store.schema_registry();
742
743 let versions = schema_registry.list_versions(&subject)?;
744
745 Ok(Json(serde_json::json!({
746 "subject": subject,
747 "versions": versions
748 })))
749}
750
751pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
753 let schema_registry = store.schema_registry();
754
755 let subjects = schema_registry.list_subjects();
756
757 Json(serde_json::json!({
758 "subjects": subjects,
759 "total": subjects.len()
760 }))
761}
762
763pub async fn validate_event_schema(
765 State(store): State<SharedStore>,
766 Json(req): Json<ValidateEventRequest>,
767) -> Result<Json<ValidateEventResponse>> {
768 let schema_registry = store.schema_registry();
769
770 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
771
772 if response.valid {
773 tracing::debug!(
774 "✅ Event validated against schema '{}' v{}",
775 req.subject,
776 response.schema_version
777 );
778 } else {
779 tracing::warn!(
780 "❌ Event validation failed for '{}': {:?}",
781 req.subject,
782 response.errors
783 );
784 }
785
786 Ok(Json(response))
787}
788
789#[derive(Deserialize)]
791pub struct SetCompatibilityRequest {
792 compatibility: CompatibilityMode,
793}
794
795pub async fn set_compatibility_mode(
796 State(store): State<SharedStore>,
797 Path(subject): Path<String>,
798 Json(req): Json<SetCompatibilityRequest>,
799) -> Json<serde_json::Value> {
800 let schema_registry = store.schema_registry();
801
802 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
803
804 tracing::info!(
805 "🔧 Set compatibility mode for '{}' to {:?}",
806 subject,
807 req.compatibility
808 );
809
810 Json(serde_json::json!({
811 "subject": subject,
812 "compatibility": req.compatibility
813 }))
814}
815
816pub async fn start_replay(
818 State(store): State<SharedStore>,
819 Json(req): Json<StartReplayRequest>,
820) -> Result<Json<StartReplayResponse>> {
821 let replay_manager = store.replay_manager();
822
823 let response = replay_manager.start_replay(store, req)?;
824
825 tracing::info!(
826 "🔄 Started replay {} with {} events",
827 response.replay_id,
828 response.total_events
829 );
830
831 Ok(Json(response))
832}
833
834pub async fn get_replay_progress(
836 State(store): State<SharedStore>,
837 Path(replay_id): Path<uuid::Uuid>,
838) -> Result<Json<ReplayProgress>> {
839 let replay_manager = store.replay_manager();
840
841 let progress = replay_manager.get_progress(replay_id)?;
842
843 Ok(Json(progress))
844}
845
846pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
848 let replay_manager = store.replay_manager();
849
850 let replays = replay_manager.list_replays();
851
852 Json(serde_json::json!({
853 "replays": replays,
854 "total": replays.len()
855 }))
856}
857
858pub async fn cancel_replay(
860 State(store): State<SharedStore>,
861 Path(replay_id): Path<uuid::Uuid>,
862) -> Result<Json<serde_json::Value>> {
863 let replay_manager = store.replay_manager();
864
865 replay_manager.cancel_replay(replay_id)?;
866
867 tracing::info!("🛑 Cancelled replay {}", replay_id);
868
869 Ok(Json(serde_json::json!({
870 "replay_id": replay_id,
871 "status": "cancelled"
872 })))
873}
874
875pub async fn delete_replay(
877 State(store): State<SharedStore>,
878 Path(replay_id): Path<uuid::Uuid>,
879) -> Result<Json<serde_json::Value>> {
880 let replay_manager = store.replay_manager();
881
882 let deleted = replay_manager.delete_replay(replay_id)?;
883
884 if deleted {
885 tracing::info!("🗑️ Deleted replay {}", replay_id);
886 }
887
888 Ok(Json(serde_json::json!({
889 "replay_id": replay_id,
890 "deleted": deleted
891 })))
892}
893
894pub async fn register_pipeline(
896 State(store): State<SharedStore>,
897 Json(config): Json<PipelineConfig>,
898) -> Result<Json<serde_json::Value>> {
899 let pipeline_manager = store.pipeline_manager();
900
901 let pipeline_id = pipeline_manager.register(config.clone());
902
903 tracing::info!(
904 "🔀 Pipeline registered: {} (name: {})",
905 pipeline_id,
906 config.name
907 );
908
909 Ok(Json(serde_json::json!({
910 "pipeline_id": pipeline_id,
911 "name": config.name,
912 "enabled": config.enabled
913 })))
914}
915
916pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
918 let pipeline_manager = store.pipeline_manager();
919
920 let pipelines = pipeline_manager.list();
921
922 tracing::debug!("Listed {} pipelines", pipelines.len());
923
924 Json(serde_json::json!({
925 "pipelines": pipelines,
926 "total": pipelines.len()
927 }))
928}
929
930pub async fn get_pipeline(
932 State(store): State<SharedStore>,
933 Path(pipeline_id): Path<uuid::Uuid>,
934) -> Result<Json<PipelineConfig>> {
935 let pipeline_manager = store.pipeline_manager();
936
937 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
938 crate::error::AllSourceError::ValidationError(format!(
939 "Pipeline not found: {}",
940 pipeline_id
941 ))
942 })?;
943
944 Ok(Json(pipeline.config().clone()))
945}
946
947pub async fn remove_pipeline(
949 State(store): State<SharedStore>,
950 Path(pipeline_id): Path<uuid::Uuid>,
951) -> Result<Json<serde_json::Value>> {
952 let pipeline_manager = store.pipeline_manager();
953
954 let removed = pipeline_manager.remove(pipeline_id);
955
956 if removed {
957 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
958 }
959
960 Ok(Json(serde_json::json!({
961 "pipeline_id": pipeline_id,
962 "removed": removed
963 })))
964}
965
966pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
968 let pipeline_manager = store.pipeline_manager();
969
970 let stats = pipeline_manager.all_stats();
971
972 Json(serde_json::json!({
973 "stats": stats,
974 "total": stats.len()
975 }))
976}
977
978pub async fn get_pipeline_stats(
980 State(store): State<SharedStore>,
981 Path(pipeline_id): Path<uuid::Uuid>,
982) -> Result<Json<PipelineStats>> {
983 let pipeline_manager = store.pipeline_manager();
984
985 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
986 crate::error::AllSourceError::ValidationError(format!(
987 "Pipeline not found: {}",
988 pipeline_id
989 ))
990 })?;
991
992 Ok(Json(pipeline.stats()))
993}
994
995pub async fn reset_pipeline(
997 State(store): State<SharedStore>,
998 Path(pipeline_id): Path<uuid::Uuid>,
999) -> Result<Json<serde_json::Value>> {
1000 let pipeline_manager = store.pipeline_manager();
1001
1002 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1003 crate::error::AllSourceError::ValidationError(format!(
1004 "Pipeline not found: {}",
1005 pipeline_id
1006 ))
1007 })?;
1008
1009 pipeline.reset();
1010
1011 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1012
1013 Ok(Json(serde_json::json!({
1014 "pipeline_id": pipeline_id,
1015 "reset": true
1016 })))
1017}
1018
1019pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1025 let projection_manager = store.projection_manager();
1026
1027 let projections: Vec<serde_json::Value> = projection_manager
1028 .list_projections()
1029 .iter()
1030 .map(|(name, projection)| {
1031 serde_json::json!({
1032 "name": name,
1033 "type": format!("{:?}", projection.name()),
1034 })
1035 })
1036 .collect();
1037
1038 tracing::debug!("Listed {} projections", projections.len());
1039
1040 Json(serde_json::json!({
1041 "projections": projections,
1042 "total": projections.len()
1043 }))
1044}
1045
1046pub async fn get_projection(
1048 State(store): State<SharedStore>,
1049 Path(name): Path<String>,
1050) -> Result<Json<serde_json::Value>> {
1051 let projection_manager = store.projection_manager();
1052
1053 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1054 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1055 })?;
1056
1057 Ok(Json(serde_json::json!({
1058 "name": projection.name(),
1059 "found": true
1060 })))
1061}
1062
1063pub async fn get_projection_state(
1068 State(store): State<SharedStore>,
1069 Path((name, entity_id)): Path<(String, String)>,
1070) -> Result<Json<serde_json::Value>> {
1071 let projection_manager = store.projection_manager();
1072
1073 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1074 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1075 })?;
1076
1077 let state = projection.get_state(&entity_id);
1078
1079 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1080
1081 Ok(Json(serde_json::json!({
1082 "projection": name,
1083 "entity_id": entity_id,
1084 "state": state,
1085 "found": state.is_some()
1086 })))
1087}
1088
1089#[derive(Debug, Deserialize)]
1091pub struct SaveProjectionStateRequest {
1092 pub state: serde_json::Value,
1093}
1094
1095pub async fn save_projection_state(
1100 State(store): State<SharedStore>,
1101 Path((name, entity_id)): Path<(String, String)>,
1102 Json(req): Json<SaveProjectionStateRequest>,
1103) -> Result<Json<serde_json::Value>> {
1104 let projection_cache = store.projection_state_cache();
1105
1106 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1108
1109 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1110
1111 Ok(Json(serde_json::json!({
1112 "projection": name,
1113 "entity_id": entity_id,
1114 "saved": true
1115 })))
1116}
1117
1118#[derive(Debug, Deserialize)]
1122pub struct BulkGetStateRequest {
1123 pub entity_ids: Vec<String>,
1124}
1125
1126#[derive(Debug, Deserialize)]
1130pub struct BulkSaveStateRequest {
1131 pub states: Vec<BulkSaveStateItem>,
1132}
1133
1134#[derive(Debug, Deserialize)]
1135pub struct BulkSaveStateItem {
1136 pub entity_id: String,
1137 pub state: serde_json::Value,
1138}
1139
1140pub async fn bulk_get_projection_states(
1141 State(store): State<SharedStore>,
1142 Path(name): Path<String>,
1143 Json(req): Json<BulkGetStateRequest>,
1144) -> Result<Json<serde_json::Value>> {
1145 let projection_manager = store.projection_manager();
1146
1147 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1148 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1149 })?;
1150
1151 let states: Vec<serde_json::Value> = req
1152 .entity_ids
1153 .iter()
1154 .map(|entity_id| {
1155 let state = projection.get_state(entity_id);
1156 serde_json::json!({
1157 "entity_id": entity_id,
1158 "state": state,
1159 "found": state.is_some()
1160 })
1161 })
1162 .collect();
1163
1164 tracing::debug!(
1165 "Bulk projection state retrieved: {} entities from {}",
1166 states.len(),
1167 name
1168 );
1169
1170 Ok(Json(serde_json::json!({
1171 "projection": name,
1172 "states": states,
1173 "total": states.len()
1174 })))
1175}
1176
1177pub async fn bulk_save_projection_states(
1182 State(store): State<SharedStore>,
1183 Path(name): Path<String>,
1184 Json(req): Json<BulkSaveStateRequest>,
1185) -> Result<Json<serde_json::Value>> {
1186 let projection_cache = store.projection_state_cache();
1187
1188 let mut saved_count = 0;
1189 for item in &req.states {
1190 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1191 saved_count += 1;
1192 }
1193
1194 tracing::info!(
1195 "Bulk projection state saved: {} entities for {}",
1196 saved_count,
1197 name
1198 );
1199
1200 Ok(Json(serde_json::json!({
1201 "projection": name,
1202 "saved": saved_count,
1203 "total": req.states.len()
1204 })))
1205}
1206
1207#[cfg(test)]
1208mod tests {
1209 use super::*;
1210 use crate::{domain::entities::Event, store::EventStore};
1211
1212 fn create_test_store() -> Arc<EventStore> {
1213 Arc::new(EventStore::new())
1214 }
1215
1216 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1217 Event::from_strings(
1218 event_type.to_string(),
1219 entity_id.to_string(),
1220 "test-stream".to_string(),
1221 serde_json::json!({
1222 "name": "Test",
1223 "value": 42
1224 }),
1225 None,
1226 )
1227 .unwrap()
1228 }
1229
1230 #[tokio::test]
1231 async fn test_projection_state_cache() {
1232 let store = create_test_store();
1233
1234 let cache = store.projection_state_cache();
1236 cache.insert(
1237 "entity_snapshots:user-123".to_string(),
1238 serde_json::json!({"name": "Test User", "age": 30}),
1239 );
1240
1241 let state = cache.get("entity_snapshots:user-123");
1243 assert!(state.is_some());
1244 let state = state.unwrap();
1245 assert_eq!(state["name"], "Test User");
1246 assert_eq!(state["age"], 30);
1247 }
1248
1249 #[tokio::test]
1250 async fn test_projection_manager_list_projections() {
1251 let store = create_test_store();
1252
1253 let projection_manager = store.projection_manager();
1255 let projections = projection_manager.list_projections();
1256
1257 assert!(projections.len() >= 2);
1259
1260 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
1261 assert!(names.contains(&"entity_snapshots"));
1262 assert!(names.contains(&"event_counters"));
1263 }
1264
1265 #[tokio::test]
1266 async fn test_projection_state_after_event_ingestion() {
1267 let store = create_test_store();
1268
1269 let event = create_test_event("user-456", "user.created");
1271 store.ingest(event).unwrap();
1272
1273 let projection_manager = store.projection_manager();
1275 let snapshot_projection = projection_manager
1276 .get_projection("entity_snapshots")
1277 .unwrap();
1278
1279 let state = snapshot_projection.get_state("user-456");
1280 assert!(state.is_some());
1281 let state = state.unwrap();
1282 assert_eq!(state["name"], "Test");
1283 assert_eq!(state["value"], 42);
1284 }
1285
1286 #[tokio::test]
1287 async fn test_projection_state_cache_multiple_entities() {
1288 let store = create_test_store();
1289 let cache = store.projection_state_cache();
1290
1291 for i in 0..10 {
1293 cache.insert(
1294 format!("entity_snapshots:entity-{}", i),
1295 serde_json::json!({"id": i, "status": "active"}),
1296 );
1297 }
1298
1299 assert_eq!(cache.len(), 10);
1301
1302 for i in 0..10 {
1304 let key = format!("entity_snapshots:entity-{}", i);
1305 let state = cache.get(&key);
1306 assert!(state.is_some());
1307 assert_eq!(state.unwrap()["id"], i);
1308 }
1309 }
1310
1311 #[tokio::test]
1312 async fn test_projection_state_update() {
1313 let store = create_test_store();
1314 let cache = store.projection_state_cache();
1315
1316 cache.insert(
1318 "entity_snapshots:user-789".to_string(),
1319 serde_json::json!({"balance": 100}),
1320 );
1321
1322 cache.insert(
1324 "entity_snapshots:user-789".to_string(),
1325 serde_json::json!({"balance": 150}),
1326 );
1327
1328 let state = cache.get("entity_snapshots:user-789").unwrap();
1330 assert_eq!(state["balance"], 150);
1331 }
1332
1333 #[tokio::test]
1334 async fn test_event_counter_projection() {
1335 let store = create_test_store();
1336
1337 store
1339 .ingest(create_test_event("user-1", "user.created"))
1340 .unwrap();
1341 store
1342 .ingest(create_test_event("user-2", "user.created"))
1343 .unwrap();
1344 store
1345 .ingest(create_test_event("user-1", "user.updated"))
1346 .unwrap();
1347
1348 let projection_manager = store.projection_manager();
1350 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1351
1352 let created_state = counter_projection.get_state("user.created");
1354 assert!(created_state.is_some());
1355 assert_eq!(created_state.unwrap()["count"], 2);
1356
1357 let updated_state = counter_projection.get_state("user.updated");
1358 assert!(updated_state.is_some());
1359 assert_eq!(updated_state.unwrap()["count"], 1);
1360 }
1361
1362 #[tokio::test]
1363 async fn test_projection_state_cache_key_format() {
1364 let store = create_test_store();
1365 let cache = store.projection_state_cache();
1366
1367 let key = "orders:order-12345".to_string();
1369 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
1370
1371 let state = cache.get(&key).unwrap();
1372 assert_eq!(state["total"], 99.99);
1373 }
1374
1375 #[tokio::test]
1376 async fn test_projection_state_cache_removal() {
1377 let store = create_test_store();
1378 let cache = store.projection_state_cache();
1379
1380 cache.insert(
1382 "test:entity-1".to_string(),
1383 serde_json::json!({"data": "value"}),
1384 );
1385 assert_eq!(cache.len(), 1);
1386
1387 cache.remove("test:entity-1");
1388 assert_eq!(cache.len(), 0);
1389 assert!(cache.get("test:entity-1").is_none());
1390 }
1391
1392 #[tokio::test]
1393 async fn test_get_nonexistent_projection() {
1394 let store = create_test_store();
1395 let projection_manager = store.projection_manager();
1396
1397 let projection = projection_manager.get_projection("nonexistent_projection");
1399 assert!(projection.is_none());
1400 }
1401
1402 #[tokio::test]
1403 async fn test_get_nonexistent_entity_state() {
1404 let store = create_test_store();
1405 let projection_manager = store.projection_manager();
1406
1407 let snapshot_projection = projection_manager
1409 .get_projection("entity_snapshots")
1410 .unwrap();
1411 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
1412 assert!(state.is_none());
1413 }
1414
1415 #[tokio::test]
1416 async fn test_projection_state_cache_concurrent_access() {
1417 let store = create_test_store();
1418 let cache = store.projection_state_cache();
1419
1420 let handles: Vec<_> = (0..10)
1422 .map(|i| {
1423 let cache_clone = cache.clone();
1424 tokio::spawn(async move {
1425 cache_clone.insert(
1426 format!("concurrent:entity-{}", i),
1427 serde_json::json!({"thread": i}),
1428 );
1429 })
1430 })
1431 .collect();
1432
1433 for handle in handles {
1434 handle.await.unwrap();
1435 }
1436
1437 assert_eq!(cache.len(), 10);
1439 }
1440
1441 #[tokio::test]
1442 async fn test_projection_state_large_payload() {
1443 let store = create_test_store();
1444 let cache = store.projection_state_cache();
1445
1446 let large_array: Vec<serde_json::Value> = (0..1000)
1448 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
1449 .collect();
1450
1451 cache.insert(
1452 "large:entity-1".to_string(),
1453 serde_json::json!({"items": large_array}),
1454 );
1455
1456 let state = cache.get("large:entity-1").unwrap();
1457 let items = state["items"].as_array().unwrap();
1458 assert_eq!(items.len(), 1000);
1459 }
1460
1461 #[tokio::test]
1462 async fn test_projection_state_complex_json() {
1463 let store = create_test_store();
1464 let cache = store.projection_state_cache();
1465
1466 let complex_state = serde_json::json!({
1468 "user": {
1469 "id": "user-123",
1470 "profile": {
1471 "name": "John Doe",
1472 "email": "john@example.com",
1473 "settings": {
1474 "theme": "dark",
1475 "notifications": true
1476 }
1477 },
1478 "roles": ["admin", "user"],
1479 "metadata": {
1480 "created_at": "2025-01-01T00:00:00Z",
1481 "last_login": null
1482 }
1483 }
1484 });
1485
1486 cache.insert("complex:user-123".to_string(), complex_state);
1487
1488 let state = cache.get("complex:user-123").unwrap();
1489 assert_eq!(state["user"]["profile"]["name"], "John Doe");
1490 assert_eq!(state["user"]["roles"][0], "admin");
1491 assert!(state["user"]["metadata"]["last_login"].is_null());
1492 }
1493
1494 #[tokio::test]
1495 async fn test_projection_state_cache_iteration() {
1496 let store = create_test_store();
1497 let cache = store.projection_state_cache();
1498
1499 for i in 0..5 {
1501 cache.insert(
1502 format!("iter:entity-{}", i),
1503 serde_json::json!({"index": i}),
1504 );
1505 }
1506
1507 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
1509 assert_eq!(entries.len(), 5);
1510 }
1511
1512 #[tokio::test]
1513 async fn test_projection_manager_get_entity_snapshots() {
1514 let store = create_test_store();
1515 let projection_manager = store.projection_manager();
1516
1517 let projection = projection_manager.get_projection("entity_snapshots");
1519 assert!(projection.is_some());
1520 assert_eq!(projection.unwrap().name(), "entity_snapshots");
1521 }
1522
1523 #[tokio::test]
1524 async fn test_projection_manager_get_event_counters() {
1525 let store = create_test_store();
1526 let projection_manager = store.projection_manager();
1527
1528 let projection = projection_manager.get_projection("event_counters");
1530 assert!(projection.is_some());
1531 assert_eq!(projection.unwrap().name(), "event_counters");
1532 }
1533
1534 #[tokio::test]
1535 async fn test_projection_state_cache_overwrite() {
1536 let store = create_test_store();
1537 let cache = store.projection_state_cache();
1538
1539 cache.insert(
1541 "overwrite:entity-1".to_string(),
1542 serde_json::json!({"version": 1}),
1543 );
1544
1545 cache.insert(
1547 "overwrite:entity-1".to_string(),
1548 serde_json::json!({"version": 2}),
1549 );
1550
1551 cache.insert(
1553 "overwrite:entity-1".to_string(),
1554 serde_json::json!({"version": 3}),
1555 );
1556
1557 let state = cache.get("overwrite:entity-1").unwrap();
1558 assert_eq!(state["version"], 3);
1559
1560 assert_eq!(cache.len(), 1);
1562 }
1563
1564 #[tokio::test]
1565 async fn test_projection_state_multiple_projections() {
1566 let store = create_test_store();
1567 let cache = store.projection_state_cache();
1568
1569 cache.insert(
1571 "entity_snapshots:user-1".to_string(),
1572 serde_json::json!({"name": "Alice"}),
1573 );
1574 cache.insert(
1575 "event_counters:user.created".to_string(),
1576 serde_json::json!({"count": 5}),
1577 );
1578 cache.insert(
1579 "custom_projection:order-1".to_string(),
1580 serde_json::json!({"total": 150.0}),
1581 );
1582
1583 assert_eq!(
1585 cache.get("entity_snapshots:user-1").unwrap()["name"],
1586 "Alice"
1587 );
1588 assert_eq!(
1589 cache.get("event_counters:user.created").unwrap()["count"],
1590 5
1591 );
1592 assert_eq!(
1593 cache.get("custom_projection:order-1").unwrap()["total"],
1594 150.0
1595 );
1596 }
1597
1598 #[tokio::test]
1599 async fn test_bulk_projection_state_access() {
1600 let store = create_test_store();
1601
1602 for i in 0..5 {
1604 let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
1605 store.ingest(event).unwrap();
1606 }
1607
1608 let projection_manager = store.projection_manager();
1610 let snapshot_projection = projection_manager
1611 .get_projection("entity_snapshots")
1612 .unwrap();
1613
1614 for i in 0..5 {
1616 let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
1617 assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
1618 }
1619 }
1620
1621 #[tokio::test]
1622 async fn test_bulk_save_projection_states() {
1623 let store = create_test_store();
1624 let cache = store.projection_state_cache();
1625
1626 let states = vec![
1628 BulkSaveStateItem {
1629 entity_id: "bulk-entity-1".to_string(),
1630 state: serde_json::json!({"name": "Entity 1", "value": 100}),
1631 },
1632 BulkSaveStateItem {
1633 entity_id: "bulk-entity-2".to_string(),
1634 state: serde_json::json!({"name": "Entity 2", "value": 200}),
1635 },
1636 BulkSaveStateItem {
1637 entity_id: "bulk-entity-3".to_string(),
1638 state: serde_json::json!({"name": "Entity 3", "value": 300}),
1639 },
1640 ];
1641
1642 let projection_name = "test_projection";
1643
1644 for item in &states {
1646 cache.insert(
1647 format!("{projection_name}:{}", item.entity_id),
1648 item.state.clone(),
1649 );
1650 }
1651
1652 assert_eq!(cache.len(), 3);
1654
1655 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
1656 assert_eq!(state1["name"], "Entity 1");
1657 assert_eq!(state1["value"], 100);
1658
1659 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
1660 assert_eq!(state2["name"], "Entity 2");
1661 assert_eq!(state2["value"], 200);
1662
1663 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
1664 assert_eq!(state3["name"], "Entity 3");
1665 assert_eq!(state3["value"], 300);
1666 }
1667
1668 #[tokio::test]
1669 async fn test_bulk_save_empty_states() {
1670 let store = create_test_store();
1671 let cache = store.projection_state_cache();
1672
1673 cache.clear();
1675
1676 let states: Vec<BulkSaveStateItem> = vec![];
1678 assert_eq!(states.len(), 0);
1679
1680 assert_eq!(cache.len(), 0);
1682 }
1683
1684 #[tokio::test]
1685 async fn test_bulk_save_overwrites_existing() {
1686 let store = create_test_store();
1687 let cache = store.projection_state_cache();
1688
1689 cache.insert(
1691 "test:entity-1".to_string(),
1692 serde_json::json!({"version": 1, "data": "initial"}),
1693 );
1694
1695 let new_state = serde_json::json!({"version": 2, "data": "updated"});
1697 cache.insert("test:entity-1".to_string(), new_state);
1698
1699 let state = cache.get("test:entity-1").unwrap();
1701 assert_eq!(state["version"], 2);
1702 assert_eq!(state["data"], "updated");
1703 }
1704
1705 #[tokio::test]
1706 async fn test_bulk_save_high_volume() {
1707 let store = create_test_store();
1708 let cache = store.projection_state_cache();
1709
1710 for i in 0..1000 {
1712 cache.insert(
1713 format!("volume_test:entity-{}", i),
1714 serde_json::json!({"index": i, "status": "active"}),
1715 );
1716 }
1717
1718 assert_eq!(cache.len(), 1000);
1720
1721 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
1723 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
1724 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
1725 }
1726
1727 #[tokio::test]
1728 async fn test_bulk_save_different_projections() {
1729 let store = create_test_store();
1730 let cache = store.projection_state_cache();
1731
1732 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
1734
1735 for proj in projections.iter() {
1736 for i in 0..5 {
1737 cache.insert(
1738 format!("{proj}:entity-{i}"),
1739 serde_json::json!({"projection": proj, "id": i}),
1740 );
1741 }
1742 }
1743
1744 assert_eq!(cache.len(), 15);
1746
1747 for proj in projections.iter() {
1749 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
1750 assert_eq!(state["projection"], *proj);
1751 }
1752 }
1753}