1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4 application::{
5 dto::{
6 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11 },
12 services::{
13 analytics::{
14 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16 },
17 pipeline::{PipelineConfig, PipelineStats},
18 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19 schema::{
20 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21 ValidateEventRequest, ValidateEventResponse,
22 },
23 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24 },
25 },
26 domain::entities::Event,
27 error::Result,
28 infrastructure::{
29 persistence::{
30 compaction::CompactionResult,
31 snapshot::{
32 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33 ListSnapshotsResponse, SnapshotInfo,
34 },
35 },
36 query::{
37 geospatial::GeoQueryRequest,
38 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39 },
40 security::middleware::OptionalAuth,
41 web::api_v1::AppState,
42 },
43 store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46 Json, Router,
47 extract::{Path, Query, State, WebSocketUpgrade},
48 response::{IntoResponse, Response},
49 routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67 let shipper_guard = state.wal_shipper.read().await;
68 if let Some(ref shipper) = *shipper_guard {
69 let mode = shipper.replication_mode();
70 if mode == ReplicationMode::Async {
71 return;
72 }
73
74 let target_offset = shipper.current_leader_offset();
75 if target_offset == 0 {
76 return;
77 }
78
79 let shipper = Arc::clone(shipper);
80 drop(shipper_guard);
82
83 let timer = state
84 .store
85 .metrics()
86 .replication_ack_wait_seconds
87 .start_timer();
88 let acked = shipper.wait_for_ack(target_offset).await;
89 timer.observe_duration();
90
91 if !acked {
92 tracing::warn!(
93 "Replication ACK timeout in {} mode (offset {}). \
94 Write succeeded locally but follower confirmation pending.",
95 mode,
96 target_offset,
97 );
98 }
99 }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103 }
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107 let app = Router::new()
108 .route("/health", get(health))
109 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
111 .route("/api/v1/events/batch", post(ingest_events_batch))
112 .route("/api/v1/events/query", get(query_events))
113 .route("/api/v1/events/{event_id}", get(get_event_by_id))
114 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
117 .route("/api/v1/event-types", get(list_event_types))
118 .route("/api/v1/entities/duplicates", get(detect_duplicates))
119 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120 .route(
121 "/api/v1/entities/{entity_id}/snapshot",
122 get(get_entity_snapshot),
123 )
124 .route("/api/v1/stats", get(get_stats))
125 .route("/api/v1/analytics/frequency", get(analytics_frequency))
127 .route("/api/v1/analytics/summary", get(analytics_summary))
128 .route("/api/v1/analytics/correlation", get(analytics_correlation))
129 .route("/api/v1/snapshots", post(create_snapshot))
131 .route("/api/v1/snapshots", get(list_snapshots))
132 .route(
133 "/api/v1/snapshots/{entity_id}/latest",
134 get(get_latest_snapshot),
135 )
136 .route("/api/v1/compaction/trigger", post(trigger_compaction))
138 .route("/api/v1/compaction/stats", get(compaction_stats))
139 .route("/api/v1/schemas", post(register_schema))
141 .route("/api/v1/schemas", get(list_subjects))
142 .route("/api/v1/schemas/{subject}", get(get_schema))
143 .route(
144 "/api/v1/schemas/{subject}/versions",
145 get(list_schema_versions),
146 )
147 .route("/api/v1/schemas/validate", post(validate_event_schema))
148 .route(
149 "/api/v1/schemas/{subject}/compatibility",
150 put(set_compatibility_mode),
151 )
152 .route("/api/v1/replay", post(start_replay))
154 .route("/api/v1/replay", get(list_replays))
155 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157 .route(
158 "/api/v1/replay/{replay_id}",
159 axum::routing::delete(delete_replay),
160 )
161 .route("/api/v1/pipelines", post(register_pipeline))
163 .route("/api/v1/pipelines", get(list_pipelines))
164 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166 .route(
167 "/api/v1/pipelines/{pipeline_id}",
168 axum::routing::delete(remove_pipeline),
169 )
170 .route(
171 "/api/v1/pipelines/{pipeline_id}/stats",
172 get(get_pipeline_stats),
173 )
174 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175 .route("/api/v1/projections", get(list_projections))
177 .route("/api/v1/projections/{name}", get(get_projection))
178 .route(
179 "/api/v1/projections/{name}",
180 axum::routing::delete(delete_projection),
181 )
182 .route(
183 "/api/v1/projections/{name}/state",
184 get(get_projection_state_summary),
185 )
186 .route("/api/v1/projections/{name}/reset", post(reset_projection))
187 .route(
188 "/api/v1/projections/{name}/{entity_id}/state",
189 get(get_projection_state),
190 )
191 .route(
192 "/api/v1/projections/{name}/{entity_id}/state",
193 post(save_projection_state),
194 )
195 .route(
196 "/api/v1/projections/{name}/{entity_id}/state",
197 put(save_projection_state),
198 )
199 .route(
200 "/api/v1/projections/{name}/bulk",
201 post(bulk_get_projection_states),
202 )
203 .route(
204 "/api/v1/projections/{name}/bulk/save",
205 post(bulk_save_projection_states),
206 )
207 .route("/api/v1/webhooks", post(register_webhook))
209 .route("/api/v1/webhooks", get(list_webhooks))
210 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212 .route(
213 "/api/v1/webhooks/{webhook_id}",
214 axum::routing::delete(delete_webhook),
215 )
216 .route(
217 "/api/v1/webhooks/{webhook_id}/deliveries",
218 get(list_webhook_deliveries),
219 )
220 .route("/api/v1/graphql", post(graphql_query))
222 .route("/api/v1/geospatial/query", post(geo_query))
223 .route("/api/v1/geospatial/stats", get(geo_stats))
224 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225 .route(
226 "/api/v1/schema-evolution/history/{event_type}",
227 get(schema_evolution_history),
228 )
229 .route(
230 "/api/v1/schema-evolution/schema/{event_type}",
231 get(schema_evolution_schema),
232 )
233 .route(
234 "/api/v1/schema-evolution/stats",
235 get(schema_evolution_stats),
236 )
237 .layer(
238 CorsLayer::new()
239 .allow_origin(Any)
240 .allow_methods(Any)
241 .allow_headers(Any),
242 )
243 .layer(TraceLayer::new_for_http())
244 .with_state(store);
245
246 let listener = tokio::net::TcpListener::bind(addr).await?;
247 axum::serve(listener, app).await?;
248
249 Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253 Json(serde_json::json!({
254 "status": "healthy",
255 "service": "allsource-core",
256 "version": env!("CARGO_PKG_VERSION")
257 }))
258}
259
260pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262 let metrics = store.metrics();
263
264 match metrics.encode() {
265 Ok(encoded) => Response::builder()
266 .status(200)
267 .header("Content-Type", "text/plain; version=0.0.4")
268 .body(encoded)
269 .unwrap()
270 .into_response(),
271 Err(e) => Response::builder()
272 .status(500)
273 .body(format!("Error encoding metrics: {e}"))
274 .unwrap()
275 .into_response(),
276 }
277}
278
279pub async fn ingest_event(
280 State(store): State<SharedStore>,
281 Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283 let expected_version = req.expected_version;
284
285 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286 let event = Event::from_strings(
287 req.event_type,
288 req.entity_id,
289 tenant_id,
290 req.payload,
291 req.metadata,
292 )?;
293
294 let event_id = event.id;
295 let timestamp = event.timestamp;
296
297 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299 tracing::info!("Event ingested: {}", event_id);
300
301 Ok(Json(IngestEventResponse {
302 event_id,
303 timestamp,
304 version: Some(new_version),
305 }))
306}
307
308pub async fn ingest_event_v1(
314 State(state): State<AppState>,
315 Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317 let expected_version = req.expected_version;
318
319 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321 let event = Event::from_strings(
322 req.event_type,
323 req.entity_id,
324 tenant_id,
325 req.payload,
326 req.metadata,
327 )?;
328
329 let event_id = event.id;
330 let timestamp = event.timestamp;
331
332 let new_version = state
333 .store
334 .ingest_with_expected_version(&event, expected_version)?;
335
336 await_replication_ack(&state).await;
338
339 tracing::info!("Event ingested: {}", event_id);
340
341 Ok(Json(IngestEventResponse {
342 event_id,
343 timestamp,
344 version: Some(new_version),
345 }))
346}
347
348pub async fn ingest_events_batch(
353 State(store): State<SharedStore>,
354 Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356 let total = req.events.len();
357 let mut ingested_events = Vec::with_capacity(total);
358
359 for event_req in req.events {
360 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361 let expected_version = event_req.expected_version;
362
363 let event = Event::from_strings(
364 event_req.event_type,
365 event_req.entity_id,
366 tenant_id,
367 event_req.payload,
368 event_req.metadata,
369 )?;
370
371 let event_id = event.id;
372 let timestamp = event.timestamp;
373
374 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376 ingested_events.push(IngestEventResponse {
377 event_id,
378 timestamp,
379 version: Some(new_version),
380 });
381 }
382
383 let ingested = ingested_events.len();
384 tracing::info!("Batch ingested {} events", ingested);
385
386 Ok(Json(IngestEventsBatchResponse {
387 total,
388 ingested,
389 events: ingested_events,
390 }))
391}
392
393pub async fn ingest_events_batch_v1(
399 State(state): State<AppState>,
400 Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402 let total = req.events.len();
403 let mut ingested_events = Vec::with_capacity(total);
404
405 for event_req in req.events {
406 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407 let expected_version = event_req.expected_version;
408
409 let event = Event::from_strings(
410 event_req.event_type,
411 event_req.entity_id,
412 tenant_id,
413 event_req.payload,
414 event_req.metadata,
415 )?;
416
417 let event_id = event.id;
418 let timestamp = event.timestamp;
419
420 let new_version = state
421 .store
422 .ingest_with_expected_version(&event, expected_version)?;
423
424 ingested_events.push(IngestEventResponse {
425 event_id,
426 timestamp,
427 version: Some(new_version),
428 });
429 }
430
431 await_replication_ack(&state).await;
433
434 let ingested = ingested_events.len();
435 tracing::info!("Batch ingested {} events", ingested);
436
437 Ok(Json(IngestEventsBatchResponse {
438 total,
439 ingested,
440 events: ingested_events,
441 }))
442}
443
444pub async fn query_events(
445 OptionalAuth(auth): OptionalAuth,
446 Query(req): Query<QueryEventsRequest>,
447 State(store): State<SharedStore>,
448) -> Result<Json<QueryEventsResponse>> {
449 let requested_limit = req.limit;
450 let queried_entity_id = req.entity_id.clone();
451
452 let enforced_tenant = auth
455 .as_ref()
456 .map(|a| a.tenant_id().to_string())
457 .or(req.tenant_id.clone());
458
459 let unlimited_req = QueryEventsRequest {
461 entity_id: req.entity_id,
462 event_type: req.event_type,
463 tenant_id: enforced_tenant,
464 as_of: req.as_of,
465 since: req.since,
466 until: req.until,
467 limit: None,
468 event_type_prefix: req.event_type_prefix,
469 payload_filter: req.payload_filter,
470 };
471 let all_events = store.query(&unlimited_req)?;
472 let total_count = all_events.len();
473
474 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
476 all_events.into_iter().take(limit).collect()
477 } else {
478 all_events
479 };
480
481 let count = limited_events.len();
482 let has_more = count < total_count;
483 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
484
485 let entity_version = queried_entity_id
487 .as_deref()
488 .map(|eid| store.get_entity_version(eid));
489
490 tracing::debug!("Query returned {} events (total: {})", count, total_count);
491
492 Ok(Json(QueryEventsResponse {
493 events,
494 count,
495 total_count,
496 has_more,
497 entity_version,
498 }))
499}
500
501pub async fn list_entities(
502 State(store): State<SharedStore>,
503 Query(req): Query<ListEntitiesRequest>,
504) -> Result<Json<ListEntitiesResponse>> {
505 use std::collections::HashMap;
506
507 let query_req = QueryEventsRequest {
509 entity_id: None,
510 event_type: None,
511 tenant_id: None,
512 as_of: None,
513 since: None,
514 until: None,
515 limit: None,
516 event_type_prefix: req.event_type_prefix,
517 payload_filter: req.payload_filter,
518 };
519 let events = store.query(&query_req)?;
520
521 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
523 for event in &events {
524 entity_map
525 .entry(event.entity_id().to_string())
526 .or_default()
527 .push(event);
528 }
529
530 let mut summaries: Vec<EntitySummary> = entity_map
532 .into_iter()
533 .map(|(entity_id, events)| {
534 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
535 EntitySummary {
536 entity_id,
537 event_count: events.len(),
538 last_event_type: last.event_type_str().to_string(),
539 last_event_at: last.timestamp(),
540 }
541 })
542 .collect();
543 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
544
545 let total = summaries.len();
546
547 let offset = req.offset.unwrap_or(0);
549 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
550 let summaries = if let Some(limit) = req.limit {
551 let has_more = summaries.len() > limit;
552 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
553 return Ok(Json(ListEntitiesResponse {
554 entities: truncated,
555 total,
556 has_more,
557 }));
558 } else {
559 summaries
560 };
561
562 Ok(Json(ListEntitiesResponse {
563 entities: summaries,
564 total,
565 has_more: false,
566 }))
567}
568
569pub async fn detect_duplicates(
570 State(store): State<SharedStore>,
571 Query(req): Query<DetectDuplicatesRequest>,
572) -> Result<Json<DetectDuplicatesResponse>> {
573 use std::collections::HashMap;
574
575 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
576
577 let query_req = QueryEventsRequest {
579 entity_id: None,
580 event_type: None,
581 tenant_id: None,
582 as_of: None,
583 since: None,
584 until: None,
585 limit: None,
586 event_type_prefix: Some(req.event_type_prefix),
587 payload_filter: None,
588 };
589 let events = store.query(&query_req)?;
590
591 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
594 for event in &events {
595 let eid = event.entity_id().to_string();
596 entity_latest
597 .entry(eid)
598 .and_modify(|existing| {
599 if event.timestamp() > existing.timestamp() {
600 *existing = event;
601 }
602 })
603 .or_insert(event);
604 }
605
606 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
608 for (entity_id, event) in &entity_latest {
609 let payload = event.payload();
610 let mut key_parts = serde_json::Map::new();
611 for field in &group_by_fields {
612 let value = payload
613 .get(*field)
614 .cloned()
615 .unwrap_or(serde_json::Value::Null);
616 key_parts.insert((*field).to_string(), value);
617 }
618 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
619 groups.entry(key_str).or_default().push(entity_id.clone());
620 }
621
622 let mut duplicate_groups: Vec<DuplicateGroup> = groups
624 .into_iter()
625 .filter(|(_, ids)| ids.len() > 1)
626 .map(|(key_str, mut ids)| {
627 ids.sort();
628 let key: serde_json::Value =
629 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
630 let count = ids.len();
631 DuplicateGroup {
632 key,
633 entity_ids: ids,
634 count,
635 }
636 })
637 .collect();
638
639 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
641
642 let total = duplicate_groups.len();
643
644 let offset = req.offset.unwrap_or(0);
646 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
647
648 if let Some(limit) = req.limit {
649 let has_more = duplicate_groups.len() > limit;
650 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
651 return Ok(Json(DetectDuplicatesResponse {
652 duplicates: truncated,
653 total,
654 has_more,
655 }));
656 }
657
658 Ok(Json(DetectDuplicatesResponse {
659 duplicates: duplicate_groups,
660 total,
661 has_more: false,
662 }))
663}
664
665#[derive(Deserialize)]
666pub struct EntityStateParams {
667 as_of: Option<chrono::DateTime<chrono::Utc>>,
668}
669
670pub async fn get_entity_state(
671 State(store): State<SharedStore>,
672 Path(entity_id): Path<String>,
673 Query(params): Query<EntityStateParams>,
674) -> Result<Json<serde_json::Value>> {
675 let state = store.reconstruct_state(&entity_id, params.as_of)?;
676
677 tracing::info!("State reconstructed for entity: {}", entity_id);
678
679 Ok(Json(state))
680}
681
682pub async fn get_entity_snapshot(
683 State(store): State<SharedStore>,
684 Path(entity_id): Path<String>,
685) -> Result<Json<serde_json::Value>> {
686 let snapshot = store.get_snapshot(&entity_id)?;
687
688 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
689
690 Ok(Json(snapshot))
691}
692
693pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
694 let stats = store.stats();
695 Json(stats)
696}
697
698#[derive(Debug, Deserialize)]
701pub struct ListStreamsParams {
702 pub limit: Option<usize>,
704 pub offset: Option<usize>,
706}
707
708#[derive(Debug, serde::Serialize)]
710pub struct ListStreamsResponse {
711 pub streams: Vec<StreamInfo>,
712 pub total: usize,
713}
714
715pub async fn list_streams(
716 State(store): State<SharedStore>,
717 Query(params): Query<ListStreamsParams>,
718) -> Json<ListStreamsResponse> {
719 let mut streams = store.list_streams();
720 let total = streams.len();
721
722 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
724
725 if let Some(offset) = params.offset {
727 if offset < streams.len() {
728 streams = streams[offset..].to_vec();
729 } else {
730 streams = vec![];
731 }
732 }
733
734 if let Some(limit) = params.limit {
735 streams.truncate(limit);
736 }
737
738 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
739
740 Json(ListStreamsResponse { streams, total })
741}
742
743#[derive(Debug, Deserialize)]
746pub struct ListEventTypesParams {
747 pub limit: Option<usize>,
749 pub offset: Option<usize>,
751}
752
753#[derive(Debug, serde::Serialize)]
755pub struct ListEventTypesResponse {
756 pub event_types: Vec<EventTypeInfo>,
757 pub total: usize,
758}
759
760pub async fn list_event_types(
761 State(store): State<SharedStore>,
762 Query(params): Query<ListEventTypesParams>,
763) -> Json<ListEventTypesResponse> {
764 let mut event_types = store.list_event_types();
765 let total = event_types.len();
766
767 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
769
770 if let Some(offset) = params.offset {
772 if offset < event_types.len() {
773 event_types = event_types[offset..].to_vec();
774 } else {
775 event_types = vec![];
776 }
777 }
778
779 if let Some(limit) = params.limit {
780 event_types.truncate(limit);
781 }
782
783 tracing::debug!(
784 "Listed {} event types (total: {})",
785 event_types.len(),
786 total
787 );
788
789 Json(ListEventTypesResponse { event_types, total })
790}
791
792#[derive(Debug, Deserialize)]
794pub struct WebSocketParams {
795 pub consumer_id: Option<String>,
796}
797
798pub async fn events_websocket(
799 ws: WebSocketUpgrade,
800 State(store): State<SharedStore>,
801 Query(params): Query<WebSocketParams>,
802) -> Response {
803 let websocket_manager = store.websocket_manager();
804
805 ws.on_upgrade(move |socket| async move {
806 if let Some(consumer_id) = params.consumer_id {
807 websocket_manager
808 .handle_socket_with_consumer(socket, consumer_id, store)
809 .await;
810 } else {
811 websocket_manager.handle_socket(socket).await;
812 }
813 })
814}
815
816pub async fn analytics_frequency(
818 State(store): State<SharedStore>,
819 Query(req): Query<EventFrequencyRequest>,
820) -> Result<Json<EventFrequencyResponse>> {
821 let response = AnalyticsEngine::event_frequency(&store, &req)?;
822
823 tracing::debug!(
824 "Frequency analysis returned {} buckets",
825 response.buckets.len()
826 );
827
828 Ok(Json(response))
829}
830
831pub async fn analytics_summary(
833 State(store): State<SharedStore>,
834 Query(req): Query<StatsSummaryRequest>,
835) -> Result<Json<StatsSummaryResponse>> {
836 let response = AnalyticsEngine::stats_summary(&store, &req)?;
837
838 tracing::debug!(
839 "Stats summary: {} events across {} entities",
840 response.total_events,
841 response.unique_entities
842 );
843
844 Ok(Json(response))
845}
846
847pub async fn analytics_correlation(
849 State(store): State<SharedStore>,
850 Query(req): Query<CorrelationRequest>,
851) -> Result<Json<CorrelationResponse>> {
852 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
853
854 tracing::debug!(
855 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
856 response.correlated_pairs,
857 response.total_a,
858 response.correlation_percentage
859 );
860
861 Ok(Json(response))
862}
863
864pub async fn create_snapshot(
866 State(store): State<SharedStore>,
867 Json(req): Json<CreateSnapshotRequest>,
868) -> Result<Json<CreateSnapshotResponse>> {
869 store.create_snapshot(&req.entity_id)?;
870
871 let snapshot_manager = store.snapshot_manager();
872 let snapshot = snapshot_manager
873 .get_latest_snapshot(&req.entity_id)
874 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
875
876 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
877
878 Ok(Json(CreateSnapshotResponse {
879 snapshot_id: snapshot.id,
880 entity_id: snapshot.entity_id,
881 created_at: snapshot.created_at,
882 event_count: snapshot.event_count,
883 size_bytes: snapshot.metadata.size_bytes,
884 }))
885}
886
887pub async fn list_snapshots(
889 State(store): State<SharedStore>,
890 Query(req): Query<ListSnapshotsRequest>,
891) -> Result<Json<ListSnapshotsResponse>> {
892 let snapshot_manager = store.snapshot_manager();
893
894 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
895 snapshot_manager
896 .get_all_snapshots(&entity_id)
897 .into_iter()
898 .map(SnapshotInfo::from)
899 .collect()
900 } else {
901 let entities = snapshot_manager.list_entities();
903 entities
904 .iter()
905 .flat_map(|entity_id| {
906 snapshot_manager
907 .get_all_snapshots(entity_id)
908 .into_iter()
909 .map(SnapshotInfo::from)
910 })
911 .collect()
912 };
913
914 let total = snapshots.len();
915
916 tracing::debug!("Listed {} snapshots", total);
917
918 Ok(Json(ListSnapshotsResponse { snapshots, total }))
919}
920
921pub async fn get_latest_snapshot(
923 State(store): State<SharedStore>,
924 Path(entity_id): Path<String>,
925) -> Result<Json<serde_json::Value>> {
926 let snapshot_manager = store.snapshot_manager();
927
928 let snapshot = snapshot_manager
929 .get_latest_snapshot(&entity_id)
930 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
931
932 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
933
934 Ok(Json(serde_json::json!({
935 "snapshot_id": snapshot.id,
936 "entity_id": snapshot.entity_id,
937 "created_at": snapshot.created_at,
938 "as_of": snapshot.as_of,
939 "event_count": snapshot.event_count,
940 "size_bytes": snapshot.metadata.size_bytes,
941 "snapshot_type": snapshot.metadata.snapshot_type,
942 "state": snapshot.state
943 })))
944}
945
946pub async fn trigger_compaction(
948 State(store): State<SharedStore>,
949) -> Result<Json<CompactionResult>> {
950 let compaction_manager = store.compaction_manager().ok_or_else(|| {
951 crate::error::AllSourceError::InternalError(
952 "Compaction not enabled (no Parquet storage)".to_string(),
953 )
954 })?;
955
956 tracing::info!("📦 Manual compaction triggered via API");
957
958 let result = compaction_manager.compact_now()?;
959
960 Ok(Json(result))
961}
962
963pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
965 let compaction_manager = store.compaction_manager().ok_or_else(|| {
966 crate::error::AllSourceError::InternalError(
967 "Compaction not enabled (no Parquet storage)".to_string(),
968 )
969 })?;
970
971 let stats = compaction_manager.stats();
972 let config = compaction_manager.config();
973
974 Ok(Json(serde_json::json!({
975 "stats": stats,
976 "config": {
977 "min_files_to_compact": config.min_files_to_compact,
978 "target_file_size": config.target_file_size,
979 "max_file_size": config.max_file_size,
980 "small_file_threshold": config.small_file_threshold,
981 "compaction_interval_seconds": config.compaction_interval_seconds,
982 "auto_compact": config.auto_compact,
983 "strategy": config.strategy
984 }
985 })))
986}
987
988pub async fn register_schema(
990 State(store): State<SharedStore>,
991 Json(req): Json<RegisterSchemaRequest>,
992) -> Result<Json<RegisterSchemaResponse>> {
993 let schema_registry = store.schema_registry();
994
995 let response =
996 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
997
998 tracing::info!(
999 "📋 Schema registered: v{} for '{}'",
1000 response.version,
1001 response.subject
1002 );
1003
1004 Ok(Json(response))
1005}
1006
1007#[derive(Deserialize)]
1009pub struct GetSchemaParams {
1010 version: Option<u32>,
1011}
1012
1013pub async fn get_schema(
1014 State(store): State<SharedStore>,
1015 Path(subject): Path<String>,
1016 Query(params): Query<GetSchemaParams>,
1017) -> Result<Json<serde_json::Value>> {
1018 let schema_registry = store.schema_registry();
1019
1020 let schema = schema_registry.get_schema(&subject, params.version)?;
1021
1022 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1023
1024 Ok(Json(serde_json::json!({
1025 "id": schema.id,
1026 "subject": schema.subject,
1027 "version": schema.version,
1028 "schema": schema.schema,
1029 "created_at": schema.created_at,
1030 "description": schema.description,
1031 "tags": schema.tags
1032 })))
1033}
1034
1035pub async fn list_schema_versions(
1037 State(store): State<SharedStore>,
1038 Path(subject): Path<String>,
1039) -> Result<Json<serde_json::Value>> {
1040 let schema_registry = store.schema_registry();
1041
1042 let versions = schema_registry.list_versions(&subject)?;
1043
1044 Ok(Json(serde_json::json!({
1045 "subject": subject,
1046 "versions": versions
1047 })))
1048}
1049
1050pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1052 let schema_registry = store.schema_registry();
1053
1054 let subjects = schema_registry.list_subjects();
1055
1056 Json(serde_json::json!({
1057 "subjects": subjects,
1058 "total": subjects.len()
1059 }))
1060}
1061
1062pub async fn validate_event_schema(
1064 State(store): State<SharedStore>,
1065 Json(req): Json<ValidateEventRequest>,
1066) -> Result<Json<ValidateEventResponse>> {
1067 let schema_registry = store.schema_registry();
1068
1069 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1070
1071 if response.valid {
1072 tracing::debug!(
1073 "✅ Event validated against schema '{}' v{}",
1074 req.subject,
1075 response.schema_version
1076 );
1077 } else {
1078 tracing::warn!(
1079 "❌ Event validation failed for '{}': {:?}",
1080 req.subject,
1081 response.errors
1082 );
1083 }
1084
1085 Ok(Json(response))
1086}
1087
1088#[derive(Deserialize)]
1090pub struct SetCompatibilityRequest {
1091 compatibility: CompatibilityMode,
1092}
1093
1094pub async fn set_compatibility_mode(
1095 State(store): State<SharedStore>,
1096 Path(subject): Path<String>,
1097 Json(req): Json<SetCompatibilityRequest>,
1098) -> Json<serde_json::Value> {
1099 let schema_registry = store.schema_registry();
1100
1101 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1102
1103 tracing::info!(
1104 "🔧 Set compatibility mode for '{}' to {:?}",
1105 subject,
1106 req.compatibility
1107 );
1108
1109 Json(serde_json::json!({
1110 "subject": subject,
1111 "compatibility": req.compatibility
1112 }))
1113}
1114
1115pub async fn start_replay(
1117 State(store): State<SharedStore>,
1118 Json(req): Json<StartReplayRequest>,
1119) -> Result<Json<StartReplayResponse>> {
1120 let replay_manager = store.replay_manager();
1121
1122 let response = replay_manager.start_replay(store, req)?;
1123
1124 tracing::info!(
1125 "🔄 Started replay {} with {} events",
1126 response.replay_id,
1127 response.total_events
1128 );
1129
1130 Ok(Json(response))
1131}
1132
1133pub async fn get_replay_progress(
1135 State(store): State<SharedStore>,
1136 Path(replay_id): Path<uuid::Uuid>,
1137) -> Result<Json<ReplayProgress>> {
1138 let replay_manager = store.replay_manager();
1139
1140 let progress = replay_manager.get_progress(replay_id)?;
1141
1142 Ok(Json(progress))
1143}
1144
1145pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1147 let replay_manager = store.replay_manager();
1148
1149 let replays = replay_manager.list_replays();
1150
1151 Json(serde_json::json!({
1152 "replays": replays,
1153 "total": replays.len()
1154 }))
1155}
1156
1157pub async fn cancel_replay(
1159 State(store): State<SharedStore>,
1160 Path(replay_id): Path<uuid::Uuid>,
1161) -> Result<Json<serde_json::Value>> {
1162 let replay_manager = store.replay_manager();
1163
1164 replay_manager.cancel_replay(replay_id)?;
1165
1166 tracing::info!("🛑 Cancelled replay {}", replay_id);
1167
1168 Ok(Json(serde_json::json!({
1169 "replay_id": replay_id,
1170 "status": "cancelled"
1171 })))
1172}
1173
1174pub async fn delete_replay(
1176 State(store): State<SharedStore>,
1177 Path(replay_id): Path<uuid::Uuid>,
1178) -> Result<Json<serde_json::Value>> {
1179 let replay_manager = store.replay_manager();
1180
1181 let deleted = replay_manager.delete_replay(replay_id)?;
1182
1183 if deleted {
1184 tracing::info!("🗑️ Deleted replay {}", replay_id);
1185 }
1186
1187 Ok(Json(serde_json::json!({
1188 "replay_id": replay_id,
1189 "deleted": deleted
1190 })))
1191}
1192
1193pub async fn register_pipeline(
1195 State(store): State<SharedStore>,
1196 Json(config): Json<PipelineConfig>,
1197) -> Result<Json<serde_json::Value>> {
1198 let pipeline_manager = store.pipeline_manager();
1199
1200 let pipeline_id = pipeline_manager.register(config.clone());
1201
1202 tracing::info!(
1203 "🔀 Pipeline registered: {} (name: {})",
1204 pipeline_id,
1205 config.name
1206 );
1207
1208 Ok(Json(serde_json::json!({
1209 "pipeline_id": pipeline_id,
1210 "name": config.name,
1211 "enabled": config.enabled
1212 })))
1213}
1214
1215pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1217 let pipeline_manager = store.pipeline_manager();
1218
1219 let pipelines = pipeline_manager.list();
1220
1221 tracing::debug!("Listed {} pipelines", pipelines.len());
1222
1223 Json(serde_json::json!({
1224 "pipelines": pipelines,
1225 "total": pipelines.len()
1226 }))
1227}
1228
1229pub async fn get_pipeline(
1231 State(store): State<SharedStore>,
1232 Path(pipeline_id): Path<uuid::Uuid>,
1233) -> Result<Json<PipelineConfig>> {
1234 let pipeline_manager = store.pipeline_manager();
1235
1236 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1237 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1238 })?;
1239
1240 Ok(Json(pipeline.config().clone()))
1241}
1242
1243pub async fn remove_pipeline(
1245 State(store): State<SharedStore>,
1246 Path(pipeline_id): Path<uuid::Uuid>,
1247) -> Result<Json<serde_json::Value>> {
1248 let pipeline_manager = store.pipeline_manager();
1249
1250 let removed = pipeline_manager.remove(pipeline_id);
1251
1252 if removed {
1253 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1254 }
1255
1256 Ok(Json(serde_json::json!({
1257 "pipeline_id": pipeline_id,
1258 "removed": removed
1259 })))
1260}
1261
1262pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1264 let pipeline_manager = store.pipeline_manager();
1265
1266 let stats = pipeline_manager.all_stats();
1267
1268 Json(serde_json::json!({
1269 "stats": stats,
1270 "total": stats.len()
1271 }))
1272}
1273
1274pub async fn get_pipeline_stats(
1276 State(store): State<SharedStore>,
1277 Path(pipeline_id): Path<uuid::Uuid>,
1278) -> Result<Json<PipelineStats>> {
1279 let pipeline_manager = store.pipeline_manager();
1280
1281 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1282 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1283 })?;
1284
1285 Ok(Json(pipeline.stats()))
1286}
1287
1288pub async fn reset_pipeline(
1290 State(store): State<SharedStore>,
1291 Path(pipeline_id): Path<uuid::Uuid>,
1292) -> Result<Json<serde_json::Value>> {
1293 let pipeline_manager = store.pipeline_manager();
1294
1295 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1296 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1297 })?;
1298
1299 pipeline.reset();
1300
1301 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1302
1303 Ok(Json(serde_json::json!({
1304 "pipeline_id": pipeline_id,
1305 "reset": true
1306 })))
1307}
1308
1309pub async fn get_event_by_id(
1315 State(store): State<SharedStore>,
1316 Path(event_id): Path<uuid::Uuid>,
1317) -> Result<Json<serde_json::Value>> {
1318 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1319 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1320 })?;
1321
1322 let dto = EventDto::from(&event);
1323
1324 tracing::debug!("Event retrieved by ID: {}", event_id);
1325
1326 Ok(Json(serde_json::json!({
1327 "event": dto,
1328 "found": true
1329 })))
1330}
1331
1332pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1338 let projection_manager = store.projection_manager();
1339 let status_map = store.projection_status();
1340
1341 let projections: Vec<serde_json::Value> = projection_manager
1342 .list_projections()
1343 .iter()
1344 .map(|(name, projection)| {
1345 let status = status_map
1346 .get(name)
1347 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1348 serde_json::json!({
1349 "name": name,
1350 "type": format!("{:?}", projection.name()),
1351 "status": status,
1352 })
1353 })
1354 .collect();
1355
1356 tracing::debug!("Listed {} projections", projections.len());
1357
1358 Json(serde_json::json!({
1359 "projections": projections,
1360 "total": projections.len()
1361 }))
1362}
1363
1364pub async fn get_projection(
1366 State(store): State<SharedStore>,
1367 Path(name): Path<String>,
1368) -> Result<Json<serde_json::Value>> {
1369 let projection_manager = store.projection_manager();
1370
1371 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1372 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1373 })?;
1374
1375 Ok(Json(serde_json::json!({
1376 "name": projection.name(),
1377 "found": true
1378 })))
1379}
1380
1381pub async fn get_projection_state(
1394 State(store): State<SharedStore>,
1395 Path((name, entity_id)): Path<(String, String)>,
1396) -> Result<Json<serde_json::Value>> {
1397 let state = store
1398 .projection_manager()
1399 .get_projection(&name)
1400 .and_then(|p| p.get_state(&entity_id))
1401 .or_else(|| {
1402 store
1403 .projection_state_cache()
1404 .get(&format!("{name}:{entity_id}"))
1405 .map(|entry| entry.value().clone())
1406 });
1407
1408 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1409
1410 Ok(Json(serde_json::json!({
1411 "projection": name,
1412 "entity_id": entity_id,
1413 "state": state,
1414 "found": state.is_some()
1415 })))
1416}
1417
1418pub async fn delete_projection(
1423 State(store): State<SharedStore>,
1424 Path(name): Path<String>,
1425) -> Result<Json<serde_json::Value>> {
1426 let projection_manager = store.projection_manager();
1427
1428 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1429 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1430 })?;
1431
1432 projection.clear();
1433
1434 let cache = store.projection_state_cache();
1436 let prefix = format!("{name}:");
1437 let keys_to_remove: Vec<String> = cache
1438 .iter()
1439 .filter(|entry| entry.key().starts_with(&prefix))
1440 .map(|entry| entry.key().clone())
1441 .collect();
1442 for key in keys_to_remove {
1443 cache.remove(&key);
1444 }
1445
1446 tracing::info!("Projection deleted (cleared): {}", name);
1447
1448 Ok(Json(serde_json::json!({
1449 "projection": name,
1450 "deleted": true
1451 })))
1452}
1453
1454pub async fn get_projection_state_summary(
1463 State(store): State<SharedStore>,
1464 Path(name): Path<String>,
1465) -> Result<Json<serde_json::Value>> {
1466 let cache = store.projection_state_cache();
1467 let prefix = format!("{name}:");
1468 let states: Vec<serde_json::Value> = cache
1469 .iter()
1470 .filter(|entry| entry.key().starts_with(&prefix))
1471 .map(|entry| {
1472 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1473 serde_json::json!({
1474 "entity_id": entity_id,
1475 "state": entry.value().clone()
1476 })
1477 })
1478 .collect();
1479
1480 let total = states.len();
1481
1482 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1483
1484 Ok(Json(serde_json::json!({
1485 "projection": name,
1486 "states": states,
1487 "total": total
1488 })))
1489}
1490
1491pub async fn reset_projection(
1495 State(store): State<SharedStore>,
1496 Path(name): Path<String>,
1497) -> Result<Json<serde_json::Value>> {
1498 let reprocessed = store.reset_projection(&name)?;
1499
1500 tracing::info!(
1501 "Projection reset: {} ({} events reprocessed)",
1502 name,
1503 reprocessed
1504 );
1505
1506 Ok(Json(serde_json::json!({
1507 "projection": name,
1508 "reset": true,
1509 "events_reprocessed": reprocessed
1510 })))
1511}
1512
1513pub async fn pause_projection(
1517 State(store): State<SharedStore>,
1518 Path(name): Path<String>,
1519) -> Result<Json<serde_json::Value>> {
1520 let projection_manager = store.projection_manager();
1521
1522 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1524 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1525 })?;
1526
1527 store
1528 .projection_status()
1529 .insert(name.clone(), "paused".to_string());
1530
1531 tracing::info!("Projection paused: {}", name);
1532
1533 Ok(Json(serde_json::json!({
1534 "projection": name,
1535 "status": "paused"
1536 })))
1537}
1538
1539pub async fn start_projection(
1543 State(store): State<SharedStore>,
1544 Path(name): Path<String>,
1545) -> Result<Json<serde_json::Value>> {
1546 let projection_manager = store.projection_manager();
1547
1548 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1550 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1551 })?;
1552
1553 store
1554 .projection_status()
1555 .insert(name.clone(), "running".to_string());
1556
1557 tracing::info!("Projection started: {}", name);
1558
1559 Ok(Json(serde_json::json!({
1560 "projection": name,
1561 "status": "running"
1562 })))
1563}
1564
1565#[derive(Debug, Deserialize)]
1567pub struct SaveProjectionStateRequest {
1568 pub state: serde_json::Value,
1569}
1570
1571pub async fn save_projection_state(
1576 State(store): State<SharedStore>,
1577 Path((name, entity_id)): Path<(String, String)>,
1578 Json(req): Json<SaveProjectionStateRequest>,
1579) -> Result<Json<serde_json::Value>> {
1580 let projection_cache = store.projection_state_cache();
1581
1582 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1584
1585 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1586
1587 Ok(Json(serde_json::json!({
1588 "projection": name,
1589 "entity_id": entity_id,
1590 "saved": true
1591 })))
1592}
1593
1594#[derive(Debug, Deserialize)]
1598pub struct BulkGetStateRequest {
1599 pub entity_ids: Vec<String>,
1600}
1601
1602#[derive(Debug, Deserialize)]
1606pub struct BulkSaveStateRequest {
1607 pub states: Vec<BulkSaveStateItem>,
1608}
1609
1610#[derive(Debug, Deserialize)]
1611pub struct BulkSaveStateItem {
1612 pub entity_id: String,
1613 pub state: serde_json::Value,
1614}
1615
1616pub async fn bulk_get_projection_states(
1617 State(store): State<SharedStore>,
1618 Path(name): Path<String>,
1619 Json(req): Json<BulkGetStateRequest>,
1620) -> Result<Json<serde_json::Value>> {
1621 let projection = store.projection_manager().get_projection(&name);
1625 let cache = store.projection_state_cache();
1626
1627 let states: Vec<serde_json::Value> = req
1628 .entity_ids
1629 .iter()
1630 .map(|entity_id| {
1631 let state = projection
1632 .as_ref()
1633 .and_then(|p| p.get_state(entity_id))
1634 .or_else(|| {
1635 cache
1636 .get(&format!("{name}:{entity_id}"))
1637 .map(|entry| entry.value().clone())
1638 });
1639 serde_json::json!({
1640 "entity_id": entity_id,
1641 "state": state,
1642 "found": state.is_some()
1643 })
1644 })
1645 .collect();
1646
1647 tracing::debug!(
1648 "Bulk projection state retrieved: {} entities from {}",
1649 states.len(),
1650 name
1651 );
1652
1653 Ok(Json(serde_json::json!({
1654 "projection": name,
1655 "states": states,
1656 "total": states.len()
1657 })))
1658}
1659
1660pub async fn bulk_save_projection_states(
1665 State(store): State<SharedStore>,
1666 Path(name): Path<String>,
1667 Json(req): Json<BulkSaveStateRequest>,
1668) -> Result<Json<serde_json::Value>> {
1669 let projection_cache = store.projection_state_cache();
1670
1671 let mut saved_count = 0;
1672 for item in &req.states {
1673 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1674 saved_count += 1;
1675 }
1676
1677 tracing::info!(
1678 "Bulk projection state saved: {} entities for {}",
1679 saved_count,
1680 name
1681 );
1682
1683 Ok(Json(serde_json::json!({
1684 "projection": name,
1685 "saved": saved_count,
1686 "total": req.states.len()
1687 })))
1688}
1689
1690#[derive(Debug, Deserialize)]
1696pub struct ListWebhooksParams {
1697 pub tenant_id: Option<String>,
1698}
1699
1700pub async fn register_webhook(
1702 State(store): State<SharedStore>,
1703 Json(req): Json<RegisterWebhookRequest>,
1704) -> Json<serde_json::Value> {
1705 let registry = store.webhook_registry();
1706 let webhook = registry.register(req);
1707
1708 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1709
1710 Json(serde_json::json!({
1711 "webhook": webhook,
1712 "created": true
1713 }))
1714}
1715
1716pub async fn list_webhooks(
1718 State(store): State<SharedStore>,
1719 Query(params): Query<ListWebhooksParams>,
1720) -> Json<serde_json::Value> {
1721 let registry = store.webhook_registry();
1722
1723 let webhooks = if let Some(tenant_id) = params.tenant_id {
1724 registry.list_by_tenant(&tenant_id)
1725 } else {
1726 vec![]
1728 };
1729
1730 let total = webhooks.len();
1731
1732 Json(serde_json::json!({
1733 "webhooks": webhooks,
1734 "total": total
1735 }))
1736}
1737
1738pub async fn get_webhook(
1740 State(store): State<SharedStore>,
1741 Path(webhook_id): Path<uuid::Uuid>,
1742) -> Result<Json<serde_json::Value>> {
1743 let registry = store.webhook_registry();
1744
1745 let webhook = registry.get(webhook_id).ok_or_else(|| {
1746 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1747 })?;
1748
1749 Ok(Json(serde_json::json!({
1750 "webhook": webhook,
1751 "found": true
1752 })))
1753}
1754
1755pub async fn update_webhook(
1757 State(store): State<SharedStore>,
1758 Path(webhook_id): Path<uuid::Uuid>,
1759 Json(req): Json<UpdateWebhookRequest>,
1760) -> Result<Json<serde_json::Value>> {
1761 let registry = store.webhook_registry();
1762
1763 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1764 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1765 })?;
1766
1767 tracing::info!("Webhook updated: {}", webhook_id);
1768
1769 Ok(Json(serde_json::json!({
1770 "webhook": webhook,
1771 "updated": true
1772 })))
1773}
1774
1775pub async fn delete_webhook(
1777 State(store): State<SharedStore>,
1778 Path(webhook_id): Path<uuid::Uuid>,
1779) -> Result<Json<serde_json::Value>> {
1780 let registry = store.webhook_registry();
1781
1782 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1783 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1784 })?;
1785
1786 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1787
1788 Ok(Json(serde_json::json!({
1789 "webhook_id": webhook_id,
1790 "deleted": true
1791 })))
1792}
1793
1794#[derive(Debug, Deserialize)]
1796pub struct ListDeliveriesParams {
1797 pub limit: Option<usize>,
1798}
1799
1800pub async fn list_webhook_deliveries(
1802 State(store): State<SharedStore>,
1803 Path(webhook_id): Path<uuid::Uuid>,
1804 Query(params): Query<ListDeliveriesParams>,
1805) -> Result<Json<serde_json::Value>> {
1806 let registry = store.webhook_registry();
1807
1808 registry.get(webhook_id).ok_or_else(|| {
1810 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1811 })?;
1812
1813 let limit = params.limit.unwrap_or(50);
1814 let deliveries = registry.get_deliveries(webhook_id, limit);
1815 let total = deliveries.len();
1816
1817 Ok(Json(serde_json::json!({
1818 "webhook_id": webhook_id,
1819 "deliveries": deliveries,
1820 "total": total
1821 })))
1822}
1823
1824#[cfg(feature = "analytics")]
1830pub async fn eventql_query(
1831 State(store): State<SharedStore>,
1832 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1833) -> Result<Json<serde_json::Value>> {
1834 let events = store.snapshot_events();
1835 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1836 Ok(response) => Ok(Json(serde_json::json!({
1837 "columns": response.columns,
1838 "rows": response.rows,
1839 "row_count": response.row_count,
1840 }))),
1841 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1842 }
1843}
1844
1845pub async fn graphql_query(
1847 State(store): State<SharedStore>,
1848 Json(req): Json<GraphQLRequest>,
1849) -> Json<serde_json::Value> {
1850 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1851 Ok(f) => f,
1852 Err(e) => {
1853 return Json(
1854 serde_json::to_value(GraphQLResponse {
1855 data: None,
1856 errors: vec![GraphQLError { message: e }],
1857 })
1858 .unwrap(),
1859 );
1860 }
1861 };
1862
1863 let mut data = serde_json::Map::new();
1864 let mut errors = Vec::new();
1865
1866 for field in &fields {
1867 match field.name.as_str() {
1868 "events" => {
1869 let request = crate::application::dto::QueryEventsRequest {
1870 entity_id: field.arguments.get("entity_id").cloned(),
1871 event_type: field.arguments.get("event_type").cloned(),
1872 tenant_id: field.arguments.get("tenant_id").cloned(),
1873 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1874 as_of: None,
1875 since: None,
1876 until: None,
1877 event_type_prefix: None,
1878 payload_filter: None,
1879 };
1880 match store.query(&request) {
1881 Ok(events) => {
1882 let json_events: Vec<serde_json::Value> = events
1883 .iter()
1884 .map(|e| {
1885 crate::infrastructure::query::graphql::event_to_json(
1886 e,
1887 &field.fields,
1888 )
1889 })
1890 .collect();
1891 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1892 }
1893 Err(e) => errors.push(GraphQLError {
1894 message: format!("events query failed: {e}"),
1895 }),
1896 }
1897 }
1898 "event" => {
1899 if let Some(id_str) = field.arguments.get("id") {
1900 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1901 match store.get_event_by_id(&id) {
1902 Ok(Some(event)) => {
1903 data.insert(
1904 "event".to_string(),
1905 crate::infrastructure::query::graphql::event_to_json(
1906 &event,
1907 &field.fields,
1908 ),
1909 );
1910 }
1911 Ok(None) => {
1912 data.insert("event".to_string(), serde_json::Value::Null);
1913 }
1914 Err(e) => errors.push(GraphQLError {
1915 message: format!("event lookup failed: {e}"),
1916 }),
1917 }
1918 } else {
1919 errors.push(GraphQLError {
1920 message: format!("Invalid UUID: {id_str}"),
1921 });
1922 }
1923 } else {
1924 errors.push(GraphQLError {
1925 message: "event query requires 'id' argument".to_string(),
1926 });
1927 }
1928 }
1929 "projections" => {
1930 let pm = store.projection_manager();
1931 let names: Vec<serde_json::Value> = pm
1932 .list_projections()
1933 .iter()
1934 .map(|(name, _)| serde_json::Value::String(name.clone()))
1935 .collect();
1936 data.insert("projections".to_string(), serde_json::Value::Array(names));
1937 }
1938 "stats" => {
1939 let stats = store.stats();
1940 data.insert(
1941 "stats".to_string(),
1942 serde_json::json!({
1943 "total_events": stats.total_events,
1944 "total_entities": stats.total_entities,
1945 "total_event_types": stats.total_event_types,
1946 }),
1947 );
1948 }
1949 "__schema" => {
1950 data.insert(
1951 "__schema".to_string(),
1952 crate::infrastructure::query::graphql::introspection_schema(),
1953 );
1954 }
1955 other => {
1956 errors.push(GraphQLError {
1957 message: format!("Unknown field: {other}"),
1958 });
1959 }
1960 }
1961 }
1962
1963 Json(
1964 serde_json::to_value(GraphQLResponse {
1965 data: Some(serde_json::Value::Object(data)),
1966 errors,
1967 })
1968 .unwrap(),
1969 )
1970}
1971
1972pub async fn geo_query(
1974 State(store): State<SharedStore>,
1975 Json(req): Json<GeoQueryRequest>,
1976) -> Json<serde_json::Value> {
1977 let events = store.snapshot_events();
1978 let geo_index = store.geo_index();
1979 let results =
1980 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1981 let total = results.len();
1982 Json(serde_json::json!({
1983 "results": results,
1984 "total": total,
1985 }))
1986}
1987
1988pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1990 let stats = store.geo_index().stats();
1991 Json(serde_json::json!(stats))
1992}
1993
1994pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1996 let stats = store.exactly_once().stats();
1997 Json(serde_json::json!(stats))
1998}
1999
2000pub async fn schema_evolution_history(
2002 State(store): State<SharedStore>,
2003 Path(event_type): Path<String>,
2004) -> Json<serde_json::Value> {
2005 let mgr = store.schema_evolution();
2006 let history = mgr.get_history(&event_type);
2007 let version = mgr.get_version(&event_type);
2008 Json(serde_json::json!({
2009 "event_type": event_type,
2010 "current_version": version,
2011 "history": history,
2012 }))
2013}
2014
2015pub async fn schema_evolution_schema(
2017 State(store): State<SharedStore>,
2018 Path(event_type): Path<String>,
2019) -> Json<serde_json::Value> {
2020 let mgr = store.schema_evolution();
2021 if let Some(schema) = mgr.get_schema(&event_type) {
2022 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2023 Json(serde_json::json!({
2024 "event_type": event_type,
2025 "version": mgr.get_version(&event_type),
2026 "inferred_schema": schema,
2027 "json_schema": json_schema,
2028 }))
2029 } else {
2030 Json(serde_json::json!({
2031 "event_type": event_type,
2032 "error": "No schema inferred for this event type"
2033 }))
2034 }
2035}
2036
2037pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2039 let stats = store.schema_evolution().stats();
2040 let event_types = store.schema_evolution().list_event_types();
2041 Json(serde_json::json!({
2042 "stats": stats,
2043 "tracked_event_types": event_types,
2044 }))
2045}
2046
2047#[cfg(feature = "embedded-sync")]
2053pub async fn sync_pull_handler(
2054 State(state): State<AppState>,
2055 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2056) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2057 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2058
2059 let store = &state.store;
2060
2061 let since = request
2064 .version_vector
2065 .values()
2066 .map(|ts| ts.physical_ms)
2067 .min()
2068 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2069
2070 let events = store.query(&crate::application::dto::QueryEventsRequest {
2071 entity_id: None,
2072 event_type: None,
2073 tenant_id: None,
2074 as_of: None,
2075 since,
2076 until: None,
2077 limit: None,
2078 event_type_prefix: None,
2079 payload_filter: None,
2080 })?;
2081
2082 let mut replicated = Vec::with_capacity(events.len());
2084 let mut last_ms = 0u64;
2085 let mut logical = 0u32;
2086
2087 for event in &events {
2088 let event_ms = event.timestamp().timestamp_millis() as u64;
2089 if event_ms == last_ms {
2090 logical += 1;
2091 } else {
2092 last_ms = event_ms;
2093 logical = 0;
2094 }
2095
2096 replicated.push(ReplicatedEvent {
2097 event_id: event.id().to_string(),
2098 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2099 origin_region: "server".to_string(),
2100 event_data: serde_json::json!({
2101 "event_type": event.event_type_str(),
2102 "entity_id": event.entity_id_str(),
2103 "tenant_id": event.tenant_id_str(),
2104 "payload": event.payload,
2105 "metadata": event.metadata,
2106 }),
2107 });
2108 }
2109
2110 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2111 events: replicated,
2112 version_vector: std::collections::BTreeMap::new(),
2113 }))
2114}
2115
2116#[cfg(feature = "embedded-sync")]
2118pub async fn sync_push_handler(
2119 State(state): State<AppState>,
2120 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2121) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2122 let store = &state.store;
2123
2124 let mut accepted = 0usize;
2125 let mut skipped = 0usize;
2126
2127 for rep_event in &request.events {
2128 let event_data = &rep_event.event_data;
2129 let event_type = event_data
2130 .get("event_type")
2131 .and_then(|v| v.as_str())
2132 .unwrap_or("unknown")
2133 .to_string();
2134 let entity_id = event_data
2135 .get("entity_id")
2136 .and_then(|v| v.as_str())
2137 .unwrap_or("unknown")
2138 .to_string();
2139 let tenant_id = event_data
2140 .get("tenant_id")
2141 .and_then(|v| v.as_str())
2142 .unwrap_or("default")
2143 .to_string();
2144 let payload = event_data
2145 .get("payload")
2146 .cloned()
2147 .unwrap_or(serde_json::json!({}));
2148 let metadata = event_data.get("metadata").cloned();
2149
2150 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2151 Ok(domain_event) => {
2152 store.ingest(&domain_event)?;
2153 accepted += 1;
2154 }
2155 Err(_) => {
2156 skipped += 1;
2157 }
2158 }
2159 }
2160
2161 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2162 accepted,
2163 skipped,
2164 version_vector: std::collections::BTreeMap::new(),
2165 }))
2166}
2167
2168pub async fn register_consumer(
2174 State(store): State<SharedStore>,
2175 Json(req): Json<RegisterConsumerRequest>,
2176) -> Result<Json<ConsumerResponse>> {
2177 let consumer = store
2178 .consumer_registry()
2179 .register(&req.consumer_id, &req.event_type_filters);
2180
2181 Ok(Json(ConsumerResponse {
2182 consumer_id: consumer.consumer_id,
2183 event_type_filters: consumer.event_type_filters,
2184 cursor_position: consumer.cursor_position,
2185 }))
2186}
2187
2188pub async fn get_consumer(
2190 State(store): State<SharedStore>,
2191 Path(consumer_id): Path<String>,
2192) -> Result<Json<ConsumerResponse>> {
2193 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2194
2195 Ok(Json(ConsumerResponse {
2196 consumer_id: consumer.consumer_id,
2197 event_type_filters: consumer.event_type_filters,
2198 cursor_position: consumer.cursor_position,
2199 }))
2200}
2201
2202#[derive(Debug, Deserialize)]
2204pub struct ConsumerPollQuery {
2205 pub limit: Option<usize>,
2206}
2207
2208pub async fn poll_consumer_events(
2209 State(store): State<SharedStore>,
2210 Path(consumer_id): Path<String>,
2211 Query(query): Query<ConsumerPollQuery>,
2212) -> Result<Json<ConsumerEventsResponse>> {
2213 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2214 let offset = consumer.cursor_position.unwrap_or(0);
2215 let limit = query.limit.unwrap_or(100);
2216
2217 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2218 let count = events.len();
2219
2220 let consumer_events: Vec<ConsumerEventDto> = events
2221 .into_iter()
2222 .map(|(position, event)| ConsumerEventDto {
2223 position,
2224 event: EventDto::from(&event),
2225 })
2226 .collect();
2227
2228 Ok(Json(ConsumerEventsResponse {
2229 events: consumer_events,
2230 count,
2231 }))
2232}
2233
2234pub async fn ack_consumer(
2236 State(store): State<SharedStore>,
2237 Path(consumer_id): Path<String>,
2238 Json(req): Json<AckRequest>,
2239) -> Result<Json<serde_json::Value>> {
2240 let max_offset = store.total_events() as u64;
2241
2242 store
2243 .consumer_registry()
2244 .ack(&consumer_id, req.position, max_offset)
2245 .map_err(crate::error::AllSourceError::InvalidInput)?;
2246
2247 Ok(Json(serde_json::json!({
2248 "status": "ok",
2249 "consumer_id": consumer_id,
2250 "position": req.position,
2251 })))
2252}
2253
2254#[cfg(test)]
2255mod tests {
2256 use super::*;
2257 use crate::{domain::entities::Event, store::EventStore};
2258
2259 fn create_test_store() -> Arc<EventStore> {
2260 Arc::new(EventStore::new())
2261 }
2262
2263 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2264 Event::from_strings(
2265 event_type.to_string(),
2266 entity_id.to_string(),
2267 "test-stream".to_string(),
2268 serde_json::json!({
2269 "name": "Test",
2270 "value": 42
2271 }),
2272 None,
2273 )
2274 .unwrap()
2275 }
2276
2277 #[tokio::test]
2278 async fn test_query_events_has_more_and_total_count() {
2279 let store = create_test_store();
2280
2281 for i in 0..50 {
2283 store
2284 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2285 .unwrap();
2286 }
2287
2288 let req = QueryEventsRequest {
2290 entity_id: None,
2291 event_type: None,
2292 tenant_id: None,
2293 as_of: None,
2294 since: None,
2295 until: None,
2296 limit: Some(10),
2297 event_type_prefix: None,
2298 payload_filter: None,
2299 };
2300
2301 let requested_limit = req.limit;
2302 let unlimited_req = QueryEventsRequest {
2303 limit: None,
2304 ..QueryEventsRequest {
2305 entity_id: req.entity_id,
2306 event_type: req.event_type,
2307 tenant_id: req.tenant_id,
2308 as_of: req.as_of,
2309 since: req.since,
2310 until: req.until,
2311 limit: None,
2312 event_type_prefix: req.event_type_prefix,
2313 payload_filter: req.payload_filter,
2314 }
2315 };
2316 let all_events = store.query(&unlimited_req).unwrap();
2317 let total_count = all_events.len();
2318 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2319 all_events.into_iter().take(limit).collect()
2320 } else {
2321 all_events
2322 };
2323 let count = limited_events.len();
2324 let has_more = count < total_count;
2325
2326 assert_eq!(count, 10);
2327 assert_eq!(total_count, 50);
2328 assert!(has_more);
2329 }
2330
2331 #[tokio::test]
2332 async fn test_query_events_no_more_results() {
2333 let store = create_test_store();
2334
2335 for i in 0..5 {
2337 store
2338 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2339 .unwrap();
2340 }
2341
2342 let all_events = store
2344 .query(&QueryEventsRequest {
2345 entity_id: None,
2346 event_type: None,
2347 tenant_id: None,
2348 as_of: None,
2349 since: None,
2350 until: None,
2351 limit: None,
2352 event_type_prefix: None,
2353 payload_filter: None,
2354 })
2355 .unwrap();
2356 let total_count = all_events.len();
2357 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2358 let count = limited_events.len();
2359 let has_more = count < total_count;
2360
2361 assert_eq!(count, 5);
2362 assert_eq!(total_count, 5);
2363 assert!(!has_more);
2364 }
2365
2366 #[tokio::test]
2367 async fn test_list_entities_by_type_prefix() {
2368 let store = create_test_store();
2369
2370 store
2372 .ingest(&create_test_event("idx-1", "index.created"))
2373 .unwrap();
2374 store
2375 .ingest(&create_test_event("idx-1", "index.updated"))
2376 .unwrap();
2377 store
2378 .ingest(&create_test_event("idx-2", "index.created"))
2379 .unwrap();
2380 store
2381 .ingest(&create_test_event("idx-3", "index.created"))
2382 .unwrap();
2383 store
2385 .ingest(&create_test_event("trade-1", "trade.created"))
2386 .unwrap();
2387 store
2388 .ingest(&create_test_event("trade-2", "trade.created"))
2389 .unwrap();
2390
2391 let req = ListEntitiesRequest {
2393 event_type_prefix: Some("index.".to_string()),
2394 payload_filter: None,
2395 limit: None,
2396 offset: None,
2397 };
2398 let query_req = QueryEventsRequest {
2399 entity_id: None,
2400 event_type: None,
2401 tenant_id: None,
2402 as_of: None,
2403 since: None,
2404 until: None,
2405 limit: None,
2406 event_type_prefix: req.event_type_prefix,
2407 payload_filter: req.payload_filter,
2408 };
2409 let events = store.query(&query_req).unwrap();
2410
2411 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2413 std::collections::HashMap::new();
2414 for event in &events {
2415 entity_map
2416 .entry(event.entity_id().to_string())
2417 .or_default()
2418 .push(event);
2419 }
2420
2421 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2424 assert_eq!(entity_map["idx-3"].len(), 1);
2425 }
2426
2427 fn create_test_event_with_payload(
2428 entity_id: &str,
2429 event_type: &str,
2430 payload: serde_json::Value,
2431 ) -> Event {
2432 Event::from_strings(
2433 event_type.to_string(),
2434 entity_id.to_string(),
2435 "test-stream".to_string(),
2436 payload,
2437 None,
2438 )
2439 .unwrap()
2440 }
2441
2442 #[tokio::test]
2443 async fn test_detect_duplicates_by_payload_fields() {
2444 let store = create_test_store();
2445
2446 store
2448 .ingest(&create_test_event_with_payload(
2449 "idx-1",
2450 "index.created",
2451 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2452 ))
2453 .unwrap();
2454 store
2455 .ingest(&create_test_event_with_payload(
2456 "idx-2",
2457 "index.created",
2458 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2459 ))
2460 .unwrap();
2461 store
2462 .ingest(&create_test_event_with_payload(
2463 "idx-3",
2464 "index.created",
2465 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2466 ))
2467 .unwrap();
2468 store
2469 .ingest(&create_test_event_with_payload(
2470 "idx-4",
2471 "index.created",
2472 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2473 ))
2474 .unwrap();
2475 store
2476 .ingest(&create_test_event_with_payload(
2477 "idx-5",
2478 "index.created",
2479 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2480 ))
2481 .unwrap();
2482
2483 let query_req = QueryEventsRequest {
2485 entity_id: None,
2486 event_type: None,
2487 tenant_id: None,
2488 as_of: None,
2489 since: None,
2490 until: None,
2491 limit: None,
2492 event_type_prefix: Some("index.".to_string()),
2493 payload_filter: None,
2494 };
2495 let events = store.query(&query_req).unwrap();
2496
2497 let group_by_fields = vec!["name"];
2499 let mut entity_latest: std::collections::HashMap<String, &Event> =
2500 std::collections::HashMap::new();
2501 for event in &events {
2502 let eid = event.entity_id().to_string();
2503 entity_latest
2504 .entry(eid)
2505 .and_modify(|existing| {
2506 if event.timestamp() > existing.timestamp() {
2507 *existing = event;
2508 }
2509 })
2510 .or_insert(event);
2511 }
2512
2513 let mut groups: std::collections::HashMap<String, Vec<String>> =
2514 std::collections::HashMap::new();
2515 for (entity_id, event) in &entity_latest {
2516 let payload = event.payload();
2517 let mut key_parts = serde_json::Map::new();
2518 for field in &group_by_fields {
2519 let value = payload
2520 .get(*field)
2521 .cloned()
2522 .unwrap_or(serde_json::Value::Null);
2523 key_parts.insert((*field).to_string(), value);
2524 }
2525 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2526 groups.entry(key_str).or_default().push(entity_id.clone());
2527 }
2528
2529 let duplicate_groups: Vec<_> = groups
2530 .into_iter()
2531 .filter(|(_, ids)| ids.len() > 1)
2532 .collect();
2533
2534 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2536 assert_eq!(ids.len(), 2);
2537 }
2538 }
2539
2540 #[tokio::test]
2541 async fn test_detect_duplicates_no_duplicates() {
2542 let store = create_test_store();
2543
2544 store
2546 .ingest(&create_test_event_with_payload(
2547 "idx-1",
2548 "index.created",
2549 serde_json::json!({"name": "A"}),
2550 ))
2551 .unwrap();
2552 store
2553 .ingest(&create_test_event_with_payload(
2554 "idx-2",
2555 "index.created",
2556 serde_json::json!({"name": "B"}),
2557 ))
2558 .unwrap();
2559
2560 let query_req = QueryEventsRequest {
2561 entity_id: None,
2562 event_type: None,
2563 tenant_id: None,
2564 as_of: None,
2565 since: None,
2566 until: None,
2567 limit: None,
2568 event_type_prefix: Some("index.".to_string()),
2569 payload_filter: None,
2570 };
2571 let events = store.query(&query_req).unwrap();
2572
2573 let mut entity_latest: std::collections::HashMap<String, &Event> =
2574 std::collections::HashMap::new();
2575 for event in &events {
2576 entity_latest
2577 .entry(event.entity_id().to_string())
2578 .or_insert(event);
2579 }
2580
2581 let mut groups: std::collections::HashMap<String, Vec<String>> =
2582 std::collections::HashMap::new();
2583 for (entity_id, event) in &entity_latest {
2584 let key_str =
2585 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2586 .unwrap();
2587 groups.entry(key_str).or_default().push(entity_id.clone());
2588 }
2589
2590 let duplicate_groups: Vec<_> = groups
2591 .into_iter()
2592 .filter(|(_, ids)| ids.len() > 1)
2593 .collect();
2594
2595 assert_eq!(duplicate_groups.len(), 0); }
2597
2598 #[tokio::test]
2599 async fn test_detect_duplicates_multi_field_group_by() {
2600 let store = create_test_store();
2601
2602 store
2604 .ingest(&create_test_event_with_payload(
2605 "idx-1",
2606 "index.created",
2607 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2608 ))
2609 .unwrap();
2610 store
2611 .ingest(&create_test_event_with_payload(
2612 "idx-2",
2613 "index.created",
2614 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2615 ))
2616 .unwrap();
2617 store
2619 .ingest(&create_test_event_with_payload(
2620 "idx-3",
2621 "index.created",
2622 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2623 ))
2624 .unwrap();
2625
2626 let query_req = QueryEventsRequest {
2627 entity_id: None,
2628 event_type: None,
2629 tenant_id: None,
2630 as_of: None,
2631 since: None,
2632 until: None,
2633 limit: None,
2634 event_type_prefix: Some("index.".to_string()),
2635 payload_filter: None,
2636 };
2637 let events = store.query(&query_req).unwrap();
2638
2639 let group_by_fields = vec!["name", "user_id"];
2640 let mut entity_latest: std::collections::HashMap<String, &Event> =
2641 std::collections::HashMap::new();
2642 for event in &events {
2643 entity_latest
2644 .entry(event.entity_id().to_string())
2645 .and_modify(|existing| {
2646 if event.timestamp() > existing.timestamp() {
2647 *existing = event;
2648 }
2649 })
2650 .or_insert(event);
2651 }
2652
2653 let mut groups: std::collections::HashMap<String, Vec<String>> =
2654 std::collections::HashMap::new();
2655 for (entity_id, event) in &entity_latest {
2656 let payload = event.payload();
2657 let mut key_parts = serde_json::Map::new();
2658 for field in &group_by_fields {
2659 let value = payload
2660 .get(*field)
2661 .cloned()
2662 .unwrap_or(serde_json::Value::Null);
2663 key_parts.insert((*field).to_string(), value);
2664 }
2665 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2666 groups.entry(key_str).or_default().push(entity_id.clone());
2667 }
2668
2669 let duplicate_groups: Vec<_> = groups
2670 .into_iter()
2671 .filter(|(_, ids)| ids.len() > 1)
2672 .collect();
2673
2674 assert_eq!(duplicate_groups.len(), 1);
2676 let (_, ref ids) = duplicate_groups[0];
2677 assert_eq!(ids.len(), 2);
2678 let mut sorted_ids = ids.clone();
2679 sorted_ids.sort();
2680 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2681 }
2682
2683 #[tokio::test]
2684 async fn test_projection_state_cache() {
2685 let store = create_test_store();
2686
2687 let cache = store.projection_state_cache();
2689 cache.insert(
2690 "entity_snapshots:user-123".to_string(),
2691 serde_json::json!({"name": "Test User", "age": 30}),
2692 );
2693
2694 let state = cache.get("entity_snapshots:user-123");
2696 assert!(state.is_some());
2697 let state = state.unwrap();
2698 assert_eq!(state["name"], "Test User");
2699 assert_eq!(state["age"], 30);
2700 }
2701
2702 #[tokio::test]
2703 async fn test_projection_manager_list_projections() {
2704 let store = create_test_store();
2705
2706 let projection_manager = store.projection_manager();
2708 let projections = projection_manager.list_projections();
2709
2710 assert!(projections.len() >= 2);
2712
2713 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2714 assert!(names.contains(&"entity_snapshots"));
2715 assert!(names.contains(&"event_counters"));
2716 }
2717
2718 #[tokio::test]
2719 async fn test_projection_state_after_event_ingestion() {
2720 let store = create_test_store();
2721
2722 let event = create_test_event("user-456", "user.created");
2724 store.ingest(&event).unwrap();
2725
2726 let projection_manager = store.projection_manager();
2728 let snapshot_projection = projection_manager
2729 .get_projection("entity_snapshots")
2730 .unwrap();
2731
2732 let state = snapshot_projection.get_state("user-456");
2733 assert!(state.is_some());
2734 let state = state.unwrap();
2735 assert_eq!(state["name"], "Test");
2736 assert_eq!(state["value"], 42);
2737 }
2738
2739 #[tokio::test]
2740 async fn test_projection_state_cache_multiple_entities() {
2741 let store = create_test_store();
2742 let cache = store.projection_state_cache();
2743
2744 for i in 0..10 {
2746 cache.insert(
2747 format!("entity_snapshots:entity-{i}"),
2748 serde_json::json!({"id": i, "status": "active"}),
2749 );
2750 }
2751
2752 assert_eq!(cache.len(), 10);
2754
2755 for i in 0..10 {
2757 let key = format!("entity_snapshots:entity-{i}");
2758 let state = cache.get(&key);
2759 assert!(state.is_some());
2760 assert_eq!(state.unwrap()["id"], i);
2761 }
2762 }
2763
2764 #[tokio::test]
2765 async fn test_projection_state_update() {
2766 let store = create_test_store();
2767 let cache = store.projection_state_cache();
2768
2769 cache.insert(
2771 "entity_snapshots:user-789".to_string(),
2772 serde_json::json!({"balance": 100}),
2773 );
2774
2775 cache.insert(
2777 "entity_snapshots:user-789".to_string(),
2778 serde_json::json!({"balance": 150}),
2779 );
2780
2781 let state = cache.get("entity_snapshots:user-789").unwrap();
2783 assert_eq!(state["balance"], 150);
2784 }
2785
2786 #[tokio::test]
2787 async fn test_event_counter_projection() {
2788 let store = create_test_store();
2789
2790 store
2792 .ingest(&create_test_event("user-1", "user.created"))
2793 .unwrap();
2794 store
2795 .ingest(&create_test_event("user-2", "user.created"))
2796 .unwrap();
2797 store
2798 .ingest(&create_test_event("user-1", "user.updated"))
2799 .unwrap();
2800
2801 let projection_manager = store.projection_manager();
2803 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2804
2805 let created_state = counter_projection.get_state("user.created");
2807 assert!(created_state.is_some());
2808 assert_eq!(created_state.unwrap()["count"], 2);
2809
2810 let updated_state = counter_projection.get_state("user.updated");
2811 assert!(updated_state.is_some());
2812 assert_eq!(updated_state.unwrap()["count"], 1);
2813 }
2814
2815 #[tokio::test]
2816 async fn test_projection_state_cache_key_format() {
2817 let store = create_test_store();
2818 let cache = store.projection_state_cache();
2819
2820 let key = "orders:order-12345".to_string();
2822 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2823
2824 let state = cache.get(&key).unwrap();
2825 assert_eq!(state["total"], 99.99);
2826 }
2827
2828 #[tokio::test]
2829 async fn test_projection_state_cache_removal() {
2830 let store = create_test_store();
2831 let cache = store.projection_state_cache();
2832
2833 cache.insert(
2835 "test:entity-1".to_string(),
2836 serde_json::json!({"data": "value"}),
2837 );
2838 assert_eq!(cache.len(), 1);
2839
2840 cache.remove("test:entity-1");
2841 assert_eq!(cache.len(), 0);
2842 assert!(cache.get("test:entity-1").is_none());
2843 }
2844
2845 #[tokio::test]
2846 async fn test_get_nonexistent_projection() {
2847 let store = create_test_store();
2848 let projection_manager = store.projection_manager();
2849
2850 let projection = projection_manager.get_projection("nonexistent_projection");
2852 assert!(projection.is_none());
2853 }
2854
2855 #[tokio::test]
2856 async fn test_get_nonexistent_entity_state() {
2857 let store = create_test_store();
2858 let projection_manager = store.projection_manager();
2859
2860 let snapshot_projection = projection_manager
2862 .get_projection("entity_snapshots")
2863 .unwrap();
2864 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2865 assert!(state.is_none());
2866 }
2867
2868 #[tokio::test]
2869 async fn test_projection_state_cache_concurrent_access() {
2870 let store = create_test_store();
2871 let cache = store.projection_state_cache();
2872
2873 let handles: Vec<_> = (0..10)
2875 .map(|i| {
2876 let cache_clone = cache.clone();
2877 tokio::spawn(async move {
2878 cache_clone.insert(
2879 format!("concurrent:entity-{i}"),
2880 serde_json::json!({"thread": i}),
2881 );
2882 })
2883 })
2884 .collect();
2885
2886 for handle in handles {
2887 handle.await.unwrap();
2888 }
2889
2890 assert_eq!(cache.len(), 10);
2892 }
2893
2894 #[tokio::test]
2895 async fn test_projection_state_large_payload() {
2896 let store = create_test_store();
2897 let cache = store.projection_state_cache();
2898
2899 let large_array: Vec<serde_json::Value> = (0..1000)
2901 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2902 .collect();
2903
2904 cache.insert(
2905 "large:entity-1".to_string(),
2906 serde_json::json!({"items": large_array}),
2907 );
2908
2909 let state = cache.get("large:entity-1").unwrap();
2910 let items = state["items"].as_array().unwrap();
2911 assert_eq!(items.len(), 1000);
2912 }
2913
2914 #[tokio::test]
2915 async fn test_projection_state_complex_json() {
2916 let store = create_test_store();
2917 let cache = store.projection_state_cache();
2918
2919 let complex_state = serde_json::json!({
2921 "user": {
2922 "id": "user-123",
2923 "profile": {
2924 "name": "John Doe",
2925 "email": "john@example.com",
2926 "settings": {
2927 "theme": "dark",
2928 "notifications": true
2929 }
2930 },
2931 "roles": ["admin", "user"],
2932 "metadata": {
2933 "created_at": "2025-01-01T00:00:00Z",
2934 "last_login": null
2935 }
2936 }
2937 });
2938
2939 cache.insert("complex:user-123".to_string(), complex_state);
2940
2941 let state = cache.get("complex:user-123").unwrap();
2942 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2943 assert_eq!(state["user"]["roles"][0], "admin");
2944 assert!(state["user"]["metadata"]["last_login"].is_null());
2945 }
2946
2947 #[tokio::test]
2948 async fn test_projection_state_cache_iteration() {
2949 let store = create_test_store();
2950 let cache = store.projection_state_cache();
2951
2952 for i in 0..5 {
2954 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2955 }
2956
2957 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2959 assert_eq!(entries.len(), 5);
2960 }
2961
2962 #[tokio::test]
2963 async fn test_projection_manager_get_entity_snapshots() {
2964 let store = create_test_store();
2965 let projection_manager = store.projection_manager();
2966
2967 let projection = projection_manager.get_projection("entity_snapshots");
2969 assert!(projection.is_some());
2970 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2971 }
2972
2973 #[tokio::test]
2974 async fn test_projection_manager_get_event_counters() {
2975 let store = create_test_store();
2976 let projection_manager = store.projection_manager();
2977
2978 let projection = projection_manager.get_projection("event_counters");
2980 assert!(projection.is_some());
2981 assert_eq!(projection.unwrap().name(), "event_counters");
2982 }
2983
2984 #[tokio::test]
2985 async fn test_projection_state_cache_overwrite() {
2986 let store = create_test_store();
2987 let cache = store.projection_state_cache();
2988
2989 cache.insert(
2991 "overwrite:entity-1".to_string(),
2992 serde_json::json!({"version": 1}),
2993 );
2994
2995 cache.insert(
2997 "overwrite:entity-1".to_string(),
2998 serde_json::json!({"version": 2}),
2999 );
3000
3001 cache.insert(
3003 "overwrite:entity-1".to_string(),
3004 serde_json::json!({"version": 3}),
3005 );
3006
3007 let state = cache.get("overwrite:entity-1").unwrap();
3008 assert_eq!(state["version"], 3);
3009
3010 assert_eq!(cache.len(), 1);
3012 }
3013
3014 #[tokio::test]
3015 async fn test_projection_state_multiple_projections() {
3016 let store = create_test_store();
3017 let cache = store.projection_state_cache();
3018
3019 cache.insert(
3021 "entity_snapshots:user-1".to_string(),
3022 serde_json::json!({"name": "Alice"}),
3023 );
3024 cache.insert(
3025 "event_counters:user.created".to_string(),
3026 serde_json::json!({"count": 5}),
3027 );
3028 cache.insert(
3029 "custom_projection:order-1".to_string(),
3030 serde_json::json!({"total": 150.0}),
3031 );
3032
3033 assert_eq!(
3035 cache.get("entity_snapshots:user-1").unwrap()["name"],
3036 "Alice"
3037 );
3038 assert_eq!(
3039 cache.get("event_counters:user.created").unwrap()["count"],
3040 5
3041 );
3042 assert_eq!(
3043 cache.get("custom_projection:order-1").unwrap()["total"],
3044 150.0
3045 );
3046 }
3047
3048 #[tokio::test]
3049 async fn test_bulk_projection_state_access() {
3050 let store = create_test_store();
3051
3052 for i in 0..5 {
3054 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3055 store.ingest(&event).unwrap();
3056 }
3057
3058 let projection_manager = store.projection_manager();
3060 let snapshot_projection = projection_manager
3061 .get_projection("entity_snapshots")
3062 .unwrap();
3063
3064 for i in 0..5 {
3066 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3067 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3068 }
3069 }
3070
3071 #[tokio::test]
3072 async fn test_bulk_save_projection_states() {
3073 let store = create_test_store();
3074 let cache = store.projection_state_cache();
3075
3076 let states = vec![
3078 BulkSaveStateItem {
3079 entity_id: "bulk-entity-1".to_string(),
3080 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3081 },
3082 BulkSaveStateItem {
3083 entity_id: "bulk-entity-2".to_string(),
3084 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3085 },
3086 BulkSaveStateItem {
3087 entity_id: "bulk-entity-3".to_string(),
3088 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3089 },
3090 ];
3091
3092 let projection_name = "test_projection";
3093
3094 for item in &states {
3096 cache.insert(
3097 format!("{projection_name}:{}", item.entity_id),
3098 item.state.clone(),
3099 );
3100 }
3101
3102 assert_eq!(cache.len(), 3);
3104
3105 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3106 assert_eq!(state1["name"], "Entity 1");
3107 assert_eq!(state1["value"], 100);
3108
3109 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3110 assert_eq!(state2["name"], "Entity 2");
3111 assert_eq!(state2["value"], 200);
3112
3113 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3114 assert_eq!(state3["name"], "Entity 3");
3115 assert_eq!(state3["value"], 300);
3116 }
3117
3118 #[tokio::test]
3119 async fn test_bulk_save_empty_states() {
3120 let store = create_test_store();
3121 let cache = store.projection_state_cache();
3122
3123 cache.clear();
3125
3126 let states: Vec<BulkSaveStateItem> = vec![];
3128 assert_eq!(states.len(), 0);
3129
3130 assert_eq!(cache.len(), 0);
3132 }
3133
3134 #[tokio::test]
3135 async fn test_bulk_save_overwrites_existing() {
3136 let store = create_test_store();
3137 let cache = store.projection_state_cache();
3138
3139 cache.insert(
3141 "test:entity-1".to_string(),
3142 serde_json::json!({"version": 1, "data": "initial"}),
3143 );
3144
3145 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3147 cache.insert("test:entity-1".to_string(), new_state);
3148
3149 let state = cache.get("test:entity-1").unwrap();
3151 assert_eq!(state["version"], 2);
3152 assert_eq!(state["data"], "updated");
3153 }
3154
3155 #[tokio::test]
3156 async fn test_bulk_save_high_volume() {
3157 let store = create_test_store();
3158 let cache = store.projection_state_cache();
3159
3160 for i in 0..1000 {
3162 cache.insert(
3163 format!("volume_test:entity-{i}"),
3164 serde_json::json!({"index": i, "status": "active"}),
3165 );
3166 }
3167
3168 assert_eq!(cache.len(), 1000);
3170
3171 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3173 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3174 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3175 }
3176
3177 #[tokio::test]
3178 async fn test_bulk_save_different_projections() {
3179 let store = create_test_store();
3180 let cache = store.projection_state_cache();
3181
3182 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3184
3185 for proj in &projections {
3186 for i in 0..5 {
3187 cache.insert(
3188 format!("{proj}:entity-{i}"),
3189 serde_json::json!({"projection": proj, "id": i}),
3190 );
3191 }
3192 }
3193
3194 assert_eq!(cache.len(), 15);
3196
3197 for proj in &projections {
3199 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3200 assert_eq!(state["projection"], *proj);
3201 }
3202 }
3203
3204 #[tokio::test]
3215 async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3216 let store = create_test_store();
3217 store.projection_state_cache().insert(
3218 "assets:BTC".to_string(),
3219 serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3220 );
3221
3222 let resp = get_projection_state(
3223 State(Arc::clone(&store)),
3224 Path(("assets".to_string(), "BTC".to_string())),
3225 )
3226 .await
3227 .expect("should not error when projection is not registered");
3228
3229 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3230 assert_eq!(resp.0["state"]["symbol"], "BTC");
3231 assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3232 }
3233
3234 #[tokio::test]
3235 async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3236 let store = create_test_store();
3237
3238 let resp = get_projection_state(
3239 State(Arc::clone(&store)),
3240 Path(("assets".to_string(), "UNKNOWN".to_string())),
3241 )
3242 .await
3243 .unwrap();
3244
3245 assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3246 assert_eq!(resp.0["state"], serde_json::Value::Null);
3247 }
3248
3249 #[tokio::test]
3250 async fn get_projection_state_registered_wins_over_cache() {
3251 let store = create_test_store();
3252
3253 let event = create_test_event("user-777", "user.created");
3255 store.ingest(&event).unwrap();
3256
3257 store.projection_state_cache().insert(
3259 "entity_snapshots:user-777".to_string(),
3260 serde_json::json!({"stolen": "value"}),
3261 );
3262
3263 let resp = get_projection_state(
3264 State(Arc::clone(&store)),
3265 Path(("entity_snapshots".to_string(), "user-777".to_string())),
3266 )
3267 .await
3268 .unwrap();
3269
3270 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3273 assert!(
3274 resp.0["state"].get("stolen").is_none(),
3275 "cache entry must not shadow registered projection state: got {:?}",
3276 resp.0["state"]
3277 );
3278 }
3279
3280 #[tokio::test]
3281 async fn get_projection_state_summary_returns_cache_without_registration() {
3282 let store = create_test_store();
3283 let cache = store.projection_state_cache();
3284 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3285 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3286 cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3288
3289 let resp =
3290 get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3291 .await
3292 .unwrap();
3293
3294 assert_eq!(resp.0["total"], 2);
3295 let states = resp.0["states"].as_array().unwrap();
3296 let entity_ids: Vec<&str> = states
3297 .iter()
3298 .map(|s| s["entity_id"].as_str().unwrap())
3299 .collect();
3300 assert!(entity_ids.contains(&"BTC"));
3301 assert!(entity_ids.contains(&"ETH"));
3302 }
3303
3304 #[tokio::test]
3305 async fn bulk_get_projection_states_falls_back_to_cache() {
3306 let store = create_test_store();
3307 let cache = store.projection_state_cache();
3308 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3309 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3310
3311 let req = BulkGetStateRequest {
3312 entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3313 };
3314
3315 let resp = bulk_get_projection_states(
3316 State(Arc::clone(&store)),
3317 Path("assets".to_string()),
3318 Json(req),
3319 )
3320 .await
3321 .unwrap();
3322
3323 assert_eq!(resp.0["total"], 3);
3324 let states = resp.0["states"].as_array().unwrap();
3325 let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3326 .iter()
3327 .map(|s| (s["entity_id"].as_str().unwrap(), s))
3328 .collect();
3329
3330 assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3331 assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3332 assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3333 assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3334 }
3335}