1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4 application::{
5 dto::{
6 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11 },
12 services::{
13 analytics::{
14 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16 },
17 pipeline::{PipelineConfig, PipelineStats},
18 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19 schema::{
20 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21 ValidateEventRequest, ValidateEventResponse,
22 },
23 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24 },
25 },
26 domain::entities::Event,
27 error::Result,
28 infrastructure::{
29 persistence::{
30 compaction::CompactionResult,
31 snapshot::{
32 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33 ListSnapshotsResponse, SnapshotInfo,
34 },
35 },
36 query::{
37 geospatial::GeoQueryRequest,
38 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39 },
40 security::middleware::OptionalAuth,
41 web::api_v1::AppState,
42 },
43 store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46 Json, Router,
47 extract::{Path, Query, State, WebSocketUpgrade},
48 response::{IntoResponse, Response},
49 routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67 let shipper_guard = state.wal_shipper.read().await;
68 if let Some(ref shipper) = *shipper_guard {
69 let mode = shipper.replication_mode();
70 if mode == ReplicationMode::Async {
71 return;
72 }
73
74 let target_offset = shipper.current_leader_offset();
75 if target_offset == 0 {
76 return;
77 }
78
79 let shipper = Arc::clone(shipper);
80 drop(shipper_guard);
82
83 let timer = state
84 .store
85 .metrics()
86 .replication_ack_wait_seconds
87 .start_timer();
88 let acked = shipper.wait_for_ack(target_offset).await;
89 timer.observe_duration();
90
91 if !acked {
92 tracing::warn!(
93 "Replication ACK timeout in {} mode (offset {}). \
94 Write succeeded locally but follower confirmation pending.",
95 mode,
96 target_offset,
97 );
98 }
99 }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103 }
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107 let app = Router::new()
108 .route("/health", get(health))
109 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
111 .route("/api/v1/events/batch", post(ingest_events_batch))
112 .route("/api/v1/events/query", get(query_events))
113 .route("/api/v1/events/{event_id}", get(get_event_by_id))
114 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
117 .route("/api/v1/event-types", get(list_event_types))
118 .route("/api/v1/entities/duplicates", get(detect_duplicates))
119 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120 .route(
121 "/api/v1/entities/{entity_id}/snapshot",
122 get(get_entity_snapshot),
123 )
124 .route("/api/v1/stats", get(get_stats))
125 .route("/api/v1/analytics/frequency", get(analytics_frequency))
127 .route("/api/v1/analytics/summary", get(analytics_summary))
128 .route("/api/v1/analytics/correlation", get(analytics_correlation))
129 .route("/api/v1/snapshots", post(create_snapshot))
131 .route("/api/v1/snapshots", get(list_snapshots))
132 .route(
133 "/api/v1/snapshots/{entity_id}/latest",
134 get(get_latest_snapshot),
135 )
136 .route("/api/v1/compaction/trigger", post(trigger_compaction))
138 .route("/api/v1/compaction/stats", get(compaction_stats))
139 .route("/api/v1/schemas", post(register_schema))
141 .route("/api/v1/schemas", get(list_subjects))
142 .route("/api/v1/schemas/{subject}", get(get_schema))
143 .route(
144 "/api/v1/schemas/{subject}/versions",
145 get(list_schema_versions),
146 )
147 .route("/api/v1/schemas/validate", post(validate_event_schema))
148 .route(
149 "/api/v1/schemas/{subject}/compatibility",
150 put(set_compatibility_mode),
151 )
152 .route("/api/v1/replay", post(start_replay))
154 .route("/api/v1/replay", get(list_replays))
155 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157 .route(
158 "/api/v1/replay/{replay_id}",
159 axum::routing::delete(delete_replay),
160 )
161 .route("/api/v1/pipelines", post(register_pipeline))
163 .route("/api/v1/pipelines", get(list_pipelines))
164 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166 .route(
167 "/api/v1/pipelines/{pipeline_id}",
168 axum::routing::delete(remove_pipeline),
169 )
170 .route(
171 "/api/v1/pipelines/{pipeline_id}/stats",
172 get(get_pipeline_stats),
173 )
174 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175 .route("/api/v1/projections", get(list_projections))
177 .route("/api/v1/projections/{name}", get(get_projection))
178 .route(
179 "/api/v1/projections/{name}",
180 axum::routing::delete(delete_projection),
181 )
182 .route(
183 "/api/v1/projections/{name}/state",
184 get(get_projection_state_summary),
185 )
186 .route("/api/v1/projections/{name}/reset", post(reset_projection))
187 .route(
188 "/api/v1/projections/{name}/{entity_id}/state",
189 get(get_projection_state),
190 )
191 .route(
192 "/api/v1/projections/{name}/{entity_id}/state",
193 post(save_projection_state),
194 )
195 .route(
196 "/api/v1/projections/{name}/{entity_id}/state",
197 put(save_projection_state),
198 )
199 .route(
200 "/api/v1/projections/{name}/bulk",
201 post(bulk_get_projection_states),
202 )
203 .route(
204 "/api/v1/projections/{name}/bulk/save",
205 post(bulk_save_projection_states),
206 )
207 .route("/api/v1/webhooks", post(register_webhook))
209 .route("/api/v1/webhooks", get(list_webhooks))
210 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212 .route(
213 "/api/v1/webhooks/{webhook_id}",
214 axum::routing::delete(delete_webhook),
215 )
216 .route(
217 "/api/v1/webhooks/{webhook_id}/deliveries",
218 get(list_webhook_deliveries),
219 )
220 .route("/api/v1/graphql", post(graphql_query))
222 .route("/api/v1/geospatial/query", post(geo_query))
223 .route("/api/v1/geospatial/stats", get(geo_stats))
224 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225 .route(
226 "/api/v1/schema-evolution/history/{event_type}",
227 get(schema_evolution_history),
228 )
229 .route(
230 "/api/v1/schema-evolution/schema/{event_type}",
231 get(schema_evolution_schema),
232 )
233 .route(
234 "/api/v1/schema-evolution/stats",
235 get(schema_evolution_stats),
236 )
237 .layer(
238 CorsLayer::new()
239 .allow_origin(Any)
240 .allow_methods(Any)
241 .allow_headers(Any),
242 )
243 .layer(TraceLayer::new_for_http())
244 .with_state(store);
245
246 let listener = tokio::net::TcpListener::bind(addr).await?;
247 axum::serve(listener, app).await?;
248
249 Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253 Json(serde_json::json!({
254 "status": "healthy",
255 "service": "allsource-core",
256 "version": env!("CARGO_PKG_VERSION")
257 }))
258}
259
260pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262 let metrics = store.metrics();
263
264 match metrics.encode() {
265 Ok(encoded) => Response::builder()
266 .status(200)
267 .header("Content-Type", "text/plain; version=0.0.4")
268 .body(encoded)
269 .unwrap()
270 .into_response(),
271 Err(e) => Response::builder()
272 .status(500)
273 .body(format!("Error encoding metrics: {e}"))
274 .unwrap()
275 .into_response(),
276 }
277}
278
279pub async fn ingest_event(
280 State(store): State<SharedStore>,
281 Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283 let expected_version = req.expected_version;
284
285 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
286 let event = Event::from_strings(
287 req.event_type,
288 req.entity_id,
289 tenant_id,
290 req.payload,
291 req.metadata,
292 )?;
293
294 let event_id = event.id;
295 let timestamp = event.timestamp;
296
297 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299 tracing::info!("Event ingested: {}", event_id);
300
301 Ok(Json(IngestEventResponse {
302 event_id,
303 timestamp,
304 version: Some(new_version),
305 }))
306}
307
308pub async fn ingest_event_v1(
314 State(state): State<AppState>,
315 Json(req): Json<IngestEventRequest>,
316) -> Result<Json<IngestEventResponse>> {
317 let expected_version = req.expected_version;
318
319 let tenant_id = req.tenant_id.unwrap_or_else(|| "default".to_string());
320
321 let event = Event::from_strings(
322 req.event_type,
323 req.entity_id,
324 tenant_id,
325 req.payload,
326 req.metadata,
327 )?;
328
329 let event_id = event.id;
330 let timestamp = event.timestamp;
331
332 let new_version = state
333 .store
334 .ingest_with_expected_version(&event, expected_version)?;
335
336 await_replication_ack(&state).await;
338
339 tracing::info!("Event ingested: {}", event_id);
340
341 Ok(Json(IngestEventResponse {
342 event_id,
343 timestamp,
344 version: Some(new_version),
345 }))
346}
347
348pub async fn ingest_events_batch(
353 State(store): State<SharedStore>,
354 Json(req): Json<IngestEventsBatchRequest>,
355) -> Result<Json<IngestEventsBatchResponse>> {
356 let total = req.events.len();
357 let mut ingested_events = Vec::with_capacity(total);
358
359 for event_req in req.events {
360 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
361 let expected_version = event_req.expected_version;
362
363 let event = Event::from_strings(
364 event_req.event_type,
365 event_req.entity_id,
366 tenant_id,
367 event_req.payload,
368 event_req.metadata,
369 )?;
370
371 let event_id = event.id;
372 let timestamp = event.timestamp;
373
374 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
375
376 ingested_events.push(IngestEventResponse {
377 event_id,
378 timestamp,
379 version: Some(new_version),
380 });
381 }
382
383 let ingested = ingested_events.len();
384 tracing::info!("Batch ingested {} events", ingested);
385
386 Ok(Json(IngestEventsBatchResponse {
387 total,
388 ingested,
389 events: ingested_events,
390 }))
391}
392
393pub async fn ingest_events_batch_v1(
399 State(state): State<AppState>,
400 Json(req): Json<IngestEventsBatchRequest>,
401) -> Result<Json<IngestEventsBatchResponse>> {
402 let total = req.events.len();
403 let mut ingested_events = Vec::with_capacity(total);
404
405 for event_req in req.events {
406 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
407 let expected_version = event_req.expected_version;
408
409 let event = Event::from_strings(
410 event_req.event_type,
411 event_req.entity_id,
412 tenant_id,
413 event_req.payload,
414 event_req.metadata,
415 )?;
416
417 let event_id = event.id;
418 let timestamp = event.timestamp;
419
420 let new_version = state
421 .store
422 .ingest_with_expected_version(&event, expected_version)?;
423
424 ingested_events.push(IngestEventResponse {
425 event_id,
426 timestamp,
427 version: Some(new_version),
428 });
429 }
430
431 await_replication_ack(&state).await;
433
434 let ingested = ingested_events.len();
435 tracing::info!("Batch ingested {} events", ingested);
436
437 Ok(Json(IngestEventsBatchResponse {
438 total,
439 ingested,
440 events: ingested_events,
441 }))
442}
443
444#[derive(Debug, Deserialize)]
448pub struct EventOrderParam {
449 pub order: Option<String>,
451}
452
453pub async fn query_events(
454 OptionalAuth(auth): OptionalAuth,
455 Query(req): Query<QueryEventsRequest>,
456 Query(order_param): Query<EventOrderParam>,
457 State(store): State<SharedStore>,
458) -> Result<Json<QueryEventsResponse>> {
459 let requested_limit = req.limit;
460 let queried_entity_id = req.entity_id.clone();
461
462 let descending = match order_param.order.as_deref() {
467 None => false,
468 Some(o) if o.eq_ignore_ascii_case("asc") => false,
469 Some(o) if o.eq_ignore_ascii_case("desc") => true,
470 Some(other) => {
471 return Err(crate::error::AllSourceError::InvalidInput(format!(
472 "invalid 'order' value '{other}': expected 'asc' or 'desc'"
473 )));
474 }
475 };
476
477 let enforced_tenant = req
485 .tenant_id
486 .clone()
487 .or_else(|| auth.as_ref().map(|a| a.tenant_id().to_string()));
488
489 let unlimited_req = QueryEventsRequest {
491 entity_id: req.entity_id,
492 event_type: req.event_type,
493 tenant_id: enforced_tenant,
494 as_of: req.as_of,
495 since: req.since,
496 until: req.until,
497 limit: None,
498 event_type_prefix: req.event_type_prefix,
499 payload_filter: req.payload_filter,
500 };
501 let mut all_events = store.query(&unlimited_req)?;
502 let total_count = all_events.len();
503
504 if descending {
507 all_events.reverse();
508 }
509
510 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
512 all_events.into_iter().take(limit).collect()
513 } else {
514 all_events
515 };
516
517 let count = limited_events.len();
518 let has_more = count < total_count;
519 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
520
521 let entity_version = queried_entity_id
523 .as_deref()
524 .map(|eid| store.get_entity_version(eid));
525
526 tracing::debug!("Query returned {} events (total: {})", count, total_count);
527
528 Ok(Json(QueryEventsResponse {
529 events,
530 count,
531 total_count,
532 has_more,
533 entity_version,
534 }))
535}
536
537pub async fn list_entities(
538 State(store): State<SharedStore>,
539 Query(req): Query<ListEntitiesRequest>,
540) -> Result<Json<ListEntitiesResponse>> {
541 use std::collections::HashMap;
542
543 let query_req = QueryEventsRequest {
545 entity_id: None,
546 event_type: None,
547 tenant_id: None,
548 as_of: None,
549 since: None,
550 until: None,
551 limit: None,
552 event_type_prefix: req.event_type_prefix,
553 payload_filter: req.payload_filter,
554 };
555 let events = store.query(&query_req)?;
556
557 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
559 for event in &events {
560 entity_map
561 .entry(event.entity_id().to_string())
562 .or_default()
563 .push(event);
564 }
565
566 let ascending = match req.order.as_deref() {
570 None => false,
571 Some(o) if o.eq_ignore_ascii_case("desc") => false,
572 Some(o) if o.eq_ignore_ascii_case("asc") => true,
573 Some(other) => {
574 return Err(crate::error::AllSourceError::InvalidInput(format!(
575 "invalid 'order' value '{other}': expected 'asc' or 'desc'"
576 )));
577 }
578 };
579
580 let mut summaries: Vec<EntitySummary> = entity_map
584 .into_iter()
585 .map(|(entity_id, events)| {
586 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
587 EntitySummary {
588 entity_id,
589 event_count: events.len(),
590 last_event_type: last.event_type_str().to_string(),
591 last_event_at: last.timestamp(),
592 }
593 })
594 .collect();
595 summaries.sort_by(|a, b| {
596 let by_time = a.last_event_at.cmp(&b.last_event_at);
597 let by_time = if ascending {
598 by_time
599 } else {
600 by_time.reverse()
601 };
602 by_time.then_with(|| a.entity_id.cmp(&b.entity_id))
603 });
604
605 let total = summaries.len();
606
607 let offset = req.offset.unwrap_or(0);
609 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
610 let summaries = if let Some(limit) = req.limit {
611 let has_more = summaries.len() > limit;
612 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
613 return Ok(Json(ListEntitiesResponse {
614 entities: truncated,
615 total,
616 has_more,
617 }));
618 } else {
619 summaries
620 };
621
622 Ok(Json(ListEntitiesResponse {
623 entities: summaries,
624 total,
625 has_more: false,
626 }))
627}
628
629pub async fn detect_duplicates(
630 State(store): State<SharedStore>,
631 Query(req): Query<DetectDuplicatesRequest>,
632) -> Result<Json<DetectDuplicatesResponse>> {
633 use std::collections::HashMap;
634
635 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
636
637 let query_req = QueryEventsRequest {
639 entity_id: None,
640 event_type: None,
641 tenant_id: None,
642 as_of: None,
643 since: None,
644 until: None,
645 limit: None,
646 event_type_prefix: Some(req.event_type_prefix),
647 payload_filter: None,
648 };
649 let events = store.query(&query_req)?;
650
651 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
654 for event in &events {
655 let eid = event.entity_id().to_string();
656 entity_latest
657 .entry(eid)
658 .and_modify(|existing| {
659 if event.timestamp() > existing.timestamp() {
660 *existing = event;
661 }
662 })
663 .or_insert(event);
664 }
665
666 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
668 for (entity_id, event) in &entity_latest {
669 let payload = event.payload();
670 let mut key_parts = serde_json::Map::new();
671 for field in &group_by_fields {
672 let value = payload
673 .get(*field)
674 .cloned()
675 .unwrap_or(serde_json::Value::Null);
676 key_parts.insert((*field).to_string(), value);
677 }
678 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
679 groups.entry(key_str).or_default().push(entity_id.clone());
680 }
681
682 let mut duplicate_groups: Vec<DuplicateGroup> = groups
684 .into_iter()
685 .filter(|(_, ids)| ids.len() > 1)
686 .map(|(key_str, mut ids)| {
687 ids.sort();
688 let key: serde_json::Value =
689 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
690 let count = ids.len();
691 DuplicateGroup {
692 key,
693 entity_ids: ids,
694 count,
695 }
696 })
697 .collect();
698
699 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
701
702 let total = duplicate_groups.len();
703
704 let offset = req.offset.unwrap_or(0);
706 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
707
708 if let Some(limit) = req.limit {
709 let has_more = duplicate_groups.len() > limit;
710 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
711 return Ok(Json(DetectDuplicatesResponse {
712 duplicates: truncated,
713 total,
714 has_more,
715 }));
716 }
717
718 Ok(Json(DetectDuplicatesResponse {
719 duplicates: duplicate_groups,
720 total,
721 has_more: false,
722 }))
723}
724
725#[derive(Deserialize)]
726pub struct EntityStateParams {
727 as_of: Option<chrono::DateTime<chrono::Utc>>,
728}
729
730pub async fn get_entity_state(
731 State(store): State<SharedStore>,
732 Path(entity_id): Path<String>,
733 Query(params): Query<EntityStateParams>,
734) -> Result<Json<serde_json::Value>> {
735 let state = store.reconstruct_state(&entity_id, params.as_of)?;
736
737 tracing::info!("State reconstructed for entity: {}", entity_id);
738
739 Ok(Json(state))
740}
741
742pub async fn get_entity_snapshot(
743 State(store): State<SharedStore>,
744 Path(entity_id): Path<String>,
745) -> Result<Json<serde_json::Value>> {
746 let snapshot = store.get_snapshot(&entity_id)?;
747
748 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
749
750 Ok(Json(snapshot))
751}
752
753pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
754 let stats = store.stats();
755 Json(stats)
756}
757
758#[derive(Debug, Deserialize)]
761pub struct ListStreamsParams {
762 pub limit: Option<usize>,
764 pub offset: Option<usize>,
766}
767
768#[derive(Debug, serde::Serialize)]
770pub struct ListStreamsResponse {
771 pub streams: Vec<StreamInfo>,
772 pub total: usize,
773}
774
775pub async fn list_streams(
776 State(store): State<SharedStore>,
777 Query(params): Query<ListStreamsParams>,
778) -> Json<ListStreamsResponse> {
779 let mut streams = store.list_streams();
780 let total = streams.len();
781
782 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
784
785 if let Some(offset) = params.offset {
787 if offset < streams.len() {
788 streams = streams[offset..].to_vec();
789 } else {
790 streams = vec![];
791 }
792 }
793
794 if let Some(limit) = params.limit {
795 streams.truncate(limit);
796 }
797
798 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
799
800 Json(ListStreamsResponse { streams, total })
801}
802
803#[derive(Debug, Deserialize)]
806pub struct ListEventTypesParams {
807 pub limit: Option<usize>,
809 pub offset: Option<usize>,
811}
812
813#[derive(Debug, serde::Serialize)]
815pub struct ListEventTypesResponse {
816 pub event_types: Vec<EventTypeInfo>,
817 pub total: usize,
818}
819
820pub async fn list_event_types(
821 State(store): State<SharedStore>,
822 Query(params): Query<ListEventTypesParams>,
823) -> Json<ListEventTypesResponse> {
824 let mut event_types = store.list_event_types();
825 let total = event_types.len();
826
827 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
829
830 if let Some(offset) = params.offset {
832 if offset < event_types.len() {
833 event_types = event_types[offset..].to_vec();
834 } else {
835 event_types = vec![];
836 }
837 }
838
839 if let Some(limit) = params.limit {
840 event_types.truncate(limit);
841 }
842
843 tracing::debug!(
844 "Listed {} event types (total: {})",
845 event_types.len(),
846 total
847 );
848
849 Json(ListEventTypesResponse { event_types, total })
850}
851
852#[derive(Debug, Deserialize)]
854pub struct WebSocketParams {
855 pub consumer_id: Option<String>,
856}
857
858pub async fn events_websocket(
859 ws: WebSocketUpgrade,
860 State(store): State<SharedStore>,
861 Query(params): Query<WebSocketParams>,
862) -> Response {
863 let websocket_manager = store.websocket_manager();
864
865 ws.on_upgrade(move |socket| async move {
866 if let Some(consumer_id) = params.consumer_id {
867 websocket_manager
868 .handle_socket_with_consumer(socket, consumer_id, store)
869 .await;
870 } else {
871 websocket_manager.handle_socket(socket).await;
872 }
873 })
874}
875
876pub async fn analytics_frequency(
878 State(store): State<SharedStore>,
879 Query(req): Query<EventFrequencyRequest>,
880) -> Result<Json<EventFrequencyResponse>> {
881 let response = AnalyticsEngine::event_frequency(&store, &req)?;
882
883 tracing::debug!(
884 "Frequency analysis returned {} buckets",
885 response.buckets.len()
886 );
887
888 Ok(Json(response))
889}
890
891pub async fn analytics_summary(
893 State(store): State<SharedStore>,
894 Query(req): Query<StatsSummaryRequest>,
895) -> Result<Json<StatsSummaryResponse>> {
896 let response = AnalyticsEngine::stats_summary(&store, &req)?;
897
898 tracing::debug!(
899 "Stats summary: {} events across {} entities",
900 response.total_events,
901 response.unique_entities
902 );
903
904 Ok(Json(response))
905}
906
907pub async fn analytics_correlation(
909 State(store): State<SharedStore>,
910 Query(req): Query<CorrelationRequest>,
911) -> Result<Json<CorrelationResponse>> {
912 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
913
914 tracing::debug!(
915 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
916 response.correlated_pairs,
917 response.total_a,
918 response.correlation_percentage
919 );
920
921 Ok(Json(response))
922}
923
924pub async fn create_snapshot(
926 State(store): State<SharedStore>,
927 Json(req): Json<CreateSnapshotRequest>,
928) -> Result<Json<CreateSnapshotResponse>> {
929 store.create_snapshot(&req.entity_id)?;
930
931 let snapshot_manager = store.snapshot_manager();
932 let snapshot = snapshot_manager
933 .get_latest_snapshot(&req.entity_id)
934 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
935
936 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
937
938 Ok(Json(CreateSnapshotResponse {
939 snapshot_id: snapshot.id,
940 entity_id: snapshot.entity_id,
941 created_at: snapshot.created_at,
942 event_count: snapshot.event_count,
943 size_bytes: snapshot.metadata.size_bytes,
944 }))
945}
946
947pub async fn list_snapshots(
949 State(store): State<SharedStore>,
950 Query(req): Query<ListSnapshotsRequest>,
951) -> Result<Json<ListSnapshotsResponse>> {
952 let snapshot_manager = store.snapshot_manager();
953
954 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
955 snapshot_manager
956 .get_all_snapshots(&entity_id)
957 .into_iter()
958 .map(SnapshotInfo::from)
959 .collect()
960 } else {
961 let entities = snapshot_manager.list_entities();
963 entities
964 .iter()
965 .flat_map(|entity_id| {
966 snapshot_manager
967 .get_all_snapshots(entity_id)
968 .into_iter()
969 .map(SnapshotInfo::from)
970 })
971 .collect()
972 };
973
974 let total = snapshots.len();
975
976 tracing::debug!("Listed {} snapshots", total);
977
978 Ok(Json(ListSnapshotsResponse { snapshots, total }))
979}
980
981pub async fn get_latest_snapshot(
983 State(store): State<SharedStore>,
984 Path(entity_id): Path<String>,
985) -> Result<Json<serde_json::Value>> {
986 let snapshot_manager = store.snapshot_manager();
987
988 let snapshot = snapshot_manager
989 .get_latest_snapshot(&entity_id)
990 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
991
992 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
993
994 Ok(Json(serde_json::json!({
995 "snapshot_id": snapshot.id,
996 "entity_id": snapshot.entity_id,
997 "created_at": snapshot.created_at,
998 "as_of": snapshot.as_of,
999 "event_count": snapshot.event_count,
1000 "size_bytes": snapshot.metadata.size_bytes,
1001 "snapshot_type": snapshot.metadata.snapshot_type,
1002 "state": snapshot.state
1003 })))
1004}
1005
1006pub async fn trigger_compaction(
1008 State(store): State<SharedStore>,
1009) -> Result<Json<CompactionResult>> {
1010 let compaction_manager = store.compaction_manager().ok_or_else(|| {
1011 crate::error::AllSourceError::InternalError(
1012 "Compaction not enabled (no Parquet storage)".to_string(),
1013 )
1014 })?;
1015
1016 tracing::info!("📦 Manual compaction triggered via API");
1017
1018 let result = compaction_manager.compact_now()?;
1019
1020 Ok(Json(result))
1021}
1022
1023pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
1025 let compaction_manager = store.compaction_manager().ok_or_else(|| {
1026 crate::error::AllSourceError::InternalError(
1027 "Compaction not enabled (no Parquet storage)".to_string(),
1028 )
1029 })?;
1030
1031 let stats = compaction_manager.stats();
1032 let config = compaction_manager.config();
1033
1034 Ok(Json(serde_json::json!({
1035 "stats": stats,
1036 "config": {
1037 "min_files_to_compact": config.min_files_to_compact,
1038 "target_file_size": config.target_file_size,
1039 "max_file_size": config.max_file_size,
1040 "small_file_threshold": config.small_file_threshold,
1041 "compaction_interval_seconds": config.compaction_interval_seconds,
1042 "auto_compact": config.auto_compact,
1043 "strategy": config.strategy
1044 }
1045 })))
1046}
1047
1048pub async fn register_schema(
1050 State(store): State<SharedStore>,
1051 Json(req): Json<RegisterSchemaRequest>,
1052) -> Result<Json<RegisterSchemaResponse>> {
1053 let schema_registry = store.schema_registry();
1054
1055 let response =
1056 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
1057
1058 tracing::info!(
1059 "📋 Schema registered: v{} for '{}'",
1060 response.version,
1061 response.subject
1062 );
1063
1064 Ok(Json(response))
1065}
1066
1067#[derive(Deserialize)]
1069pub struct GetSchemaParams {
1070 version: Option<u32>,
1071}
1072
1073pub async fn get_schema(
1074 State(store): State<SharedStore>,
1075 Path(subject): Path<String>,
1076 Query(params): Query<GetSchemaParams>,
1077) -> Result<Json<serde_json::Value>> {
1078 let schema_registry = store.schema_registry();
1079
1080 let schema = schema_registry.get_schema(&subject, params.version)?;
1081
1082 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1083
1084 Ok(Json(serde_json::json!({
1085 "id": schema.id,
1086 "subject": schema.subject,
1087 "version": schema.version,
1088 "schema": schema.schema,
1089 "created_at": schema.created_at,
1090 "description": schema.description,
1091 "tags": schema.tags
1092 })))
1093}
1094
1095pub async fn list_schema_versions(
1097 State(store): State<SharedStore>,
1098 Path(subject): Path<String>,
1099) -> Result<Json<serde_json::Value>> {
1100 let schema_registry = store.schema_registry();
1101
1102 let versions = schema_registry.list_versions(&subject)?;
1103
1104 Ok(Json(serde_json::json!({
1105 "subject": subject,
1106 "versions": versions
1107 })))
1108}
1109
1110pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1112 let schema_registry = store.schema_registry();
1113
1114 let subjects = schema_registry.list_subjects();
1115
1116 Json(serde_json::json!({
1117 "subjects": subjects,
1118 "total": subjects.len()
1119 }))
1120}
1121
1122pub async fn validate_event_schema(
1124 State(store): State<SharedStore>,
1125 Json(req): Json<ValidateEventRequest>,
1126) -> Result<Json<ValidateEventResponse>> {
1127 let schema_registry = store.schema_registry();
1128
1129 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1130
1131 if response.valid {
1132 tracing::debug!(
1133 "✅ Event validated against schema '{}' v{}",
1134 req.subject,
1135 response.schema_version
1136 );
1137 } else {
1138 tracing::warn!(
1139 "❌ Event validation failed for '{}': {:?}",
1140 req.subject,
1141 response.errors
1142 );
1143 }
1144
1145 Ok(Json(response))
1146}
1147
1148#[derive(Deserialize)]
1150pub struct SetCompatibilityRequest {
1151 compatibility: CompatibilityMode,
1152}
1153
1154pub async fn set_compatibility_mode(
1155 State(store): State<SharedStore>,
1156 Path(subject): Path<String>,
1157 Json(req): Json<SetCompatibilityRequest>,
1158) -> Json<serde_json::Value> {
1159 let schema_registry = store.schema_registry();
1160
1161 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1162
1163 tracing::info!(
1164 "🔧 Set compatibility mode for '{}' to {:?}",
1165 subject,
1166 req.compatibility
1167 );
1168
1169 Json(serde_json::json!({
1170 "subject": subject,
1171 "compatibility": req.compatibility
1172 }))
1173}
1174
1175pub async fn start_replay(
1177 State(store): State<SharedStore>,
1178 Json(req): Json<StartReplayRequest>,
1179) -> Result<Json<StartReplayResponse>> {
1180 let replay_manager = store.replay_manager();
1181
1182 let response = replay_manager.start_replay(store, req)?;
1183
1184 tracing::info!(
1185 "🔄 Started replay {} with {} events",
1186 response.replay_id,
1187 response.total_events
1188 );
1189
1190 Ok(Json(response))
1191}
1192
1193pub async fn get_replay_progress(
1195 State(store): State<SharedStore>,
1196 Path(replay_id): Path<uuid::Uuid>,
1197) -> Result<Json<ReplayProgress>> {
1198 let replay_manager = store.replay_manager();
1199
1200 let progress = replay_manager.get_progress(replay_id)?;
1201
1202 Ok(Json(progress))
1203}
1204
1205pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1207 let replay_manager = store.replay_manager();
1208
1209 let replays = replay_manager.list_replays();
1210
1211 Json(serde_json::json!({
1212 "replays": replays,
1213 "total": replays.len()
1214 }))
1215}
1216
1217pub async fn cancel_replay(
1219 State(store): State<SharedStore>,
1220 Path(replay_id): Path<uuid::Uuid>,
1221) -> Result<Json<serde_json::Value>> {
1222 let replay_manager = store.replay_manager();
1223
1224 replay_manager.cancel_replay(replay_id)?;
1225
1226 tracing::info!("🛑 Cancelled replay {}", replay_id);
1227
1228 Ok(Json(serde_json::json!({
1229 "replay_id": replay_id,
1230 "status": "cancelled"
1231 })))
1232}
1233
1234pub async fn delete_replay(
1236 State(store): State<SharedStore>,
1237 Path(replay_id): Path<uuid::Uuid>,
1238) -> Result<Json<serde_json::Value>> {
1239 let replay_manager = store.replay_manager();
1240
1241 let deleted = replay_manager.delete_replay(replay_id)?;
1242
1243 if deleted {
1244 tracing::info!("🗑️ Deleted replay {}", replay_id);
1245 }
1246
1247 Ok(Json(serde_json::json!({
1248 "replay_id": replay_id,
1249 "deleted": deleted
1250 })))
1251}
1252
1253pub async fn register_pipeline(
1255 State(store): State<SharedStore>,
1256 Json(config): Json<PipelineConfig>,
1257) -> Result<Json<serde_json::Value>> {
1258 let pipeline_manager = store.pipeline_manager();
1259
1260 let pipeline_id = pipeline_manager.register(config.clone());
1261
1262 tracing::info!(
1263 "🔀 Pipeline registered: {} (name: {})",
1264 pipeline_id,
1265 config.name
1266 );
1267
1268 Ok(Json(serde_json::json!({
1269 "pipeline_id": pipeline_id,
1270 "name": config.name,
1271 "enabled": config.enabled
1272 })))
1273}
1274
1275pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1277 let pipeline_manager = store.pipeline_manager();
1278
1279 let pipelines = pipeline_manager.list();
1280
1281 tracing::debug!("Listed {} pipelines", pipelines.len());
1282
1283 Json(serde_json::json!({
1284 "pipelines": pipelines,
1285 "total": pipelines.len()
1286 }))
1287}
1288
1289pub async fn get_pipeline(
1291 State(store): State<SharedStore>,
1292 Path(pipeline_id): Path<uuid::Uuid>,
1293) -> Result<Json<PipelineConfig>> {
1294 let pipeline_manager = store.pipeline_manager();
1295
1296 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1297 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1298 })?;
1299
1300 Ok(Json(pipeline.config().clone()))
1301}
1302
1303pub async fn remove_pipeline(
1305 State(store): State<SharedStore>,
1306 Path(pipeline_id): Path<uuid::Uuid>,
1307) -> Result<Json<serde_json::Value>> {
1308 let pipeline_manager = store.pipeline_manager();
1309
1310 let removed = pipeline_manager.remove(pipeline_id);
1311
1312 if removed {
1313 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1314 }
1315
1316 Ok(Json(serde_json::json!({
1317 "pipeline_id": pipeline_id,
1318 "removed": removed
1319 })))
1320}
1321
1322pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1324 let pipeline_manager = store.pipeline_manager();
1325
1326 let stats = pipeline_manager.all_stats();
1327
1328 Json(serde_json::json!({
1329 "stats": stats,
1330 "total": stats.len()
1331 }))
1332}
1333
1334pub async fn get_pipeline_stats(
1336 State(store): State<SharedStore>,
1337 Path(pipeline_id): Path<uuid::Uuid>,
1338) -> Result<Json<PipelineStats>> {
1339 let pipeline_manager = store.pipeline_manager();
1340
1341 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1342 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1343 })?;
1344
1345 Ok(Json(pipeline.stats()))
1346}
1347
1348pub async fn reset_pipeline(
1350 State(store): State<SharedStore>,
1351 Path(pipeline_id): Path<uuid::Uuid>,
1352) -> Result<Json<serde_json::Value>> {
1353 let pipeline_manager = store.pipeline_manager();
1354
1355 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1356 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1357 })?;
1358
1359 pipeline.reset();
1360
1361 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1362
1363 Ok(Json(serde_json::json!({
1364 "pipeline_id": pipeline_id,
1365 "reset": true
1366 })))
1367}
1368
1369pub async fn get_event_by_id(
1375 State(store): State<SharedStore>,
1376 Path(event_id): Path<uuid::Uuid>,
1377) -> Result<Json<serde_json::Value>> {
1378 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1379 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1380 })?;
1381
1382 let dto = EventDto::from(&event);
1383
1384 tracing::debug!("Event retrieved by ID: {}", event_id);
1385
1386 Ok(Json(serde_json::json!({
1387 "event": dto,
1388 "found": true
1389 })))
1390}
1391
1392pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1398 let projection_manager = store.projection_manager();
1399 let status_map = store.projection_status();
1400
1401 let projections: Vec<serde_json::Value> = projection_manager
1402 .list_projections()
1403 .iter()
1404 .map(|(name, projection)| {
1405 let status = status_map
1406 .get(name)
1407 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1408 serde_json::json!({
1409 "name": name,
1410 "type": format!("{:?}", projection.name()),
1411 "status": status,
1412 })
1413 })
1414 .collect();
1415
1416 tracing::debug!("Listed {} projections", projections.len());
1417
1418 Json(serde_json::json!({
1419 "projections": projections,
1420 "total": projections.len()
1421 }))
1422}
1423
1424pub async fn get_projection(
1426 State(store): State<SharedStore>,
1427 Path(name): Path<String>,
1428) -> Result<Json<serde_json::Value>> {
1429 let projection_manager = store.projection_manager();
1430
1431 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1432 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1433 })?;
1434
1435 Ok(Json(serde_json::json!({
1436 "name": projection.name(),
1437 "found": true
1438 })))
1439}
1440
1441pub async fn get_projection_state(
1454 State(store): State<SharedStore>,
1455 Path((name, entity_id)): Path<(String, String)>,
1456) -> Result<Json<serde_json::Value>> {
1457 let state = store
1458 .projection_manager()
1459 .get_projection(&name)
1460 .and_then(|p| p.get_state(&entity_id))
1461 .or_else(|| {
1462 store
1463 .projection_state_cache()
1464 .get(&format!("{name}:{entity_id}"))
1465 .map(|entry| entry.value().clone())
1466 });
1467
1468 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1469
1470 Ok(Json(serde_json::json!({
1471 "projection": name,
1472 "entity_id": entity_id,
1473 "state": state,
1474 "found": state.is_some()
1475 })))
1476}
1477
1478pub async fn delete_projection(
1483 State(store): State<SharedStore>,
1484 Path(name): Path<String>,
1485) -> Result<Json<serde_json::Value>> {
1486 let projection_manager = store.projection_manager();
1487
1488 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1489 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1490 })?;
1491
1492 projection.clear();
1493
1494 let cache = store.projection_state_cache();
1496 let prefix = format!("{name}:");
1497 let keys_to_remove: Vec<String> = cache
1498 .iter()
1499 .filter(|entry| entry.key().starts_with(&prefix))
1500 .map(|entry| entry.key().clone())
1501 .collect();
1502 for key in keys_to_remove {
1503 cache.remove(&key);
1504 }
1505
1506 tracing::info!("Projection deleted (cleared): {}", name);
1507
1508 Ok(Json(serde_json::json!({
1509 "projection": name,
1510 "deleted": true
1511 })))
1512}
1513
1514pub async fn get_projection_state_summary(
1523 State(store): State<SharedStore>,
1524 Path(name): Path<String>,
1525) -> Result<Json<serde_json::Value>> {
1526 let cache = store.projection_state_cache();
1527 let prefix = format!("{name}:");
1528 let states: Vec<serde_json::Value> = cache
1529 .iter()
1530 .filter(|entry| entry.key().starts_with(&prefix))
1531 .map(|entry| {
1532 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1533 serde_json::json!({
1534 "entity_id": entity_id,
1535 "state": entry.value().clone()
1536 })
1537 })
1538 .collect();
1539
1540 let total = states.len();
1541
1542 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1543
1544 Ok(Json(serde_json::json!({
1545 "projection": name,
1546 "states": states,
1547 "total": total
1548 })))
1549}
1550
1551pub async fn reset_projection(
1555 State(store): State<SharedStore>,
1556 Path(name): Path<String>,
1557) -> Result<Json<serde_json::Value>> {
1558 let reprocessed = store.reset_projection(&name)?;
1559
1560 tracing::info!(
1561 "Projection reset: {} ({} events reprocessed)",
1562 name,
1563 reprocessed
1564 );
1565
1566 Ok(Json(serde_json::json!({
1567 "projection": name,
1568 "reset": true,
1569 "events_reprocessed": reprocessed
1570 })))
1571}
1572
1573pub async fn pause_projection(
1577 State(store): State<SharedStore>,
1578 Path(name): Path<String>,
1579) -> Result<Json<serde_json::Value>> {
1580 let projection_manager = store.projection_manager();
1581
1582 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1584 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1585 })?;
1586
1587 store
1588 .projection_status()
1589 .insert(name.clone(), "paused".to_string());
1590
1591 tracing::info!("Projection paused: {}", name);
1592
1593 Ok(Json(serde_json::json!({
1594 "projection": name,
1595 "status": "paused"
1596 })))
1597}
1598
1599pub async fn start_projection(
1603 State(store): State<SharedStore>,
1604 Path(name): Path<String>,
1605) -> Result<Json<serde_json::Value>> {
1606 let projection_manager = store.projection_manager();
1607
1608 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1610 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1611 })?;
1612
1613 store
1614 .projection_status()
1615 .insert(name.clone(), "running".to_string());
1616
1617 tracing::info!("Projection started: {}", name);
1618
1619 Ok(Json(serde_json::json!({
1620 "projection": name,
1621 "status": "running"
1622 })))
1623}
1624
1625#[derive(Debug, Deserialize)]
1627pub struct SaveProjectionStateRequest {
1628 pub state: serde_json::Value,
1629}
1630
1631pub async fn save_projection_state(
1636 State(store): State<SharedStore>,
1637 Path((name, entity_id)): Path<(String, String)>,
1638 Json(req): Json<SaveProjectionStateRequest>,
1639) -> Result<Json<serde_json::Value>> {
1640 let projection_cache = store.projection_state_cache();
1641
1642 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1644
1645 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1646
1647 Ok(Json(serde_json::json!({
1648 "projection": name,
1649 "entity_id": entity_id,
1650 "saved": true
1651 })))
1652}
1653
1654#[derive(Debug, Deserialize)]
1658pub struct BulkGetStateRequest {
1659 pub entity_ids: Vec<String>,
1660}
1661
1662#[derive(Debug, Deserialize)]
1666pub struct BulkSaveStateRequest {
1667 pub states: Vec<BulkSaveStateItem>,
1668}
1669
1670#[derive(Debug, Deserialize)]
1671pub struct BulkSaveStateItem {
1672 pub entity_id: String,
1673 pub state: serde_json::Value,
1674}
1675
1676pub async fn bulk_get_projection_states(
1677 State(store): State<SharedStore>,
1678 Path(name): Path<String>,
1679 Json(req): Json<BulkGetStateRequest>,
1680) -> Result<Json<serde_json::Value>> {
1681 let projection = store.projection_manager().get_projection(&name);
1685 let cache = store.projection_state_cache();
1686
1687 let states: Vec<serde_json::Value> = req
1688 .entity_ids
1689 .iter()
1690 .map(|entity_id| {
1691 let state = projection
1692 .as_ref()
1693 .and_then(|p| p.get_state(entity_id))
1694 .or_else(|| {
1695 cache
1696 .get(&format!("{name}:{entity_id}"))
1697 .map(|entry| entry.value().clone())
1698 });
1699 serde_json::json!({
1700 "entity_id": entity_id,
1701 "state": state,
1702 "found": state.is_some()
1703 })
1704 })
1705 .collect();
1706
1707 tracing::debug!(
1708 "Bulk projection state retrieved: {} entities from {}",
1709 states.len(),
1710 name
1711 );
1712
1713 Ok(Json(serde_json::json!({
1714 "projection": name,
1715 "states": states,
1716 "total": states.len()
1717 })))
1718}
1719
1720pub async fn bulk_save_projection_states(
1725 State(store): State<SharedStore>,
1726 Path(name): Path<String>,
1727 Json(req): Json<BulkSaveStateRequest>,
1728) -> Result<Json<serde_json::Value>> {
1729 let projection_cache = store.projection_state_cache();
1730
1731 let mut saved_count = 0;
1732 for item in &req.states {
1733 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1734 saved_count += 1;
1735 }
1736
1737 tracing::info!(
1738 "Bulk projection state saved: {} entities for {}",
1739 saved_count,
1740 name
1741 );
1742
1743 Ok(Json(serde_json::json!({
1744 "projection": name,
1745 "saved": saved_count,
1746 "total": req.states.len()
1747 })))
1748}
1749
1750#[derive(Debug, Deserialize)]
1756pub struct ListWebhooksParams {
1757 pub tenant_id: Option<String>,
1758}
1759
1760pub async fn register_webhook(
1762 State(store): State<SharedStore>,
1763 Json(req): Json<RegisterWebhookRequest>,
1764) -> Json<serde_json::Value> {
1765 let registry = store.webhook_registry();
1766 let webhook = registry.register(req);
1767
1768 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1769
1770 Json(serde_json::json!({
1771 "webhook": webhook,
1772 "created": true
1773 }))
1774}
1775
1776pub async fn list_webhooks(
1778 State(store): State<SharedStore>,
1779 Query(params): Query<ListWebhooksParams>,
1780) -> Json<serde_json::Value> {
1781 let registry = store.webhook_registry();
1782
1783 let webhooks = if let Some(tenant_id) = params.tenant_id {
1784 registry.list_by_tenant(&tenant_id)
1785 } else {
1786 vec![]
1788 };
1789
1790 let total = webhooks.len();
1791
1792 Json(serde_json::json!({
1793 "webhooks": webhooks,
1794 "total": total
1795 }))
1796}
1797
1798pub async fn get_webhook(
1800 State(store): State<SharedStore>,
1801 Path(webhook_id): Path<uuid::Uuid>,
1802) -> Result<Json<serde_json::Value>> {
1803 let registry = store.webhook_registry();
1804
1805 let webhook = registry.get(webhook_id).ok_or_else(|| {
1806 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1807 })?;
1808
1809 Ok(Json(serde_json::json!({
1810 "webhook": webhook,
1811 "found": true
1812 })))
1813}
1814
1815pub async fn update_webhook(
1817 State(store): State<SharedStore>,
1818 Path(webhook_id): Path<uuid::Uuid>,
1819 Json(req): Json<UpdateWebhookRequest>,
1820) -> Result<Json<serde_json::Value>> {
1821 let registry = store.webhook_registry();
1822
1823 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1824 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1825 })?;
1826
1827 tracing::info!("Webhook updated: {}", webhook_id);
1828
1829 Ok(Json(serde_json::json!({
1830 "webhook": webhook,
1831 "updated": true
1832 })))
1833}
1834
1835pub async fn delete_webhook(
1837 State(store): State<SharedStore>,
1838 Path(webhook_id): Path<uuid::Uuid>,
1839) -> Result<Json<serde_json::Value>> {
1840 let registry = store.webhook_registry();
1841
1842 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1843 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1844 })?;
1845
1846 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1847
1848 Ok(Json(serde_json::json!({
1849 "webhook_id": webhook_id,
1850 "deleted": true
1851 })))
1852}
1853
1854#[derive(Debug, Deserialize)]
1856pub struct ListDeliveriesParams {
1857 pub limit: Option<usize>,
1858}
1859
1860pub async fn list_webhook_deliveries(
1862 State(store): State<SharedStore>,
1863 Path(webhook_id): Path<uuid::Uuid>,
1864 Query(params): Query<ListDeliveriesParams>,
1865) -> Result<Json<serde_json::Value>> {
1866 let registry = store.webhook_registry();
1867
1868 registry.get(webhook_id).ok_or_else(|| {
1870 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1871 })?;
1872
1873 let limit = params.limit.unwrap_or(50);
1874 let deliveries = registry.get_deliveries(webhook_id, limit);
1875 let total = deliveries.len();
1876
1877 Ok(Json(serde_json::json!({
1878 "webhook_id": webhook_id,
1879 "deliveries": deliveries,
1880 "total": total
1881 })))
1882}
1883
1884#[cfg(feature = "analytics")]
1890pub async fn eventql_query(
1891 State(store): State<SharedStore>,
1892 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1893) -> Result<Json<serde_json::Value>> {
1894 let events = store.snapshot_events();
1895 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1896 Ok(response) => Ok(Json(serde_json::json!({
1897 "columns": response.columns,
1898 "rows": response.rows,
1899 "row_count": response.row_count,
1900 }))),
1901 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1902 }
1903}
1904
1905pub async fn graphql_query(
1907 State(store): State<SharedStore>,
1908 Json(req): Json<GraphQLRequest>,
1909) -> Json<serde_json::Value> {
1910 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1911 Ok(f) => f,
1912 Err(e) => {
1913 return Json(
1914 serde_json::to_value(GraphQLResponse {
1915 data: None,
1916 errors: vec![GraphQLError { message: e }],
1917 })
1918 .unwrap(),
1919 );
1920 }
1921 };
1922
1923 let mut data = serde_json::Map::new();
1924 let mut errors = Vec::new();
1925
1926 for field in &fields {
1927 match field.name.as_str() {
1928 "events" => {
1929 let request = crate::application::dto::QueryEventsRequest {
1930 entity_id: field.arguments.get("entity_id").cloned(),
1931 event_type: field.arguments.get("event_type").cloned(),
1932 tenant_id: field.arguments.get("tenant_id").cloned(),
1933 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1934 as_of: None,
1935 since: None,
1936 until: None,
1937 event_type_prefix: None,
1938 payload_filter: None,
1939 };
1940 match store.query(&request) {
1941 Ok(events) => {
1942 let json_events: Vec<serde_json::Value> = events
1943 .iter()
1944 .map(|e| {
1945 crate::infrastructure::query::graphql::event_to_json(
1946 e,
1947 &field.fields,
1948 )
1949 })
1950 .collect();
1951 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1952 }
1953 Err(e) => errors.push(GraphQLError {
1954 message: format!("events query failed: {e}"),
1955 }),
1956 }
1957 }
1958 "event" => {
1959 if let Some(id_str) = field.arguments.get("id") {
1960 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1961 match store.get_event_by_id(&id) {
1962 Ok(Some(event)) => {
1963 data.insert(
1964 "event".to_string(),
1965 crate::infrastructure::query::graphql::event_to_json(
1966 &event,
1967 &field.fields,
1968 ),
1969 );
1970 }
1971 Ok(None) => {
1972 data.insert("event".to_string(), serde_json::Value::Null);
1973 }
1974 Err(e) => errors.push(GraphQLError {
1975 message: format!("event lookup failed: {e}"),
1976 }),
1977 }
1978 } else {
1979 errors.push(GraphQLError {
1980 message: format!("Invalid UUID: {id_str}"),
1981 });
1982 }
1983 } else {
1984 errors.push(GraphQLError {
1985 message: "event query requires 'id' argument".to_string(),
1986 });
1987 }
1988 }
1989 "projections" => {
1990 let pm = store.projection_manager();
1991 let names: Vec<serde_json::Value> = pm
1992 .list_projections()
1993 .iter()
1994 .map(|(name, _)| serde_json::Value::String(name.clone()))
1995 .collect();
1996 data.insert("projections".to_string(), serde_json::Value::Array(names));
1997 }
1998 "stats" => {
1999 let stats = store.stats();
2000 data.insert(
2001 "stats".to_string(),
2002 serde_json::json!({
2003 "total_events": stats.total_events,
2004 "total_entities": stats.total_entities,
2005 "total_event_types": stats.total_event_types,
2006 }),
2007 );
2008 }
2009 "__schema" => {
2010 data.insert(
2011 "__schema".to_string(),
2012 crate::infrastructure::query::graphql::introspection_schema(),
2013 );
2014 }
2015 other => {
2016 errors.push(GraphQLError {
2017 message: format!("Unknown field: {other}"),
2018 });
2019 }
2020 }
2021 }
2022
2023 Json(
2024 serde_json::to_value(GraphQLResponse {
2025 data: Some(serde_json::Value::Object(data)),
2026 errors,
2027 })
2028 .unwrap(),
2029 )
2030}
2031
2032pub async fn geo_query(
2034 State(store): State<SharedStore>,
2035 Json(req): Json<GeoQueryRequest>,
2036) -> Json<serde_json::Value> {
2037 let events = store.snapshot_events();
2038 let geo_index = store.geo_index();
2039 let results =
2040 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
2041 let total = results.len();
2042 Json(serde_json::json!({
2043 "results": results,
2044 "total": total,
2045 }))
2046}
2047
2048pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2050 let stats = store.geo_index().stats();
2051 Json(serde_json::json!(stats))
2052}
2053
2054pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2056 let stats = store.exactly_once().stats();
2057 Json(serde_json::json!(stats))
2058}
2059
2060pub async fn schema_evolution_history(
2062 State(store): State<SharedStore>,
2063 Path(event_type): Path<String>,
2064) -> Json<serde_json::Value> {
2065 let mgr = store.schema_evolution();
2066 let history = mgr.get_history(&event_type);
2067 let version = mgr.get_version(&event_type);
2068 Json(serde_json::json!({
2069 "event_type": event_type,
2070 "current_version": version,
2071 "history": history,
2072 }))
2073}
2074
2075pub async fn schema_evolution_schema(
2077 State(store): State<SharedStore>,
2078 Path(event_type): Path<String>,
2079) -> Json<serde_json::Value> {
2080 let mgr = store.schema_evolution();
2081 if let Some(schema) = mgr.get_schema(&event_type) {
2082 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2083 Json(serde_json::json!({
2084 "event_type": event_type,
2085 "version": mgr.get_version(&event_type),
2086 "inferred_schema": schema,
2087 "json_schema": json_schema,
2088 }))
2089 } else {
2090 Json(serde_json::json!({
2091 "event_type": event_type,
2092 "error": "No schema inferred for this event type"
2093 }))
2094 }
2095}
2096
2097pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2099 let stats = store.schema_evolution().stats();
2100 let event_types = store.schema_evolution().list_event_types();
2101 Json(serde_json::json!({
2102 "stats": stats,
2103 "tracked_event_types": event_types,
2104 }))
2105}
2106
2107#[cfg(feature = "embedded-sync")]
2113pub async fn sync_pull_handler(
2114 State(state): State<AppState>,
2115 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2116) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2117 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2118
2119 let store = &state.store;
2120
2121 let since = request
2124 .version_vector
2125 .values()
2126 .map(|ts| ts.physical_ms)
2127 .min()
2128 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2129
2130 let events = store.query(&crate::application::dto::QueryEventsRequest {
2131 entity_id: None,
2132 event_type: None,
2133 tenant_id: None,
2134 as_of: None,
2135 since,
2136 until: None,
2137 limit: None,
2138 event_type_prefix: None,
2139 payload_filter: None,
2140 })?;
2141
2142 let mut replicated = Vec::with_capacity(events.len());
2144 let mut last_ms = 0u64;
2145 let mut logical = 0u32;
2146
2147 for event in &events {
2148 let event_ms = event.timestamp().timestamp_millis() as u64;
2149 if event_ms == last_ms {
2150 logical += 1;
2151 } else {
2152 last_ms = event_ms;
2153 logical = 0;
2154 }
2155
2156 replicated.push(ReplicatedEvent {
2157 event_id: event.id().to_string(),
2158 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2159 origin_region: "server".to_string(),
2160 event_data: serde_json::json!({
2161 "event_type": event.event_type_str(),
2162 "entity_id": event.entity_id_str(),
2163 "tenant_id": event.tenant_id_str(),
2164 "payload": event.payload,
2165 "metadata": event.metadata,
2166 }),
2167 });
2168 }
2169
2170 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2171 events: replicated,
2172 version_vector: std::collections::BTreeMap::new(),
2173 }))
2174}
2175
2176#[cfg(feature = "embedded-sync")]
2178pub async fn sync_push_handler(
2179 State(state): State<AppState>,
2180 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2181) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2182 let store = &state.store;
2183
2184 let mut accepted = 0usize;
2185 let mut skipped = 0usize;
2186
2187 for rep_event in &request.events {
2188 let event_data = &rep_event.event_data;
2189 let event_type = event_data
2190 .get("event_type")
2191 .and_then(|v| v.as_str())
2192 .unwrap_or("unknown")
2193 .to_string();
2194 let entity_id = event_data
2195 .get("entity_id")
2196 .and_then(|v| v.as_str())
2197 .unwrap_or("unknown")
2198 .to_string();
2199 let tenant_id = event_data
2200 .get("tenant_id")
2201 .and_then(|v| v.as_str())
2202 .unwrap_or("default")
2203 .to_string();
2204 let payload = event_data
2205 .get("payload")
2206 .cloned()
2207 .unwrap_or(serde_json::json!({}));
2208 let metadata = event_data.get("metadata").cloned();
2209
2210 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2211 Ok(domain_event) => {
2212 store.ingest(&domain_event)?;
2213 accepted += 1;
2214 }
2215 Err(_) => {
2216 skipped += 1;
2217 }
2218 }
2219 }
2220
2221 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2222 accepted,
2223 skipped,
2224 version_vector: std::collections::BTreeMap::new(),
2225 }))
2226}
2227
2228pub async fn register_consumer(
2234 State(store): State<SharedStore>,
2235 Json(req): Json<RegisterConsumerRequest>,
2236) -> Result<Json<ConsumerResponse>> {
2237 let consumer = store
2238 .consumer_registry()
2239 .register(&req.consumer_id, &req.event_type_filters);
2240
2241 Ok(Json(ConsumerResponse {
2242 consumer_id: consumer.consumer_id,
2243 event_type_filters: consumer.event_type_filters,
2244 cursor_position: consumer.cursor_position,
2245 }))
2246}
2247
2248pub async fn get_consumer(
2250 State(store): State<SharedStore>,
2251 Path(consumer_id): Path<String>,
2252) -> Result<Json<ConsumerResponse>> {
2253 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2254
2255 Ok(Json(ConsumerResponse {
2256 consumer_id: consumer.consumer_id,
2257 event_type_filters: consumer.event_type_filters,
2258 cursor_position: consumer.cursor_position,
2259 }))
2260}
2261
2262#[derive(Debug, Deserialize)]
2264pub struct ConsumerPollQuery {
2265 pub limit: Option<usize>,
2266}
2267
2268pub async fn poll_consumer_events(
2269 State(store): State<SharedStore>,
2270 Path(consumer_id): Path<String>,
2271 Query(query): Query<ConsumerPollQuery>,
2272) -> Result<Json<ConsumerEventsResponse>> {
2273 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2274 let offset = consumer.cursor_position.unwrap_or(0);
2275 let limit = query.limit.unwrap_or(100);
2276
2277 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2278 let count = events.len();
2279
2280 let consumer_events: Vec<ConsumerEventDto> = events
2281 .into_iter()
2282 .map(|(position, event)| ConsumerEventDto {
2283 position,
2284 event: EventDto::from(&event),
2285 })
2286 .collect();
2287
2288 Ok(Json(ConsumerEventsResponse {
2289 events: consumer_events,
2290 count,
2291 }))
2292}
2293
2294pub async fn ack_consumer(
2296 State(store): State<SharedStore>,
2297 Path(consumer_id): Path<String>,
2298 Json(req): Json<AckRequest>,
2299) -> Result<Json<serde_json::Value>> {
2300 let max_offset = store.total_events() as u64;
2301
2302 store
2303 .consumer_registry()
2304 .ack(&consumer_id, req.position, max_offset)
2305 .map_err(crate::error::AllSourceError::InvalidInput)?;
2306
2307 Ok(Json(serde_json::json!({
2308 "status": "ok",
2309 "consumer_id": consumer_id,
2310 "position": req.position,
2311 })))
2312}
2313
2314#[cfg(test)]
2315mod tests {
2316 use super::*;
2317 use crate::{domain::entities::Event, store::EventStore};
2318
2319 fn create_test_store() -> Arc<EventStore> {
2320 Arc::new(EventStore::new())
2321 }
2322
2323 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2324 Event::from_strings(
2325 event_type.to_string(),
2326 entity_id.to_string(),
2327 "test-stream".to_string(),
2328 serde_json::json!({
2329 "name": "Test",
2330 "value": 42
2331 }),
2332 None,
2333 )
2334 .unwrap()
2335 }
2336
2337 #[tokio::test]
2338 async fn test_query_events_has_more_and_total_count() {
2339 let store = create_test_store();
2340
2341 for i in 0..50 {
2343 store
2344 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2345 .unwrap();
2346 }
2347
2348 let req = QueryEventsRequest {
2350 entity_id: None,
2351 event_type: None,
2352 tenant_id: None,
2353 as_of: None,
2354 since: None,
2355 until: None,
2356 limit: Some(10),
2357 event_type_prefix: None,
2358 payload_filter: None,
2359 };
2360
2361 let requested_limit = req.limit;
2362 let unlimited_req = QueryEventsRequest {
2363 limit: None,
2364 ..QueryEventsRequest {
2365 entity_id: req.entity_id,
2366 event_type: req.event_type,
2367 tenant_id: req.tenant_id,
2368 as_of: req.as_of,
2369 since: req.since,
2370 until: req.until,
2371 limit: None,
2372 event_type_prefix: req.event_type_prefix,
2373 payload_filter: req.payload_filter,
2374 }
2375 };
2376 let all_events = store.query(&unlimited_req).unwrap();
2377 let total_count = all_events.len();
2378 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2379 all_events.into_iter().take(limit).collect()
2380 } else {
2381 all_events
2382 };
2383 let count = limited_events.len();
2384 let has_more = count < total_count;
2385
2386 assert_eq!(count, 10);
2387 assert_eq!(total_count, 50);
2388 assert!(has_more);
2389 }
2390
2391 #[tokio::test]
2392 async fn test_query_events_no_more_results() {
2393 let store = create_test_store();
2394
2395 for i in 0..5 {
2397 store
2398 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2399 .unwrap();
2400 }
2401
2402 let all_events = store
2404 .query(&QueryEventsRequest {
2405 entity_id: None,
2406 event_type: None,
2407 tenant_id: None,
2408 as_of: None,
2409 since: None,
2410 until: None,
2411 limit: None,
2412 event_type_prefix: None,
2413 payload_filter: None,
2414 })
2415 .unwrap();
2416 let total_count = all_events.len();
2417 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2418 let count = limited_events.len();
2419 let has_more = count < total_count;
2420
2421 assert_eq!(count, 5);
2422 assert_eq!(total_count, 5);
2423 assert!(!has_more);
2424 }
2425
2426 #[tokio::test]
2430 async fn test_query_events_order_desc_returns_latest() {
2431 let store = create_test_store();
2432
2433 let base = chrono::Utc::now();
2436 for i in 0..3i64 {
2437 let mut event = create_test_event("org-1", "auth.org.updated");
2438 event.timestamp = base + chrono::Duration::seconds(i);
2439 event.version = i + 1;
2440 store.ingest(&event).unwrap();
2441 }
2442
2443 let req = QueryEventsRequest {
2444 entity_id: Some("org-1".to_string()),
2445 ..Default::default()
2446 };
2447 let ascending = store.query(&req).unwrap();
2448 assert_eq!(ascending.len(), 3);
2449 assert!(ascending[0].timestamp < ascending[1].timestamp);
2451 assert!(ascending[1].timestamp < ascending[2].timestamp);
2452 let oldest_ts = ascending[0].timestamp;
2453 let newest_ts = ascending[2].timestamp;
2454
2455 let mut descending = ascending.clone();
2457 descending.reverse();
2458 let latest: Vec<Event> = descending.into_iter().take(1).collect();
2459 assert_eq!(latest.len(), 1);
2460 assert_eq!(
2461 latest[0].timestamp, newest_ts,
2462 "order=desc&limit=1 must yield the newest event"
2463 );
2464
2465 let oldest: Vec<Event> = ascending.into_iter().take(1).collect();
2467 assert_eq!(oldest[0].timestamp, oldest_ts);
2468 }
2469
2470 #[tokio::test]
2471 async fn test_list_entities_by_type_prefix() {
2472 let store = create_test_store();
2473
2474 store
2476 .ingest(&create_test_event("idx-1", "index.created"))
2477 .unwrap();
2478 store
2479 .ingest(&create_test_event("idx-1", "index.updated"))
2480 .unwrap();
2481 store
2482 .ingest(&create_test_event("idx-2", "index.created"))
2483 .unwrap();
2484 store
2485 .ingest(&create_test_event("idx-3", "index.created"))
2486 .unwrap();
2487 store
2489 .ingest(&create_test_event("trade-1", "trade.created"))
2490 .unwrap();
2491 store
2492 .ingest(&create_test_event("trade-2", "trade.created"))
2493 .unwrap();
2494
2495 let req = ListEntitiesRequest {
2497 event_type_prefix: Some("index.".to_string()),
2498 ..Default::default()
2499 };
2500 let query_req = QueryEventsRequest {
2501 entity_id: None,
2502 event_type: None,
2503 tenant_id: None,
2504 as_of: None,
2505 since: None,
2506 until: None,
2507 limit: None,
2508 event_type_prefix: req.event_type_prefix,
2509 payload_filter: req.payload_filter,
2510 };
2511 let events = store.query(&query_req).unwrap();
2512
2513 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2515 std::collections::HashMap::new();
2516 for event in &events {
2517 entity_map
2518 .entry(event.entity_id().to_string())
2519 .or_default()
2520 .push(event);
2521 }
2522
2523 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2526 assert_eq!(entity_map["idx-3"].len(), 1);
2527 }
2528
2529 #[tokio::test]
2532 async fn test_list_entities_order_and_pagination() {
2533 let store = create_test_store();
2534
2535 let base = chrono::Utc::now();
2537 for (i, eid) in ["org-a", "org-b", "org-c"].iter().enumerate() {
2538 let mut event = create_test_event(eid, "auth.org.created");
2539 event.timestamp = base + chrono::Duration::seconds(i as i64);
2540 store.ingest(&event).unwrap();
2541 }
2542 let prefix = || Some("auth.org.".to_string());
2543
2544 let desc = list_entities(
2546 State(store.clone()),
2547 Query(ListEntitiesRequest {
2548 event_type_prefix: prefix(),
2549 ..Default::default()
2550 }),
2551 )
2552 .await
2553 .unwrap();
2554 let desc_ids: Vec<&str> = desc
2555 .0
2556 .entities
2557 .iter()
2558 .map(|e| e.entity_id.as_str())
2559 .collect();
2560 assert_eq!(desc_ids, ["org-c", "org-b", "org-a"]);
2561
2562 let asc = list_entities(
2564 State(store.clone()),
2565 Query(ListEntitiesRequest {
2566 event_type_prefix: prefix(),
2567 order: Some("asc".to_string()),
2568 ..Default::default()
2569 }),
2570 )
2571 .await
2572 .unwrap();
2573 let asc_ids: Vec<&str> = asc
2574 .0
2575 .entities
2576 .iter()
2577 .map(|e| e.entity_id.as_str())
2578 .collect();
2579 assert_eq!(asc_ids, ["org-a", "org-b", "org-c"]);
2580
2581 let page2 = list_entities(
2583 State(store.clone()),
2584 Query(ListEntitiesRequest {
2585 event_type_prefix: prefix(),
2586 order: Some("asc".to_string()),
2587 limit: Some(1),
2588 offset: Some(1),
2589 ..Default::default()
2590 }),
2591 )
2592 .await
2593 .unwrap();
2594 assert_eq!(page2.0.entities.len(), 1);
2595 assert_eq!(page2.0.entities[0].entity_id, "org-b");
2596 assert_eq!(page2.0.total, 3);
2597 assert!(page2.0.has_more);
2598
2599 let err = list_entities(
2601 State(store.clone()),
2602 Query(ListEntitiesRequest {
2603 event_type_prefix: prefix(),
2604 order: Some("sideways".to_string()),
2605 ..Default::default()
2606 }),
2607 )
2608 .await;
2609 assert!(err.is_err(), "invalid order value must be rejected");
2610 }
2611
2612 fn create_test_event_with_payload(
2613 entity_id: &str,
2614 event_type: &str,
2615 payload: serde_json::Value,
2616 ) -> Event {
2617 Event::from_strings(
2618 event_type.to_string(),
2619 entity_id.to_string(),
2620 "test-stream".to_string(),
2621 payload,
2622 None,
2623 )
2624 .unwrap()
2625 }
2626
2627 #[tokio::test]
2628 async fn test_detect_duplicates_by_payload_fields() {
2629 let store = create_test_store();
2630
2631 store
2633 .ingest(&create_test_event_with_payload(
2634 "idx-1",
2635 "index.created",
2636 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2637 ))
2638 .unwrap();
2639 store
2640 .ingest(&create_test_event_with_payload(
2641 "idx-2",
2642 "index.created",
2643 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2644 ))
2645 .unwrap();
2646 store
2647 .ingest(&create_test_event_with_payload(
2648 "idx-3",
2649 "index.created",
2650 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2651 ))
2652 .unwrap();
2653 store
2654 .ingest(&create_test_event_with_payload(
2655 "idx-4",
2656 "index.created",
2657 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2658 ))
2659 .unwrap();
2660 store
2661 .ingest(&create_test_event_with_payload(
2662 "idx-5",
2663 "index.created",
2664 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2665 ))
2666 .unwrap();
2667
2668 let query_req = QueryEventsRequest {
2670 entity_id: None,
2671 event_type: None,
2672 tenant_id: None,
2673 as_of: None,
2674 since: None,
2675 until: None,
2676 limit: None,
2677 event_type_prefix: Some("index.".to_string()),
2678 payload_filter: None,
2679 };
2680 let events = store.query(&query_req).unwrap();
2681
2682 let group_by_fields = vec!["name"];
2684 let mut entity_latest: std::collections::HashMap<String, &Event> =
2685 std::collections::HashMap::new();
2686 for event in &events {
2687 let eid = event.entity_id().to_string();
2688 entity_latest
2689 .entry(eid)
2690 .and_modify(|existing| {
2691 if event.timestamp() > existing.timestamp() {
2692 *existing = event;
2693 }
2694 })
2695 .or_insert(event);
2696 }
2697
2698 let mut groups: std::collections::HashMap<String, Vec<String>> =
2699 std::collections::HashMap::new();
2700 for (entity_id, event) in &entity_latest {
2701 let payload = event.payload();
2702 let mut key_parts = serde_json::Map::new();
2703 for field in &group_by_fields {
2704 let value = payload
2705 .get(*field)
2706 .cloned()
2707 .unwrap_or(serde_json::Value::Null);
2708 key_parts.insert((*field).to_string(), value);
2709 }
2710 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2711 groups.entry(key_str).or_default().push(entity_id.clone());
2712 }
2713
2714 let duplicate_groups: Vec<_> = groups
2715 .into_iter()
2716 .filter(|(_, ids)| ids.len() > 1)
2717 .collect();
2718
2719 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2721 assert_eq!(ids.len(), 2);
2722 }
2723 }
2724
2725 #[tokio::test]
2726 async fn test_detect_duplicates_no_duplicates() {
2727 let store = create_test_store();
2728
2729 store
2731 .ingest(&create_test_event_with_payload(
2732 "idx-1",
2733 "index.created",
2734 serde_json::json!({"name": "A"}),
2735 ))
2736 .unwrap();
2737 store
2738 .ingest(&create_test_event_with_payload(
2739 "idx-2",
2740 "index.created",
2741 serde_json::json!({"name": "B"}),
2742 ))
2743 .unwrap();
2744
2745 let query_req = QueryEventsRequest {
2746 entity_id: None,
2747 event_type: None,
2748 tenant_id: None,
2749 as_of: None,
2750 since: None,
2751 until: None,
2752 limit: None,
2753 event_type_prefix: Some("index.".to_string()),
2754 payload_filter: None,
2755 };
2756 let events = store.query(&query_req).unwrap();
2757
2758 let mut entity_latest: std::collections::HashMap<String, &Event> =
2759 std::collections::HashMap::new();
2760 for event in &events {
2761 entity_latest
2762 .entry(event.entity_id().to_string())
2763 .or_insert(event);
2764 }
2765
2766 let mut groups: std::collections::HashMap<String, Vec<String>> =
2767 std::collections::HashMap::new();
2768 for (entity_id, event) in &entity_latest {
2769 let key_str =
2770 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2771 .unwrap();
2772 groups.entry(key_str).or_default().push(entity_id.clone());
2773 }
2774
2775 let duplicate_groups: Vec<_> = groups
2776 .into_iter()
2777 .filter(|(_, ids)| ids.len() > 1)
2778 .collect();
2779
2780 assert_eq!(duplicate_groups.len(), 0); }
2782
2783 #[tokio::test]
2784 async fn test_detect_duplicates_multi_field_group_by() {
2785 let store = create_test_store();
2786
2787 store
2789 .ingest(&create_test_event_with_payload(
2790 "idx-1",
2791 "index.created",
2792 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2793 ))
2794 .unwrap();
2795 store
2796 .ingest(&create_test_event_with_payload(
2797 "idx-2",
2798 "index.created",
2799 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2800 ))
2801 .unwrap();
2802 store
2804 .ingest(&create_test_event_with_payload(
2805 "idx-3",
2806 "index.created",
2807 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2808 ))
2809 .unwrap();
2810
2811 let query_req = QueryEventsRequest {
2812 entity_id: None,
2813 event_type: None,
2814 tenant_id: None,
2815 as_of: None,
2816 since: None,
2817 until: None,
2818 limit: None,
2819 event_type_prefix: Some("index.".to_string()),
2820 payload_filter: None,
2821 };
2822 let events = store.query(&query_req).unwrap();
2823
2824 let group_by_fields = vec!["name", "user_id"];
2825 let mut entity_latest: std::collections::HashMap<String, &Event> =
2826 std::collections::HashMap::new();
2827 for event in &events {
2828 entity_latest
2829 .entry(event.entity_id().to_string())
2830 .and_modify(|existing| {
2831 if event.timestamp() > existing.timestamp() {
2832 *existing = event;
2833 }
2834 })
2835 .or_insert(event);
2836 }
2837
2838 let mut groups: std::collections::HashMap<String, Vec<String>> =
2839 std::collections::HashMap::new();
2840 for (entity_id, event) in &entity_latest {
2841 let payload = event.payload();
2842 let mut key_parts = serde_json::Map::new();
2843 for field in &group_by_fields {
2844 let value = payload
2845 .get(*field)
2846 .cloned()
2847 .unwrap_or(serde_json::Value::Null);
2848 key_parts.insert((*field).to_string(), value);
2849 }
2850 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2851 groups.entry(key_str).or_default().push(entity_id.clone());
2852 }
2853
2854 let duplicate_groups: Vec<_> = groups
2855 .into_iter()
2856 .filter(|(_, ids)| ids.len() > 1)
2857 .collect();
2858
2859 assert_eq!(duplicate_groups.len(), 1);
2861 let (_, ref ids) = duplicate_groups[0];
2862 assert_eq!(ids.len(), 2);
2863 let mut sorted_ids = ids.clone();
2864 sorted_ids.sort();
2865 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2866 }
2867
2868 #[tokio::test]
2869 async fn test_projection_state_cache() {
2870 let store = create_test_store();
2871
2872 let cache = store.projection_state_cache();
2874 cache.insert(
2875 "entity_snapshots:user-123".to_string(),
2876 serde_json::json!({"name": "Test User", "age": 30}),
2877 );
2878
2879 let state = cache.get("entity_snapshots:user-123");
2881 assert!(state.is_some());
2882 let state = state.unwrap();
2883 assert_eq!(state["name"], "Test User");
2884 assert_eq!(state["age"], 30);
2885 }
2886
2887 #[tokio::test]
2888 async fn test_projection_manager_list_projections() {
2889 let store = create_test_store();
2890
2891 let projection_manager = store.projection_manager();
2893 let projections = projection_manager.list_projections();
2894
2895 assert!(projections.len() >= 2);
2897
2898 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2899 assert!(names.contains(&"entity_snapshots"));
2900 assert!(names.contains(&"event_counters"));
2901 }
2902
2903 #[tokio::test]
2904 async fn test_projection_state_after_event_ingestion() {
2905 let store = create_test_store();
2906
2907 let event = create_test_event("user-456", "user.created");
2909 store.ingest(&event).unwrap();
2910
2911 let projection_manager = store.projection_manager();
2913 let snapshot_projection = projection_manager
2914 .get_projection("entity_snapshots")
2915 .unwrap();
2916
2917 let state = snapshot_projection.get_state("user-456");
2918 assert!(state.is_some());
2919 let state = state.unwrap();
2920 assert_eq!(state["name"], "Test");
2921 assert_eq!(state["value"], 42);
2922 }
2923
2924 #[tokio::test]
2925 async fn test_projection_state_cache_multiple_entities() {
2926 let store = create_test_store();
2927 let cache = store.projection_state_cache();
2928
2929 for i in 0..10 {
2931 cache.insert(
2932 format!("entity_snapshots:entity-{i}"),
2933 serde_json::json!({"id": i, "status": "active"}),
2934 );
2935 }
2936
2937 assert_eq!(cache.len(), 10);
2939
2940 for i in 0..10 {
2942 let key = format!("entity_snapshots:entity-{i}");
2943 let state = cache.get(&key);
2944 assert!(state.is_some());
2945 assert_eq!(state.unwrap()["id"], i);
2946 }
2947 }
2948
2949 #[tokio::test]
2950 async fn test_projection_state_update() {
2951 let store = create_test_store();
2952 let cache = store.projection_state_cache();
2953
2954 cache.insert(
2956 "entity_snapshots:user-789".to_string(),
2957 serde_json::json!({"balance": 100}),
2958 );
2959
2960 cache.insert(
2962 "entity_snapshots:user-789".to_string(),
2963 serde_json::json!({"balance": 150}),
2964 );
2965
2966 let state = cache.get("entity_snapshots:user-789").unwrap();
2968 assert_eq!(state["balance"], 150);
2969 }
2970
2971 #[tokio::test]
2972 async fn test_event_counter_projection() {
2973 let store = create_test_store();
2974
2975 store
2977 .ingest(&create_test_event("user-1", "user.created"))
2978 .unwrap();
2979 store
2980 .ingest(&create_test_event("user-2", "user.created"))
2981 .unwrap();
2982 store
2983 .ingest(&create_test_event("user-1", "user.updated"))
2984 .unwrap();
2985
2986 let projection_manager = store.projection_manager();
2988 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2989
2990 let created_state = counter_projection.get_state("user.created");
2992 assert!(created_state.is_some());
2993 assert_eq!(created_state.unwrap()["count"], 2);
2994
2995 let updated_state = counter_projection.get_state("user.updated");
2996 assert!(updated_state.is_some());
2997 assert_eq!(updated_state.unwrap()["count"], 1);
2998 }
2999
3000 #[tokio::test]
3001 async fn test_projection_state_cache_key_format() {
3002 let store = create_test_store();
3003 let cache = store.projection_state_cache();
3004
3005 let key = "orders:order-12345".to_string();
3007 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
3008
3009 let state = cache.get(&key).unwrap();
3010 assert_eq!(state["total"], 99.99);
3011 }
3012
3013 #[tokio::test]
3014 async fn test_projection_state_cache_removal() {
3015 let store = create_test_store();
3016 let cache = store.projection_state_cache();
3017
3018 cache.insert(
3020 "test:entity-1".to_string(),
3021 serde_json::json!({"data": "value"}),
3022 );
3023 assert_eq!(cache.len(), 1);
3024
3025 cache.remove("test:entity-1");
3026 assert_eq!(cache.len(), 0);
3027 assert!(cache.get("test:entity-1").is_none());
3028 }
3029
3030 #[tokio::test]
3031 async fn test_get_nonexistent_projection() {
3032 let store = create_test_store();
3033 let projection_manager = store.projection_manager();
3034
3035 let projection = projection_manager.get_projection("nonexistent_projection");
3037 assert!(projection.is_none());
3038 }
3039
3040 #[tokio::test]
3041 async fn test_get_nonexistent_entity_state() {
3042 let store = create_test_store();
3043 let projection_manager = store.projection_manager();
3044
3045 let snapshot_projection = projection_manager
3047 .get_projection("entity_snapshots")
3048 .unwrap();
3049 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
3050 assert!(state.is_none());
3051 }
3052
3053 #[tokio::test]
3054 async fn test_projection_state_cache_concurrent_access() {
3055 let store = create_test_store();
3056 let cache = store.projection_state_cache();
3057
3058 let handles: Vec<_> = (0..10)
3060 .map(|i| {
3061 let cache_clone = cache.clone();
3062 tokio::spawn(async move {
3063 cache_clone.insert(
3064 format!("concurrent:entity-{i}"),
3065 serde_json::json!({"thread": i}),
3066 );
3067 })
3068 })
3069 .collect();
3070
3071 for handle in handles {
3072 handle.await.unwrap();
3073 }
3074
3075 assert_eq!(cache.len(), 10);
3077 }
3078
3079 #[tokio::test]
3080 async fn test_projection_state_large_payload() {
3081 let store = create_test_store();
3082 let cache = store.projection_state_cache();
3083
3084 let large_array: Vec<serde_json::Value> = (0..1000)
3086 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
3087 .collect();
3088
3089 cache.insert(
3090 "large:entity-1".to_string(),
3091 serde_json::json!({"items": large_array}),
3092 );
3093
3094 let state = cache.get("large:entity-1").unwrap();
3095 let items = state["items"].as_array().unwrap();
3096 assert_eq!(items.len(), 1000);
3097 }
3098
3099 #[tokio::test]
3100 async fn test_projection_state_complex_json() {
3101 let store = create_test_store();
3102 let cache = store.projection_state_cache();
3103
3104 let complex_state = serde_json::json!({
3106 "user": {
3107 "id": "user-123",
3108 "profile": {
3109 "name": "John Doe",
3110 "email": "john@example.com",
3111 "settings": {
3112 "theme": "dark",
3113 "notifications": true
3114 }
3115 },
3116 "roles": ["admin", "user"],
3117 "metadata": {
3118 "created_at": "2025-01-01T00:00:00Z",
3119 "last_login": null
3120 }
3121 }
3122 });
3123
3124 cache.insert("complex:user-123".to_string(), complex_state);
3125
3126 let state = cache.get("complex:user-123").unwrap();
3127 assert_eq!(state["user"]["profile"]["name"], "John Doe");
3128 assert_eq!(state["user"]["roles"][0], "admin");
3129 assert!(state["user"]["metadata"]["last_login"].is_null());
3130 }
3131
3132 #[tokio::test]
3133 async fn test_projection_state_cache_iteration() {
3134 let store = create_test_store();
3135 let cache = store.projection_state_cache();
3136
3137 for i in 0..5 {
3139 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
3140 }
3141
3142 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
3144 assert_eq!(entries.len(), 5);
3145 }
3146
3147 #[tokio::test]
3148 async fn test_projection_manager_get_entity_snapshots() {
3149 let store = create_test_store();
3150 let projection_manager = store.projection_manager();
3151
3152 let projection = projection_manager.get_projection("entity_snapshots");
3154 assert!(projection.is_some());
3155 assert_eq!(projection.unwrap().name(), "entity_snapshots");
3156 }
3157
3158 #[tokio::test]
3159 async fn test_projection_manager_get_event_counters() {
3160 let store = create_test_store();
3161 let projection_manager = store.projection_manager();
3162
3163 let projection = projection_manager.get_projection("event_counters");
3165 assert!(projection.is_some());
3166 assert_eq!(projection.unwrap().name(), "event_counters");
3167 }
3168
3169 #[tokio::test]
3170 async fn test_projection_state_cache_overwrite() {
3171 let store = create_test_store();
3172 let cache = store.projection_state_cache();
3173
3174 cache.insert(
3176 "overwrite:entity-1".to_string(),
3177 serde_json::json!({"version": 1}),
3178 );
3179
3180 cache.insert(
3182 "overwrite:entity-1".to_string(),
3183 serde_json::json!({"version": 2}),
3184 );
3185
3186 cache.insert(
3188 "overwrite:entity-1".to_string(),
3189 serde_json::json!({"version": 3}),
3190 );
3191
3192 let state = cache.get("overwrite:entity-1").unwrap();
3193 assert_eq!(state["version"], 3);
3194
3195 assert_eq!(cache.len(), 1);
3197 }
3198
3199 #[tokio::test]
3200 async fn test_projection_state_multiple_projections() {
3201 let store = create_test_store();
3202 let cache = store.projection_state_cache();
3203
3204 cache.insert(
3206 "entity_snapshots:user-1".to_string(),
3207 serde_json::json!({"name": "Alice"}),
3208 );
3209 cache.insert(
3210 "event_counters:user.created".to_string(),
3211 serde_json::json!({"count": 5}),
3212 );
3213 cache.insert(
3214 "custom_projection:order-1".to_string(),
3215 serde_json::json!({"total": 150.0}),
3216 );
3217
3218 assert_eq!(
3220 cache.get("entity_snapshots:user-1").unwrap()["name"],
3221 "Alice"
3222 );
3223 assert_eq!(
3224 cache.get("event_counters:user.created").unwrap()["count"],
3225 5
3226 );
3227 assert_eq!(
3228 cache.get("custom_projection:order-1").unwrap()["total"],
3229 150.0
3230 );
3231 }
3232
3233 #[tokio::test]
3234 async fn test_bulk_projection_state_access() {
3235 let store = create_test_store();
3236
3237 for i in 0..5 {
3239 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3240 store.ingest(&event).unwrap();
3241 }
3242
3243 let projection_manager = store.projection_manager();
3245 let snapshot_projection = projection_manager
3246 .get_projection("entity_snapshots")
3247 .unwrap();
3248
3249 for i in 0..5 {
3251 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3252 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3253 }
3254 }
3255
3256 #[tokio::test]
3257 async fn test_bulk_save_projection_states() {
3258 let store = create_test_store();
3259 let cache = store.projection_state_cache();
3260
3261 let states = vec![
3263 BulkSaveStateItem {
3264 entity_id: "bulk-entity-1".to_string(),
3265 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3266 },
3267 BulkSaveStateItem {
3268 entity_id: "bulk-entity-2".to_string(),
3269 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3270 },
3271 BulkSaveStateItem {
3272 entity_id: "bulk-entity-3".to_string(),
3273 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3274 },
3275 ];
3276
3277 let projection_name = "test_projection";
3278
3279 for item in &states {
3281 cache.insert(
3282 format!("{projection_name}:{}", item.entity_id),
3283 item.state.clone(),
3284 );
3285 }
3286
3287 assert_eq!(cache.len(), 3);
3289
3290 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3291 assert_eq!(state1["name"], "Entity 1");
3292 assert_eq!(state1["value"], 100);
3293
3294 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3295 assert_eq!(state2["name"], "Entity 2");
3296 assert_eq!(state2["value"], 200);
3297
3298 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3299 assert_eq!(state3["name"], "Entity 3");
3300 assert_eq!(state3["value"], 300);
3301 }
3302
3303 #[tokio::test]
3304 async fn test_bulk_save_empty_states() {
3305 let store = create_test_store();
3306 let cache = store.projection_state_cache();
3307
3308 cache.clear();
3310
3311 let states: Vec<BulkSaveStateItem> = vec![];
3313 assert_eq!(states.len(), 0);
3314
3315 assert_eq!(cache.len(), 0);
3317 }
3318
3319 #[tokio::test]
3320 async fn test_bulk_save_overwrites_existing() {
3321 let store = create_test_store();
3322 let cache = store.projection_state_cache();
3323
3324 cache.insert(
3326 "test:entity-1".to_string(),
3327 serde_json::json!({"version": 1, "data": "initial"}),
3328 );
3329
3330 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3332 cache.insert("test:entity-1".to_string(), new_state);
3333
3334 let state = cache.get("test:entity-1").unwrap();
3336 assert_eq!(state["version"], 2);
3337 assert_eq!(state["data"], "updated");
3338 }
3339
3340 #[tokio::test]
3341 async fn test_bulk_save_high_volume() {
3342 let store = create_test_store();
3343 let cache = store.projection_state_cache();
3344
3345 for i in 0..1000 {
3347 cache.insert(
3348 format!("volume_test:entity-{i}"),
3349 serde_json::json!({"index": i, "status": "active"}),
3350 );
3351 }
3352
3353 assert_eq!(cache.len(), 1000);
3355
3356 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3358 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3359 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3360 }
3361
3362 #[tokio::test]
3363 async fn test_bulk_save_different_projections() {
3364 let store = create_test_store();
3365 let cache = store.projection_state_cache();
3366
3367 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3369
3370 for proj in &projections {
3371 for i in 0..5 {
3372 cache.insert(
3373 format!("{proj}:entity-{i}"),
3374 serde_json::json!({"projection": proj, "id": i}),
3375 );
3376 }
3377 }
3378
3379 assert_eq!(cache.len(), 15);
3381
3382 for proj in &projections {
3384 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3385 assert_eq!(state["projection"], *proj);
3386 }
3387 }
3388
3389 #[tokio::test]
3400 async fn get_projection_state_falls_back_to_cache_when_unregistered() {
3401 let store = create_test_store();
3402 store.projection_state_cache().insert(
3403 "assets:BTC".to_string(),
3404 serde_json::json!({"symbol": "BTC", "altname": "Bitcoin"}),
3405 );
3406
3407 let resp = get_projection_state(
3408 State(Arc::clone(&store)),
3409 Path(("assets".to_string(), "BTC".to_string())),
3410 )
3411 .await
3412 .expect("should not error when projection is not registered");
3413
3414 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3415 assert_eq!(resp.0["state"]["symbol"], "BTC");
3416 assert_eq!(resp.0["state"]["altname"], "Bitcoin");
3417 }
3418
3419 #[tokio::test]
3420 async fn get_projection_state_returns_not_found_when_absent_everywhere() {
3421 let store = create_test_store();
3422
3423 let resp = get_projection_state(
3424 State(Arc::clone(&store)),
3425 Path(("assets".to_string(), "UNKNOWN".to_string())),
3426 )
3427 .await
3428 .unwrap();
3429
3430 assert_eq!(resp.0["found"], serde_json::Value::Bool(false));
3431 assert_eq!(resp.0["state"], serde_json::Value::Null);
3432 }
3433
3434 #[tokio::test]
3435 async fn get_projection_state_registered_wins_over_cache() {
3436 let store = create_test_store();
3437
3438 let event = create_test_event("user-777", "user.created");
3440 store.ingest(&event).unwrap();
3441
3442 store.projection_state_cache().insert(
3444 "entity_snapshots:user-777".to_string(),
3445 serde_json::json!({"stolen": "value"}),
3446 );
3447
3448 let resp = get_projection_state(
3449 State(Arc::clone(&store)),
3450 Path(("entity_snapshots".to_string(), "user-777".to_string())),
3451 )
3452 .await
3453 .unwrap();
3454
3455 assert_eq!(resp.0["found"], serde_json::Value::Bool(true));
3458 assert!(
3459 resp.0["state"].get("stolen").is_none(),
3460 "cache entry must not shadow registered projection state: got {:?}",
3461 resp.0["state"]
3462 );
3463 }
3464
3465 #[tokio::test]
3466 async fn get_projection_state_summary_returns_cache_without_registration() {
3467 let store = create_test_store();
3468 let cache = store.projection_state_cache();
3469 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3470 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3471 cache.insert("trades:t-1".into(), serde_json::json!({"x": 1}));
3473
3474 let resp =
3475 get_projection_state_summary(State(Arc::clone(&store)), Path("assets".to_string()))
3476 .await
3477 .unwrap();
3478
3479 assert_eq!(resp.0["total"], 2);
3480 let states = resp.0["states"].as_array().unwrap();
3481 let entity_ids: Vec<&str> = states
3482 .iter()
3483 .map(|s| s["entity_id"].as_str().unwrap())
3484 .collect();
3485 assert!(entity_ids.contains(&"BTC"));
3486 assert!(entity_ids.contains(&"ETH"));
3487 }
3488
3489 #[tokio::test]
3490 async fn bulk_get_projection_states_falls_back_to_cache() {
3491 let store = create_test_store();
3492 let cache = store.projection_state_cache();
3493 cache.insert("assets:BTC".into(), serde_json::json!({"symbol": "BTC"}));
3494 cache.insert("assets:ETH".into(), serde_json::json!({"symbol": "ETH"}));
3495
3496 let req = BulkGetStateRequest {
3497 entity_ids: vec!["BTC".into(), "ETH".into(), "MISSING".into()],
3498 };
3499
3500 let resp = bulk_get_projection_states(
3501 State(Arc::clone(&store)),
3502 Path("assets".to_string()),
3503 Json(req),
3504 )
3505 .await
3506 .unwrap();
3507
3508 assert_eq!(resp.0["total"], 3);
3509 let states = resp.0["states"].as_array().unwrap();
3510 let by_id: std::collections::HashMap<&str, &serde_json::Value> = states
3511 .iter()
3512 .map(|s| (s["entity_id"].as_str().unwrap(), s))
3513 .collect();
3514
3515 assert_eq!(by_id["BTC"]["found"], serde_json::Value::Bool(true));
3516 assert_eq!(by_id["BTC"]["state"]["symbol"], "BTC");
3517 assert_eq!(by_id["ETH"]["found"], serde_json::Value::Bool(true));
3518 assert_eq!(by_id["MISSING"]["found"], serde_json::Value::Bool(false));
3519 }
3520}