1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4 application::{
5 dto::{
6 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11 },
12 services::{
13 analytics::{
14 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16 },
17 pipeline::{PipelineConfig, PipelineStats},
18 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19 schema::{
20 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21 ValidateEventRequest, ValidateEventResponse,
22 },
23 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24 },
25 },
26 domain::entities::Event,
27 error::Result,
28 infrastructure::{
29 persistence::{
30 compaction::CompactionResult,
31 snapshot::{
32 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33 ListSnapshotsResponse, SnapshotInfo,
34 },
35 },
36 query::{
37 geospatial::GeoQueryRequest,
38 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39 },
40 security::middleware::OptionalAuth,
41 web::api_v1::AppState,
42 },
43 store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46 Json, Router,
47 extract::{Path, Query, State, WebSocketUpgrade},
48 response::{IntoResponse, Response},
49 routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67 let shipper_guard = state.wal_shipper.read().await;
68 if let Some(ref shipper) = *shipper_guard {
69 let mode = shipper.replication_mode();
70 if mode == ReplicationMode::Async {
71 return;
72 }
73
74 let target_offset = shipper.current_leader_offset();
75 if target_offset == 0 {
76 return;
77 }
78
79 let shipper = Arc::clone(shipper);
80 drop(shipper_guard);
82
83 let timer = state
84 .store
85 .metrics()
86 .replication_ack_wait_seconds
87 .start_timer();
88 let acked = shipper.wait_for_ack(target_offset).await;
89 timer.observe_duration();
90
91 if !acked {
92 tracing::warn!(
93 "Replication ACK timeout in {} mode (offset {}). \
94 Write succeeded locally but follower confirmation pending.",
95 mode,
96 target_offset,
97 );
98 }
99 }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103 }
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107 let app = Router::new()
108 .route("/health", get(health))
109 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
111 .route("/api/v1/events/batch", post(ingest_events_batch))
112 .route("/api/v1/events/query", get(query_events))
113 .route("/api/v1/events/{event_id}", get(get_event_by_id))
114 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
117 .route("/api/v1/event-types", get(list_event_types))
118 .route("/api/v1/entities/duplicates", get(detect_duplicates))
119 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120 .route(
121 "/api/v1/entities/{entity_id}/snapshot",
122 get(get_entity_snapshot),
123 )
124 .route("/api/v1/stats", get(get_stats))
125 .route("/api/v1/analytics/frequency", get(analytics_frequency))
127 .route("/api/v1/analytics/summary", get(analytics_summary))
128 .route("/api/v1/analytics/correlation", get(analytics_correlation))
129 .route("/api/v1/snapshots", post(create_snapshot))
131 .route("/api/v1/snapshots", get(list_snapshots))
132 .route(
133 "/api/v1/snapshots/{entity_id}/latest",
134 get(get_latest_snapshot),
135 )
136 .route("/api/v1/compaction/trigger", post(trigger_compaction))
138 .route("/api/v1/compaction/stats", get(compaction_stats))
139 .route("/api/v1/schemas", post(register_schema))
141 .route("/api/v1/schemas", get(list_subjects))
142 .route("/api/v1/schemas/{subject}", get(get_schema))
143 .route(
144 "/api/v1/schemas/{subject}/versions",
145 get(list_schema_versions),
146 )
147 .route("/api/v1/schemas/validate", post(validate_event_schema))
148 .route(
149 "/api/v1/schemas/{subject}/compatibility",
150 put(set_compatibility_mode),
151 )
152 .route("/api/v1/replay", post(start_replay))
154 .route("/api/v1/replay", get(list_replays))
155 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157 .route(
158 "/api/v1/replay/{replay_id}",
159 axum::routing::delete(delete_replay),
160 )
161 .route("/api/v1/pipelines", post(register_pipeline))
163 .route("/api/v1/pipelines", get(list_pipelines))
164 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166 .route(
167 "/api/v1/pipelines/{pipeline_id}",
168 axum::routing::delete(remove_pipeline),
169 )
170 .route(
171 "/api/v1/pipelines/{pipeline_id}/stats",
172 get(get_pipeline_stats),
173 )
174 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175 .route("/api/v1/projections", get(list_projections))
177 .route("/api/v1/projections/{name}", get(get_projection))
178 .route(
179 "/api/v1/projections/{name}",
180 axum::routing::delete(delete_projection),
181 )
182 .route(
183 "/api/v1/projections/{name}/state",
184 get(get_projection_state_summary),
185 )
186 .route("/api/v1/projections/{name}/reset", post(reset_projection))
187 .route(
188 "/api/v1/projections/{name}/{entity_id}/state",
189 get(get_projection_state),
190 )
191 .route(
192 "/api/v1/projections/{name}/{entity_id}/state",
193 post(save_projection_state),
194 )
195 .route(
196 "/api/v1/projections/{name}/{entity_id}/state",
197 put(save_projection_state),
198 )
199 .route(
200 "/api/v1/projections/{name}/bulk",
201 post(bulk_get_projection_states),
202 )
203 .route(
204 "/api/v1/projections/{name}/bulk/save",
205 post(bulk_save_projection_states),
206 )
207 .route("/api/v1/webhooks", post(register_webhook))
209 .route("/api/v1/webhooks", get(list_webhooks))
210 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212 .route(
213 "/api/v1/webhooks/{webhook_id}",
214 axum::routing::delete(delete_webhook),
215 )
216 .route(
217 "/api/v1/webhooks/{webhook_id}/deliveries",
218 get(list_webhook_deliveries),
219 )
220 .route("/api/v1/graphql", post(graphql_query))
222 .route("/api/v1/geospatial/query", post(geo_query))
223 .route("/api/v1/geospatial/stats", get(geo_stats))
224 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225 .route(
226 "/api/v1/schema-evolution/history/{event_type}",
227 get(schema_evolution_history),
228 )
229 .route(
230 "/api/v1/schema-evolution/schema/{event_type}",
231 get(schema_evolution_schema),
232 )
233 .route(
234 "/api/v1/schema-evolution/stats",
235 get(schema_evolution_stats),
236 )
237 .layer(
238 CorsLayer::new()
239 .allow_origin(Any)
240 .allow_methods(Any)
241 .allow_headers(Any),
242 )
243 .layer(TraceLayer::new_for_http())
244 .with_state(store);
245
246 let listener = tokio::net::TcpListener::bind(addr).await?;
247 axum::serve(listener, app).await?;
248
249 Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253 Json(serde_json::json!({
254 "status": "healthy",
255 "service": "allsource-core",
256 "version": env!("CARGO_PKG_VERSION")
257 }))
258}
259
260pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262 let metrics = store.metrics();
263
264 match metrics.encode() {
265 Ok(encoded) => Response::builder()
266 .status(200)
267 .header("Content-Type", "text/plain; version=0.0.4")
268 .body(encoded)
269 .unwrap()
270 .into_response(),
271 Err(e) => Response::builder()
272 .status(500)
273 .body(format!("Error encoding metrics: {e}"))
274 .unwrap()
275 .into_response(),
276 }
277}
278
279pub async fn ingest_event(
280 State(store): State<SharedStore>,
281 Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283 let expected_version = req.expected_version;
284
285 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286 let event = Event::from_strings(
287 req.event_type,
288 req.entity_id,
289 tenant_id,
290 req.payload,
291 req.metadata,
292 )?;
293
294 let event_id = event.id;
295 let timestamp = event.timestamp;
296
297 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299 tracing::info!("Event ingested: {}", event_id);
300
301 Ok(Json(IngestEventResponse {
302 event_id,
303 timestamp,
304 version: Some(new_version),
305 }))
306}
307
308pub async fn ingest_event_v1(
314 State(state): State<AppState>,
315 Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317 let expected_version = req.expected_version;
318
319 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321 let event = Event::from_strings(
322 req.event_type,
323 req.entity_id,
324 tenant_id,
325 req.payload,
326 req.metadata,
327 )?;
328
329 let event_id = event.id;
330 let timestamp = event.timestamp;
331
332 let new_version = state
333 .store
334 .ingest_with_expected_version(&event, expected_version)?;
335
336 await_replication_ack(&state).await;
338
339 tracing::info!("Event ingested: {}", event_id);
340
341 Ok(Json(IngestEventResponse {
342 event_id,
343 timestamp,
344 version: Some(new_version),
345 }))
346}
347
348pub async fn ingest_events_batch(
353 State(store): State<SharedStore>,
354 Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356 let total = req.events.len();
357 let mut ingested_events = Vec::with_capacity(total);
358
359 for event_req in req.events {
360 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361 let expected_version = event_req.expected_version;
362
363 let event = Event::from_strings(
364 event_req.event_type,
365 event_req.entity_id,
366 tenant_id,
367 event_req.payload,
368 event_req.metadata,
369 )?;
370
371 let event_id = event.id;
372 let timestamp = event.timestamp;
373
374 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376 ingested_events.push(IngestEventResponse {
377 event_id,
378 timestamp,
379 version: Some(new_version),
380 });
381 }
382
383 let ingested = ingested_events.len();
384 tracing::info!("Batch ingested {} events", ingested);
385
386 Ok(Json(IngestEventsBatchResponse {
387 total,
388 ingested,
389 events: ingested_events,
390 }))
391}
392
393pub async fn ingest_events_batch_v1(
399 State(state): State<AppState>,
400 Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402 let total = req.events.len();
403 let mut ingested_events = Vec::with_capacity(total);
404
405 for event_req in req.events {
406 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407 let expected_version = event_req.expected_version;
408
409 let event = Event::from_strings(
410 event_req.event_type,
411 event_req.entity_id,
412 tenant_id,
413 event_req.payload,
414 event_req.metadata,
415 )?;
416
417 let event_id = event.id;
418 let timestamp = event.timestamp;
419
420 let new_version = state
421 .store
422 .ingest_with_expected_version(&event, expected_version)?;
423
424 ingested_events.push(IngestEventResponse {
425 event_id,
426 timestamp,
427 version: Some(new_version),
428 });
429 }
430
431 await_replication_ack(&state).await;
433
434 let ingested = ingested_events.len();
435 tracing::info!("Batch ingested {} events", ingested);
436
437 Ok(Json(IngestEventsBatchResponse {
438 total,
439 ingested,
440 events: ingested_events,
441 }))
442}
443
444pub async fn query_events(
445 OptionalAuth(auth): OptionalAuth,
446 Query(req): Query<QueryEventsRequest>,
447 State(store): State<SharedStore>,
448) -> Result<Json<QueryEventsResponse>> {
449 let requested_limit = req.limit;
450 let queried_entity_id = req.entity_id.clone();
451
452 let enforced_tenant = req
460 .tenant_id
461 .clone()
462 .or_else(|| auth.as_ref().map(|a| a.tenant_id().to_string()));
463
464 let unlimited_req = QueryEventsRequest {
466 entity_id: req.entity_id,
467 event_type: req.event_type,
468 tenant_id: enforced_tenant,
469 as_of: req.as_of,
470 since: req.since,
471 until: req.until,
472 limit: None,
473 event_type_prefix: req.event_type_prefix,
474 payload_filter: req.payload_filter,
475 };
476 let all_events = store.query(&unlimited_req)?;
477 let total_count = all_events.len();
478
479 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
481 all_events.into_iter().take(limit).collect()
482 } else {
483 all_events
484 };
485
486 let count = limited_events.len();
487 let has_more = count < total_count;
488 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
489
490 let entity_version = queried_entity_id
492 .as_deref()
493 .map(|eid| store.get_entity_version(eid));
494
495 tracing::debug!("Query returned {} events (total: {})", count, total_count);
496
497 Ok(Json(QueryEventsResponse {
498 events,
499 count,
500 total_count,
501 has_more,
502 entity_version,
503 }))
504}
505
506pub async fn list_entities(
507 State(store): State<SharedStore>,
508 Query(req): Query<ListEntitiesRequest>,
509) -> Result<Json<ListEntitiesResponse>> {
510 use std::collections::HashMap;
511
512 let query_req = QueryEventsRequest {
514 entity_id: None,
515 event_type: None,
516 tenant_id: None,
517 as_of: None,
518 since: None,
519 until: None,
520 limit: None,
521 event_type_prefix: req.event_type_prefix,
522 payload_filter: req.payload_filter,
523 };
524 let events = store.query(&query_req)?;
525
526 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
528 for event in &events {
529 entity_map
530 .entry(event.entity_id().to_string())
531 .or_default()
532 .push(event);
533 }
534
535 let mut summaries: Vec<EntitySummary> = entity_map
537 .into_iter()
538 .map(|(entity_id, events)| {
539 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
540 EntitySummary {
541 entity_id,
542 event_count: events.len(),
543 last_event_type: last.event_type_str().to_string(),
544 last_event_at: last.timestamp(),
545 }
546 })
547 .collect();
548 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
549
550 let total = summaries.len();
551
552 let offset = req.offset.unwrap_or(0);
554 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
555 let summaries = if let Some(limit) = req.limit {
556 let has_more = summaries.len() > limit;
557 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
558 return Ok(Json(ListEntitiesResponse {
559 entities: truncated,
560 total,
561 has_more,
562 }));
563 } else {
564 summaries
565 };
566
567 Ok(Json(ListEntitiesResponse {
568 entities: summaries,
569 total,
570 has_more: false,
571 }))
572}
573
574pub async fn detect_duplicates(
575 State(store): State<SharedStore>,
576 Query(req): Query<DetectDuplicatesRequest>,
577) -> Result<Json<DetectDuplicatesResponse>> {
578 use std::collections::HashMap;
579
580 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
581
582 let query_req = QueryEventsRequest {
584 entity_id: None,
585 event_type: None,
586 tenant_id: None,
587 as_of: None,
588 since: None,
589 until: None,
590 limit: None,
591 event_type_prefix: Some(req.event_type_prefix),
592 payload_filter: None,
593 };
594 let events = store.query(&query_req)?;
595
596 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
599 for event in &events {
600 let eid = event.entity_id().to_string();
601 entity_latest
602 .entry(eid)
603 .and_modify(|existing| {
604 if event.timestamp() > existing.timestamp() {
605 *existing = event;
606 }
607 })
608 .or_insert(event);
609 }
610
611 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
613 for (entity_id, event) in &entity_latest {
614 let payload = event.payload();
615 let mut key_parts = serde_json::Map::new();
616 for field in &group_by_fields {
617 let value = payload
618 .get(*field)
619 .cloned()
620 .unwrap_or(serde_json::Value::Null);
621 key_parts.insert((*field).to_string(), value);
622 }
623 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
624 groups.entry(key_str).or_default().push(entity_id.clone());
625 }
626
627 let mut duplicate_groups: Vec<DuplicateGroup> = groups
629 .into_iter()
630 .filter(|(_, ids)| ids.len() > 1)
631 .map(|(key_str, mut ids)| {
632 ids.sort();
633 let key: serde_json::Value =
634 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
635 let count = ids.len();
636 DuplicateGroup {
637 key,
638 entity_ids: ids,
639 count,
640 }
641 })
642 .collect();
643
644 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
646
647 let total = duplicate_groups.len();
648
649 let offset = req.offset.unwrap_or(0);
651 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
652
653 if let Some(limit) = req.limit {
654 let has_more = duplicate_groups.len() > limit;
655 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
656 return Ok(Json(DetectDuplicatesResponse {
657 duplicates: truncated,
658 total,
659 has_more,
660 }));
661 }
662
663 Ok(Json(DetectDuplicatesResponse {
664 duplicates: duplicate_groups,
665 total,
666 has_more: false,
667 }))
668}
669
670#[derive(Deserialize)]
671pub struct EntityStateParams {
672 as_of: Option<chrono::DateTime<chrono::Utc>>,
673}
674
675pub async fn get_entity_state(
676 State(store): State<SharedStore>,
677 Path(entity_id): Path<String>,
678 Query(params): Query<EntityStateParams>,
679) -> Result<Json<serde_json::Value>> {
680 let state = store.reconstruct_state(&entity_id, params.as_of)?;
681
682 tracing::info!("State reconstructed for entity: {}", entity_id);
683
684 Ok(Json(state))
685}
686
687pub async fn get_entity_snapshot(
688 State(store): State<SharedStore>,
689 Path(entity_id): Path<String>,
690) -> Result<Json<serde_json::Value>> {
691 let snapshot = store.get_snapshot(&entity_id)?;
692
693 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
694
695 Ok(Json(snapshot))
696}
697
698pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
699 let stats = store.stats();
700 Json(stats)
701}
702
703#[derive(Debug, Deserialize)]
706pub struct ListStreamsParams {
707 pub limit: Option<usize>,
709 pub offset: Option<usize>,
711}
712
713#[derive(Debug, serde::Serialize)]
715pub struct ListStreamsResponse {
716 pub streams: Vec<StreamInfo>,
717 pub total: usize,
718}
719
720pub async fn list_streams(
721 State(store): State<SharedStore>,
722 Query(params): Query<ListStreamsParams>,
723) -> Json<ListStreamsResponse> {
724 let mut streams = store.list_streams();
725 let total = streams.len();
726
727 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
729
730 if let Some(offset) = params.offset {
732 if offset < streams.len() {
733 streams = streams[offset..].to_vec();
734 } else {
735 streams = vec![];
736 }
737 }
738
739 if let Some(limit) = params.limit {
740 streams.truncate(limit);
741 }
742
743 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
744
745 Json(ListStreamsResponse { streams, total })
746}
747
748#[derive(Debug, Deserialize)]
751pub struct ListEventTypesParams {
752 pub limit: Option<usize>,
754 pub offset: Option<usize>,
756}
757
758#[derive(Debug, serde::Serialize)]
760pub struct ListEventTypesResponse {
761 pub event_types: Vec<EventTypeInfo>,
762 pub total: usize,
763}
764
765pub async fn list_event_types(
766 State(store): State<SharedStore>,
767 Query(params): Query<ListEventTypesParams>,
768) -> Json<ListEventTypesResponse> {
769 let mut event_types = store.list_event_types();
770 let total = event_types.len();
771
772 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
774
775 if let Some(offset) = params.offset {
777 if offset < event_types.len() {
778 event_types = event_types[offset..].to_vec();
779 } else {
780 event_types = vec![];
781 }
782 }
783
784 if let Some(limit) = params.limit {
785 event_types.truncate(limit);
786 }
787
788 tracing::debug!(
789 "Listed {} event types (total: {})",
790 event_types.len(),
791 total
792 );
793
794 Json(ListEventTypesResponse { event_types, total })
795}
796
797#[derive(Debug, Deserialize)]
799pub struct WebSocketParams {
800 pub consumer_id: Option<String>,
801}
802
803pub async fn events_websocket(
804 ws: WebSocketUpgrade,
805 State(store): State<SharedStore>,
806 Query(params): Query<WebSocketParams>,
807) -> Response {
808 let websocket_manager = store.websocket_manager();
809
810 ws.on_upgrade(move |socket| async move {
811 if let Some(consumer_id) = params.consumer_id {
812 websocket_manager
813 .handle_socket_with_consumer(socket, consumer_id, store)
814 .await;
815 } else {
816 websocket_manager.handle_socket(socket).await;
817 }
818 })
819}
820
821pub async fn analytics_frequency(
823 State(store): State<SharedStore>,
824 Query(req): Query<EventFrequencyRequest>,
825) -> Result<Json<EventFrequencyResponse>> {
826 let response = AnalyticsEngine::event_frequency(&store, &req)?;
827
828 tracing::debug!(
829 "Frequency analysis returned {} buckets",
830 response.buckets.len()
831 );
832
833 Ok(Json(response))
834}
835
836pub async fn analytics_summary(
838 State(store): State<SharedStore>,
839 Query(req): Query<StatsSummaryRequest>,
840) -> Result<Json<StatsSummaryResponse>> {
841 let response = AnalyticsEngine::stats_summary(&store, &req)?;
842
843 tracing::debug!(
844 "Stats summary: {} events across {} entities",
845 response.total_events,
846 response.unique_entities
847 );
848
849 Ok(Json(response))
850}
851
852pub async fn analytics_correlation(
854 State(store): State<SharedStore>,
855 Query(req): Query<CorrelationRequest>,
856) -> Result<Json<CorrelationResponse>> {
857 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
858
859 tracing::debug!(
860 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
861 response.correlated_pairs,
862 response.total_a,
863 response.correlation_percentage
864 );
865
866 Ok(Json(response))
867}
868
869pub async fn create_snapshot(
871 State(store): State<SharedStore>,
872 Json(req): Json<CreateSnapshotRequest>,
873) -> Result<Json<CreateSnapshotResponse>> {
874 store.create_snapshot(&req.entity_id)?;
875
876 let snapshot_manager = store.snapshot_manager();
877 let snapshot = snapshot_manager
878 .get_latest_snapshot(&req.entity_id)
879 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
880
881 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
882
883 Ok(Json(CreateSnapshotResponse {
884 snapshot_id: snapshot.id,
885 entity_id: snapshot.entity_id,
886 created_at: snapshot.created_at,
887 event_count: snapshot.event_count,
888 size_bytes: snapshot.metadata.size_bytes,
889 }))
890}
891
892pub async fn list_snapshots(
894 State(store): State<SharedStore>,
895 Query(req): Query<ListSnapshotsRequest>,
896) -> Result<Json<ListSnapshotsResponse>> {
897 let snapshot_manager = store.snapshot_manager();
898
899 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
900 snapshot_manager
901 .get_all_snapshots(&entity_id)
902 .into_iter()
903 .map(SnapshotInfo::from)
904 .collect()
905 } else {
906 let entities = snapshot_manager.list_entities();
908 entities
909 .iter()
910 .flat_map(|entity_id| {
911 snapshot_manager
912 .get_all_snapshots(entity_id)
913 .into_iter()
914 .map(SnapshotInfo::from)
915 })
916 .collect()
917 };
918
919 let total = snapshots.len();
920
921 tracing::debug!("Listed {} snapshots", total);
922
923 Ok(Json(ListSnapshotsResponse { snapshots, total }))
924}
925
926pub async fn get_latest_snapshot(
928 State(store): State<SharedStore>,
929 Path(entity_id): Path<String>,
930) -> Result<Json<serde_json::Value>> {
931 let snapshot_manager = store.snapshot_manager();
932
933 let snapshot = snapshot_manager
934 .get_latest_snapshot(&entity_id)
935 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
936
937 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
938
939 Ok(Json(serde_json::json!({
940 "snapshot_id": snapshot.id,
941 "entity_id": snapshot.entity_id,
942 "created_at": snapshot.created_at,
943 "as_of": snapshot.as_of,
944 "event_count": snapshot.event_count,
945 "size_bytes": snapshot.metadata.size_bytes,
946 "snapshot_type": snapshot.metadata.snapshot_type,
947 "state": snapshot.state
948 })))
949}
950
951pub async fn trigger_compaction(
953 State(store): State<SharedStore>,
954) -> Result<Json<CompactionResult>> {
955 let compaction_manager = store.compaction_manager().ok_or_else(|| {
956 crate::error::AllSourceError::InternalError(
957 "Compaction not enabled (no Parquet storage)".to_string(),
958 )
959 })?;
960
961 tracing::info!("📦 Manual compaction triggered via API");
962
963 let result = compaction_manager.compact_now()?;
964
965 Ok(Json(result))
966}
967
968pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
970 let compaction_manager = store.compaction_manager().ok_or_else(|| {
971 crate::error::AllSourceError::InternalError(
972 "Compaction not enabled (no Parquet storage)".to_string(),
973 )
974 })?;
975
976 let stats = compaction_manager.stats();
977 let config = compaction_manager.config();
978
979 Ok(Json(serde_json::json!({
980 "stats": stats,
981 "config": {
982 "min_files_to_compact": config.min_files_to_compact,
983 "target_file_size": config.target_file_size,
984 "max_file_size": config.max_file_size,
985 "small_file_threshold": config.small_file_threshold,
986 "compaction_interval_seconds": config.compaction_interval_seconds,
987 "auto_compact": config.auto_compact,
988 "strategy": config.strategy
989 }
990 })))
991}
992
993pub async fn register_schema(
995 State(store): State<SharedStore>,
996 Json(req): Json<RegisterSchemaRequest>,
997) -> Result<Json<RegisterSchemaResponse>> {
998 let schema_registry = store.schema_registry();
999
1000 let response =
1001 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
1002
1003 tracing::info!(
1004 "📋 Schema registered: v{} for '{}'",
1005 response.version,
1006 response.subject
1007 );
1008
1009 Ok(Json(response))
1010}
1011
1012#[derive(Deserialize)]
1014pub struct GetSchemaParams {
1015 version: Option<u32>,
1016}
1017
1018pub async fn get_schema(
1019 State(store): State<SharedStore>,
1020 Path(subject): Path<String>,
1021 Query(params): Query<GetSchemaParams>,
1022) -> Result<Json<serde_json::Value>> {
1023 let schema_registry = store.schema_registry();
1024
1025 let schema = schema_registry.get_schema(&subject, params.version)?;
1026
1027 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1028
1029 Ok(Json(serde_json::json!({
1030 "id": schema.id,
1031 "subject": schema.subject,
1032 "version": schema.version,
1033 "schema": schema.schema,
1034 "created_at": schema.created_at,
1035 "description": schema.description,
1036 "tags": schema.tags
1037 })))
1038}
1039
1040pub async fn list_schema_versions(
1042 State(store): State<SharedStore>,
1043 Path(subject): Path<String>,
1044) -> Result<Json<serde_json::Value>> {
1045 let schema_registry = store.schema_registry();
1046
1047 let versions = schema_registry.list_versions(&subject)?;
1048
1049 Ok(Json(serde_json::json!({
1050 "subject": subject,
1051 "versions": versions
1052 })))
1053}
1054
1055pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1057 let schema_registry = store.schema_registry();
1058
1059 let subjects = schema_registry.list_subjects();
1060
1061 Json(serde_json::json!({
1062 "subjects": subjects,
1063 "total": subjects.len()
1064 }))
1065}
1066
1067pub async fn validate_event_schema(
1069 State(store): State<SharedStore>,
1070 Json(req): Json<ValidateEventRequest>,
1071) -> Result<Json<ValidateEventResponse>> {
1072 let schema_registry = store.schema_registry();
1073
1074 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1075
1076 if response.valid {
1077 tracing::debug!(
1078 "✅ Event validated against schema '{}' v{}",
1079 req.subject,
1080 response.schema_version
1081 );
1082 } else {
1083 tracing::warn!(
1084 "❌ Event validation failed for '{}': {:?}",
1085 req.subject,
1086 response.errors
1087 );
1088 }
1089
1090 Ok(Json(response))
1091}
1092
1093#[derive(Deserialize)]
1095pub struct SetCompatibilityRequest {
1096 compatibility: CompatibilityMode,
1097}
1098
1099pub async fn set_compatibility_mode(
1100 State(store): State<SharedStore>,
1101 Path(subject): Path<String>,
1102 Json(req): Json<SetCompatibilityRequest>,
1103) -> Json<serde_json::Value> {
1104 let schema_registry = store.schema_registry();
1105
1106 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1107
1108 tracing::info!(
1109 "🔧 Set compatibility mode for '{}' to {:?}",
1110 subject,
1111 req.compatibility
1112 );
1113
1114 Json(serde_json::json!({
1115 "subject": subject,
1116 "compatibility": req.compatibility
1117 }))
1118}
1119
1120pub async fn start_replay(
1122 State(store): State<SharedStore>,
1123 Json(req): Json<StartReplayRequest>,
1124) -> Result<Json<StartReplayResponse>> {
1125 let replay_manager = store.replay_manager();
1126
1127 let response = replay_manager.start_replay(store, req)?;
1128
1129 tracing::info!(
1130 "🔄 Started replay {} with {} events",
1131 response.replay_id,
1132 response.total_events
1133 );
1134
1135 Ok(Json(response))
1136}
1137
1138pub async fn get_replay_progress(
1140 State(store): State<SharedStore>,
1141 Path(replay_id): Path<uuid::Uuid>,
1142) -> Result<Json<ReplayProgress>> {
1143 let replay_manager = store.replay_manager();
1144
1145 let progress = replay_manager.get_progress(replay_id)?;
1146
1147 Ok(Json(progress))
1148}
1149
1150pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1152 let replay_manager = store.replay_manager();
1153
1154 let replays = replay_manager.list_replays();
1155
1156 Json(serde_json::json!({
1157 "replays": replays,
1158 "total": replays.len()
1159 }))
1160}
1161
1162pub async fn cancel_replay(
1164 State(store): State<SharedStore>,
1165 Path(replay_id): Path<uuid::Uuid>,
1166) -> Result<Json<serde_json::Value>> {
1167 let replay_manager = store.replay_manager();
1168
1169 replay_manager.cancel_replay(replay_id)?;
1170
1171 tracing::info!("🛑 Cancelled replay {}", replay_id);
1172
1173 Ok(Json(serde_json::json!({
1174 "replay_id": replay_id,
1175 "status": "cancelled"
1176 })))
1177}
1178
1179pub async fn delete_replay(
1181 State(store): State<SharedStore>,
1182 Path(replay_id): Path<uuid::Uuid>,
1183) -> Result<Json<serde_json::Value>> {
1184 let replay_manager = store.replay_manager();
1185
1186 let deleted = replay_manager.delete_replay(replay_id)?;
1187
1188 if deleted {
1189 tracing::info!("🗑️ Deleted replay {}", replay_id);
1190 }
1191
1192 Ok(Json(serde_json::json!({
1193 "replay_id": replay_id,
1194 "deleted": deleted
1195 })))
1196}
1197
1198pub async fn register_pipeline(
1200 State(store): State<SharedStore>,
1201 Json(config): Json<PipelineConfig>,
1202) -> Result<Json<serde_json::Value>> {
1203 let pipeline_manager = store.pipeline_manager();
1204
1205 let pipeline_id = pipeline_manager.register(config.clone());
1206
1207 tracing::info!(
1208 "🔀 Pipeline registered: {} (name: {})",
1209 pipeline_id,
1210 config.name
1211 );
1212
1213 Ok(Json(serde_json::json!({
1214 "pipeline_id": pipeline_id,
1215 "name": config.name,
1216 "enabled": config.enabled
1217 })))
1218}
1219
1220pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1222 let pipeline_manager = store.pipeline_manager();
1223
1224 let pipelines = pipeline_manager.list();
1225
1226 tracing::debug!("Listed {} pipelines", pipelines.len());
1227
1228 Json(serde_json::json!({
1229 "pipelines": pipelines,
1230 "total": pipelines.len()
1231 }))
1232}
1233
1234pub async fn get_pipeline(
1236 State(store): State<SharedStore>,
1237 Path(pipeline_id): Path<uuid::Uuid>,
1238) -> Result<Json<PipelineConfig>> {
1239 let pipeline_manager = store.pipeline_manager();
1240
1241 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1242 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1243 })?;
1244
1245 Ok(Json(pipeline.config().clone()))
1246}
1247
1248pub async fn remove_pipeline(
1250 State(store): State<SharedStore>,
1251 Path(pipeline_id): Path<uuid::Uuid>,
1252) -> Result<Json<serde_json::Value>> {
1253 let pipeline_manager = store.pipeline_manager();
1254
1255 let removed = pipeline_manager.remove(pipeline_id);
1256
1257 if removed {
1258 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1259 }
1260
1261 Ok(Json(serde_json::json!({
1262 "pipeline_id": pipeline_id,
1263 "removed": removed
1264 })))
1265}
1266
1267pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1269 let pipeline_manager = store.pipeline_manager();
1270
1271 let stats = pipeline_manager.all_stats();
1272
1273 Json(serde_json::json!({
1274 "stats": stats,
1275 "total": stats.len()
1276 }))
1277}
1278
1279pub async fn get_pipeline_stats(
1281 State(store): State<SharedStore>,
1282 Path(pipeline_id): Path<uuid::Uuid>,
1283) -> Result<Json<PipelineStats>> {
1284 let pipeline_manager = store.pipeline_manager();
1285
1286 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1287 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1288 })?;
1289
1290 Ok(Json(pipeline.stats()))
1291}
1292
1293pub async fn reset_pipeline(
1295 State(store): State<SharedStore>,
1296 Path(pipeline_id): Path<uuid::Uuid>,
1297) -> Result<Json<serde_json::Value>> {
1298 let pipeline_manager = store.pipeline_manager();
1299
1300 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1301 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1302 })?;
1303
1304 pipeline.reset();
1305
1306 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1307
1308 Ok(Json(serde_json::json!({
1309 "pipeline_id": pipeline_id,
1310 "reset": true
1311 })))
1312}
1313
1314pub async fn get_event_by_id(
1320 State(store): State<SharedStore>,
1321 Path(event_id): Path<uuid::Uuid>,
1322) -> Result<Json<serde_json::Value>> {
1323 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1324 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1325 })?;
1326
1327 let dto = EventDto::from(&event);
1328
1329 tracing::debug!("Event retrieved by ID: {}", event_id);
1330
1331 Ok(Json(serde_json::json!({
1332 "event": dto,
1333 "found": true
1334 })))
1335}
1336
1337pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1343 let projection_manager = store.projection_manager();
1344 let status_map = store.projection_status();
1345
1346 let projections: Vec<serde_json::Value> = projection_manager
1347 .list_projections()
1348 .iter()
1349 .map(|(name, projection)| {
1350 let status = status_map
1351 .get(name)
1352 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1353 serde_json::json!({
1354 "name": name,
1355 "type": format!("{:?}", projection.name()),
1356 "status": status,
1357 })
1358 })
1359 .collect();
1360
1361 tracing::debug!("Listed {} projections", projections.len());
1362
1363 Json(serde_json::json!({
1364 "projections": projections,
1365 "total": projections.len()
1366 }))
1367}
1368
1369pub async fn get_projection(
1371 State(store): State<SharedStore>,
1372 Path(name): Path<String>,
1373) -> Result<Json<serde_json::Value>> {
1374 let projection_manager = store.projection_manager();
1375
1376 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1377 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1378 })?;
1379
1380 Ok(Json(serde_json::json!({
1381 "name": projection.name(),
1382 "found": true
1383 })))
1384}
1385
1386pub async fn get_projection_state(
1399 State(store): State<SharedStore>,
1400 Path((name, entity_id)): Path<(String, String)>,
1401) -> Result<Json<serde_json::Value>> {
1402 let state = store
1403 .projection_manager()
1404 .get_projection(&name)
1405 .and_then(|p| p.get_state(&entity_id))
1406 .or_else(|| {
1407 store
1408 .projection_state_cache()
1409 .get(&format!("{name}:{entity_id}"))
1410 .map(|entry| entry.value().clone())
1411 });
1412
1413 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1414
1415 Ok(Json(serde_json::json!({
1416 "projection": name,
1417 "entity_id": entity_id,
1418 "state": state,
1419 "found": state.is_some()
1420 })))
1421}
1422
1423pub async fn delete_projection(
1428 State(store): State<SharedStore>,
1429 Path(name): Path<String>,
1430) -> Result<Json<serde_json::Value>> {
1431 let projection_manager = store.projection_manager();
1432
1433 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1434 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1435 })?;
1436
1437 projection.clear();
1438
1439 let cache = store.projection_state_cache();
1441 let prefix = format!("{name}:");
1442 let keys_to_remove: Vec<String> = cache
1443 .iter()
1444 .filter(|entry| entry.key().starts_with(&prefix))
1445 .map(|entry| entry.key().clone())
1446 .collect();
1447 for key in keys_to_remove {
1448 cache.remove(&key);
1449 }
1450
1451 tracing::info!("Projection deleted (cleared): {}", name);
1452
1453 Ok(Json(serde_json::json!({
1454 "projection": name,
1455 "deleted": true
1456 })))
1457}
1458
1459pub async fn get_projection_state_summary(
1468 State(store): State<SharedStore>,
1469 Path(name): Path<String>,
1470) -> Result<Json<serde_json::Value>> {
1471 let cache = store.projection_state_cache();
1472 let prefix = format!("{name}:");
1473 let states: Vec<serde_json::Value> = cache
1474 .iter()
1475 .filter(|entry| entry.key().starts_with(&prefix))
1476 .map(|entry| {
1477 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1478 serde_json::json!({
1479 "entity_id": entity_id,
1480 "state": entry.value().clone()
1481 })
1482 })
1483 .collect();
1484
1485 let total = states.len();
1486
1487 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1488
1489 Ok(Json(serde_json::json!({
1490 "projection": name,
1491 "states": states,
1492 "total": total
1493 })))
1494}
1495
1496pub async fn reset_projection(
1500 State(store): State<SharedStore>,
1501 Path(name): Path<String>,
1502) -> Result<Json<serde_json::Value>> {
1503 let reprocessed = store.reset_projection(&name)?;
1504
1505 tracing::info!(
1506 "Projection reset: {} ({} events reprocessed)",
1507 name,
1508 reprocessed
1509 );
1510
1511 Ok(Json(serde_json::json!({
1512 "projection": name,
1513 "reset": true,
1514 "events_reprocessed": reprocessed
1515 })))
1516}
1517
1518pub async fn pause_projection(
1522 State(store): State<SharedStore>,
1523 Path(name): Path<String>,
1524) -> Result<Json<serde_json::Value>> {
1525 let projection_manager = store.projection_manager();
1526
1527 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1529 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1530 })?;
1531
1532 store
1533 .projection_status()
1534 .insert(name.clone(), "paused".to_string());
1535
1536 tracing::info!("Projection paused: {}", name);
1537
1538 Ok(Json(serde_json::json!({
1539 "projection": name,
1540 "status": "paused"
1541 })))
1542}
1543
1544pub async fn start_projection(
1548 State(store): State<SharedStore>,
1549 Path(name): Path<String>,
1550) -> Result<Json<serde_json::Value>> {
1551 let projection_manager = store.projection_manager();
1552
1553 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1555 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1556 })?;
1557
1558 store
1559 .projection_status()
1560 .insert(name.clone(), "running".to_string());
1561
1562 tracing::info!("Projection started: {}", name);
1563
1564 Ok(Json(serde_json::json!({
1565 "projection": name,
1566 "status": "running"
1567 })))
1568}
1569
1570#[derive(Debug, Deserialize)]
1572pub struct SaveProjectionStateRequest {
1573 pub state: serde_json::Value,
1574}
1575
1576pub async fn save_projection_state(
1581 State(store): State<SharedStore>,
1582 Path((name, entity_id)): Path<(String, String)>,
1583 Json(req): Json<SaveProjectionStateRequest>,
1584) -> Result<Json<serde_json::Value>> {
1585 let projection_cache = store.projection_state_cache();
1586
1587 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1589
1590 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1591
1592 Ok(Json(serde_json::json!({
1593 "projection": name,
1594 "entity_id": entity_id,
1595 "saved": true
1596 })))
1597}
1598
1599#[derive(Debug, Deserialize)]
1603pub struct BulkGetStateRequest {
1604 pub entity_ids: Vec<String>,
1605}
1606
1607#[derive(Debug, Deserialize)]
1611pub struct BulkSaveStateRequest {
1612 pub states: Vec<BulkSaveStateItem>,
1613}
1614
1615#[derive(Debug, Deserialize)]
1616pub struct BulkSaveStateItem {
1617 pub entity_id: String,
1618 pub state: serde_json::Value,
1619}
1620
1621pub async fn bulk_get_projection_states(
1622 State(store): State<SharedStore>,
1623 Path(name): Path<String>,
1624 Json(req): Json<BulkGetStateRequest>,
1625) -> Result<Json<serde_json::Value>> {
1626 let projection = store.projection_manager().get_projection(&name);
1630 let cache = store.projection_state_cache();
1631
1632 let states: Vec<serde_json::Value> = req
1633 .entity_ids
1634 .iter()
1635 .map(|entity_id| {
1636 let state = projection
1637 .as_ref()
1638 .and_then(|p| p.get_state(entity_id))
1639 .or_else(|| {
1640 cache
1641 .get(&format!("{name}:{entity_id}"))
1642 .map(|entry| entry.value().clone())
1643 });
1644 serde_json::json!({
1645 "entity_id": entity_id,
1646 "state": state,
1647 "found": state.is_some()
1648 })
1649 })
1650 .collect();
1651
1652 tracing::debug!(
1653 "Bulk projection state retrieved: {} entities from {}",
1654 states.len(),
1655 name
1656 );
1657
1658 Ok(Json(serde_json::json!({
1659 "projection": name,
1660 "states": states,
1661 "total": states.len()
1662 })))
1663}
1664
1665pub async fn bulk_save_projection_states(
1670 State(store): State<SharedStore>,
1671 Path(name): Path<String>,
1672 Json(req): Json<BulkSaveStateRequest>,
1673) -> Result<Json<serde_json::Value>> {
1674 let projection_cache = store.projection_state_cache();
1675
1676 let mut saved_count = 0;
1677 for item in &req.states {
1678 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1679 saved_count += 1;
1680 }
1681
1682 tracing::info!(
1683 "Bulk projection state saved: {} entities for {}",
1684 saved_count,
1685 name
1686 );
1687
1688 Ok(Json(serde_json::json!({
1689 "projection": name,
1690 "saved": saved_count,
1691 "total": req.states.len()
1692 })))
1693}
1694
1695#[derive(Debug, Deserialize)]
1701pub struct ListWebhooksParams {
1702 pub tenant_id: Option<String>,
1703}
1704
1705pub async fn register_webhook(
1707 State(store): State<SharedStore>,
1708 Json(req): Json<RegisterWebhookRequest>,
1709) -> Json<serde_json::Value> {
1710 let registry = store.webhook_registry();
1711 let webhook = registry.register(req);
1712
1713 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1714
1715 Json(serde_json::json!({
1716 "webhook": webhook,
1717 "created": true
1718 }))
1719}
1720
1721pub async fn list_webhooks(
1723 State(store): State<SharedStore>,
1724 Query(params): Query<ListWebhooksParams>,
1725) -> Json<serde_json::Value> {
1726 let registry = store.webhook_registry();
1727
1728 let webhooks = if let Some(tenant_id) = params.tenant_id {
1729 registry.list_by_tenant(&tenant_id)
1730 } else {
1731 vec![]
1733 };
1734
1735 let total = webhooks.len();
1736
1737 Json(serde_json::json!({
1738 "webhooks": webhooks,
1739 "total": total
1740 }))
1741}
1742
1743pub async fn get_webhook(
1745 State(store): State<SharedStore>,
1746 Path(webhook_id): Path<uuid::Uuid>,
1747) -> Result<Json<serde_json::Value>> {
1748 let registry = store.webhook_registry();
1749
1750 let webhook = registry.get(webhook_id).ok_or_else(|| {
1751 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1752 })?;
1753
1754 Ok(Json(serde_json::json!({
1755 "webhook": webhook,
1756 "found": true
1757 })))
1758}
1759
1760pub async fn update_webhook(
1762 State(store): State<SharedStore>,
1763 Path(webhook_id): Path<uuid::Uuid>,
1764 Json(req): Json<UpdateWebhookRequest>,
1765) -> Result<Json<serde_json::Value>> {
1766 let registry = store.webhook_registry();
1767
1768 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1769 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1770 })?;
1771
1772 tracing::info!("Webhook updated: {}", webhook_id);
1773
1774 Ok(Json(serde_json::json!({
1775 "webhook": webhook,
1776 "updated": true
1777 })))
1778}
1779
1780pub async fn delete_webhook(
1782 State(store): State<SharedStore>,
1783 Path(webhook_id): Path<uuid::Uuid>,
1784) -> Result<Json<serde_json::Value>> {
1785 let registry = store.webhook_registry();
1786
1787 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1788 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1789 })?;
1790
1791 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1792
1793 Ok(Json(serde_json::json!({
1794 "webhook_id": webhook_id,
1795 "deleted": true
1796 })))
1797}
1798
1799#[derive(Debug, Deserialize)]
1801pub struct ListDeliveriesParams {
1802 pub limit: Option<usize>,
1803}
1804
1805pub async fn list_webhook_deliveries(
1807 State(store): State<SharedStore>,
1808 Path(webhook_id): Path<uuid::Uuid>,
1809 Query(params): Query<ListDeliveriesParams>,
1810) -> Result<Json<serde_json::Value>> {
1811 let registry = store.webhook_registry();
1812
1813 registry.get(webhook_id).ok_or_else(|| {
1815 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1816 })?;
1817
1818 let limit = params.limit.unwrap_or(50);
1819 let deliveries = registry.get_deliveries(webhook_id, limit);
1820 let total = deliveries.len();
1821
1822 Ok(Json(serde_json::json!({
1823 "webhook_id": webhook_id,
1824 "deliveries": deliveries,
1825 "total": total
1826 })))
1827}
1828
1829#[cfg(feature = "analytics")]
1835pub async fn eventql_query(
1836 State(store): State<SharedStore>,
1837 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1838) -> Result<Json<serde_json::Value>> {
1839 let events = store.snapshot_events();
1840 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1841 Ok(response) => Ok(Json(serde_json::json!({
1842 "columns": response.columns,
1843 "rows": response.rows,
1844 "row_count": response.row_count,
1845 }))),
1846 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1847 }
1848}
1849
1850pub async fn graphql_query(
1852 State(store): State<SharedStore>,
1853 Json(req): Json<GraphQLRequest>,
1854) -> Json<serde_json::Value> {
1855 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1856 Ok(f) => f,
1857 Err(e) => {
1858 return Json(
1859 serde_json::to_value(GraphQLResponse {
1860 data: None,
1861 errors: vec![GraphQLError { message: e }],
1862 })
1863 .unwrap(),
1864 );
1865 }
1866 };
1867
1868 let mut data = serde_json::Map::new();
1869 let mut errors = Vec::new();
1870
1871 for field in &fields {
1872 match field.name.as_str() {
1873 "events" => {
1874 let request = crate::application::dto::QueryEventsRequest {
1875 entity_id: field.arguments.get("entity_id").cloned(),
1876 event_type: field.arguments.get("event_type").cloned(),
1877 tenant_id: field.arguments.get("tenant_id").cloned(),
1878 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1879 as_of: None,
1880 since: None,
1881 until: None,
1882 event_type_prefix: None,
1883 payload_filter: None,
1884 };
1885 match store.query(&request) {
1886 Ok(events) => {
1887 let json_events: Vec<serde_json::Value> = events
1888 .iter()
1889 .map(|e| {
1890 crate::infrastructure::query::graphql::event_to_json(
1891 e,
1892 &field.fields,
1893 )
1894 })
1895 .collect();
1896 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1897 }
1898 Err(e) => errors.push(GraphQLError {
1899 message: format!("events query failed: {e}"),
1900 }),
1901 }
1902 }
1903 "event" => {
1904 if let Some(id_str) = field.arguments.get("id") {
1905 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1906 match store.get_event_by_id(&id) {
1907 Ok(Some(event)) => {
1908 data.insert(
1909 "event".to_string(),
1910 crate::infrastructure::query::graphql::event_to_json(
1911 &event,
1912 &field.fields,
1913 ),
1914 );
1915 }
1916 Ok(None) => {
1917 data.insert("event".to_string(), serde_json::Value::Null);
1918 }
1919 Err(e) => errors.push(GraphQLError {
1920 message: format!("event lookup failed: {e}"),
1921 }),
1922 }
1923 } else {
1924 errors.push(GraphQLError {
1925 message: format!("Invalid UUID: {id_str}"),
1926 });
1927 }
1928 } else {
1929 errors.push(GraphQLError {
1930 message: "event query requires 'id' argument".to_string(),
1931 });
1932 }
1933 }
1934 "projections" => {
1935 let pm = store.projection_manager();
1936 let names: Vec<serde_json::Value> = pm
1937 .list_projections()
1938 .iter()
1939 .map(|(name, _)| serde_json::Value::String(name.clone()))
1940 .collect();
1941 data.insert("projections".to_string(), serde_json::Value::Array(names));
1942 }
1943 "stats" => {
1944 let stats = store.stats();
1945 data.insert(
1946 "stats".to_string(),
1947 serde_json::json!({
1948 "total_events": stats.total_events,
1949 "total_entities": stats.total_entities,
1950 "total_event_types": stats.total_event_types,
1951 }),
1952 );
1953 }
1954 "__schema" => {
1955 data.insert(
1956 "__schema".to_string(),
1957 crate::infrastructure::query::graphql::introspection_schema(),
1958 );
1959 }
1960 other => {
1961 errors.push(GraphQLError {
1962 message: format!("Unknown field: {other}"),
1963 });
1964 }
1965 }
1966 }
1967
1968 Json(
1969 serde_json::to_value(GraphQLResponse {
1970 data: Some(serde_json::Value::Object(data)),
1971 errors,
1972 })
1973 .unwrap(),
1974 )
1975}
1976
1977pub async fn geo_query(
1979 State(store): State<SharedStore>,
1980 Json(req): Json<GeoQueryRequest>,
1981) -> Json<serde_json::Value> {
1982 let events = store.snapshot_events();
1983 let geo_index = store.geo_index();
1984 let results =
1985 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1986 let total = results.len();
1987 Json(serde_json::json!({
1988 "results": results,
1989 "total": total,
1990 }))
1991}
1992
1993pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1995 let stats = store.geo_index().stats();
1996 Json(serde_json::json!(stats))
1997}
1998
1999pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2001 let stats = store.exactly_once().stats();
2002 Json(serde_json::json!(stats))
2003}
2004
2005pub async fn schema_evolution_history(
2007 State(store): State<SharedStore>,
2008 Path(event_type): Path<String>,
2009) -> Json<serde_json::Value> {
2010 let mgr = store.schema_evolution();
2011 let history = mgr.get_history(&event_type);
2012 let version = mgr.get_version(&event_type);
2013 Json(serde_json::json!({
2014 "event_type": event_type,
2015 "current_version": version,
2016 "history": history,
2017 }))
2018}
2019
2020pub async fn schema_evolution_schema(
2022 State(store): State<SharedStore>,
2023 Path(event_type): Path<String>,
2024) -> Json<serde_json::Value> {
2025 let mgr = store.schema_evolution();
2026 if let Some(schema) = mgr.get_schema(&event_type) {
2027 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2028 Json(serde_json::json!({
2029 "event_type": event_type,
2030 "version": mgr.get_version(&event_type),
2031 "inferred_schema": schema,
2032 "json_schema": json_schema,
2033 }))
2034 } else {
2035 Json(serde_json::json!({
2036 "event_type": event_type,
2037 "error": "No schema inferred for this event type"
2038 }))
2039 }
2040}
2041
2042pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2044 let stats = store.schema_evolution().stats();
2045 let event_types = store.schema_evolution().list_event_types();
2046 Json(serde_json::json!({
2047 "stats": stats,
2048 "tracked_event_types": event_types,
2049 }))
2050}
2051
2052#[cfg(feature = "embedded-sync")]
2058pub async fn sync_pull_handler(
2059 State(state): State<AppState>,
2060 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2061) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2062 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2063
2064 let store = &state.store;
2065
2066 let since = request
2069 .version_vector
2070 .values()
2071 .map(|ts| ts.physical_ms)
2072 .min()
2073 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2074
2075 let events = store.query(&crate::application::dto::QueryEventsRequest {
2076 entity_id: None,
2077 event_type: None,
2078 tenant_id: None,
2079 as_of: None,
2080 since,
2081 until: None,
2082 limit: None,
2083 event_type_prefix: None,
2084 payload_filter: None,
2085 })?;
2086
2087 let mut replicated = Vec::with_capacity(events.len());
2089 let mut last_ms = 0u64;
2090 let mut logical = 0u32;
2091
2092 for event in &events {
2093 let event_ms = event.timestamp().timestamp_millis() as u64;
2094 if event_ms == last_ms {
2095 logical += 1;
2096 } else {
2097 last_ms = event_ms;
2098 logical = 0;
2099 }
2100
2101 replicated.push(ReplicatedEvent {
2102 event_id: event.id().to_string(),
2103 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2104 origin_region: "server".to_string(),
2105 event_data: serde_json::json!({
2106 "event_type": event.event_type_str(),
2107 "entity_id": event.entity_id_str(),
2108 "tenant_id": event.tenant_id_str(),
2109 "payload": event.payload,
2110 "metadata": event.metadata,
2111 }),
2112 });
2113 }
2114
2115 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2116 events: replicated,
2117 version_vector: std::collections::BTreeMap::new(),
2118 }))
2119}
2120
2121#[cfg(feature = "embedded-sync")]
2123pub async fn sync_push_handler(
2124 State(state): State<AppState>,
2125 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2126) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2127 let store = &state.store;
2128
2129 let mut accepted = 0usize;
2130 let mut skipped = 0usize;
2131
2132 for rep_event in &request.events {
2133 let event_data = &rep_event.event_data;
2134 let event_type = event_data
2135 .get("event_type")
2136 .and_then(|v| v.as_str())
2137 .unwrap_or("unknown")
2138 .to_string();
2139 let entity_id = event_data
2140 .get("entity_id")
2141 .and_then(|v| v.as_str())
2142 .unwrap_or("unknown")
2143 .to_string();
2144 let tenant_id = event_data
2145 .get("tenant_id")
2146 .and_then(|v| v.as_str())
2147 .unwrap_or("default")
2148 .to_string();
2149 let payload = event_data
2150 .get("payload")
2151 .cloned()
2152 .unwrap_or(serde_json::json!({}));
2153 let metadata = event_data.get("metadata").cloned();
2154
2155 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2156 Ok(domain_event) => {
2157 store.ingest(&domain_event)?;
2158 accepted += 1;
2159 }
2160 Err(_) => {
2161 skipped += 1;
2162 }
2163 }
2164 }
2165
2166 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2167 accepted,
2168 skipped,
2169 version_vector: std::collections::BTreeMap::new(),
2170 }))
2171}
2172
2173pub async fn register_consumer(
2179 State(store): State<SharedStore>,
2180 Json(req): Json<RegisterConsumerRequest>,
2181) -> Result<Json<ConsumerResponse>> {
2182 let consumer = store
2183 .consumer_registry()
2184 .register(&req.consumer_id, &req.event_type_filters);
2185
2186 Ok(Json(ConsumerResponse {
2187 consumer_id: consumer.consumer_id,
2188 event_type_filters: consumer.event_type_filters,
2189 cursor_position: consumer.cursor_position,
2190 }))
2191}
2192
2193pub async fn get_consumer(
2195 State(store): State<SharedStore>,
2196 Path(consumer_id): Path<String>,
2197) -> Result<Json<ConsumerResponse>> {
2198 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2199
2200 Ok(Json(ConsumerResponse {
2201 consumer_id: consumer.consumer_id,
2202 event_type_filters: consumer.event_type_filters,
2203 cursor_position: consumer.cursor_position,
2204 }))
2205}
2206
2207#[derive(Debug, Deserialize)]
2209pub struct ConsumerPollQuery {
2210 pub limit: Option<usize>,
2211}
2212
2213pub async fn poll_consumer_events(
2214 State(store): State<SharedStore>,
2215 Path(consumer_id): Path<String>,
2216 Query(query): Query<ConsumerPollQuery>,
2217) -> Result<Json<ConsumerEventsResponse>> {
2218 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2219 let offset = consumer.cursor_position.unwrap_or(0);
2220 let limit = query.limit.unwrap_or(100);
2221
2222 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2223 let count = events.len();
2224
2225 let consumer_events: Vec<ConsumerEventDto> = events
2226 .into_iter()
2227 .map(|(position, event)| ConsumerEventDto {
2228 position,
2229 event: EventDto::from(&event),
2230 })
2231 .collect();
2232
2233 Ok(Json(ConsumerEventsResponse {
2234 events: consumer_events,
2235 count,
2236 }))
2237}
2238
2239pub async fn ack_consumer(
2241 State(store): State<SharedStore>,
2242 Path(consumer_id): Path<String>,
2243 Json(req): Json<AckRequest>,
2244) -> Result<Json<serde_json::Value>> {
2245 let max_offset = store.total_events() as u64;
2246
2247 store
2248 .consumer_registry()
2249 .ack(&consumer_id, req.position, max_offset)
2250 .map_err(crate::error::AllSourceError::InvalidInput)?;
2251
2252 Ok(Json(serde_json::json!({
2253 "status": "ok",
2254 "consumer_id": consumer_id,
2255 "position": req.position,
2256 })))
2257}
2258
2259#[cfg(test)]
2260mod tests {
2261 use super::*;
2262 use crate::{domain::entities::Event, store::EventStore};
2263
2264 fn create_test_store() -> Arc<EventStore> {
2265 Arc::new(EventStore::new())
2266 }
2267
2268 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2269 Event::from_strings(
2270 event_type.to_string(),
2271 entity_id.to_string(),
2272 "test-stream".to_string(),
2273 serde_json::json!({
2274 "name": "Test",
2275 "value": 42
2276 }),
2277 None,
2278 )
2279 .unwrap()
2280 }
2281
2282 #[tokio::test]
2283 async fn test_query_events_has_more_and_total_count() {
2284 let store = create_test_store();
2285
2286 for i in 0..50 {
2288 store
2289 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2290 .unwrap();
2291 }
2292
2293 let req = QueryEventsRequest {
2295 entity_id: None,
2296 event_type: None,
2297 tenant_id: None,
2298 as_of: None,
2299 since: None,
2300 until: None,
2301 limit: Some(10),
2302 event_type_prefix: None,
2303 payload_filter: None,
2304 };
2305
2306 let requested_limit = req.limit;
2307 let unlimited_req = QueryEventsRequest {
2308 limit: None,
2309 ..QueryEventsRequest {
2310 entity_id: req.entity_id,
2311 event_type: req.event_type,
2312 tenant_id: req.tenant_id,
2313 as_of: req.as_of,
2314 since: req.since,
2315 until: req.until,
2316 limit: None,
2317 event_type_prefix: req.event_type_prefix,
2318 payload_filter: req.payload_filter,
2319 }
2320 };
2321 let all_events = store.query(&unlimited_req).unwrap();
2322 let total_count = all_events.len();
2323 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2324 all_events.into_iter().take(limit).collect()
2325 } else {
2326 all_events
2327 };
2328 let count = limited_events.len();
2329 let has_more = count < total_count;
2330
2331 assert_eq!(count, 10);
2332 assert_eq!(total_count, 50);
2333 assert!(has_more);
2334 }
2335
2336 #[tokio::test]
2337 async fn test_query_events_no_more_results() {
2338 let store = create_test_store();
2339
2340 for i in 0..5 {
2342 store
2343 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2344 .unwrap();
2345 }
2346
2347 let all_events = store
2349 .query(&QueryEventsRequest {
2350 entity_id: None,
2351 event_type: None,
2352 tenant_id: None,
2353 as_of: None,
2354 since: None,
2355 until: None,
2356 limit: None,
2357 event_type_prefix: None,
2358 payload_filter: None,
2359 })
2360 .unwrap();
2361 let total_count = all_events.len();
2362 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2363 let count = limited_events.len();
2364 let has_more = count < total_count;
2365
2366 assert_eq!(count, 5);
2367 assert_eq!(total_count, 5);
2368 assert!(!has_more);
2369 }
2370
2371 #[tokio::test]
2372 async fn test_list_entities_by_type_prefix() {
2373 let store = create_test_store();
2374
2375 store
2377 .ingest(&create_test_event("idx-1", "index.created"))
2378 .unwrap();
2379 store
2380 .ingest(&create_test_event("idx-1", "index.updated"))
2381 .unwrap();
2382 store
2383 .ingest(&create_test_event("idx-2", "index.created"))
2384 .unwrap();
2385 store
2386 .ingest(&create_test_event("idx-3", "index.created"))
2387 .unwrap();
2388 store
2390 .ingest(&create_test_event("trade-1", "trade.created"))
2391 .unwrap();
2392 store
2393 .ingest(&create_test_event("trade-2", "trade.created"))
2394 .unwrap();
2395
2396 let req = ListEntitiesRequest {
2398 event_type_prefix: Some("index.".to_string()),
2399 payload_filter: None,
2400 limit: None,
2401 offset: None,
2402 };
2403 let query_req = QueryEventsRequest {
2404 entity_id: None,
2405 event_type: None,
2406 tenant_id: None,
2407 as_of: None,
2408 since: None,
2409 until: None,
2410 limit: None,
2411 event_type_prefix: req.event_type_prefix,
2412 payload_filter: req.payload_filter,
2413 };
2414 let events = store.query(&query_req).unwrap();
2415
2416 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2418 std::collections::HashMap::new();
2419 for event in &events {
2420 entity_map
2421 .entry(event.entity_id().to_string())
2422 .or_default()
2423 .push(event);
2424 }
2425
2426 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2429 assert_eq!(entity_map["idx-3"].len(), 1);
2430 }
2431
2432 fn create_test_event_with_payload(
2433 entity_id: &str,
2434 event_type: &str,
2435 payload: serde_json::Value,
2436 ) -> Event {
2437 Event::from_strings(
2438 event_type.to_string(),
2439 entity_id.to_string(),
2440 "test-stream".to_string(),
2441 payload,
2442 None,
2443 )
2444 .unwrap()
2445 }
2446
2447 #[tokio::test]
2448 async fn test_detect_duplicates_by_payload_fields() {
2449 let store = create_test_store();
2450
2451 store
2453 .ingest(&create_test_event_with_payload(
2454 "idx-1",
2455 "index.created",
2456 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2457 ))
2458 .unwrap();
2459 store
2460 .ingest(&create_test_event_with_payload(
2461 "idx-2",
2462 "index.created",
2463 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2464 ))
2465 .unwrap();
2466 store
2467 .ingest(&create_test_event_with_payload(
2468 "idx-3",
2469 "index.created",
2470 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2471 ))
2472 .unwrap();
2473 store
2474 .ingest(&create_test_event_with_payload(
2475 "idx-4",
2476 "index.created",
2477 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2478 ))
2479 .unwrap();
2480 store
2481 .ingest(&create_test_event_with_payload(
2482 "idx-5",
2483 "index.created",
2484 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2485 ))
2486 .unwrap();
2487
2488 let query_req = QueryEventsRequest {
2490 entity_id: None,
2491 event_type: None,
2492 tenant_id: None,
2493 as_of: None,
2494 since: None,
2495 until: None,
2496 limit: None,
2497 event_type_prefix: Some("index.".to_string()),
2498 payload_filter: None,
2499 };
2500 let events = store.query(&query_req).unwrap();
2501
2502 let group_by_fields = vec!["name"];
2504 let mut entity_latest: std::collections::HashMap<String, &Event> =
2505 std::collections::HashMap::new();
2506 for event in &events {
2507 let eid = event.entity_id().to_string();
2508 entity_latest
2509 .entry(eid)
2510 .and_modify(|existing| {
2511 if event.timestamp() > existing.timestamp() {
2512 *existing = event;
2513 }
2514 })
2515 .or_insert(event);
2516 }
2517
2518 let mut groups: std::collections::HashMap<String, Vec<String>> =
2519 std::collections::HashMap::new();
2520 for (entity_id, event) in &entity_latest {
2521 let payload = event.payload();
2522 let mut key_parts = serde_json::Map::new();
2523 for field in &group_by_fields {
2524 let value = payload
2525 .get(*field)
2526 .cloned()
2527 .unwrap_or(serde_json::Value::Null);
2528 key_parts.insert((*field).to_string(), value);
2529 }
2530 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2531 groups.entry(key_str).or_default().push(entity_id.clone());
2532 }
2533
2534 let duplicate_groups: Vec<_> = groups
2535 .into_iter()
2536 .filter(|(_, ids)| ids.len() > 1)
2537 .collect();
2538
2539 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2541 assert_eq!(ids.len(), 2);
2542 }
2543 }
2544
2545 #[tokio::test]
2546 async fn test_detect_duplicates_no_duplicates() {
2547 let store = create_test_store();
2548
2549 store
2551 .ingest(&create_test_event_with_payload(
2552 "idx-1",
2553 "index.created",
2554 serde_json::json!({"name": "A"}),
2555 ))
2556 .unwrap();
2557 store
2558 .ingest(&create_test_event_with_payload(
2559 "idx-2",
2560 "index.created",
2561 serde_json::json!({"name": "B"}),
2562 ))
2563 .unwrap();
2564
2565 let query_req = QueryEventsRequest {
2566 entity_id: None,
2567 event_type: None,
2568 tenant_id: None,
2569 as_of: None,
2570 since: None,
2571 until: None,
2572 limit: None,
2573 event_type_prefix: Some("index.".to_string()),
2574 payload_filter: None,
2575 };
2576 let events = store.query(&query_req).unwrap();
2577
2578 let mut entity_latest: std::collections::HashMap<String, &Event> =
2579 std::collections::HashMap::new();
2580 for event in &events {
2581 entity_latest
2582 .entry(event.entity_id().to_string())
2583 .or_insert(event);
2584 }
2585
2586 let mut groups: std::collections::HashMap<String, Vec<String>> =
2587 std::collections::HashMap::new();
2588 for (entity_id, event) in &entity_latest {
2589 let key_str =
2590 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2591 .unwrap();
2592 groups.entry(key_str).or_default().push(entity_id.clone());
2593 }
2594
2595 let duplicate_groups: Vec<_> = groups
2596 .into_iter()
2597 .filter(|(_, ids)| ids.len() > 1)
2598 .collect();
2599
2600 assert_eq!(duplicate_groups.len(), 0); }
2602
2603 #[tokio::test]
2604 async fn test_detect_duplicates_multi_field_group_by() {
2605 let store = create_test_store();
2606
2607 store
2609 .ingest(&create_test_event_with_payload(
2610 "idx-1",
2611 "index.created",
2612 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2613 ))
2614 .unwrap();
2615 store
2616 .ingest(&create_test_event_with_payload(
2617 "idx-2",
2618 "index.created",
2619 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2620 ))
2621 .unwrap();
2622 store
2624 .ingest(&create_test_event_with_payload(
2625 "idx-3",
2626 "index.created",
2627 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2628 ))
2629 .unwrap();
2630
2631 let query_req = QueryEventsRequest {
2632 entity_id: None,
2633 event_type: None,
2634 tenant_id: None,
2635 as_of: None,
2636 since: None,
2637 until: None,
2638 limit: None,
2639 event_type_prefix: Some("index.".to_string()),
2640 payload_filter: None,
2641 };
2642 let events = store.query(&query_req).unwrap();
2643
2644 let group_by_fields = vec!["name", "user_id"];
2645 let mut entity_latest: std::collections::HashMap<String, &Event> =
2646 std::collections::HashMap::new();
2647 for event in &events {
2648 entity_latest
2649 .entry(event.entity_id().to_string())
2650 .and_modify(|existing| {
2651 if event.timestamp() > existing.timestamp() {
2652 *existing = event;
2653 }
2654 })
2655 .or_insert(event);
2656 }
2657
2658 let mut groups: std::collections::HashMap<String, Vec<String>> =
2659 std::collections::HashMap::new();
2660 for (entity_id, event) in &entity_latest {
2661 let payload = event.payload();
2662 let mut key_parts = serde_json::Map::new();
2663 for field in &group_by_fields {
2664 let value = payload
2665 .get(*field)
2666 .cloned()
2667 .unwrap_or(serde_json::Value::Null);
2668 key_parts.insert((*field).to_string(), value);
2669 }
2670 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2671 groups.entry(key_str).or_default().push(entity_id.clone());
2672 }
2673
2674 let duplicate_groups: Vec<_> = groups
2675 .into_iter()
2676 .filter(|(_, ids)| ids.len() > 1)
2677 .collect();
2678
2679 assert_eq!(duplicate_groups.len(), 1);
2681 let (_, ref ids) = duplicate_groups[0];
2682 assert_eq!(ids.len(), 2);
2683 let mut sorted_ids = ids.clone();
2684 sorted_ids.sort();
2685 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2686 }
2687
2688 #[tokio::test]
2689 async fn test_projection_state_cache() {
2690 let store = create_test_store();
2691
2692 let cache = store.projection_state_cache();
2694 cache.insert(
2695 "entity_snapshots:user-123".to_string(),
2696 serde_json::json!({"name": "Test User", "age": 30}),
2697 );
2698
2699 let state = cache.get("entity_snapshots:user-123");
2701 assert!(state.is_some());
2702 let state = state.unwrap();
2703 assert_eq!(state["name"], "Test User");
2704 assert_eq!(state["age"], 30);
2705 }
2706
2707 #[tokio::test]
2708 async fn test_projection_manager_list_projections() {
2709 let store = create_test_store();
2710
2711 let projection_manager = store.projection_manager();
2713 let projections = projection_manager.list_projections();
2714
2715 assert!(projections.len() >= 2);
2717
2718 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2719 assert!(names.contains(&"entity_snapshots"));
2720 assert!(names.contains(&"event_counters"));
2721 }
2722
2723 #[tokio::test]
2724 async fn test_projection_state_after_event_ingestion() {
2725 let store = create_test_store();
2726
2727 let event = create_test_event("user-456", "user.created");
2729 store.ingest(&event).unwrap();
2730
2731 let projection_manager = store.projection_manager();
2733 let snapshot_projection = projection_manager
2734 .get_projection("entity_snapshots")
2735 .unwrap();
2736
2737 let state = snapshot_projection.get_state("user-456");
2738 assert!(state.is_some());
2739 let state = state.unwrap();
2740 assert_eq!(state["name"], "Test");
2741 assert_eq!(state["value"], 42);
2742 }
2743
2744 #[tokio::test]
2745 async fn test_projection_state_cache_multiple_entities() {
2746 let store = create_test_store();
2747 let cache = store.projection_state_cache();
2748
2749 for i in 0..10 {
2751 cache.insert(
2752 format!("entity_snapshots:entity-{i}"),
2753 serde_json::json!({"id": i, "status": "active"}),
2754 );
2755 }
2756
2757 assert_eq!(cache.len(), 10);
2759
2760 for i in 0..10 {
2762 let key = format!("entity_snapshots:entity-{i}");
2763 let state = cache.get(&key);
2764 assert!(state.is_some());
2765 assert_eq!(state.unwrap()["id"], i);
2766 }
2767 }
2768
2769 #[tokio::test]
2770 async fn test_projection_state_update() {
2771 let store = create_test_store();
2772 let cache = store.projection_state_cache();
2773
2774 cache.insert(
2776 "entity_snapshots:user-789".to_string(),
2777 serde_json::json!({"balance": 100}),
2778 );
2779
2780 cache.insert(
2782 "entity_snapshots:user-789".to_string(),
2783 serde_json::json!({"balance": 150}),
2784 );
2785
2786 let state = cache.get("entity_snapshots:user-789").unwrap();
2788 assert_eq!(state["balance"], 150);
2789 }
2790
2791 #[tokio::test]
2792 async fn test_event_counter_projection() {
2793 let store = create_test_store();
2794
2795 store
2797 .ingest(&create_test_event("user-1", "user.created"))
2798 .unwrap();
2799 store
2800 .ingest(&create_test_event("user-2", "user.created"))
2801 .unwrap();
2802 store
2803 .ingest(&create_test_event("user-1", "user.updated"))
2804 .unwrap();
2805
2806 let projection_manager = store.projection_manager();
2808 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2809
2810 let created_state = counter_projection.get_state("user.created");
2812 assert!(created_state.is_some());
2813 assert_eq!(created_state.unwrap()["count"], 2);
2814
2815 let updated_state = counter_projection.get_state("user.updated");
2816 assert!(updated_state.is_some());
2817 assert_eq!(updated_state.unwrap()["count"], 1);
2818 }
2819
2820 #[tokio::test]
2821 async fn test_projection_state_cache_key_format() {
2822 let store = create_test_store();
2823 let cache = store.projection_state_cache();
2824
2825 let key = "orders:order-12345".to_string();
2827 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2828
2829 let state = cache.get(&key).unwrap();
2830 assert_eq!(state["total"], 99.99);
2831 }
2832
2833 #[tokio::test]
2834 async fn test_projection_state_cache_removal() {
2835 let store = create_test_store();
2836 let cache = store.projection_state_cache();
2837
2838 cache.insert(
2840 "test:entity-1".to_string(),
2841 serde_json::json!({"data": "value"}),
2842 );
2843 assert_eq!(cache.len(), 1);
2844
2845 cache.remove("test:entity-1");
2846 assert_eq!(cache.len(), 0);
2847 assert!(cache.get("test:entity-1").is_none());
2848 }
2849
2850 #[tokio::test]
2851 async fn test_get_nonexistent_projection() {
2852 let store = create_test_store();
2853 let projection_manager = store.projection_manager();
2854
2855 let projection = projection_manager.get_projection("nonexistent_projection");
2857 assert!(projection.is_none());
2858 }
2859
2860 #[tokio::test]
2861 async fn test_get_nonexistent_entity_state() {
2862 let store = create_test_store();
2863 let projection_manager = store.projection_manager();
2864
2865 let snapshot_projection = projection_manager
2867 .get_projection("entity_snapshots")
2868 .unwrap();
2869 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2870 assert!(state.is_none());
2871 }
2872
2873 #[tokio::test]
2874 async fn test_projection_state_cache_concurrent_access() {
2875 let store = create_test_store();
2876 let cache = store.projection_state_cache();
2877
2878 let handles: Vec<_> = (0..10)
2880 .map(|i| {
2881 let cache_clone = cache.clone();
2882 tokio::spawn(async move {
2883 cache_clone.insert(
2884 format!("concurrent:entity-{i}"),
2885 serde_json::json!({"thread": i}),
2886 );
2887 })
2888 })
2889 .collect();
2890
2891 for handle in handles {
2892 handle.await.unwrap();
2893 }
2894
2895 assert_eq!(cache.len(), 10);
2897 }
2898
2899 #[tokio::test]
2900 async fn test_projection_state_large_payload() {
2901 let store = create_test_store();
2902 let cache = store.projection_state_cache();
2903
2904 let large_array: Vec<serde_json::Value> = (0..1000)
2906 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2907 .collect();
2908
2909 cache.insert(
2910 "large:entity-1".to_string(),
2911 serde_json::json!({"items": large_array}),
2912 );
2913
2914 let state = cache.get("large:entity-1").unwrap();
2915 let items = state["items"].as_array().unwrap();
2916 assert_eq!(items.len(), 1000);
2917 }
2918
2919 #[tokio::test]
2920 async fn test_projection_state_complex_json() {
2921 let store = create_test_store();
2922 let cache = store.projection_state_cache();
2923
2924 let complex_state = serde_json::json!({
2926 "user": {
2927 "id": "user-123",
2928 "profile": {
2929 "name": "John Doe",
2930 "email": "john@example.com",
2931 "settings": {
2932 "theme": "dark",
2933 "notifications": true
2934 }
2935 },
2936 "roles": ["admin", "user"],
2937 "metadata": {
2938 "created_at": "2025-01-01T00:00:00Z",
2939 "last_login": null
2940 }
2941 }
2942 });
2943
2944 cache.insert("complex:user-123".to_string(), complex_state);
2945
2946 let state = cache.get("complex:user-123").unwrap();
2947 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2948 assert_eq!(state["user"]["roles"][0], "admin");
2949 assert!(state["user"]["metadata"]["last_login"].is_null());
2950 }
2951
2952 #[tokio::test]
2953 async fn test_projection_state_cache_iteration() {
2954 let store = create_test_store();
2955 let cache = store.projection_state_cache();
2956
2957 for i in 0..5 {
2959 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2960 }
2961
2962 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2964 assert_eq!(entries.len(), 5);
2965 }
2966
2967 #[tokio::test]
2968 async fn test_projection_manager_get_entity_snapshots() {
2969 let store = create_test_store();
2970 let projection_manager = store.projection_manager();
2971
2972 let projection = projection_manager.get_projection("entity_snapshots");
2974 assert!(projection.is_some());
2975 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2976 }
2977
2978 #[tokio::test]
2979 async fn test_projection_manager_get_event_counters() {
2980 let store = create_test_store();
2981 let projection_manager = store.projection_manager();
2982
2983 let projection = projection_manager.get_projection("event_counters");
2985 assert!(projection.is_some());
2986 assert_eq!(projection.unwrap().name(), "event_counters");
2987 }
2988
2989 #[tokio::test]
2990 async fn test_projection_state_cache_overwrite() {
2991 let store = create_test_store();
2992 let cache = store.projection_state_cache();
2993
2994 cache.insert(
2996 "overwrite:entity-1".to_string(),
2997 serde_json::json!({"version": 1}),
2998 );
2999
3000 cache.insert(
3002 "overwrite:entity-1".to_string(),
3003 serde_json::json!({"version": 2}),
3004 );
3005
3006 cache.insert(
3008 "overwrite:entity-1".to_string(),
3009 serde_json::json!({"version": 3}),
3010 );
3011
3012 let state = cache.get("overwrite:entity-1").unwrap();
3013 assert_eq!(state["version"], 3);
3014
3015 assert_eq!(cache.len(), 1);
3017 }
3018
3019 #[tokio::test]
3020 async fn test_projection_state_multiple_projections() {
3021 let store = create_test_store();
3022 let cache = store.projection_state_cache();
3023
3024 cache.insert(
3026 "entity_snapshots:user-1".to_string(),
3027 serde_json::json!({"name": "Alice"}),
3028 );
3029 cache.insert(
3030 "event_counters:user.created".to_string(),
3031 serde_json::json!({"count": 5}),
3032 );
3033 cache.insert(
3034 "custom_projection:order-1".to_string(),
3035 serde_json::json!({"total": 150.0}),
3036 );
3037
3038 assert_eq!(
3040 cache.get("entity_snapshots:user-1").unwrap()["name"],
3041 "Alice"
3042 );
3043 assert_eq!(
3044 cache.get("event_counters:user.created").unwrap()["count"],
3045 5
3046 );
3047 assert_eq!(
3048 cache.get("custom_projection:order-1").unwrap()["total"],
3049 150.0
3050 );
3051 }
3052
3053 #[tokio::test]
3054 async fn test_bulk_projection_state_access() {
3055 let store = create_test_store();
3056
3057 for i in 0..5 {
3059 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3060 store.ingest(&event).unwrap();
3061 }
3062
3063 let projection_manager = store.projection_manager();
3065 let snapshot_projection = projection_manager
3066 .get_projection("entity_snapshots")
3067 .unwrap();
3068
3069 for i in 0..5 {
3071 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3072 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3073 }
3074 }
3075
3076 #[tokio::test]
3077 async fn test_bulk_save_projection_states() {
3078 let store = create_test_store();
3079 let cache = store.projection_state_cache();
3080
3081 let states = vec![
3083 BulkSaveStateItem {
3084 entity_id: "bulk-entity-1".to_string(),
3085 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3086 },
3087 BulkSaveStateItem {
3088 entity_id: "bulk-entity-2".to_string(),
3089 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3090 },
3091 BulkSaveStateItem {
3092 entity_id: "bulk-entity-3".to_string(),
3093 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3094 },
3095 ];
3096
3097 let projection_name = "test_projection";
3098
3099 for item in &states {
3101 cache.insert(
3102 format!("{projection_name}:{}", item.entity_id),
3103 item.state.clone(),
3104 );
3105 }
3106
3107 assert_eq!(cache.len(), 3);
3109
3110 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3111 assert_eq!(state1["name"], "Entity 1");
3112 assert_eq!(state1["value"], 100);
3113
3114 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3115 assert_eq!(state2["name"], "Entity 2");
3116 assert_eq!(state2["value"], 200);
3117
3118 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3119 assert_eq!(state3["name"], "Entity 3");
3120 assert_eq!(state3["value"], 300);
3121 }
3122
3123 #[tokio::test]
3124 async fn test_bulk_save_empty_states() {
3125 let store = create_test_store();
3126 let cache = store.projection_state_cache();
3127
3128 cache.clear();
3130
3131 let states: Vec<BulkSaveStateItem> = vec![];
3133 assert_eq!(states.len(), 0);
3134
3135 assert_eq!(cache.len(), 0);
3137 }
3138
3139 #[tokio::test]
3140 async fn test_bulk_save_overwrites_existing() {
3141 let store = create_test_store();
3142 let cache = store.projection_state_cache();
3143
3144 cache.insert(
3146 "test:entity-1".to_string(),
3147 serde_json::json!({"version": 1, "data": "initial"}),
3148 );
3149
3150 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3152 cache.insert("test:entity-1".to_string(), new_state);
3153
3154 let state = cache.get("test:entity-1").unwrap();
3156 assert_eq!(state["version"], 2);
3157 assert_eq!(state["data"], "updated");
3158 }
3159
3160 #[tokio::test]
3161 async fn test_bulk_save_high_volume() {
3162 let store = create_test_store();
3163 let cache = store.projection_state_cache();
3164
3165 for i in 0..1000 {
3167 cache.insert(
3168 format!("volume_test:entity-{i}"),
3169 serde_json::json!({"index": i, "status": "active"}),
3170 );
3171 }
3172
3173 assert_eq!(cache.len(), 1000);
3175
3176 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3178 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3179 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3180 }
3181
3182 #[tokio::test]
3183 async fn test_bulk_save_different_projections() {
3184 let store = create_test_store();
3185 let cache = store.projection_state_cache();
3186
3187 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3189
3190 for proj in &projections {
3191 for i in 0..5 {
3192 cache.insert(
3193 format!("{proj}:entity-{i}"),
3194 serde_json::json!({"projection": proj, "id": i}),
3195 );
3196 }
3197 }
3198
3199 assert_eq!(cache.len(), 15);
3201
3202 for proj in &projections {
3204 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3205 assert_eq!(state["projection"], *proj);
3206 }
3207 }
3208
3209 #[tokio::test]
3220 async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3221 let store = create_test_store();
3222 store.projection_state_cache().insert(
3223 "assets:BTC".to_string(),
3224 serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3225 );
3226
3227 let resp = get_projection_state(
3228 State(Arc::clone(&store)),
3229 Path(("assets".to_string(), "BTC".to_string())),
3230 )
3231 .await
3232 .expect("should not error when projection is not registered");
3233
3234 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3235 assert_eq!(resp.0["state"]["symbol"], "BTC");
3236 assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3237 }
3238
3239 #[tokio::test]
3240 async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3241 let store = create_test_store();
3242
3243 let resp = get_projection_state(
3244 State(Arc::clone(&store)),
3245 Path(("assets".to_string(), "UNKNOWN".to_string())),
3246 )
3247 .await
3248 .unwrap();
3249
3250 assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3251 assert_eq!(resp.0["state"], serde_json::Value::Null);
3252 }
3253
3254 #[tokio::test]
3255 async fn get_projection_state_registered_wins_over_cache() {
3256 let store = create_test_store();
3257
3258 let event = create_test_event("user-777", "user.created");
3260 store.ingest(&event).unwrap();
3261
3262 store.projection_state_cache().insert(
3264 "entity_snapshots:user-777".to_string(),
3265 serde_json::json!({"stolen": "value"}),
3266 );
3267
3268 let resp = get_projection_state(
3269 State(Arc::clone(&store)),
3270 Path(("entity_snapshots".to_string(), "user-777".to_string())),
3271 )
3272 .await
3273 .unwrap();
3274
3275 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3278 assert!(
3279 resp.0["state"].get("stolen").is_none(),
3280 "cache entry must not shadow registered projection state: got {:?}",
3281 resp.0["state"]
3282 );
3283 }
3284
3285 #[tokio::test]
3286 async fn get_projection_state_summary_returns_cache_without_registration() {
3287 let store = create_test_store();
3288 let cache = store.projection_state_cache();
3289 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3290 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3291 cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3293
3294 let resp =
3295 get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3296 .await
3297 .unwrap();
3298
3299 assert_eq!(resp.0["total"], 2);
3300 let states = resp.0["states"].as_array().unwrap();
3301 let entity_ids: Vec<&str> = states
3302 .iter()
3303 .map(|s| s["entity_id"].as_str().unwrap())
3304 .collect();
3305 assert!(entity_ids.contains(&"BTC"));
3306 assert!(entity_ids.contains(&"ETH"));
3307 }
3308
3309 #[tokio::test]
3310 async fn bulk_get_projection_states_falls_back_to_cache() {
3311 let store = create_test_store();
3312 let cache = store.projection_state_cache();
3313 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3314 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3315
3316 let req = BulkGetStateRequest {
3317 entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3318 };
3319
3320 let resp = bulk_get_projection_states(
3321 State(Arc::clone(&store)),
3322 Path("assets".to_string()),
3323 Json(req),
3324 )
3325 .await
3326 .unwrap();
3327
3328 assert_eq!(resp.0["total"], 3);
3329 let states = resp.0["states"].as_array().unwrap();
3330 let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3331 .iter()
3332 .map(|s| (s["entity_id"].as_str().unwrap(), s))
3333 .collect();
3334
3335 assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3336 assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3337 assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3338 assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3339 }
3340}