1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4 application::{
5 dto::{
6 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11 },
12 services::{
13 analytics::{
14 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16 },
17 pipeline::{PipelineConfig, PipelineStats},
18 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19 schema::{
20 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21 ValidateEventRequest, ValidateEventResponse,
22 },
23 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24 },
25 },
26 domain::entities::Event,
27 error::Result,
28 infrastructure::{
29 persistence::{
30 compaction::CompactionResult,
31 snapshot::{
32 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33 ListSnapshotsResponse, SnapshotInfo,
34 },
35 },
36 query::{
37 geospatial::GeoQueryRequest,
38 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39 },
40 security::middleware::OptionalAuth,
41 web::api_v1::AppState,
42 },
43 store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46 Json, Router,
47 extract::{Path, Query, State, WebSocketUpgrade},
48 response::{IntoResponse, Response},
49 routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67 let shipper_guard = state.wal_shipper.read().await;
68 if let Some(ref shipper) = *shipper_guard {
69 let mode = shipper.replication_mode();
70 if mode == ReplicationMode::Async {
71 return;
72 }
73
74 let target_offset = shipper.current_leader_offset();
75 if target_offset == 0 {
76 return;
77 }
78
79 let shipper = Arc::clone(shipper);
80 drop(shipper_guard);
82
83 let timer = state
84 .store
85 .metrics()
86 .replication_ack_wait_seconds
87 .start_timer();
88 let acked = shipper.wait_for_ack(target_offset).await;
89 timer.observe_duration();
90
91 if !acked {
92 tracing::warn!(
93 "Replication ACK timeout in {} mode (offset {}). \
94 Write succeeded locally but follower confirmation pending.",
95 mode,
96 target_offset,
97 );
98 }
99 }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103 }
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107 let app = Router::new()
108 .route("/health", get(health))
109 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
111 .route("/api/v1/events/batch", post(ingest_events_batch))
112 .route("/api/v1/events/query", get(query_events))
113 .route("/api/v1/events/{event_id}", get(get_event_by_id))
114 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
117 .route("/api/v1/event-types", get(list_event_types))
118 .route("/api/v1/entities/duplicates", get(detect_duplicates))
119 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120 .route(
121 "/api/v1/entities/{entity_id}/snapshot",
122 get(get_entity_snapshot),
123 )
124 .route("/api/v1/stats", get(get_stats))
125 .route("/api/v1/analytics/frequency", get(analytics_frequency))
127 .route("/api/v1/analytics/summary", get(analytics_summary))
128 .route("/api/v1/analytics/correlation", get(analytics_correlation))
129 .route("/api/v1/snapshots", post(create_snapshot))
131 .route("/api/v1/snapshots", get(list_snapshots))
132 .route(
133 "/api/v1/snapshots/{entity_id}/latest",
134 get(get_latest_snapshot),
135 )
136 .route("/api/v1/compaction/trigger", post(trigger_compaction))
138 .route("/api/v1/compaction/stats", get(compaction_stats))
139 .route("/api/v1/schemas", post(register_schema))
141 .route("/api/v1/schemas", get(list_subjects))
142 .route("/api/v1/schemas/{subject}", get(get_schema))
143 .route(
144 "/api/v1/schemas/{subject}/versions",
145 get(list_schema_versions),
146 )
147 .route("/api/v1/schemas/validate", post(validate_event_schema))
148 .route(
149 "/api/v1/schemas/{subject}/compatibility",
150 put(set_compatibility_mode),
151 )
152 .route("/api/v1/replay", post(start_replay))
154 .route("/api/v1/replay", get(list_replays))
155 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157 .route(
158 "/api/v1/replay/{replay_id}",
159 axum::routing::delete(delete_replay),
160 )
161 .route("/api/v1/pipelines", post(register_pipeline))
163 .route("/api/v1/pipelines", get(list_pipelines))
164 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166 .route(
167 "/api/v1/pipelines/{pipeline_id}",
168 axum::routing::delete(remove_pipeline),
169 )
170 .route(
171 "/api/v1/pipelines/{pipeline_id}/stats",
172 get(get_pipeline_stats),
173 )
174 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175 .route("/api/v1/projections", get(list_projections))
177 .route("/api/v1/projections/{name}", get(get_projection))
178 .route(
179 "/api/v1/projections/{name}",
180 axum::routing::delete(delete_projection),
181 )
182 .route(
183 "/api/v1/projections/{name}/state",
184 get(get_projection_state_summary),
185 )
186 .route("/api/v1/projections/{name}/reset", post(reset_projection))
187 .route(
188 "/api/v1/projections/{name}/{entity_id}/state",
189 get(get_projection_state),
190 )
191 .route(
192 "/api/v1/projections/{name}/{entity_id}/state",
193 post(save_projection_state),
194 )
195 .route(
196 "/api/v1/projections/{name}/{entity_id}/state",
197 put(save_projection_state),
198 )
199 .route(
200 "/api/v1/projections/{name}/bulk",
201 post(bulk_get_projection_states),
202 )
203 .route(
204 "/api/v1/projections/{name}/bulk/save",
205 post(bulk_save_projection_states),
206 )
207 .route("/api/v1/webhooks", post(register_webhook))
209 .route("/api/v1/webhooks", get(list_webhooks))
210 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212 .route(
213 "/api/v1/webhooks/{webhook_id}",
214 axum::routing::delete(delete_webhook),
215 )
216 .route(
217 "/api/v1/webhooks/{webhook_id}/deliveries",
218 get(list_webhook_deliveries),
219 )
220 .route("/api/v1/graphql", post(graphql_query))
222 .route("/api/v1/geospatial/query", post(geo_query))
223 .route("/api/v1/geospatial/stats", get(geo_stats))
224 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225 .route(
226 "/api/v1/schema-evolution/history/{event_type}",
227 get(schema_evolution_history),
228 )
229 .route(
230 "/api/v1/schema-evolution/schema/{event_type}",
231 get(schema_evolution_schema),
232 )
233 .route(
234 "/api/v1/schema-evolution/stats",
235 get(schema_evolution_stats),
236 )
237 .layer(
238 CorsLayer::new()
239 .allow_origin(Any)
240 .allow_methods(Any)
241 .allow_headers(Any),
242 )
243 .layer(TraceLayer::new_for_http())
244 .with_state(store);
245
246 let listener = tokio::net::TcpListener::bind(addr).await?;
247 axum::serve(listener, app).await?;
248
249 Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253 Json(serde_json::json!({
254 "status": "healthy",
255 "service": "allsource-core",
256 "version": env!("CARGO_PKG_VERSION")
257 }))
258}
259
260pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262 let metrics = store.metrics();
263
264 match metrics.encode() {
265 Ok(encoded) => Response::builder()
266 .status(200)
267 .header("Content-Type", "text/plain; version=0.0.4")
268 .body(encoded)
269 .unwrap()
270 .into_response(),
271 Err(e) => Response::builder()
272 .status(500)
273 .body(format!("Error encoding metrics: {e}"))
274 .unwrap()
275 .into_response(),
276 }
277}
278
279pub async fn ingest_event(
280 State(store): State<SharedStore>,
281 Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283 let expected_version = req.expected_version;
284
285 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286 let event = Event::from_strings(
287 req.event_type,
288 req.entity_id,
289 tenant_id,
290 req.payload,
291 req.metadata,
292 )?;
293
294 let event_id = event.id;
295 let timestamp = event.timestamp;
296
297 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299 tracing::info!("Event ingested: {}", event_id);
300
301 Ok(Json(IngestEventResponse {
302 event_id,
303 timestamp,
304 version: Some(new_version),
305 }))
306}
307
308pub async fn ingest_event_v1(
314 State(state): State<AppState>,
315 Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317 let expected_version = req.expected_version;
318
319 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321 let event = Event::from_strings(
322 req.event_type,
323 req.entity_id,
324 tenant_id,
325 req.payload,
326 req.metadata,
327 )?;
328
329 let event_id = event.id;
330 let timestamp = event.timestamp;
331
332 let new_version = state
333 .store
334 .ingest_with_expected_version(&event, expected_version)?;
335
336 await_replication_ack(&state).await;
338
339 tracing::info!("Event ingested: {}", event_id);
340
341 Ok(Json(IngestEventResponse {
342 event_id,
343 timestamp,
344 version: Some(new_version),
345 }))
346}
347
348pub async fn ingest_events_batch(
353 State(store): State<SharedStore>,
354 Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356 let total = req.events.len();
357 let mut ingested_events = Vec::with_capacity(total);
358
359 for event_req in req.events {
360 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361 let expected_version = event_req.expected_version;
362
363 let event = Event::from_strings(
364 event_req.event_type,
365 event_req.entity_id,
366 tenant_id,
367 event_req.payload,
368 event_req.metadata,
369 )?;
370
371 let event_id = event.id;
372 let timestamp = event.timestamp;
373
374 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376 ingested_events.push(IngestEventResponse {
377 event_id,
378 timestamp,
379 version: Some(new_version),
380 });
381 }
382
383 let ingested = ingested_events.len();
384 tracing::info!("Batch ingested {} events", ingested);
385
386 Ok(Json(IngestEventsBatchResponse {
387 total,
388 ingested,
389 events: ingested_events,
390 }))
391}
392
393pub async fn ingest_events_batch_v1(
399 State(state): State<AppState>,
400 Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402 let total = req.events.len();
403 let mut ingested_events = Vec::with_capacity(total);
404
405 for event_req in req.events {
406 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407 let expected_version = event_req.expected_version;
408
409 let event = Event::from_strings(
410 event_req.event_type,
411 event_req.entity_id,
412 tenant_id,
413 event_req.payload,
414 event_req.metadata,
415 )?;
416
417 let event_id = event.id;
418 let timestamp = event.timestamp;
419
420 let new_version = state
421 .store
422 .ingest_with_expected_version(&event, expected_version)?;
423
424 ingested_events.push(IngestEventResponse {
425 event_id,
426 timestamp,
427 version: Some(new_version),
428 });
429 }
430
431 await_replication_ack(&state).await;
433
434 let ingested = ingested_events.len();
435 tracing::info!("Batch ingested {} events", ingested);
436
437 Ok(Json(IngestEventsBatchResponse {
438 total,
439 ingested,
440 events: ingested_events,
441 }))
442}
443
444#[derive(Debug, Deserialize)]
448pub struct EventOrderParam {
449 pub order: Option<String>,
451}
452
453pub async fn query_events(
454 OptionalAuth(auth): OptionalAuth,
455 Query(req): Query<QueryEventsRequest>,
456 Query(order_param): Query<EventOrderParam>,
457 State(store): State<SharedStore>,
458) -> Result<Json<QueryEventsResponse>> {
459 let requested_limit = req.limit;
460 let queried_entity_id = req.entity_id.clone();
461
462 let descending = match order_param.order.as_deref() {
467 None => false,
468 Some(o) if o.eq_ignore_ascii_case("asc") => false,
469 Some(o) if o.eq_ignore_ascii_case("desc") => true,
470 Some(other) => {
471 return Err(crate::error::AllSourceError::InvalidInput(format!(
472 "invalid 'order' value '{other}': expected 'asc' or 'desc'"
473 )));
474 }
475 };
476
477 let enforced_tenant = req
485 .tenant_id
486 .clone()
487 .or_else(|| auth.as_ref().map(|a| a.tenant_id().to_string()));
488
489 let unlimited_req = QueryEventsRequest {
491 entity_id: req.entity_id,
492 event_type: req.event_type,
493 tenant_id: enforced_tenant,
494 as_of: req.as_of,
495 since: req.since,
496 until: req.until,
497 limit: None,
498 event_type_prefix: req.event_type_prefix,
499 payload_filter: req.payload_filter,
500 };
501 let mut all_events = store.query(&unlimited_req)?;
502 let total_count = all_events.len();
503
504 if descending {
507 all_events.reverse();
508 }
509
510 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
512 all_events.into_iter().take(limit).collect()
513 } else {
514 all_events
515 };
516
517 let count = limited_events.len();
518 let has_more = count < total_count;
519 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
520
521 let entity_version = queried_entity_id
523 .as_deref()
524 .map(|eid| store.get_entity_version(eid));
525
526 tracing::debug!("Query returned {} events (total: {})", count, total_count);
527
528 Ok(Json(QueryEventsResponse {
529 events,
530 count,
531 total_count,
532 has_more,
533 entity_version,
534 }))
535}
536
537pub async fn list_entities(
538 State(store): State<SharedStore>,
539 Query(req): Query<ListEntitiesRequest>,
540) -> Result<Json<ListEntitiesResponse>> {
541 use std::collections::HashMap;
542
543 let query_req = QueryEventsRequest {
545 entity_id: None,
546 event_type: None,
547 tenant_id: None,
548 as_of: None,
549 since: None,
550 until: None,
551 limit: None,
552 event_type_prefix: req.event_type_prefix,
553 payload_filter: req.payload_filter,
554 };
555 let events = store.query(&query_req)?;
556
557 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
559 for event in &events {
560 entity_map
561 .entry(event.entity_id().to_string())
562 .or_default()
563 .push(event);
564 }
565
566 let mut summaries: Vec<EntitySummary> = entity_map
568 .into_iter()
569 .map(|(entity_id, events)| {
570 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
571 EntitySummary {
572 entity_id,
573 event_count: events.len(),
574 last_event_type: last.event_type_str().to_string(),
575 last_event_at: last.timestamp(),
576 }
577 })
578 .collect();
579 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
580
581 let total = summaries.len();
582
583 let offset = req.offset.unwrap_or(0);
585 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
586 let summaries = if let Some(limit) = req.limit {
587 let has_more = summaries.len() > limit;
588 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
589 return Ok(Json(ListEntitiesResponse {
590 entities: truncated,
591 total,
592 has_more,
593 }));
594 } else {
595 summaries
596 };
597
598 Ok(Json(ListEntitiesResponse {
599 entities: summaries,
600 total,
601 has_more: false,
602 }))
603}
604
605pub async fn detect_duplicates(
606 State(store): State<SharedStore>,
607 Query(req): Query<DetectDuplicatesRequest>,
608) -> Result<Json<DetectDuplicatesResponse>> {
609 use std::collections::HashMap;
610
611 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
612
613 let query_req = QueryEventsRequest {
615 entity_id: None,
616 event_type: None,
617 tenant_id: None,
618 as_of: None,
619 since: None,
620 until: None,
621 limit: None,
622 event_type_prefix: Some(req.event_type_prefix),
623 payload_filter: None,
624 };
625 let events = store.query(&query_req)?;
626
627 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
630 for event in &events {
631 let eid = event.entity_id().to_string();
632 entity_latest
633 .entry(eid)
634 .and_modify(|existing| {
635 if event.timestamp() > existing.timestamp() {
636 *existing = event;
637 }
638 })
639 .or_insert(event);
640 }
641
642 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
644 for (entity_id, event) in &entity_latest {
645 let payload = event.payload();
646 let mut key_parts = serde_json::Map::new();
647 for field in &group_by_fields {
648 let value = payload
649 .get(*field)
650 .cloned()
651 .unwrap_or(serde_json::Value::Null);
652 key_parts.insert((*field).to_string(), value);
653 }
654 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
655 groups.entry(key_str).or_default().push(entity_id.clone());
656 }
657
658 let mut duplicate_groups: Vec<DuplicateGroup> = groups
660 .into_iter()
661 .filter(|(_, ids)| ids.len() > 1)
662 .map(|(key_str, mut ids)| {
663 ids.sort();
664 let key: serde_json::Value =
665 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
666 let count = ids.len();
667 DuplicateGroup {
668 key,
669 entity_ids: ids,
670 count,
671 }
672 })
673 .collect();
674
675 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
677
678 let total = duplicate_groups.len();
679
680 let offset = req.offset.unwrap_or(0);
682 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
683
684 if let Some(limit) = req.limit {
685 let has_more = duplicate_groups.len() > limit;
686 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
687 return Ok(Json(DetectDuplicatesResponse {
688 duplicates: truncated,
689 total,
690 has_more,
691 }));
692 }
693
694 Ok(Json(DetectDuplicatesResponse {
695 duplicates: duplicate_groups,
696 total,
697 has_more: false,
698 }))
699}
700
701#[derive(Deserialize)]
702pub struct EntityStateParams {
703 as_of: Option<chrono::DateTime<chrono::Utc>>,
704}
705
706pub async fn get_entity_state(
707 State(store): State<SharedStore>,
708 Path(entity_id): Path<String>,
709 Query(params): Query<EntityStateParams>,
710) -> Result<Json<serde_json::Value>> {
711 let state = store.reconstruct_state(&entity_id, params.as_of)?;
712
713 tracing::info!("State reconstructed for entity: {}", entity_id);
714
715 Ok(Json(state))
716}
717
718pub async fn get_entity_snapshot(
719 State(store): State<SharedStore>,
720 Path(entity_id): Path<String>,
721) -> Result<Json<serde_json::Value>> {
722 let snapshot = store.get_snapshot(&entity_id)?;
723
724 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
725
726 Ok(Json(snapshot))
727}
728
729pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
730 let stats = store.stats();
731 Json(stats)
732}
733
734#[derive(Debug, Deserialize)]
737pub struct ListStreamsParams {
738 pub limit: Option<usize>,
740 pub offset: Option<usize>,
742}
743
744#[derive(Debug, serde::Serialize)]
746pub struct ListStreamsResponse {
747 pub streams: Vec<StreamInfo>,
748 pub total: usize,
749}
750
751pub async fn list_streams(
752 State(store): State<SharedStore>,
753 Query(params): Query<ListStreamsParams>,
754) -> Json<ListStreamsResponse> {
755 let mut streams = store.list_streams();
756 let total = streams.len();
757
758 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
760
761 if let Some(offset) = params.offset {
763 if offset < streams.len() {
764 streams = streams[offset..].to_vec();
765 } else {
766 streams = vec![];
767 }
768 }
769
770 if let Some(limit) = params.limit {
771 streams.truncate(limit);
772 }
773
774 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
775
776 Json(ListStreamsResponse { streams, total })
777}
778
779#[derive(Debug, Deserialize)]
782pub struct ListEventTypesParams {
783 pub limit: Option<usize>,
785 pub offset: Option<usize>,
787}
788
789#[derive(Debug, serde::Serialize)]
791pub struct ListEventTypesResponse {
792 pub event_types: Vec<EventTypeInfo>,
793 pub total: usize,
794}
795
796pub async fn list_event_types(
797 State(store): State<SharedStore>,
798 Query(params): Query<ListEventTypesParams>,
799) -> Json<ListEventTypesResponse> {
800 let mut event_types = store.list_event_types();
801 let total = event_types.len();
802
803 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
805
806 if let Some(offset) = params.offset {
808 if offset < event_types.len() {
809 event_types = event_types[offset..].to_vec();
810 } else {
811 event_types = vec![];
812 }
813 }
814
815 if let Some(limit) = params.limit {
816 event_types.truncate(limit);
817 }
818
819 tracing::debug!(
820 "Listed {} event types (total: {})",
821 event_types.len(),
822 total
823 );
824
825 Json(ListEventTypesResponse { event_types, total })
826}
827
828#[derive(Debug, Deserialize)]
830pub struct WebSocketParams {
831 pub consumer_id: Option<String>,
832}
833
834pub async fn events_websocket(
835 ws: WebSocketUpgrade,
836 State(store): State<SharedStore>,
837 Query(params): Query<WebSocketParams>,
838) -> Response {
839 let websocket_manager = store.websocket_manager();
840
841 ws.on_upgrade(move |socket| async move {
842 if let Some(consumer_id) = params.consumer_id {
843 websocket_manager
844 .handle_socket_with_consumer(socket, consumer_id, store)
845 .await;
846 } else {
847 websocket_manager.handle_socket(socket).await;
848 }
849 })
850}
851
852pub async fn analytics_frequency(
854 State(store): State<SharedStore>,
855 Query(req): Query<EventFrequencyRequest>,
856) -> Result<Json<EventFrequencyResponse>> {
857 let response = AnalyticsEngine::event_frequency(&store, &req)?;
858
859 tracing::debug!(
860 "Frequency analysis returned {} buckets",
861 response.buckets.len()
862 );
863
864 Ok(Json(response))
865}
866
867pub async fn analytics_summary(
869 State(store): State<SharedStore>,
870 Query(req): Query<StatsSummaryRequest>,
871) -> Result<Json<StatsSummaryResponse>> {
872 let response = AnalyticsEngine::stats_summary(&store, &req)?;
873
874 tracing::debug!(
875 "Stats summary: {} events across {} entities",
876 response.total_events,
877 response.unique_entities
878 );
879
880 Ok(Json(response))
881}
882
883pub async fn analytics_correlation(
885 State(store): State<SharedStore>,
886 Query(req): Query<CorrelationRequest>,
887) -> Result<Json<CorrelationResponse>> {
888 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
889
890 tracing::debug!(
891 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
892 response.correlated_pairs,
893 response.total_a,
894 response.correlation_percentage
895 );
896
897 Ok(Json(response))
898}
899
900pub async fn create_snapshot(
902 State(store): State<SharedStore>,
903 Json(req): Json<CreateSnapshotRequest>,
904) -> Result<Json<CreateSnapshotResponse>> {
905 store.create_snapshot(&req.entity_id)?;
906
907 let snapshot_manager = store.snapshot_manager();
908 let snapshot = snapshot_manager
909 .get_latest_snapshot(&req.entity_id)
910 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
911
912 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
913
914 Ok(Json(CreateSnapshotResponse {
915 snapshot_id: snapshot.id,
916 entity_id: snapshot.entity_id,
917 created_at: snapshot.created_at,
918 event_count: snapshot.event_count,
919 size_bytes: snapshot.metadata.size_bytes,
920 }))
921}
922
923pub async fn list_snapshots(
925 State(store): State<SharedStore>,
926 Query(req): Query<ListSnapshotsRequest>,
927) -> Result<Json<ListSnapshotsResponse>> {
928 let snapshot_manager = store.snapshot_manager();
929
930 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
931 snapshot_manager
932 .get_all_snapshots(&entity_id)
933 .into_iter()
934 .map(SnapshotInfo::from)
935 .collect()
936 } else {
937 let entities = snapshot_manager.list_entities();
939 entities
940 .iter()
941 .flat_map(|entity_id| {
942 snapshot_manager
943 .get_all_snapshots(entity_id)
944 .into_iter()
945 .map(SnapshotInfo::from)
946 })
947 .collect()
948 };
949
950 let total = snapshots.len();
951
952 tracing::debug!("Listed {} snapshots", total);
953
954 Ok(Json(ListSnapshotsResponse { snapshots, total }))
955}
956
957pub async fn get_latest_snapshot(
959 State(store): State<SharedStore>,
960 Path(entity_id): Path<String>,
961) -> Result<Json<serde_json::Value>> {
962 let snapshot_manager = store.snapshot_manager();
963
964 let snapshot = snapshot_manager
965 .get_latest_snapshot(&entity_id)
966 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
967
968 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
969
970 Ok(Json(serde_json::json!({
971 "snapshot_id": snapshot.id,
972 "entity_id": snapshot.entity_id,
973 "created_at": snapshot.created_at,
974 "as_of": snapshot.as_of,
975 "event_count": snapshot.event_count,
976 "size_bytes": snapshot.metadata.size_bytes,
977 "snapshot_type": snapshot.metadata.snapshot_type,
978 "state": snapshot.state
979 })))
980}
981
982pub async fn trigger_compaction(
984 State(store): State<SharedStore>,
985) -> Result<Json<CompactionResult>> {
986 let compaction_manager = store.compaction_manager().ok_or_else(|| {
987 crate::error::AllSourceError::InternalError(
988 "Compaction not enabled (no Parquet storage)".to_string(),
989 )
990 })?;
991
992 tracing::info!("📦 Manual compaction triggered via API");
993
994 let result = compaction_manager.compact_now()?;
995
996 Ok(Json(result))
997}
998
999pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
1001 let compaction_manager = store.compaction_manager().ok_or_else(|| {
1002 crate::error::AllSourceError::InternalError(
1003 "Compaction not enabled (no Parquet storage)".to_string(),
1004 )
1005 })?;
1006
1007 let stats = compaction_manager.stats();
1008 let config = compaction_manager.config();
1009
1010 Ok(Json(serde_json::json!({
1011 "stats": stats,
1012 "config": {
1013 "min_files_to_compact": config.min_files_to_compact,
1014 "target_file_size": config.target_file_size,
1015 "max_file_size": config.max_file_size,
1016 "small_file_threshold": config.small_file_threshold,
1017 "compaction_interval_seconds": config.compaction_interval_seconds,
1018 "auto_compact": config.auto_compact,
1019 "strategy": config.strategy
1020 }
1021 })))
1022}
1023
1024pub async fn register_schema(
1026 State(store): State<SharedStore>,
1027 Json(req): Json<RegisterSchemaRequest>,
1028) -> Result<Json<RegisterSchemaResponse>> {
1029 let schema_registry = store.schema_registry();
1030
1031 let response =
1032 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
1033
1034 tracing::info!(
1035 "📋 Schema registered: v{} for '{}'",
1036 response.version,
1037 response.subject
1038 );
1039
1040 Ok(Json(response))
1041}
1042
1043#[derive(Deserialize)]
1045pub struct GetSchemaParams {
1046 version: Option<u32>,
1047}
1048
1049pub async fn get_schema(
1050 State(store): State<SharedStore>,
1051 Path(subject): Path<String>,
1052 Query(params): Query<GetSchemaParams>,
1053) -> Result<Json<serde_json::Value>> {
1054 let schema_registry = store.schema_registry();
1055
1056 let schema = schema_registry.get_schema(&subject, params.version)?;
1057
1058 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1059
1060 Ok(Json(serde_json::json!({
1061 "id": schema.id,
1062 "subject": schema.subject,
1063 "version": schema.version,
1064 "schema": schema.schema,
1065 "created_at": schema.created_at,
1066 "description": schema.description,
1067 "tags": schema.tags
1068 })))
1069}
1070
1071pub async fn list_schema_versions(
1073 State(store): State<SharedStore>,
1074 Path(subject): Path<String>,
1075) -> Result<Json<serde_json::Value>> {
1076 let schema_registry = store.schema_registry();
1077
1078 let versions = schema_registry.list_versions(&subject)?;
1079
1080 Ok(Json(serde_json::json!({
1081 "subject": subject,
1082 "versions": versions
1083 })))
1084}
1085
1086pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1088 let schema_registry = store.schema_registry();
1089
1090 let subjects = schema_registry.list_subjects();
1091
1092 Json(serde_json::json!({
1093 "subjects": subjects,
1094 "total": subjects.len()
1095 }))
1096}
1097
1098pub async fn validate_event_schema(
1100 State(store): State<SharedStore>,
1101 Json(req): Json<ValidateEventRequest>,
1102) -> Result<Json<ValidateEventResponse>> {
1103 let schema_registry = store.schema_registry();
1104
1105 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1106
1107 if response.valid {
1108 tracing::debug!(
1109 "✅ Event validated against schema '{}' v{}",
1110 req.subject,
1111 response.schema_version
1112 );
1113 } else {
1114 tracing::warn!(
1115 "❌ Event validation failed for '{}': {:?}",
1116 req.subject,
1117 response.errors
1118 );
1119 }
1120
1121 Ok(Json(response))
1122}
1123
1124#[derive(Deserialize)]
1126pub struct SetCompatibilityRequest {
1127 compatibility: CompatibilityMode,
1128}
1129
1130pub async fn set_compatibility_mode(
1131 State(store): State<SharedStore>,
1132 Path(subject): Path<String>,
1133 Json(req): Json<SetCompatibilityRequest>,
1134) -> Json<serde_json::Value> {
1135 let schema_registry = store.schema_registry();
1136
1137 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1138
1139 tracing::info!(
1140 "🔧 Set compatibility mode for '{}' to {:?}",
1141 subject,
1142 req.compatibility
1143 );
1144
1145 Json(serde_json::json!({
1146 "subject": subject,
1147 "compatibility": req.compatibility
1148 }))
1149}
1150
1151pub async fn start_replay(
1153 State(store): State<SharedStore>,
1154 Json(req): Json<StartReplayRequest>,
1155) -> Result<Json<StartReplayResponse>> {
1156 let replay_manager = store.replay_manager();
1157
1158 let response = replay_manager.start_replay(store, req)?;
1159
1160 tracing::info!(
1161 "🔄 Started replay {} with {} events",
1162 response.replay_id,
1163 response.total_events
1164 );
1165
1166 Ok(Json(response))
1167}
1168
1169pub async fn get_replay_progress(
1171 State(store): State<SharedStore>,
1172 Path(replay_id): Path<uuid::Uuid>,
1173) -> Result<Json<ReplayProgress>> {
1174 let replay_manager = store.replay_manager();
1175
1176 let progress = replay_manager.get_progress(replay_id)?;
1177
1178 Ok(Json(progress))
1179}
1180
1181pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1183 let replay_manager = store.replay_manager();
1184
1185 let replays = replay_manager.list_replays();
1186
1187 Json(serde_json::json!({
1188 "replays": replays,
1189 "total": replays.len()
1190 }))
1191}
1192
1193pub async fn cancel_replay(
1195 State(store): State<SharedStore>,
1196 Path(replay_id): Path<uuid::Uuid>,
1197) -> Result<Json<serde_json::Value>> {
1198 let replay_manager = store.replay_manager();
1199
1200 replay_manager.cancel_replay(replay_id)?;
1201
1202 tracing::info!("🛑 Cancelled replay {}", replay_id);
1203
1204 Ok(Json(serde_json::json!({
1205 "replay_id": replay_id,
1206 "status": "cancelled"
1207 })))
1208}
1209
1210pub async fn delete_replay(
1212 State(store): State<SharedStore>,
1213 Path(replay_id): Path<uuid::Uuid>,
1214) -> Result<Json<serde_json::Value>> {
1215 let replay_manager = store.replay_manager();
1216
1217 let deleted = replay_manager.delete_replay(replay_id)?;
1218
1219 if deleted {
1220 tracing::info!("🗑️ Deleted replay {}", replay_id);
1221 }
1222
1223 Ok(Json(serde_json::json!({
1224 "replay_id": replay_id,
1225 "deleted": deleted
1226 })))
1227}
1228
1229pub async fn register_pipeline(
1231 State(store): State<SharedStore>,
1232 Json(config): Json<PipelineConfig>,
1233) -> Result<Json<serde_json::Value>> {
1234 let pipeline_manager = store.pipeline_manager();
1235
1236 let pipeline_id = pipeline_manager.register(config.clone());
1237
1238 tracing::info!(
1239 "🔀 Pipeline registered: {} (name: {})",
1240 pipeline_id,
1241 config.name
1242 );
1243
1244 Ok(Json(serde_json::json!({
1245 "pipeline_id": pipeline_id,
1246 "name": config.name,
1247 "enabled": config.enabled
1248 })))
1249}
1250
1251pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1253 let pipeline_manager = store.pipeline_manager();
1254
1255 let pipelines = pipeline_manager.list();
1256
1257 tracing::debug!("Listed {} pipelines", pipelines.len());
1258
1259 Json(serde_json::json!({
1260 "pipelines": pipelines,
1261 "total": pipelines.len()
1262 }))
1263}
1264
1265pub async fn get_pipeline(
1267 State(store): State<SharedStore>,
1268 Path(pipeline_id): Path<uuid::Uuid>,
1269) -> Result<Json<PipelineConfig>> {
1270 let pipeline_manager = store.pipeline_manager();
1271
1272 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1273 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1274 })?;
1275
1276 Ok(Json(pipeline.config().clone()))
1277}
1278
1279pub async fn remove_pipeline(
1281 State(store): State<SharedStore>,
1282 Path(pipeline_id): Path<uuid::Uuid>,
1283) -> Result<Json<serde_json::Value>> {
1284 let pipeline_manager = store.pipeline_manager();
1285
1286 let removed = pipeline_manager.remove(pipeline_id);
1287
1288 if removed {
1289 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1290 }
1291
1292 Ok(Json(serde_json::json!({
1293 "pipeline_id": pipeline_id,
1294 "removed": removed
1295 })))
1296}
1297
1298pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1300 let pipeline_manager = store.pipeline_manager();
1301
1302 let stats = pipeline_manager.all_stats();
1303
1304 Json(serde_json::json!({
1305 "stats": stats,
1306 "total": stats.len()
1307 }))
1308}
1309
1310pub async fn get_pipeline_stats(
1312 State(store): State<SharedStore>,
1313 Path(pipeline_id): Path<uuid::Uuid>,
1314) -> Result<Json<PipelineStats>> {
1315 let pipeline_manager = store.pipeline_manager();
1316
1317 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1318 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1319 })?;
1320
1321 Ok(Json(pipeline.stats()))
1322}
1323
1324pub async fn reset_pipeline(
1326 State(store): State<SharedStore>,
1327 Path(pipeline_id): Path<uuid::Uuid>,
1328) -> Result<Json<serde_json::Value>> {
1329 let pipeline_manager = store.pipeline_manager();
1330
1331 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1332 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1333 })?;
1334
1335 pipeline.reset();
1336
1337 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1338
1339 Ok(Json(serde_json::json!({
1340 "pipeline_id": pipeline_id,
1341 "reset": true
1342 })))
1343}
1344
1345pub async fn get_event_by_id(
1351 State(store): State<SharedStore>,
1352 Path(event_id): Path<uuid::Uuid>,
1353) -> Result<Json<serde_json::Value>> {
1354 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1355 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1356 })?;
1357
1358 let dto = EventDto::from(&event);
1359
1360 tracing::debug!("Event retrieved by ID: {}", event_id);
1361
1362 Ok(Json(serde_json::json!({
1363 "event": dto,
1364 "found": true
1365 })))
1366}
1367
1368pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1374 let projection_manager = store.projection_manager();
1375 let status_map = store.projection_status();
1376
1377 let projections: Vec<serde_json::Value> = projection_manager
1378 .list_projections()
1379 .iter()
1380 .map(|(name, projection)| {
1381 let status = status_map
1382 .get(name)
1383 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1384 serde_json::json!({
1385 "name": name,
1386 "type": format!("{:?}", projection.name()),
1387 "status": status,
1388 })
1389 })
1390 .collect();
1391
1392 tracing::debug!("Listed {} projections", projections.len());
1393
1394 Json(serde_json::json!({
1395 "projections": projections,
1396 "total": projections.len()
1397 }))
1398}
1399
1400pub async fn get_projection(
1402 State(store): State<SharedStore>,
1403 Path(name): Path<String>,
1404) -> Result<Json<serde_json::Value>> {
1405 let projection_manager = store.projection_manager();
1406
1407 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1408 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1409 })?;
1410
1411 Ok(Json(serde_json::json!({
1412 "name": projection.name(),
1413 "found": true
1414 })))
1415}
1416
1417pub async fn get_projection_state(
1430 State(store): State<SharedStore>,
1431 Path((name, entity_id)): Path<(String, String)>,
1432) -> Result<Json<serde_json::Value>> {
1433 let state = store
1434 .projection_manager()
1435 .get_projection(&name)
1436 .and_then(|p| p.get_state(&entity_id))
1437 .or_else(|| {
1438 store
1439 .projection_state_cache()
1440 .get(&format!("{name}:{entity_id}"))
1441 .map(|entry| entry.value().clone())
1442 });
1443
1444 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1445
1446 Ok(Json(serde_json::json!({
1447 "projection": name,
1448 "entity_id": entity_id,
1449 "state": state,
1450 "found": state.is_some()
1451 })))
1452}
1453
1454pub async fn delete_projection(
1459 State(store): State<SharedStore>,
1460 Path(name): Path<String>,
1461) -> Result<Json<serde_json::Value>> {
1462 let projection_manager = store.projection_manager();
1463
1464 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1465 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1466 })?;
1467
1468 projection.clear();
1469
1470 let cache = store.projection_state_cache();
1472 let prefix = format!("{name}:");
1473 let keys_to_remove: Vec<String> = cache
1474 .iter()
1475 .filter(|entry| entry.key().starts_with(&prefix))
1476 .map(|entry| entry.key().clone())
1477 .collect();
1478 for key in keys_to_remove {
1479 cache.remove(&key);
1480 }
1481
1482 tracing::info!("Projection deleted (cleared): {}", name);
1483
1484 Ok(Json(serde_json::json!({
1485 "projection": name,
1486 "deleted": true
1487 })))
1488}
1489
1490pub async fn get_projection_state_summary(
1499 State(store): State<SharedStore>,
1500 Path(name): Path<String>,
1501) -> Result<Json<serde_json::Value>> {
1502 let cache = store.projection_state_cache();
1503 let prefix = format!("{name}:");
1504 let states: Vec<serde_json::Value> = cache
1505 .iter()
1506 .filter(|entry| entry.key().starts_with(&prefix))
1507 .map(|entry| {
1508 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1509 serde_json::json!({
1510 "entity_id": entity_id,
1511 "state": entry.value().clone()
1512 })
1513 })
1514 .collect();
1515
1516 let total = states.len();
1517
1518 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1519
1520 Ok(Json(serde_json::json!({
1521 "projection": name,
1522 "states": states,
1523 "total": total
1524 })))
1525}
1526
1527pub async fn reset_projection(
1531 State(store): State<SharedStore>,
1532 Path(name): Path<String>,
1533) -> Result<Json<serde_json::Value>> {
1534 let reprocessed = store.reset_projection(&name)?;
1535
1536 tracing::info!(
1537 "Projection reset: {} ({} events reprocessed)",
1538 name,
1539 reprocessed
1540 );
1541
1542 Ok(Json(serde_json::json!({
1543 "projection": name,
1544 "reset": true,
1545 "events_reprocessed": reprocessed
1546 })))
1547}
1548
1549pub async fn pause_projection(
1553 State(store): State<SharedStore>,
1554 Path(name): Path<String>,
1555) -> Result<Json<serde_json::Value>> {
1556 let projection_manager = store.projection_manager();
1557
1558 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1560 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1561 })?;
1562
1563 store
1564 .projection_status()
1565 .insert(name.clone(), "paused".to_string());
1566
1567 tracing::info!("Projection paused: {}", name);
1568
1569 Ok(Json(serde_json::json!({
1570 "projection": name,
1571 "status": "paused"
1572 })))
1573}
1574
1575pub async fn start_projection(
1579 State(store): State<SharedStore>,
1580 Path(name): Path<String>,
1581) -> Result<Json<serde_json::Value>> {
1582 let projection_manager = store.projection_manager();
1583
1584 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1586 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1587 })?;
1588
1589 store
1590 .projection_status()
1591 .insert(name.clone(), "running".to_string());
1592
1593 tracing::info!("Projection started: {}", name);
1594
1595 Ok(Json(serde_json::json!({
1596 "projection": name,
1597 "status": "running"
1598 })))
1599}
1600
1601#[derive(Debug, Deserialize)]
1603pub struct SaveProjectionStateRequest {
1604 pub state: serde_json::Value,
1605}
1606
1607pub async fn save_projection_state(
1612 State(store): State<SharedStore>,
1613 Path((name, entity_id)): Path<(String, String)>,
1614 Json(req): Json<SaveProjectionStateRequest>,
1615) -> Result<Json<serde_json::Value>> {
1616 let projection_cache = store.projection_state_cache();
1617
1618 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1620
1621 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1622
1623 Ok(Json(serde_json::json!({
1624 "projection": name,
1625 "entity_id": entity_id,
1626 "saved": true
1627 })))
1628}
1629
1630#[derive(Debug, Deserialize)]
1634pub struct BulkGetStateRequest {
1635 pub entity_ids: Vec<String>,
1636}
1637
1638#[derive(Debug, Deserialize)]
1642pub struct BulkSaveStateRequest {
1643 pub states: Vec<BulkSaveStateItem>,
1644}
1645
1646#[derive(Debug, Deserialize)]
1647pub struct BulkSaveStateItem {
1648 pub entity_id: String,
1649 pub state: serde_json::Value,
1650}
1651
1652pub async fn bulk_get_projection_states(
1653 State(store): State<SharedStore>,
1654 Path(name): Path<String>,
1655 Json(req): Json<BulkGetStateRequest>,
1656) -> Result<Json<serde_json::Value>> {
1657 let projection = store.projection_manager().get_projection(&name);
1661 let cache = store.projection_state_cache();
1662
1663 let states: Vec<serde_json::Value> = req
1664 .entity_ids
1665 .iter()
1666 .map(|entity_id| {
1667 let state = projection
1668 .as_ref()
1669 .and_then(|p| p.get_state(entity_id))
1670 .or_else(|| {
1671 cache
1672 .get(&format!("{name}:{entity_id}"))
1673 .map(|entry| entry.value().clone())
1674 });
1675 serde_json::json!({
1676 "entity_id": entity_id,
1677 "state": state,
1678 "found": state.is_some()
1679 })
1680 })
1681 .collect();
1682
1683 tracing::debug!(
1684 "Bulk projection state retrieved: {} entities from {}",
1685 states.len(),
1686 name
1687 );
1688
1689 Ok(Json(serde_json::json!({
1690 "projection": name,
1691 "states": states,
1692 "total": states.len()
1693 })))
1694}
1695
1696pub async fn bulk_save_projection_states(
1701 State(store): State<SharedStore>,
1702 Path(name): Path<String>,
1703 Json(req): Json<BulkSaveStateRequest>,
1704) -> Result<Json<serde_json::Value>> {
1705 let projection_cache = store.projection_state_cache();
1706
1707 let mut saved_count = 0;
1708 for item in &req.states {
1709 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1710 saved_count += 1;
1711 }
1712
1713 tracing::info!(
1714 "Bulk projection state saved: {} entities for {}",
1715 saved_count,
1716 name
1717 );
1718
1719 Ok(Json(serde_json::json!({
1720 "projection": name,
1721 "saved": saved_count,
1722 "total": req.states.len()
1723 })))
1724}
1725
1726#[derive(Debug, Deserialize)]
1732pub struct ListWebhooksParams {
1733 pub tenant_id: Option<String>,
1734}
1735
1736pub async fn register_webhook(
1738 State(store): State<SharedStore>,
1739 Json(req): Json<RegisterWebhookRequest>,
1740) -> Json<serde_json::Value> {
1741 let registry = store.webhook_registry();
1742 let webhook = registry.register(req);
1743
1744 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1745
1746 Json(serde_json::json!({
1747 "webhook": webhook,
1748 "created": true
1749 }))
1750}
1751
1752pub async fn list_webhooks(
1754 State(store): State<SharedStore>,
1755 Query(params): Query<ListWebhooksParams>,
1756) -> Json<serde_json::Value> {
1757 let registry = store.webhook_registry();
1758
1759 let webhooks = if let Some(tenant_id) = params.tenant_id {
1760 registry.list_by_tenant(&tenant_id)
1761 } else {
1762 vec![]
1764 };
1765
1766 let total = webhooks.len();
1767
1768 Json(serde_json::json!({
1769 "webhooks": webhooks,
1770 "total": total
1771 }))
1772}
1773
1774pub async fn get_webhook(
1776 State(store): State<SharedStore>,
1777 Path(webhook_id): Path<uuid::Uuid>,
1778) -> Result<Json<serde_json::Value>> {
1779 let registry = store.webhook_registry();
1780
1781 let webhook = registry.get(webhook_id).ok_or_else(|| {
1782 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1783 })?;
1784
1785 Ok(Json(serde_json::json!({
1786 "webhook": webhook,
1787 "found": true
1788 })))
1789}
1790
1791pub async fn update_webhook(
1793 State(store): State<SharedStore>,
1794 Path(webhook_id): Path<uuid::Uuid>,
1795 Json(req): Json<UpdateWebhookRequest>,
1796) -> Result<Json<serde_json::Value>> {
1797 let registry = store.webhook_registry();
1798
1799 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1800 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1801 })?;
1802
1803 tracing::info!("Webhook updated: {}", webhook_id);
1804
1805 Ok(Json(serde_json::json!({
1806 "webhook": webhook,
1807 "updated": true
1808 })))
1809}
1810
1811pub async fn delete_webhook(
1813 State(store): State<SharedStore>,
1814 Path(webhook_id): Path<uuid::Uuid>,
1815) -> Result<Json<serde_json::Value>> {
1816 let registry = store.webhook_registry();
1817
1818 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1819 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1820 })?;
1821
1822 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1823
1824 Ok(Json(serde_json::json!({
1825 "webhook_id": webhook_id,
1826 "deleted": true
1827 })))
1828}
1829
1830#[derive(Debug, Deserialize)]
1832pub struct ListDeliveriesParams {
1833 pub limit: Option<usize>,
1834}
1835
1836pub async fn list_webhook_deliveries(
1838 State(store): State<SharedStore>,
1839 Path(webhook_id): Path<uuid::Uuid>,
1840 Query(params): Query<ListDeliveriesParams>,
1841) -> Result<Json<serde_json::Value>> {
1842 let registry = store.webhook_registry();
1843
1844 registry.get(webhook_id).ok_or_else(|| {
1846 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1847 })?;
1848
1849 let limit = params.limit.unwrap_or(50);
1850 let deliveries = registry.get_deliveries(webhook_id, limit);
1851 let total = deliveries.len();
1852
1853 Ok(Json(serde_json::json!({
1854 "webhook_id": webhook_id,
1855 "deliveries": deliveries,
1856 "total": total
1857 })))
1858}
1859
1860#[cfg(feature = "analytics")]
1866pub async fn eventql_query(
1867 State(store): State<SharedStore>,
1868 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1869) -> Result<Json<serde_json::Value>> {
1870 let events = store.snapshot_events();
1871 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1872 Ok(response) => Ok(Json(serde_json::json!({
1873 "columns": response.columns,
1874 "rows": response.rows,
1875 "row_count": response.row_count,
1876 }))),
1877 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1878 }
1879}
1880
1881pub async fn graphql_query(
1883 State(store): State<SharedStore>,
1884 Json(req): Json<GraphQLRequest>,
1885) -> Json<serde_json::Value> {
1886 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1887 Ok(f) => f,
1888 Err(e) => {
1889 return Json(
1890 serde_json::to_value(GraphQLResponse {
1891 data: None,
1892 errors: vec![GraphQLError { message: e }],
1893 })
1894 .unwrap(),
1895 );
1896 }
1897 };
1898
1899 let mut data = serde_json::Map::new();
1900 let mut errors = Vec::new();
1901
1902 for field in &fields {
1903 match field.name.as_str() {
1904 "events" => {
1905 let request = crate::application::dto::QueryEventsRequest {
1906 entity_id: field.arguments.get("entity_id").cloned(),
1907 event_type: field.arguments.get("event_type").cloned(),
1908 tenant_id: field.arguments.get("tenant_id").cloned(),
1909 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1910 as_of: None,
1911 since: None,
1912 until: None,
1913 event_type_prefix: None,
1914 payload_filter: None,
1915 };
1916 match store.query(&request) {
1917 Ok(events) => {
1918 let json_events: Vec<serde_json::Value> = events
1919 .iter()
1920 .map(|e| {
1921 crate::infrastructure::query::graphql::event_to_json(
1922 e,
1923 &field.fields,
1924 )
1925 })
1926 .collect();
1927 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1928 }
1929 Err(e) => errors.push(GraphQLError {
1930 message: format!("events query failed: {e}"),
1931 }),
1932 }
1933 }
1934 "event" => {
1935 if let Some(id_str) = field.arguments.get("id") {
1936 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1937 match store.get_event_by_id(&id) {
1938 Ok(Some(event)) => {
1939 data.insert(
1940 "event".to_string(),
1941 crate::infrastructure::query::graphql::event_to_json(
1942 &event,
1943 &field.fields,
1944 ),
1945 );
1946 }
1947 Ok(None) => {
1948 data.insert("event".to_string(), serde_json::Value::Null);
1949 }
1950 Err(e) => errors.push(GraphQLError {
1951 message: format!("event lookup failed: {e}"),
1952 }),
1953 }
1954 } else {
1955 errors.push(GraphQLError {
1956 message: format!("Invalid UUID: {id_str}"),
1957 });
1958 }
1959 } else {
1960 errors.push(GraphQLError {
1961 message: "event query requires 'id' argument".to_string(),
1962 });
1963 }
1964 }
1965 "projections" => {
1966 let pm = store.projection_manager();
1967 let names: Vec<serde_json::Value> = pm
1968 .list_projections()
1969 .iter()
1970 .map(|(name, _)| serde_json::Value::String(name.clone()))
1971 .collect();
1972 data.insert("projections".to_string(), serde_json::Value::Array(names));
1973 }
1974 "stats" => {
1975 let stats = store.stats();
1976 data.insert(
1977 "stats".to_string(),
1978 serde_json::json!({
1979 "total_events": stats.total_events,
1980 "total_entities": stats.total_entities,
1981 "total_event_types": stats.total_event_types,
1982 }),
1983 );
1984 }
1985 "__schema" => {
1986 data.insert(
1987 "__schema".to_string(),
1988 crate::infrastructure::query::graphql::introspection_schema(),
1989 );
1990 }
1991 other => {
1992 errors.push(GraphQLError {
1993 message: format!("Unknown field: {other}"),
1994 });
1995 }
1996 }
1997 }
1998
1999 Json(
2000 serde_json::to_value(GraphQLResponse {
2001 data: Some(serde_json::Value::Object(data)),
2002 errors,
2003 })
2004 .unwrap(),
2005 )
2006}
2007
2008pub async fn geo_query(
2010 State(store): State<SharedStore>,
2011 Json(req): Json<GeoQueryRequest>,
2012) -> Json<serde_json::Value> {
2013 let events = store.snapshot_events();
2014 let geo_index = store.geo_index();
2015 let results =
2016 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
2017 let total = results.len();
2018 Json(serde_json::json!({
2019 "results": results,
2020 "total": total,
2021 }))
2022}
2023
2024pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2026 let stats = store.geo_index().stats();
2027 Json(serde_json::json!(stats))
2028}
2029
2030pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2032 let stats = store.exactly_once().stats();
2033 Json(serde_json::json!(stats))
2034}
2035
2036pub async fn schema_evolution_history(
2038 State(store): State<SharedStore>,
2039 Path(event_type): Path<String>,
2040) -> Json<serde_json::Value> {
2041 let mgr = store.schema_evolution();
2042 let history = mgr.get_history(&event_type);
2043 let version = mgr.get_version(&event_type);
2044 Json(serde_json::json!({
2045 "event_type": event_type,
2046 "current_version": version,
2047 "history": history,
2048 }))
2049}
2050
2051pub async fn schema_evolution_schema(
2053 State(store): State<SharedStore>,
2054 Path(event_type): Path<String>,
2055) -> Json<serde_json::Value> {
2056 let mgr = store.schema_evolution();
2057 if let Some(schema) = mgr.get_schema(&event_type) {
2058 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2059 Json(serde_json::json!({
2060 "event_type": event_type,
2061 "version": mgr.get_version(&event_type),
2062 "inferred_schema": schema,
2063 "json_schema": json_schema,
2064 }))
2065 } else {
2066 Json(serde_json::json!({
2067 "event_type": event_type,
2068 "error": "No schema inferred for this event type"
2069 }))
2070 }
2071}
2072
2073pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2075 let stats = store.schema_evolution().stats();
2076 let event_types = store.schema_evolution().list_event_types();
2077 Json(serde_json::json!({
2078 "stats": stats,
2079 "tracked_event_types": event_types,
2080 }))
2081}
2082
2083#[cfg(feature = "embedded-sync")]
2089pub async fn sync_pull_handler(
2090 State(state): State<AppState>,
2091 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2092) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2093 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2094
2095 let store = &state.store;
2096
2097 let since = request
2100 .version_vector
2101 .values()
2102 .map(|ts| ts.physical_ms)
2103 .min()
2104 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2105
2106 let events = store.query(&crate::application::dto::QueryEventsRequest {
2107 entity_id: None,
2108 event_type: None,
2109 tenant_id: None,
2110 as_of: None,
2111 since,
2112 until: None,
2113 limit: None,
2114 event_type_prefix: None,
2115 payload_filter: None,
2116 })?;
2117
2118 let mut replicated = Vec::with_capacity(events.len());
2120 let mut last_ms = 0u64;
2121 let mut logical = 0u32;
2122
2123 for event in &events {
2124 let event_ms = event.timestamp().timestamp_millis() as u64;
2125 if event_ms == last_ms {
2126 logical += 1;
2127 } else {
2128 last_ms = event_ms;
2129 logical = 0;
2130 }
2131
2132 replicated.push(ReplicatedEvent {
2133 event_id: event.id().to_string(),
2134 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2135 origin_region: "server".to_string(),
2136 event_data: serde_json::json!({
2137 "event_type": event.event_type_str(),
2138 "entity_id": event.entity_id_str(),
2139 "tenant_id": event.tenant_id_str(),
2140 "payload": event.payload,
2141 "metadata": event.metadata,
2142 }),
2143 });
2144 }
2145
2146 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2147 events: replicated,
2148 version_vector: std::collections::BTreeMap::new(),
2149 }))
2150}
2151
2152#[cfg(feature = "embedded-sync")]
2154pub async fn sync_push_handler(
2155 State(state): State<AppState>,
2156 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2157) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2158 let store = &state.store;
2159
2160 let mut accepted = 0usize;
2161 let mut skipped = 0usize;
2162
2163 for rep_event in &request.events {
2164 let event_data = &rep_event.event_data;
2165 let event_type = event_data
2166 .get("event_type")
2167 .and_then(|v| v.as_str())
2168 .unwrap_or("unknown")
2169 .to_string();
2170 let entity_id = event_data
2171 .get("entity_id")
2172 .and_then(|v| v.as_str())
2173 .unwrap_or("unknown")
2174 .to_string();
2175 let tenant_id = event_data
2176 .get("tenant_id")
2177 .and_then(|v| v.as_str())
2178 .unwrap_or("default")
2179 .to_string();
2180 let payload = event_data
2181 .get("payload")
2182 .cloned()
2183 .unwrap_or(serde_json::json!({}));
2184 let metadata = event_data.get("metadata").cloned();
2185
2186 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2187 Ok(domain_event) => {
2188 store.ingest(&domain_event)?;
2189 accepted += 1;
2190 }
2191 Err(_) => {
2192 skipped += 1;
2193 }
2194 }
2195 }
2196
2197 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2198 accepted,
2199 skipped,
2200 version_vector: std::collections::BTreeMap::new(),
2201 }))
2202}
2203
2204pub async fn register_consumer(
2210 State(store): State<SharedStore>,
2211 Json(req): Json<RegisterConsumerRequest>,
2212) -> Result<Json<ConsumerResponse>> {
2213 let consumer = store
2214 .consumer_registry()
2215 .register(&req.consumer_id, &req.event_type_filters);
2216
2217 Ok(Json(ConsumerResponse {
2218 consumer_id: consumer.consumer_id,
2219 event_type_filters: consumer.event_type_filters,
2220 cursor_position: consumer.cursor_position,
2221 }))
2222}
2223
2224pub async fn get_consumer(
2226 State(store): State<SharedStore>,
2227 Path(consumer_id): Path<String>,
2228) -> Result<Json<ConsumerResponse>> {
2229 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2230
2231 Ok(Json(ConsumerResponse {
2232 consumer_id: consumer.consumer_id,
2233 event_type_filters: consumer.event_type_filters,
2234 cursor_position: consumer.cursor_position,
2235 }))
2236}
2237
2238#[derive(Debug, Deserialize)]
2240pub struct ConsumerPollQuery {
2241 pub limit: Option<usize>,
2242}
2243
2244pub async fn poll_consumer_events(
2245 State(store): State<SharedStore>,
2246 Path(consumer_id): Path<String>,
2247 Query(query): Query<ConsumerPollQuery>,
2248) -> Result<Json<ConsumerEventsResponse>> {
2249 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2250 let offset = consumer.cursor_position.unwrap_or(0);
2251 let limit = query.limit.unwrap_or(100);
2252
2253 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2254 let count = events.len();
2255
2256 let consumer_events: Vec<ConsumerEventDto> = events
2257 .into_iter()
2258 .map(|(position, event)| ConsumerEventDto {
2259 position,
2260 event: EventDto::from(&event),
2261 })
2262 .collect();
2263
2264 Ok(Json(ConsumerEventsResponse {
2265 events: consumer_events,
2266 count,
2267 }))
2268}
2269
2270pub async fn ack_consumer(
2272 State(store): State<SharedStore>,
2273 Path(consumer_id): Path<String>,
2274 Json(req): Json<AckRequest>,
2275) -> Result<Json<serde_json::Value>> {
2276 let max_offset = store.total_events() as u64;
2277
2278 store
2279 .consumer_registry()
2280 .ack(&consumer_id, req.position, max_offset)
2281 .map_err(crate::error::AllSourceError::InvalidInput)?;
2282
2283 Ok(Json(serde_json::json!({
2284 "status": "ok",
2285 "consumer_id": consumer_id,
2286 "position": req.position,
2287 })))
2288}
2289
2290#[cfg(test)]
2291mod tests {
2292 use super::*;
2293 use crate::{domain::entities::Event, store::EventStore};
2294
2295 fn create_test_store() -> Arc<EventStore> {
2296 Arc::new(EventStore::new())
2297 }
2298
2299 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2300 Event::from_strings(
2301 event_type.to_string(),
2302 entity_id.to_string(),
2303 "test-stream".to_string(),
2304 serde_json::json!({
2305 "name": "Test",
2306 "value": 42
2307 }),
2308 None,
2309 )
2310 .unwrap()
2311 }
2312
2313 #[tokio::test]
2314 async fn test_query_events_has_more_and_total_count() {
2315 let store = create_test_store();
2316
2317 for i in 0..50 {
2319 store
2320 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2321 .unwrap();
2322 }
2323
2324 let req = QueryEventsRequest {
2326 entity_id: None,
2327 event_type: None,
2328 tenant_id: None,
2329 as_of: None,
2330 since: None,
2331 until: None,
2332 limit: Some(10),
2333 event_type_prefix: None,
2334 payload_filter: None,
2335 };
2336
2337 let requested_limit = req.limit;
2338 let unlimited_req = QueryEventsRequest {
2339 limit: None,
2340 ..QueryEventsRequest {
2341 entity_id: req.entity_id,
2342 event_type: req.event_type,
2343 tenant_id: req.tenant_id,
2344 as_of: req.as_of,
2345 since: req.since,
2346 until: req.until,
2347 limit: None,
2348 event_type_prefix: req.event_type_prefix,
2349 payload_filter: req.payload_filter,
2350 }
2351 };
2352 let all_events = store.query(&unlimited_req).unwrap();
2353 let total_count = all_events.len();
2354 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2355 all_events.into_iter().take(limit).collect()
2356 } else {
2357 all_events
2358 };
2359 let count = limited_events.len();
2360 let has_more = count < total_count;
2361
2362 assert_eq!(count, 10);
2363 assert_eq!(total_count, 50);
2364 assert!(has_more);
2365 }
2366
2367 #[tokio::test]
2368 async fn test_query_events_no_more_results() {
2369 let store = create_test_store();
2370
2371 for i in 0..5 {
2373 store
2374 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2375 .unwrap();
2376 }
2377
2378 let all_events = store
2380 .query(&QueryEventsRequest {
2381 entity_id: None,
2382 event_type: None,
2383 tenant_id: None,
2384 as_of: None,
2385 since: None,
2386 until: None,
2387 limit: None,
2388 event_type_prefix: None,
2389 payload_filter: None,
2390 })
2391 .unwrap();
2392 let total_count = all_events.len();
2393 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2394 let count = limited_events.len();
2395 let has_more = count < total_count;
2396
2397 assert_eq!(count, 5);
2398 assert_eq!(total_count, 5);
2399 assert!(!has_more);
2400 }
2401
2402 #[tokio::test]
2406 async fn test_query_events_order_desc_returns_latest() {
2407 let store = create_test_store();
2408
2409 let base = chrono::Utc::now();
2412 for i in 0..3i64 {
2413 let mut event = create_test_event("org-1", "auth.org.updated");
2414 event.timestamp = base + chrono::Duration::seconds(i);
2415 event.version = i + 1;
2416 store.ingest(&event).unwrap();
2417 }
2418
2419 let req = QueryEventsRequest {
2420 entity_id: Some("org-1".to_string()),
2421 ..Default::default()
2422 };
2423 let ascending = store.query(&req).unwrap();
2424 assert_eq!(ascending.len(), 3);
2425 assert!(ascending[0].timestamp < ascending[1].timestamp);
2427 assert!(ascending[1].timestamp < ascending[2].timestamp);
2428 let oldest_ts = ascending[0].timestamp;
2429 let newest_ts = ascending[2].timestamp;
2430
2431 let mut descending = ascending.clone();
2433 descending.reverse();
2434 let latest: Vec<Event> = descending.into_iter().take(1).collect();
2435 assert_eq!(latest.len(), 1);
2436 assert_eq!(
2437 latest[0].timestamp, newest_ts,
2438 "order=desc&limit=1 must yield the newest event"
2439 );
2440
2441 let oldest: Vec<Event> = ascending.into_iter().take(1).collect();
2443 assert_eq!(oldest[0].timestamp, oldest_ts);
2444 }
2445
2446 #[tokio::test]
2447 async fn test_list_entities_by_type_prefix() {
2448 let store = create_test_store();
2449
2450 store
2452 .ingest(&create_test_event("idx-1", "index.created"))
2453 .unwrap();
2454 store
2455 .ingest(&create_test_event("idx-1", "index.updated"))
2456 .unwrap();
2457 store
2458 .ingest(&create_test_event("idx-2", "index.created"))
2459 .unwrap();
2460 store
2461 .ingest(&create_test_event("idx-3", "index.created"))
2462 .unwrap();
2463 store
2465 .ingest(&create_test_event("trade-1", "trade.created"))
2466 .unwrap();
2467 store
2468 .ingest(&create_test_event("trade-2", "trade.created"))
2469 .unwrap();
2470
2471 let req = ListEntitiesRequest {
2473 event_type_prefix: Some("index.".to_string()),
2474 payload_filter: None,
2475 limit: None,
2476 offset: None,
2477 };
2478 let query_req = QueryEventsRequest {
2479 entity_id: None,
2480 event_type: None,
2481 tenant_id: None,
2482 as_of: None,
2483 since: None,
2484 until: None,
2485 limit: None,
2486 event_type_prefix: req.event_type_prefix,
2487 payload_filter: req.payload_filter,
2488 };
2489 let events = store.query(&query_req).unwrap();
2490
2491 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2493 std::collections::HashMap::new();
2494 for event in &events {
2495 entity_map
2496 .entry(event.entity_id().to_string())
2497 .or_default()
2498 .push(event);
2499 }
2500
2501 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2504 assert_eq!(entity_map["idx-3"].len(), 1);
2505 }
2506
2507 fn create_test_event_with_payload(
2508 entity_id: &str,
2509 event_type: &str,
2510 payload: serde_json::Value,
2511 ) -> Event {
2512 Event::from_strings(
2513 event_type.to_string(),
2514 entity_id.to_string(),
2515 "test-stream".to_string(),
2516 payload,
2517 None,
2518 )
2519 .unwrap()
2520 }
2521
2522 #[tokio::test]
2523 async fn test_detect_duplicates_by_payload_fields() {
2524 let store = create_test_store();
2525
2526 store
2528 .ingest(&create_test_event_with_payload(
2529 "idx-1",
2530 "index.created",
2531 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2532 ))
2533 .unwrap();
2534 store
2535 .ingest(&create_test_event_with_payload(
2536 "idx-2",
2537 "index.created",
2538 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2539 ))
2540 .unwrap();
2541 store
2542 .ingest(&create_test_event_with_payload(
2543 "idx-3",
2544 "index.created",
2545 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2546 ))
2547 .unwrap();
2548 store
2549 .ingest(&create_test_event_with_payload(
2550 "idx-4",
2551 "index.created",
2552 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2553 ))
2554 .unwrap();
2555 store
2556 .ingest(&create_test_event_with_payload(
2557 "idx-5",
2558 "index.created",
2559 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2560 ))
2561 .unwrap();
2562
2563 let query_req = QueryEventsRequest {
2565 entity_id: None,
2566 event_type: None,
2567 tenant_id: None,
2568 as_of: None,
2569 since: None,
2570 until: None,
2571 limit: None,
2572 event_type_prefix: Some("index.".to_string()),
2573 payload_filter: None,
2574 };
2575 let events = store.query(&query_req).unwrap();
2576
2577 let group_by_fields = vec!["name"];
2579 let mut entity_latest: std::collections::HashMap<String, &Event> =
2580 std::collections::HashMap::new();
2581 for event in &events {
2582 let eid = event.entity_id().to_string();
2583 entity_latest
2584 .entry(eid)
2585 .and_modify(|existing| {
2586 if event.timestamp() > existing.timestamp() {
2587 *existing = event;
2588 }
2589 })
2590 .or_insert(event);
2591 }
2592
2593 let mut groups: std::collections::HashMap<String, Vec<String>> =
2594 std::collections::HashMap::new();
2595 for (entity_id, event) in &entity_latest {
2596 let payload = event.payload();
2597 let mut key_parts = serde_json::Map::new();
2598 for field in &group_by_fields {
2599 let value = payload
2600 .get(*field)
2601 .cloned()
2602 .unwrap_or(serde_json::Value::Null);
2603 key_parts.insert((*field).to_string(), value);
2604 }
2605 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2606 groups.entry(key_str).or_default().push(entity_id.clone());
2607 }
2608
2609 let duplicate_groups: Vec<_> = groups
2610 .into_iter()
2611 .filter(|(_, ids)| ids.len() > 1)
2612 .collect();
2613
2614 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2616 assert_eq!(ids.len(), 2);
2617 }
2618 }
2619
2620 #[tokio::test]
2621 async fn test_detect_duplicates_no_duplicates() {
2622 let store = create_test_store();
2623
2624 store
2626 .ingest(&create_test_event_with_payload(
2627 "idx-1",
2628 "index.created",
2629 serde_json::json!({"name": "A"}),
2630 ))
2631 .unwrap();
2632 store
2633 .ingest(&create_test_event_with_payload(
2634 "idx-2",
2635 "index.created",
2636 serde_json::json!({"name": "B"}),
2637 ))
2638 .unwrap();
2639
2640 let query_req = QueryEventsRequest {
2641 entity_id: None,
2642 event_type: None,
2643 tenant_id: None,
2644 as_of: None,
2645 since: None,
2646 until: None,
2647 limit: None,
2648 event_type_prefix: Some("index.".to_string()),
2649 payload_filter: None,
2650 };
2651 let events = store.query(&query_req).unwrap();
2652
2653 let mut entity_latest: std::collections::HashMap<String, &Event> =
2654 std::collections::HashMap::new();
2655 for event in &events {
2656 entity_latest
2657 .entry(event.entity_id().to_string())
2658 .or_insert(event);
2659 }
2660
2661 let mut groups: std::collections::HashMap<String, Vec<String>> =
2662 std::collections::HashMap::new();
2663 for (entity_id, event) in &entity_latest {
2664 let key_str =
2665 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2666 .unwrap();
2667 groups.entry(key_str).or_default().push(entity_id.clone());
2668 }
2669
2670 let duplicate_groups: Vec<_> = groups
2671 .into_iter()
2672 .filter(|(_, ids)| ids.len() > 1)
2673 .collect();
2674
2675 assert_eq!(duplicate_groups.len(), 0); }
2677
2678 #[tokio::test]
2679 async fn test_detect_duplicates_multi_field_group_by() {
2680 let store = create_test_store();
2681
2682 store
2684 .ingest(&create_test_event_with_payload(
2685 "idx-1",
2686 "index.created",
2687 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2688 ))
2689 .unwrap();
2690 store
2691 .ingest(&create_test_event_with_payload(
2692 "idx-2",
2693 "index.created",
2694 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2695 ))
2696 .unwrap();
2697 store
2699 .ingest(&create_test_event_with_payload(
2700 "idx-3",
2701 "index.created",
2702 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2703 ))
2704 .unwrap();
2705
2706 let query_req = QueryEventsRequest {
2707 entity_id: None,
2708 event_type: None,
2709 tenant_id: None,
2710 as_of: None,
2711 since: None,
2712 until: None,
2713 limit: None,
2714 event_type_prefix: Some("index.".to_string()),
2715 payload_filter: None,
2716 };
2717 let events = store.query(&query_req).unwrap();
2718
2719 let group_by_fields = vec!["name", "user_id"];
2720 let mut entity_latest: std::collections::HashMap<String, &Event> =
2721 std::collections::HashMap::new();
2722 for event in &events {
2723 entity_latest
2724 .entry(event.entity_id().to_string())
2725 .and_modify(|existing| {
2726 if event.timestamp() > existing.timestamp() {
2727 *existing = event;
2728 }
2729 })
2730 .or_insert(event);
2731 }
2732
2733 let mut groups: std::collections::HashMap<String, Vec<String>> =
2734 std::collections::HashMap::new();
2735 for (entity_id, event) in &entity_latest {
2736 let payload = event.payload();
2737 let mut key_parts = serde_json::Map::new();
2738 for field in &group_by_fields {
2739 let value = payload
2740 .get(*field)
2741 .cloned()
2742 .unwrap_or(serde_json::Value::Null);
2743 key_parts.insert((*field).to_string(), value);
2744 }
2745 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2746 groups.entry(key_str).or_default().push(entity_id.clone());
2747 }
2748
2749 let duplicate_groups: Vec<_> = groups
2750 .into_iter()
2751 .filter(|(_, ids)| ids.len() > 1)
2752 .collect();
2753
2754 assert_eq!(duplicate_groups.len(), 1);
2756 let (_, ref ids) = duplicate_groups[0];
2757 assert_eq!(ids.len(), 2);
2758 let mut sorted_ids = ids.clone();
2759 sorted_ids.sort();
2760 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2761 }
2762
2763 #[tokio::test]
2764 async fn test_projection_state_cache() {
2765 let store = create_test_store();
2766
2767 let cache = store.projection_state_cache();
2769 cache.insert(
2770 "entity_snapshots:user-123".to_string(),
2771 serde_json::json!({"name": "Test User", "age": 30}),
2772 );
2773
2774 let state = cache.get("entity_snapshots:user-123");
2776 assert!(state.is_some());
2777 let state = state.unwrap();
2778 assert_eq!(state["name"], "Test User");
2779 assert_eq!(state["age"], 30);
2780 }
2781
2782 #[tokio::test]
2783 async fn test_projection_manager_list_projections() {
2784 let store = create_test_store();
2785
2786 let projection_manager = store.projection_manager();
2788 let projections = projection_manager.list_projections();
2789
2790 assert!(projections.len() >= 2);
2792
2793 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2794 assert!(names.contains(&"entity_snapshots"));
2795 assert!(names.contains(&"event_counters"));
2796 }
2797
2798 #[tokio::test]
2799 async fn test_projection_state_after_event_ingestion() {
2800 let store = create_test_store();
2801
2802 let event = create_test_event("user-456", "user.created");
2804 store.ingest(&event).unwrap();
2805
2806 let projection_manager = store.projection_manager();
2808 let snapshot_projection = projection_manager
2809 .get_projection("entity_snapshots")
2810 .unwrap();
2811
2812 let state = snapshot_projection.get_state("user-456");
2813 assert!(state.is_some());
2814 let state = state.unwrap();
2815 assert_eq!(state["name"], "Test");
2816 assert_eq!(state["value"], 42);
2817 }
2818
2819 #[tokio::test]
2820 async fn test_projection_state_cache_multiple_entities() {
2821 let store = create_test_store();
2822 let cache = store.projection_state_cache();
2823
2824 for i in 0..10 {
2826 cache.insert(
2827 format!("entity_snapshots:entity-{i}"),
2828 serde_json::json!({"id": i, "status": "active"}),
2829 );
2830 }
2831
2832 assert_eq!(cache.len(), 10);
2834
2835 for i in 0..10 {
2837 let key = format!("entity_snapshots:entity-{i}");
2838 let state = cache.get(&key);
2839 assert!(state.is_some());
2840 assert_eq!(state.unwrap()["id"], i);
2841 }
2842 }
2843
2844 #[tokio::test]
2845 async fn test_projection_state_update() {
2846 let store = create_test_store();
2847 let cache = store.projection_state_cache();
2848
2849 cache.insert(
2851 "entity_snapshots:user-789".to_string(),
2852 serde_json::json!({"balance": 100}),
2853 );
2854
2855 cache.insert(
2857 "entity_snapshots:user-789".to_string(),
2858 serde_json::json!({"balance": 150}),
2859 );
2860
2861 let state = cache.get("entity_snapshots:user-789").unwrap();
2863 assert_eq!(state["balance"], 150);
2864 }
2865
2866 #[tokio::test]
2867 async fn test_event_counter_projection() {
2868 let store = create_test_store();
2869
2870 store
2872 .ingest(&create_test_event("user-1", "user.created"))
2873 .unwrap();
2874 store
2875 .ingest(&create_test_event("user-2", "user.created"))
2876 .unwrap();
2877 store
2878 .ingest(&create_test_event("user-1", "user.updated"))
2879 .unwrap();
2880
2881 let projection_manager = store.projection_manager();
2883 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2884
2885 let created_state = counter_projection.get_state("user.created");
2887 assert!(created_state.is_some());
2888 assert_eq!(created_state.unwrap()["count"], 2);
2889
2890 let updated_state = counter_projection.get_state("user.updated");
2891 assert!(updated_state.is_some());
2892 assert_eq!(updated_state.unwrap()["count"], 1);
2893 }
2894
2895 #[tokio::test]
2896 async fn test_projection_state_cache_key_format() {
2897 let store = create_test_store();
2898 let cache = store.projection_state_cache();
2899
2900 let key = "orders:order-12345".to_string();
2902 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2903
2904 let state = cache.get(&key).unwrap();
2905 assert_eq!(state["total"], 99.99);
2906 }
2907
2908 #[tokio::test]
2909 async fn test_projection_state_cache_removal() {
2910 let store = create_test_store();
2911 let cache = store.projection_state_cache();
2912
2913 cache.insert(
2915 "test:entity-1".to_string(),
2916 serde_json::json!({"data": "value"}),
2917 );
2918 assert_eq!(cache.len(), 1);
2919
2920 cache.remove("test:entity-1");
2921 assert_eq!(cache.len(), 0);
2922 assert!(cache.get("test:entity-1").is_none());
2923 }
2924
2925 #[tokio::test]
2926 async fn test_get_nonexistent_projection() {
2927 let store = create_test_store();
2928 let projection_manager = store.projection_manager();
2929
2930 let projection = projection_manager.get_projection("nonexistent_projection");
2932 assert!(projection.is_none());
2933 }
2934
2935 #[tokio::test]
2936 async fn test_get_nonexistent_entity_state() {
2937 let store = create_test_store();
2938 let projection_manager = store.projection_manager();
2939
2940 let snapshot_projection = projection_manager
2942 .get_projection("entity_snapshots")
2943 .unwrap();
2944 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2945 assert!(state.is_none());
2946 }
2947
2948 #[tokio::test]
2949 async fn test_projection_state_cache_concurrent_access() {
2950 let store = create_test_store();
2951 let cache = store.projection_state_cache();
2952
2953 let handles: Vec<_> = (0..10)
2955 .map(|i| {
2956 let cache_clone = cache.clone();
2957 tokio::spawn(async move {
2958 cache_clone.insert(
2959 format!("concurrent:entity-{i}"),
2960 serde_json::json!({"thread": i}),
2961 );
2962 })
2963 })
2964 .collect();
2965
2966 for handle in handles {
2967 handle.await.unwrap();
2968 }
2969
2970 assert_eq!(cache.len(), 10);
2972 }
2973
2974 #[tokio::test]
2975 async fn test_projection_state_large_payload() {
2976 let store = create_test_store();
2977 let cache = store.projection_state_cache();
2978
2979 let large_array: Vec<serde_json::Value> = (0..1000)
2981 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2982 .collect();
2983
2984 cache.insert(
2985 "large:entity-1".to_string(),
2986 serde_json::json!({"items": large_array}),
2987 );
2988
2989 let state = cache.get("large:entity-1").unwrap();
2990 let items = state["items"].as_array().unwrap();
2991 assert_eq!(items.len(), 1000);
2992 }
2993
2994 #[tokio::test]
2995 async fn test_projection_state_complex_json() {
2996 let store = create_test_store();
2997 let cache = store.projection_state_cache();
2998
2999 let complex_state = serde_json::json!({
3001 "user": {
3002 "id": "user-123",
3003 "profile": {
3004 "name": "John Doe",
3005 "email": "john@example.com",
3006 "settings": {
3007 "theme": "dark",
3008 "notifications": true
3009 }
3010 },
3011 "roles": ["admin", "user"],
3012 "metadata": {
3013 "created_at": "2025-01-01T00:00:00Z",
3014 "last_login": null
3015 }
3016 }
3017 });
3018
3019 cache.insert("complex:user-123".to_string(), complex_state);
3020
3021 let state = cache.get("complex:user-123").unwrap();
3022 assert_eq!(state["user"]["profile"]["name"], "John Doe");
3023 assert_eq!(state["user"]["roles"][0], "admin");
3024 assert!(state["user"]["metadata"]["last_login"].is_null());
3025 }
3026
3027 #[tokio::test]
3028 async fn test_projection_state_cache_iteration() {
3029 let store = create_test_store();
3030 let cache = store.projection_state_cache();
3031
3032 for i in 0..5 {
3034 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
3035 }
3036
3037 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
3039 assert_eq!(entries.len(), 5);
3040 }
3041
3042 #[tokio::test]
3043 async fn test_projection_manager_get_entity_snapshots() {
3044 let store = create_test_store();
3045 let projection_manager = store.projection_manager();
3046
3047 let projection = projection_manager.get_projection("entity_snapshots");
3049 assert!(projection.is_some());
3050 assert_eq!(projection.unwrap().name(), "entity_snapshots");
3051 }
3052
3053 #[tokio::test]
3054 async fn test_projection_manager_get_event_counters() {
3055 let store = create_test_store();
3056 let projection_manager = store.projection_manager();
3057
3058 let projection = projection_manager.get_projection("event_counters");
3060 assert!(projection.is_some());
3061 assert_eq!(projection.unwrap().name(), "event_counters");
3062 }
3063
3064 #[tokio::test]
3065 async fn test_projection_state_cache_overwrite() {
3066 let store = create_test_store();
3067 let cache = store.projection_state_cache();
3068
3069 cache.insert(
3071 "overwrite:entity-1".to_string(),
3072 serde_json::json!({"version": 1}),
3073 );
3074
3075 cache.insert(
3077 "overwrite:entity-1".to_string(),
3078 serde_json::json!({"version": 2}),
3079 );
3080
3081 cache.insert(
3083 "overwrite:entity-1".to_string(),
3084 serde_json::json!({"version": 3}),
3085 );
3086
3087 let state = cache.get("overwrite:entity-1").unwrap();
3088 assert_eq!(state["version"], 3);
3089
3090 assert_eq!(cache.len(), 1);
3092 }
3093
3094 #[tokio::test]
3095 async fn test_projection_state_multiple_projections() {
3096 let store = create_test_store();
3097 let cache = store.projection_state_cache();
3098
3099 cache.insert(
3101 "entity_snapshots:user-1".to_string(),
3102 serde_json::json!({"name": "Alice"}),
3103 );
3104 cache.insert(
3105 "event_counters:user.created".to_string(),
3106 serde_json::json!({"count": 5}),
3107 );
3108 cache.insert(
3109 "custom_projection:order-1".to_string(),
3110 serde_json::json!({"total": 150.0}),
3111 );
3112
3113 assert_eq!(
3115 cache.get("entity_snapshots:user-1").unwrap()["name"],
3116 "Alice"
3117 );
3118 assert_eq!(
3119 cache.get("event_counters:user.created").unwrap()["count"],
3120 5
3121 );
3122 assert_eq!(
3123 cache.get("custom_projection:order-1").unwrap()["total"],
3124 150.0
3125 );
3126 }
3127
3128 #[tokio::test]
3129 async fn test_bulk_projection_state_access() {
3130 let store = create_test_store();
3131
3132 for i in 0..5 {
3134 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3135 store.ingest(&event).unwrap();
3136 }
3137
3138 let projection_manager = store.projection_manager();
3140 let snapshot_projection = projection_manager
3141 .get_projection("entity_snapshots")
3142 .unwrap();
3143
3144 for i in 0..5 {
3146 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3147 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3148 }
3149 }
3150
3151 #[tokio::test]
3152 async fn test_bulk_save_projection_states() {
3153 let store = create_test_store();
3154 let cache = store.projection_state_cache();
3155
3156 let states = vec![
3158 BulkSaveStateItem {
3159 entity_id: "bulk-entity-1".to_string(),
3160 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3161 },
3162 BulkSaveStateItem {
3163 entity_id: "bulk-entity-2".to_string(),
3164 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3165 },
3166 BulkSaveStateItem {
3167 entity_id: "bulk-entity-3".to_string(),
3168 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3169 },
3170 ];
3171
3172 let projection_name = "test_projection";
3173
3174 for item in &states {
3176 cache.insert(
3177 format!("{projection_name}:{}", item.entity_id),
3178 item.state.clone(),
3179 );
3180 }
3181
3182 assert_eq!(cache.len(), 3);
3184
3185 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3186 assert_eq!(state1["name"], "Entity 1");
3187 assert_eq!(state1["value"], 100);
3188
3189 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3190 assert_eq!(state2["name"], "Entity 2");
3191 assert_eq!(state2["value"], 200);
3192
3193 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3194 assert_eq!(state3["name"], "Entity 3");
3195 assert_eq!(state3["value"], 300);
3196 }
3197
3198 #[tokio::test]
3199 async fn test_bulk_save_empty_states() {
3200 let store = create_test_store();
3201 let cache = store.projection_state_cache();
3202
3203 cache.clear();
3205
3206 let states: Vec<BulkSaveStateItem> = vec![];
3208 assert_eq!(states.len(), 0);
3209
3210 assert_eq!(cache.len(), 0);
3212 }
3213
3214 #[tokio::test]
3215 async fn test_bulk_save_overwrites_existing() {
3216 let store = create_test_store();
3217 let cache = store.projection_state_cache();
3218
3219 cache.insert(
3221 "test:entity-1".to_string(),
3222 serde_json::json!({"version": 1, "data": "initial"}),
3223 );
3224
3225 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3227 cache.insert("test:entity-1".to_string(), new_state);
3228
3229 let state = cache.get("test:entity-1").unwrap();
3231 assert_eq!(state["version"], 2);
3232 assert_eq!(state["data"], "updated");
3233 }
3234
3235 #[tokio::test]
3236 async fn test_bulk_save_high_volume() {
3237 let store = create_test_store();
3238 let cache = store.projection_state_cache();
3239
3240 for i in 0..1000 {
3242 cache.insert(
3243 format!("volume_test:entity-{i}"),
3244 serde_json::json!({"index": i, "status": "active"}),
3245 );
3246 }
3247
3248 assert_eq!(cache.len(), 1000);
3250
3251 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3253 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3254 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3255 }
3256
3257 #[tokio::test]
3258 async fn test_bulk_save_different_projections() {
3259 let store = create_test_store();
3260 let cache = store.projection_state_cache();
3261
3262 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3264
3265 for proj in &projections {
3266 for i in 0..5 {
3267 cache.insert(
3268 format!("{proj}:entity-{i}"),
3269 serde_json::json!({"projection": proj, "id": i}),
3270 );
3271 }
3272 }
3273
3274 assert_eq!(cache.len(), 15);
3276
3277 for proj in &projections {
3279 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3280 assert_eq!(state["projection"], *proj);
3281 }
3282 }
3283
3284 #[tokio::test]
3295 async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3296 let store = create_test_store();
3297 store.projection_state_cache().insert(
3298 "assets:BTC".to_string(),
3299 serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3300 );
3301
3302 let resp = get_projection_state(
3303 State(Arc::clone(&store)),
3304 Path(("assets".to_string(), "BTC".to_string())),
3305 )
3306 .await
3307 .expect("should not error when projection is not registered");
3308
3309 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3310 assert_eq!(resp.0["state"]["symbol"], "BTC");
3311 assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3312 }
3313
3314 #[tokio::test]
3315 async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3316 let store = create_test_store();
3317
3318 let resp = get_projection_state(
3319 State(Arc::clone(&store)),
3320 Path(("assets".to_string(), "UNKNOWN".to_string())),
3321 )
3322 .await
3323 .unwrap();
3324
3325 assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3326 assert_eq!(resp.0["state"], serde_json::Value::Null);
3327 }
3328
3329 #[tokio::test]
3330 async fn get_projection_state_registered_wins_over_cache() {
3331 let store = create_test_store();
3332
3333 let event = create_test_event("user-777", "user.created");
3335 store.ingest(&event).unwrap();
3336
3337 store.projection_state_cache().insert(
3339 "entity_snapshots:user-777".to_string(),
3340 serde_json::json!({"stolen": "value"}),
3341 );
3342
3343 let resp = get_projection_state(
3344 State(Arc::clone(&store)),
3345 Path(("entity_snapshots".to_string(), "user-777".to_string())),
3346 )
3347 .await
3348 .unwrap();
3349
3350 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3353 assert!(
3354 resp.0["state"].get("stolen").is_none(),
3355 "cache entry must not shadow registered projection state: got {:?}",
3356 resp.0["state"]
3357 );
3358 }
3359
3360 #[tokio::test]
3361 async fn get_projection_state_summary_returns_cache_without_registration() {
3362 let store = create_test_store();
3363 let cache = store.projection_state_cache();
3364 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3365 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3366 cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3368
3369 let resp =
3370 get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3371 .await
3372 .unwrap();
3373
3374 assert_eq!(resp.0["total"], 2);
3375 let states = resp.0["states"].as_array().unwrap();
3376 let entity_ids: Vec<&str> = states
3377 .iter()
3378 .map(|s| s["entity_id"].as_str().unwrap())
3379 .collect();
3380 assert!(entity_ids.contains(&"BTC"));
3381 assert!(entity_ids.contains(&"ETH"));
3382 }
3383
3384 #[tokio::test]
3385 async fn bulk_get_projection_states_falls_back_to_cache() {
3386 let store = create_test_store();
3387 let cache = store.projection_state_cache();
3388 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3389 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3390
3391 let req = BulkGetStateRequest {
3392 entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3393 };
3394
3395 let resp = bulk_get_projection_states(
3396 State(Arc::clone(&store)),
3397 Path("assets".to_string()),
3398 Json(req),
3399 )
3400 .await
3401 .unwrap();
3402
3403 assert_eq!(resp.0["total"], 3);
3404 let states = resp.0["states"].as_array().unwrap();
3405 let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3406 .iter()
3407 .map(|s| (s["entity_id"].as_str().unwrap(), s))
3408 .collect();
3409
3410 assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3411 assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3412 assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3413 assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3414 }
3415}