1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4 application::{
5 dto::{
6 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11 },
12 services::{
13 analytics::{
14 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16 },
17 pipeline::{PipelineConfig, PipelineStats},
18 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19 schema::{
20 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21 ValidateEventRequest, ValidateEventResponse,
22 },
23 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24 },
25 },
26 domain::entities::Event,
27 error::Result,
28 infrastructure::{
29 persistence::{
30 compaction::CompactionResult,
31 snapshot::{
32 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33 ListSnapshotsResponse, SnapshotInfo,
34 },
35 },
36 query::{
37 geospatial::GeoQueryRequest,
38 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39 },
40 web::api_v1::AppState,
41 },
42 store::{EventStore, EventTypeInfo, StreamInfo},
43};
44use axum::{
45 Json, Router,
46 extract::{Path, Query, State, WebSocketUpgrade},
47 response::{IntoResponse, Response},
48 routing::{get, post, put},
49};
50use serde::Deserialize;
51use std::sync::Arc;
52use tower_http::{
53 cors::{Any, CorsLayer},
54 trace::TraceLayer,
55};
56
57type SharedStore = Arc<EventStore>;
58
59#[cfg(feature = "replication")]
65async fn await_replication_ack(state: &AppState) {
66 let shipper_guard = state.wal_shipper.read().await;
67 if let Some(ref shipper) = *shipper_guard {
68 let mode = shipper.replication_mode();
69 if mode == ReplicationMode::Async {
70 return;
71 }
72
73 let target_offset = shipper.current_leader_offset();
74 if target_offset == 0 {
75 return;
76 }
77
78 let shipper = Arc::clone(shipper);
79 drop(shipper_guard);
81
82 let timer = state
83 .store
84 .metrics()
85 .replication_ack_wait_seconds
86 .start_timer();
87 let acked = shipper.wait_for_ack(target_offset).await;
88 timer.observe_duration();
89
90 if !acked {
91 tracing::warn!(
92 "Replication ACK timeout in {} mode (offset {}). \
93 Write succeeded locally but follower confirmation pending.",
94 mode,
95 target_offset,
96 );
97 }
98 }
99}
100#[cfg(not(feature = "replication"))]
101async fn await_replication_ack(_state: &AppState) {
102 }
104
105pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
106 let app = Router::new()
107 .route("/health", get(health))
108 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
110 .route("/api/v1/events/batch", post(ingest_events_batch))
111 .route("/api/v1/events/query", get(query_events))
112 .route("/api/v1/events/{event_id}", get(get_event_by_id))
113 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
116 .route("/api/v1/event-types", get(list_event_types))
117 .route("/api/v1/entities/duplicates", get(detect_duplicates))
118 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
119 .route(
120 "/api/v1/entities/{entity_id}/snapshot",
121 get(get_entity_snapshot),
122 )
123 .route("/api/v1/stats", get(get_stats))
124 .route("/api/v1/analytics/frequency", get(analytics_frequency))
126 .route("/api/v1/analytics/summary", get(analytics_summary))
127 .route("/api/v1/analytics/correlation", get(analytics_correlation))
128 .route("/api/v1/snapshots", post(create_snapshot))
130 .route("/api/v1/snapshots", get(list_snapshots))
131 .route(
132 "/api/v1/snapshots/{entity_id}/latest",
133 get(get_latest_snapshot),
134 )
135 .route("/api/v1/compaction/trigger", post(trigger_compaction))
137 .route("/api/v1/compaction/stats", get(compaction_stats))
138 .route("/api/v1/schemas", post(register_schema))
140 .route("/api/v1/schemas", get(list_subjects))
141 .route("/api/v1/schemas/{subject}", get(get_schema))
142 .route(
143 "/api/v1/schemas/{subject}/versions",
144 get(list_schema_versions),
145 )
146 .route("/api/v1/schemas/validate", post(validate_event_schema))
147 .route(
148 "/api/v1/schemas/{subject}/compatibility",
149 put(set_compatibility_mode),
150 )
151 .route("/api/v1/replay", post(start_replay))
153 .route("/api/v1/replay", get(list_replays))
154 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
155 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
156 .route(
157 "/api/v1/replay/{replay_id}",
158 axum::routing::delete(delete_replay),
159 )
160 .route("/api/v1/pipelines", post(register_pipeline))
162 .route("/api/v1/pipelines", get(list_pipelines))
163 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
164 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
165 .route(
166 "/api/v1/pipelines/{pipeline_id}",
167 axum::routing::delete(remove_pipeline),
168 )
169 .route(
170 "/api/v1/pipelines/{pipeline_id}/stats",
171 get(get_pipeline_stats),
172 )
173 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
174 .route("/api/v1/projections", get(list_projections))
176 .route("/api/v1/projections/{name}", get(get_projection))
177 .route(
178 "/api/v1/projections/{name}",
179 axum::routing::delete(delete_projection),
180 )
181 .route(
182 "/api/v1/projections/{name}/state",
183 get(get_projection_state_summary),
184 )
185 .route("/api/v1/projections/{name}/reset", post(reset_projection))
186 .route(
187 "/api/v1/projections/{name}/{entity_id}/state",
188 get(get_projection_state),
189 )
190 .route(
191 "/api/v1/projections/{name}/{entity_id}/state",
192 post(save_projection_state),
193 )
194 .route(
195 "/api/v1/projections/{name}/{entity_id}/state",
196 put(save_projection_state),
197 )
198 .route(
199 "/api/v1/projections/{name}/bulk",
200 post(bulk_get_projection_states),
201 )
202 .route(
203 "/api/v1/projections/{name}/bulk/save",
204 post(bulk_save_projection_states),
205 )
206 .route("/api/v1/webhooks", post(register_webhook))
208 .route("/api/v1/webhooks", get(list_webhooks))
209 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
210 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
211 .route(
212 "/api/v1/webhooks/{webhook_id}",
213 axum::routing::delete(delete_webhook),
214 )
215 .route(
216 "/api/v1/webhooks/{webhook_id}/deliveries",
217 get(list_webhook_deliveries),
218 )
219 .route("/api/v1/graphql", post(graphql_query))
221 .route("/api/v1/geospatial/query", post(geo_query))
222 .route("/api/v1/geospatial/stats", get(geo_stats))
223 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
224 .route(
225 "/api/v1/schema-evolution/history/{event_type}",
226 get(schema_evolution_history),
227 )
228 .route(
229 "/api/v1/schema-evolution/schema/{event_type}",
230 get(schema_evolution_schema),
231 )
232 .route(
233 "/api/v1/schema-evolution/stats",
234 get(schema_evolution_stats),
235 )
236 .layer(
237 CorsLayer::new()
238 .allow_origin(Any)
239 .allow_methods(Any)
240 .allow_headers(Any),
241 )
242 .layer(TraceLayer::new_for_http())
243 .with_state(store);
244
245 let listener = tokio::net::TcpListener::bind(addr).await?;
246 axum::serve(listener, app).await?;
247
248 Ok(())
249}
250
251pub async fn health() -> impl IntoResponse {
252 Json(serde_json::json!({
253 "status": "healthy",
254 "service": "allsource-core",
255 "version": env!("CARGO_PKG_VERSION")
256 }))
257}
258
259pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
261 let metrics = store.metrics();
262
263 match metrics.encode() {
264 Ok(encoded) => Response::builder()
265 .status(200)
266 .header("Content-Type", "text/plain; version=0.0.4")
267 .body(encoded)
268 .unwrap()
269 .into_response(),
270 Err(e) => Response::builder()
271 .status(500)
272 .body(format!("Error encoding metrics: {e}"))
273 .unwrap()
274 .into_response(),
275 }
276}
277
278pub async fn ingest_event(
279 State(store): State<SharedStore>,
280 Json(req): Json<IngestEventRequest>,
281) -> Result<Json<IngestEventResponse>> {
282 let expected_version = req.expected_version;
283
284 let event = Event::from_strings(
286 req.event_type,
287 req.entity_id,
288 "default".to_string(),
289 req.payload,
290 req.metadata,
291 )?;
292
293 let event_id = event.id;
294 let timestamp = event.timestamp;
295
296 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
297
298 tracing::info!("Event ingested: {}", event_id);
299
300 Ok(Json(IngestEventResponse {
301 event_id,
302 timestamp,
303 version: Some(new_version),
304 }))
305}
306
307pub async fn ingest_event_v1(
311 State(state): State<AppState>,
312 Json(req): Json<IngestEventRequest>,
313) -> Result<Json<IngestEventResponse>> {
314 let expected_version = req.expected_version;
315
316 let event = Event::from_strings(
317 req.event_type,
318 req.entity_id,
319 "default".to_string(),
320 req.payload,
321 req.metadata,
322 )?;
323
324 let event_id = event.id;
325 let timestamp = event.timestamp;
326
327 let new_version = state
328 .store
329 .ingest_with_expected_version(&event, expected_version)?;
330
331 await_replication_ack(&state).await;
333
334 tracing::info!("Event ingested: {}", event_id);
335
336 Ok(Json(IngestEventResponse {
337 event_id,
338 timestamp,
339 version: Some(new_version),
340 }))
341}
342
343pub async fn ingest_events_batch(
348 State(store): State<SharedStore>,
349 Json(req): Json<IngestEventsBatchRequest>,
350) -> Result<Json<IngestEventsBatchResponse>> {
351 let total = req.events.len();
352 let mut ingested_events = Vec::with_capacity(total);
353
354 for event_req in req.events {
355 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
356 let expected_version = event_req.expected_version;
357
358 let event = Event::from_strings(
359 event_req.event_type,
360 event_req.entity_id,
361 tenant_id,
362 event_req.payload,
363 event_req.metadata,
364 )?;
365
366 let event_id = event.id;
367 let timestamp = event.timestamp;
368
369 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
370
371 ingested_events.push(IngestEventResponse {
372 event_id,
373 timestamp,
374 version: Some(new_version),
375 });
376 }
377
378 let ingested = ingested_events.len();
379 tracing::info!("Batch ingested {} events", ingested);
380
381 Ok(Json(IngestEventsBatchResponse {
382 total,
383 ingested,
384 events: ingested_events,
385 }))
386}
387
388pub async fn ingest_events_batch_v1(
392 State(state): State<AppState>,
393 Json(req): Json<IngestEventsBatchRequest>,
394) -> Result<Json<IngestEventsBatchResponse>> {
395 let total = req.events.len();
396 let mut ingested_events = Vec::with_capacity(total);
397
398 for event_req in req.events {
399 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
400 let expected_version = event_req.expected_version;
401
402 let event = Event::from_strings(
403 event_req.event_type,
404 event_req.entity_id,
405 tenant_id,
406 event_req.payload,
407 event_req.metadata,
408 )?;
409
410 let event_id = event.id;
411 let timestamp = event.timestamp;
412
413 let new_version = state
414 .store
415 .ingest_with_expected_version(&event, expected_version)?;
416
417 ingested_events.push(IngestEventResponse {
418 event_id,
419 timestamp,
420 version: Some(new_version),
421 });
422 }
423
424 await_replication_ack(&state).await;
426
427 let ingested = ingested_events.len();
428 tracing::info!("Batch ingested {} events", ingested);
429
430 Ok(Json(IngestEventsBatchResponse {
431 total,
432 ingested,
433 events: ingested_events,
434 }))
435}
436
437pub async fn query_events(
438 State(store): State<SharedStore>,
439 Query(req): Query<QueryEventsRequest>,
440) -> Result<Json<QueryEventsResponse>> {
441 let requested_limit = req.limit;
442 let queried_entity_id = req.entity_id.clone();
443
444 let unlimited_req = QueryEventsRequest {
446 entity_id: req.entity_id,
447 event_type: req.event_type,
448 tenant_id: req.tenant_id,
449 as_of: req.as_of,
450 since: req.since,
451 until: req.until,
452 limit: None,
453 event_type_prefix: req.event_type_prefix,
454 payload_filter: req.payload_filter,
455 };
456 let all_events = store.query(&unlimited_req)?;
457 let total_count = all_events.len();
458
459 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
461 all_events.into_iter().take(limit).collect()
462 } else {
463 all_events
464 };
465
466 let count = limited_events.len();
467 let has_more = count < total_count;
468 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
469
470 let entity_version = queried_entity_id
472 .as_deref()
473 .map(|eid| store.get_entity_version(eid));
474
475 tracing::debug!("Query returned {} events (total: {})", count, total_count);
476
477 Ok(Json(QueryEventsResponse {
478 events,
479 count,
480 total_count,
481 has_more,
482 entity_version,
483 }))
484}
485
486pub async fn list_entities(
487 State(store): State<SharedStore>,
488 Query(req): Query<ListEntitiesRequest>,
489) -> Result<Json<ListEntitiesResponse>> {
490 use std::collections::HashMap;
491
492 let query_req = QueryEventsRequest {
494 entity_id: None,
495 event_type: None,
496 tenant_id: None,
497 as_of: None,
498 since: None,
499 until: None,
500 limit: None,
501 event_type_prefix: req.event_type_prefix,
502 payload_filter: req.payload_filter,
503 };
504 let events = store.query(&query_req)?;
505
506 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
508 for event in &events {
509 entity_map
510 .entry(event.entity_id().to_string())
511 .or_default()
512 .push(event);
513 }
514
515 let mut summaries: Vec<EntitySummary> = entity_map
517 .into_iter()
518 .map(|(entity_id, events)| {
519 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
520 EntitySummary {
521 entity_id,
522 event_count: events.len(),
523 last_event_type: last.event_type_str().to_string(),
524 last_event_at: last.timestamp(),
525 }
526 })
527 .collect();
528 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
529
530 let total = summaries.len();
531
532 let offset = req.offset.unwrap_or(0);
534 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
535 let summaries = if let Some(limit) = req.limit {
536 let has_more = summaries.len() > limit;
537 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
538 return Ok(Json(ListEntitiesResponse {
539 entities: truncated,
540 total,
541 has_more,
542 }));
543 } else {
544 summaries
545 };
546
547 Ok(Json(ListEntitiesResponse {
548 entities: summaries,
549 total,
550 has_more: false,
551 }))
552}
553
554pub async fn detect_duplicates(
555 State(store): State<SharedStore>,
556 Query(req): Query<DetectDuplicatesRequest>,
557) -> Result<Json<DetectDuplicatesResponse>> {
558 use std::collections::HashMap;
559
560 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
561
562 let query_req = QueryEventsRequest {
564 entity_id: None,
565 event_type: None,
566 tenant_id: None,
567 as_of: None,
568 since: None,
569 until: None,
570 limit: None,
571 event_type_prefix: Some(req.event_type_prefix),
572 payload_filter: None,
573 };
574 let events = store.query(&query_req)?;
575
576 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
579 for event in &events {
580 let eid = event.entity_id().to_string();
581 entity_latest
582 .entry(eid)
583 .and_modify(|existing| {
584 if event.timestamp() > existing.timestamp() {
585 *existing = event;
586 }
587 })
588 .or_insert(event);
589 }
590
591 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
593 for (entity_id, event) in &entity_latest {
594 let payload = event.payload();
595 let mut key_parts = serde_json::Map::new();
596 for field in &group_by_fields {
597 let value = payload
598 .get(*field)
599 .cloned()
600 .unwrap_or(serde_json::Value::Null);
601 key_parts.insert((*field).to_string(), value);
602 }
603 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
604 groups.entry(key_str).or_default().push(entity_id.clone());
605 }
606
607 let mut duplicate_groups: Vec<DuplicateGroup> = groups
609 .into_iter()
610 .filter(|(_, ids)| ids.len() > 1)
611 .map(|(key_str, mut ids)| {
612 ids.sort();
613 let key: serde_json::Value =
614 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
615 let count = ids.len();
616 DuplicateGroup {
617 key,
618 entity_ids: ids,
619 count,
620 }
621 })
622 .collect();
623
624 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
626
627 let total = duplicate_groups.len();
628
629 let offset = req.offset.unwrap_or(0);
631 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
632
633 if let Some(limit) = req.limit {
634 let has_more = duplicate_groups.len() > limit;
635 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
636 return Ok(Json(DetectDuplicatesResponse {
637 duplicates: truncated,
638 total,
639 has_more,
640 }));
641 }
642
643 Ok(Json(DetectDuplicatesResponse {
644 duplicates: duplicate_groups,
645 total,
646 has_more: false,
647 }))
648}
649
650#[derive(Deserialize)]
651pub struct EntityStateParams {
652 as_of: Option<chrono::DateTime<chrono::Utc>>,
653}
654
655pub async fn get_entity_state(
656 State(store): State<SharedStore>,
657 Path(entity_id): Path<String>,
658 Query(params): Query<EntityStateParams>,
659) -> Result<Json<serde_json::Value>> {
660 let state = store.reconstruct_state(&entity_id, params.as_of)?;
661
662 tracing::info!("State reconstructed for entity: {}", entity_id);
663
664 Ok(Json(state))
665}
666
667pub async fn get_entity_snapshot(
668 State(store): State<SharedStore>,
669 Path(entity_id): Path<String>,
670) -> Result<Json<serde_json::Value>> {
671 let snapshot = store.get_snapshot(&entity_id)?;
672
673 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
674
675 Ok(Json(snapshot))
676}
677
678pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
679 let stats = store.stats();
680 Json(stats)
681}
682
683#[derive(Debug, Deserialize)]
686pub struct ListStreamsParams {
687 pub limit: Option<usize>,
689 pub offset: Option<usize>,
691}
692
693#[derive(Debug, serde::Serialize)]
695pub struct ListStreamsResponse {
696 pub streams: Vec<StreamInfo>,
697 pub total: usize,
698}
699
700pub async fn list_streams(
701 State(store): State<SharedStore>,
702 Query(params): Query<ListStreamsParams>,
703) -> Json<ListStreamsResponse> {
704 let mut streams = store.list_streams();
705 let total = streams.len();
706
707 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
709
710 if let Some(offset) = params.offset {
712 if offset < streams.len() {
713 streams = streams[offset..].to_vec();
714 } else {
715 streams = vec![];
716 }
717 }
718
719 if let Some(limit) = params.limit {
720 streams.truncate(limit);
721 }
722
723 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
724
725 Json(ListStreamsResponse { streams, total })
726}
727
728#[derive(Debug, Deserialize)]
731pub struct ListEventTypesParams {
732 pub limit: Option<usize>,
734 pub offset: Option<usize>,
736}
737
738#[derive(Debug, serde::Serialize)]
740pub struct ListEventTypesResponse {
741 pub event_types: Vec<EventTypeInfo>,
742 pub total: usize,
743}
744
745pub async fn list_event_types(
746 State(store): State<SharedStore>,
747 Query(params): Query<ListEventTypesParams>,
748) -> Json<ListEventTypesResponse> {
749 let mut event_types = store.list_event_types();
750 let total = event_types.len();
751
752 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
754
755 if let Some(offset) = params.offset {
757 if offset < event_types.len() {
758 event_types = event_types[offset..].to_vec();
759 } else {
760 event_types = vec![];
761 }
762 }
763
764 if let Some(limit) = params.limit {
765 event_types.truncate(limit);
766 }
767
768 tracing::debug!(
769 "Listed {} event types (total: {})",
770 event_types.len(),
771 total
772 );
773
774 Json(ListEventTypesResponse { event_types, total })
775}
776
777#[derive(Debug, Deserialize)]
779pub struct WebSocketParams {
780 pub consumer_id: Option<String>,
781}
782
783pub async fn events_websocket(
784 ws: WebSocketUpgrade,
785 State(store): State<SharedStore>,
786 Query(params): Query<WebSocketParams>,
787) -> Response {
788 let websocket_manager = store.websocket_manager();
789
790 ws.on_upgrade(move |socket| async move {
791 if let Some(consumer_id) = params.consumer_id {
792 websocket_manager
793 .handle_socket_with_consumer(socket, consumer_id, store)
794 .await;
795 } else {
796 websocket_manager.handle_socket(socket).await;
797 }
798 })
799}
800
801pub async fn analytics_frequency(
803 State(store): State<SharedStore>,
804 Query(req): Query<EventFrequencyRequest>,
805) -> Result<Json<EventFrequencyResponse>> {
806 let response = AnalyticsEngine::event_frequency(&store, &req)?;
807
808 tracing::debug!(
809 "Frequency analysis returned {} buckets",
810 response.buckets.len()
811 );
812
813 Ok(Json(response))
814}
815
816pub async fn analytics_summary(
818 State(store): State<SharedStore>,
819 Query(req): Query<StatsSummaryRequest>,
820) -> Result<Json<StatsSummaryResponse>> {
821 let response = AnalyticsEngine::stats_summary(&store, &req)?;
822
823 tracing::debug!(
824 "Stats summary: {} events across {} entities",
825 response.total_events,
826 response.unique_entities
827 );
828
829 Ok(Json(response))
830}
831
832pub async fn analytics_correlation(
834 State(store): State<SharedStore>,
835 Query(req): Query<CorrelationRequest>,
836) -> Result<Json<CorrelationResponse>> {
837 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
838
839 tracing::debug!(
840 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
841 response.correlated_pairs,
842 response.total_a,
843 response.correlation_percentage
844 );
845
846 Ok(Json(response))
847}
848
849pub async fn create_snapshot(
851 State(store): State<SharedStore>,
852 Json(req): Json<CreateSnapshotRequest>,
853) -> Result<Json<CreateSnapshotResponse>> {
854 store.create_snapshot(&req.entity_id)?;
855
856 let snapshot_manager = store.snapshot_manager();
857 let snapshot = snapshot_manager
858 .get_latest_snapshot(&req.entity_id)
859 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
860
861 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
862
863 Ok(Json(CreateSnapshotResponse {
864 snapshot_id: snapshot.id,
865 entity_id: snapshot.entity_id,
866 created_at: snapshot.created_at,
867 event_count: snapshot.event_count,
868 size_bytes: snapshot.metadata.size_bytes,
869 }))
870}
871
872pub async fn list_snapshots(
874 State(store): State<SharedStore>,
875 Query(req): Query<ListSnapshotsRequest>,
876) -> Result<Json<ListSnapshotsResponse>> {
877 let snapshot_manager = store.snapshot_manager();
878
879 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
880 snapshot_manager
881 .get_all_snapshots(&entity_id)
882 .into_iter()
883 .map(SnapshotInfo::from)
884 .collect()
885 } else {
886 let entities = snapshot_manager.list_entities();
888 entities
889 .iter()
890 .flat_map(|entity_id| {
891 snapshot_manager
892 .get_all_snapshots(entity_id)
893 .into_iter()
894 .map(SnapshotInfo::from)
895 })
896 .collect()
897 };
898
899 let total = snapshots.len();
900
901 tracing::debug!("Listed {} snapshots", total);
902
903 Ok(Json(ListSnapshotsResponse { snapshots, total }))
904}
905
906pub async fn get_latest_snapshot(
908 State(store): State<SharedStore>,
909 Path(entity_id): Path<String>,
910) -> Result<Json<serde_json::Value>> {
911 let snapshot_manager = store.snapshot_manager();
912
913 let snapshot = snapshot_manager
914 .get_latest_snapshot(&entity_id)
915 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
916
917 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
918
919 Ok(Json(serde_json::json!({
920 "snapshot_id": snapshot.id,
921 "entity_id": snapshot.entity_id,
922 "created_at": snapshot.created_at,
923 "as_of": snapshot.as_of,
924 "event_count": snapshot.event_count,
925 "size_bytes": snapshot.metadata.size_bytes,
926 "snapshot_type": snapshot.metadata.snapshot_type,
927 "state": snapshot.state
928 })))
929}
930
931pub async fn trigger_compaction(
933 State(store): State<SharedStore>,
934) -> Result<Json<CompactionResult>> {
935 let compaction_manager = store.compaction_manager().ok_or_else(|| {
936 crate::error::AllSourceError::InternalError(
937 "Compaction not enabled (no Parquet storage)".to_string(),
938 )
939 })?;
940
941 tracing::info!("📦 Manual compaction triggered via API");
942
943 let result = compaction_manager.compact_now()?;
944
945 Ok(Json(result))
946}
947
948pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
950 let compaction_manager = store.compaction_manager().ok_or_else(|| {
951 crate::error::AllSourceError::InternalError(
952 "Compaction not enabled (no Parquet storage)".to_string(),
953 )
954 })?;
955
956 let stats = compaction_manager.stats();
957 let config = compaction_manager.config();
958
959 Ok(Json(serde_json::json!({
960 "stats": stats,
961 "config": {
962 "min_files_to_compact": config.min_files_to_compact,
963 "target_file_size": config.target_file_size,
964 "max_file_size": config.max_file_size,
965 "small_file_threshold": config.small_file_threshold,
966 "compaction_interval_seconds": config.compaction_interval_seconds,
967 "auto_compact": config.auto_compact,
968 "strategy": config.strategy
969 }
970 })))
971}
972
973pub async fn register_schema(
975 State(store): State<SharedStore>,
976 Json(req): Json<RegisterSchemaRequest>,
977) -> Result<Json<RegisterSchemaResponse>> {
978 let schema_registry = store.schema_registry();
979
980 let response =
981 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
982
983 tracing::info!(
984 "📋 Schema registered: v{} for '{}'",
985 response.version,
986 response.subject
987 );
988
989 Ok(Json(response))
990}
991
992#[derive(Deserialize)]
994pub struct GetSchemaParams {
995 version: Option<u32>,
996}
997
998pub async fn get_schema(
999 State(store): State<SharedStore>,
1000 Path(subject): Path<String>,
1001 Query(params): Query<GetSchemaParams>,
1002) -> Result<Json<serde_json::Value>> {
1003 let schema_registry = store.schema_registry();
1004
1005 let schema = schema_registry.get_schema(&subject, params.version)?;
1006
1007 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1008
1009 Ok(Json(serde_json::json!({
1010 "id": schema.id,
1011 "subject": schema.subject,
1012 "version": schema.version,
1013 "schema": schema.schema,
1014 "created_at": schema.created_at,
1015 "description": schema.description,
1016 "tags": schema.tags
1017 })))
1018}
1019
1020pub async fn list_schema_versions(
1022 State(store): State<SharedStore>,
1023 Path(subject): Path<String>,
1024) -> Result<Json<serde_json::Value>> {
1025 let schema_registry = store.schema_registry();
1026
1027 let versions = schema_registry.list_versions(&subject)?;
1028
1029 Ok(Json(serde_json::json!({
1030 "subject": subject,
1031 "versions": versions
1032 })))
1033}
1034
1035pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1037 let schema_registry = store.schema_registry();
1038
1039 let subjects = schema_registry.list_subjects();
1040
1041 Json(serde_json::json!({
1042 "subjects": subjects,
1043 "total": subjects.len()
1044 }))
1045}
1046
1047pub async fn validate_event_schema(
1049 State(store): State<SharedStore>,
1050 Json(req): Json<ValidateEventRequest>,
1051) -> Result<Json<ValidateEventResponse>> {
1052 let schema_registry = store.schema_registry();
1053
1054 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1055
1056 if response.valid {
1057 tracing::debug!(
1058 "✅ Event validated against schema '{}' v{}",
1059 req.subject,
1060 response.schema_version
1061 );
1062 } else {
1063 tracing::warn!(
1064 "❌ Event validation failed for '{}': {:?}",
1065 req.subject,
1066 response.errors
1067 );
1068 }
1069
1070 Ok(Json(response))
1071}
1072
1073#[derive(Deserialize)]
1075pub struct SetCompatibilityRequest {
1076 compatibility: CompatibilityMode,
1077}
1078
1079pub async fn set_compatibility_mode(
1080 State(store): State<SharedStore>,
1081 Path(subject): Path<String>,
1082 Json(req): Json<SetCompatibilityRequest>,
1083) -> Json<serde_json::Value> {
1084 let schema_registry = store.schema_registry();
1085
1086 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1087
1088 tracing::info!(
1089 "🔧 Set compatibility mode for '{}' to {:?}",
1090 subject,
1091 req.compatibility
1092 );
1093
1094 Json(serde_json::json!({
1095 "subject": subject,
1096 "compatibility": req.compatibility
1097 }))
1098}
1099
1100pub async fn start_replay(
1102 State(store): State<SharedStore>,
1103 Json(req): Json<StartReplayRequest>,
1104) -> Result<Json<StartReplayResponse>> {
1105 let replay_manager = store.replay_manager();
1106
1107 let response = replay_manager.start_replay(store, req)?;
1108
1109 tracing::info!(
1110 "🔄 Started replay {} with {} events",
1111 response.replay_id,
1112 response.total_events
1113 );
1114
1115 Ok(Json(response))
1116}
1117
1118pub async fn get_replay_progress(
1120 State(store): State<SharedStore>,
1121 Path(replay_id): Path<uuid::Uuid>,
1122) -> Result<Json<ReplayProgress>> {
1123 let replay_manager = store.replay_manager();
1124
1125 let progress = replay_manager.get_progress(replay_id)?;
1126
1127 Ok(Json(progress))
1128}
1129
1130pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1132 let replay_manager = store.replay_manager();
1133
1134 let replays = replay_manager.list_replays();
1135
1136 Json(serde_json::json!({
1137 "replays": replays,
1138 "total": replays.len()
1139 }))
1140}
1141
1142pub async fn cancel_replay(
1144 State(store): State<SharedStore>,
1145 Path(replay_id): Path<uuid::Uuid>,
1146) -> Result<Json<serde_json::Value>> {
1147 let replay_manager = store.replay_manager();
1148
1149 replay_manager.cancel_replay(replay_id)?;
1150
1151 tracing::info!("🛑 Cancelled replay {}", replay_id);
1152
1153 Ok(Json(serde_json::json!({
1154 "replay_id": replay_id,
1155 "status": "cancelled"
1156 })))
1157}
1158
1159pub async fn delete_replay(
1161 State(store): State<SharedStore>,
1162 Path(replay_id): Path<uuid::Uuid>,
1163) -> Result<Json<serde_json::Value>> {
1164 let replay_manager = store.replay_manager();
1165
1166 let deleted = replay_manager.delete_replay(replay_id)?;
1167
1168 if deleted {
1169 tracing::info!("🗑️ Deleted replay {}", replay_id);
1170 }
1171
1172 Ok(Json(serde_json::json!({
1173 "replay_id": replay_id,
1174 "deleted": deleted
1175 })))
1176}
1177
1178pub async fn register_pipeline(
1180 State(store): State<SharedStore>,
1181 Json(config): Json<PipelineConfig>,
1182) -> Result<Json<serde_json::Value>> {
1183 let pipeline_manager = store.pipeline_manager();
1184
1185 let pipeline_id = pipeline_manager.register(config.clone());
1186
1187 tracing::info!(
1188 "🔀 Pipeline registered: {} (name: {})",
1189 pipeline_id,
1190 config.name
1191 );
1192
1193 Ok(Json(serde_json::json!({
1194 "pipeline_id": pipeline_id,
1195 "name": config.name,
1196 "enabled": config.enabled
1197 })))
1198}
1199
1200pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1202 let pipeline_manager = store.pipeline_manager();
1203
1204 let pipelines = pipeline_manager.list();
1205
1206 tracing::debug!("Listed {} pipelines", pipelines.len());
1207
1208 Json(serde_json::json!({
1209 "pipelines": pipelines,
1210 "total": pipelines.len()
1211 }))
1212}
1213
1214pub async fn get_pipeline(
1216 State(store): State<SharedStore>,
1217 Path(pipeline_id): Path<uuid::Uuid>,
1218) -> Result<Json<PipelineConfig>> {
1219 let pipeline_manager = store.pipeline_manager();
1220
1221 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1222 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1223 })?;
1224
1225 Ok(Json(pipeline.config().clone()))
1226}
1227
1228pub async fn remove_pipeline(
1230 State(store): State<SharedStore>,
1231 Path(pipeline_id): Path<uuid::Uuid>,
1232) -> Result<Json<serde_json::Value>> {
1233 let pipeline_manager = store.pipeline_manager();
1234
1235 let removed = pipeline_manager.remove(pipeline_id);
1236
1237 if removed {
1238 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1239 }
1240
1241 Ok(Json(serde_json::json!({
1242 "pipeline_id": pipeline_id,
1243 "removed": removed
1244 })))
1245}
1246
1247pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1249 let pipeline_manager = store.pipeline_manager();
1250
1251 let stats = pipeline_manager.all_stats();
1252
1253 Json(serde_json::json!({
1254 "stats": stats,
1255 "total": stats.len()
1256 }))
1257}
1258
1259pub async fn get_pipeline_stats(
1261 State(store): State<SharedStore>,
1262 Path(pipeline_id): Path<uuid::Uuid>,
1263) -> Result<Json<PipelineStats>> {
1264 let pipeline_manager = store.pipeline_manager();
1265
1266 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1267 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1268 })?;
1269
1270 Ok(Json(pipeline.stats()))
1271}
1272
1273pub async fn reset_pipeline(
1275 State(store): State<SharedStore>,
1276 Path(pipeline_id): Path<uuid::Uuid>,
1277) -> Result<Json<serde_json::Value>> {
1278 let pipeline_manager = store.pipeline_manager();
1279
1280 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1281 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1282 })?;
1283
1284 pipeline.reset();
1285
1286 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1287
1288 Ok(Json(serde_json::json!({
1289 "pipeline_id": pipeline_id,
1290 "reset": true
1291 })))
1292}
1293
1294pub async fn get_event_by_id(
1300 State(store): State<SharedStore>,
1301 Path(event_id): Path<uuid::Uuid>,
1302) -> Result<Json<serde_json::Value>> {
1303 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1304 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1305 })?;
1306
1307 let dto = EventDto::from(&event);
1308
1309 tracing::debug!("Event retrieved by ID: {}", event_id);
1310
1311 Ok(Json(serde_json::json!({
1312 "event": dto,
1313 "found": true
1314 })))
1315}
1316
1317pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1323 let projection_manager = store.projection_manager();
1324 let status_map = store.projection_status();
1325
1326 let projections: Vec<serde_json::Value> = projection_manager
1327 .list_projections()
1328 .iter()
1329 .map(|(name, projection)| {
1330 let status = status_map
1331 .get(name)
1332 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1333 serde_json::json!({
1334 "name": name,
1335 "type": format!("{:?}", projection.name()),
1336 "status": status,
1337 })
1338 })
1339 .collect();
1340
1341 tracing::debug!("Listed {} projections", projections.len());
1342
1343 Json(serde_json::json!({
1344 "projections": projections,
1345 "total": projections.len()
1346 }))
1347}
1348
1349pub async fn get_projection(
1351 State(store): State<SharedStore>,
1352 Path(name): Path<String>,
1353) -> Result<Json<serde_json::Value>> {
1354 let projection_manager = store.projection_manager();
1355
1356 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1357 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1358 })?;
1359
1360 Ok(Json(serde_json::json!({
1361 "name": projection.name(),
1362 "found": true
1363 })))
1364}
1365
1366pub async fn get_projection_state(
1371 State(store): State<SharedStore>,
1372 Path((name, entity_id)): Path<(String, String)>,
1373) -> Result<Json<serde_json::Value>> {
1374 let projection_manager = store.projection_manager();
1375
1376 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1377 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1378 })?;
1379
1380 let state = projection.get_state(&entity_id);
1381
1382 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1383
1384 Ok(Json(serde_json::json!({
1385 "projection": name,
1386 "entity_id": entity_id,
1387 "state": state,
1388 "found": state.is_some()
1389 })))
1390}
1391
1392pub async fn delete_projection(
1397 State(store): State<SharedStore>,
1398 Path(name): Path<String>,
1399) -> Result<Json<serde_json::Value>> {
1400 let projection_manager = store.projection_manager();
1401
1402 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1403 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1404 })?;
1405
1406 projection.clear();
1407
1408 let cache = store.projection_state_cache();
1410 let prefix = format!("{name}:");
1411 let keys_to_remove: Vec<String> = cache
1412 .iter()
1413 .filter(|entry| entry.key().starts_with(&prefix))
1414 .map(|entry| entry.key().clone())
1415 .collect();
1416 for key in keys_to_remove {
1417 cache.remove(&key);
1418 }
1419
1420 tracing::info!("Projection deleted (cleared): {}", name);
1421
1422 Ok(Json(serde_json::json!({
1423 "projection": name,
1424 "deleted": true
1425 })))
1426}
1427
1428pub async fn get_projection_state_summary(
1432 State(store): State<SharedStore>,
1433 Path(name): Path<String>,
1434) -> Result<Json<serde_json::Value>> {
1435 let projection_manager = store.projection_manager();
1436
1437 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1438 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1439 })?;
1440
1441 let cache = store.projection_state_cache();
1443 let prefix = format!("{name}:");
1444 let states: Vec<serde_json::Value> = cache
1445 .iter()
1446 .filter(|entry| entry.key().starts_with(&prefix))
1447 .map(|entry| {
1448 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1449 serde_json::json!({
1450 "entity_id": entity_id,
1451 "state": entry.value().clone()
1452 })
1453 })
1454 .collect();
1455
1456 let total = states.len();
1457
1458 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1459
1460 Ok(Json(serde_json::json!({
1461 "projection": name,
1462 "states": states,
1463 "total": total
1464 })))
1465}
1466
1467pub async fn reset_projection(
1471 State(store): State<SharedStore>,
1472 Path(name): Path<String>,
1473) -> Result<Json<serde_json::Value>> {
1474 let reprocessed = store.reset_projection(&name)?;
1475
1476 tracing::info!(
1477 "Projection reset: {} ({} events reprocessed)",
1478 name,
1479 reprocessed
1480 );
1481
1482 Ok(Json(serde_json::json!({
1483 "projection": name,
1484 "reset": true,
1485 "events_reprocessed": reprocessed
1486 })))
1487}
1488
1489pub async fn pause_projection(
1493 State(store): State<SharedStore>,
1494 Path(name): Path<String>,
1495) -> Result<Json<serde_json::Value>> {
1496 let projection_manager = store.projection_manager();
1497
1498 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1500 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1501 })?;
1502
1503 store
1504 .projection_status()
1505 .insert(name.clone(), "paused".to_string());
1506
1507 tracing::info!("Projection paused: {}", name);
1508
1509 Ok(Json(serde_json::json!({
1510 "projection": name,
1511 "status": "paused"
1512 })))
1513}
1514
1515pub async fn start_projection(
1519 State(store): State<SharedStore>,
1520 Path(name): Path<String>,
1521) -> Result<Json<serde_json::Value>> {
1522 let projection_manager = store.projection_manager();
1523
1524 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1526 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1527 })?;
1528
1529 store
1530 .projection_status()
1531 .insert(name.clone(), "running".to_string());
1532
1533 tracing::info!("Projection started: {}", name);
1534
1535 Ok(Json(serde_json::json!({
1536 "projection": name,
1537 "status": "running"
1538 })))
1539}
1540
1541#[derive(Debug, Deserialize)]
1543pub struct SaveProjectionStateRequest {
1544 pub state: serde_json::Value,
1545}
1546
1547pub async fn save_projection_state(
1552 State(store): State<SharedStore>,
1553 Path((name, entity_id)): Path<(String, String)>,
1554 Json(req): Json<SaveProjectionStateRequest>,
1555) -> Result<Json<serde_json::Value>> {
1556 let projection_cache = store.projection_state_cache();
1557
1558 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1560
1561 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1562
1563 Ok(Json(serde_json::json!({
1564 "projection": name,
1565 "entity_id": entity_id,
1566 "saved": true
1567 })))
1568}
1569
1570#[derive(Debug, Deserialize)]
1574pub struct BulkGetStateRequest {
1575 pub entity_ids: Vec<String>,
1576}
1577
1578#[derive(Debug, Deserialize)]
1582pub struct BulkSaveStateRequest {
1583 pub states: Vec<BulkSaveStateItem>,
1584}
1585
1586#[derive(Debug, Deserialize)]
1587pub struct BulkSaveStateItem {
1588 pub entity_id: String,
1589 pub state: serde_json::Value,
1590}
1591
1592pub async fn bulk_get_projection_states(
1593 State(store): State<SharedStore>,
1594 Path(name): Path<String>,
1595 Json(req): Json<BulkGetStateRequest>,
1596) -> Result<Json<serde_json::Value>> {
1597 let projection_manager = store.projection_manager();
1598
1599 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1600 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1601 })?;
1602
1603 let states: Vec<serde_json::Value> = req
1604 .entity_ids
1605 .iter()
1606 .map(|entity_id| {
1607 let state = projection.get_state(entity_id);
1608 serde_json::json!({
1609 "entity_id": entity_id,
1610 "state": state,
1611 "found": state.is_some()
1612 })
1613 })
1614 .collect();
1615
1616 tracing::debug!(
1617 "Bulk projection state retrieved: {} entities from {}",
1618 states.len(),
1619 name
1620 );
1621
1622 Ok(Json(serde_json::json!({
1623 "projection": name,
1624 "states": states,
1625 "total": states.len()
1626 })))
1627}
1628
1629pub async fn bulk_save_projection_states(
1634 State(store): State<SharedStore>,
1635 Path(name): Path<String>,
1636 Json(req): Json<BulkSaveStateRequest>,
1637) -> Result<Json<serde_json::Value>> {
1638 let projection_cache = store.projection_state_cache();
1639
1640 let mut saved_count = 0;
1641 for item in &req.states {
1642 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1643 saved_count += 1;
1644 }
1645
1646 tracing::info!(
1647 "Bulk projection state saved: {} entities for {}",
1648 saved_count,
1649 name
1650 );
1651
1652 Ok(Json(serde_json::json!({
1653 "projection": name,
1654 "saved": saved_count,
1655 "total": req.states.len()
1656 })))
1657}
1658
1659#[derive(Debug, Deserialize)]
1665pub struct ListWebhooksParams {
1666 pub tenant_id: Option<String>,
1667}
1668
1669pub async fn register_webhook(
1671 State(store): State<SharedStore>,
1672 Json(req): Json<RegisterWebhookRequest>,
1673) -> Json<serde_json::Value> {
1674 let registry = store.webhook_registry();
1675 let webhook = registry.register(req);
1676
1677 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1678
1679 Json(serde_json::json!({
1680 "webhook": webhook,
1681 "created": true
1682 }))
1683}
1684
1685pub async fn list_webhooks(
1687 State(store): State<SharedStore>,
1688 Query(params): Query<ListWebhooksParams>,
1689) -> Json<serde_json::Value> {
1690 let registry = store.webhook_registry();
1691
1692 let webhooks = if let Some(tenant_id) = params.tenant_id {
1693 registry.list_by_tenant(&tenant_id)
1694 } else {
1695 vec![]
1697 };
1698
1699 let total = webhooks.len();
1700
1701 Json(serde_json::json!({
1702 "webhooks": webhooks,
1703 "total": total
1704 }))
1705}
1706
1707pub async fn get_webhook(
1709 State(store): State<SharedStore>,
1710 Path(webhook_id): Path<uuid::Uuid>,
1711) -> Result<Json<serde_json::Value>> {
1712 let registry = store.webhook_registry();
1713
1714 let webhook = registry.get(webhook_id).ok_or_else(|| {
1715 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1716 })?;
1717
1718 Ok(Json(serde_json::json!({
1719 "webhook": webhook,
1720 "found": true
1721 })))
1722}
1723
1724pub async fn update_webhook(
1726 State(store): State<SharedStore>,
1727 Path(webhook_id): Path<uuid::Uuid>,
1728 Json(req): Json<UpdateWebhookRequest>,
1729) -> Result<Json<serde_json::Value>> {
1730 let registry = store.webhook_registry();
1731
1732 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1733 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1734 })?;
1735
1736 tracing::info!("Webhook updated: {}", webhook_id);
1737
1738 Ok(Json(serde_json::json!({
1739 "webhook": webhook,
1740 "updated": true
1741 })))
1742}
1743
1744pub async fn delete_webhook(
1746 State(store): State<SharedStore>,
1747 Path(webhook_id): Path<uuid::Uuid>,
1748) -> Result<Json<serde_json::Value>> {
1749 let registry = store.webhook_registry();
1750
1751 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1752 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1753 })?;
1754
1755 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1756
1757 Ok(Json(serde_json::json!({
1758 "webhook_id": webhook_id,
1759 "deleted": true
1760 })))
1761}
1762
1763#[derive(Debug, Deserialize)]
1765pub struct ListDeliveriesParams {
1766 pub limit: Option<usize>,
1767}
1768
1769pub async fn list_webhook_deliveries(
1771 State(store): State<SharedStore>,
1772 Path(webhook_id): Path<uuid::Uuid>,
1773 Query(params): Query<ListDeliveriesParams>,
1774) -> Result<Json<serde_json::Value>> {
1775 let registry = store.webhook_registry();
1776
1777 registry.get(webhook_id).ok_or_else(|| {
1779 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1780 })?;
1781
1782 let limit = params.limit.unwrap_or(50);
1783 let deliveries = registry.get_deliveries(webhook_id, limit);
1784 let total = deliveries.len();
1785
1786 Ok(Json(serde_json::json!({
1787 "webhook_id": webhook_id,
1788 "deliveries": deliveries,
1789 "total": total
1790 })))
1791}
1792
1793#[cfg(feature = "analytics")]
1799pub async fn eventql_query(
1800 State(store): State<SharedStore>,
1801 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1802) -> Result<Json<serde_json::Value>> {
1803 let events = store.snapshot_events();
1804 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1805 Ok(response) => Ok(Json(serde_json::json!({
1806 "columns": response.columns,
1807 "rows": response.rows,
1808 "row_count": response.row_count,
1809 }))),
1810 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1811 }
1812}
1813
1814pub async fn graphql_query(
1816 State(store): State<SharedStore>,
1817 Json(req): Json<GraphQLRequest>,
1818) -> Json<serde_json::Value> {
1819 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1820 Ok(f) => f,
1821 Err(e) => {
1822 return Json(
1823 serde_json::to_value(GraphQLResponse {
1824 data: None,
1825 errors: vec![GraphQLError { message: e }],
1826 })
1827 .unwrap(),
1828 );
1829 }
1830 };
1831
1832 let mut data = serde_json::Map::new();
1833 let mut errors = Vec::new();
1834
1835 for field in &fields {
1836 match field.name.as_str() {
1837 "events" => {
1838 let request = crate::application::dto::QueryEventsRequest {
1839 entity_id: field.arguments.get("entity_id").cloned(),
1840 event_type: field.arguments.get("event_type").cloned(),
1841 tenant_id: field.arguments.get("tenant_id").cloned(),
1842 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1843 as_of: None,
1844 since: None,
1845 until: None,
1846 event_type_prefix: None,
1847 payload_filter: None,
1848 };
1849 match store.query(&request) {
1850 Ok(events) => {
1851 let json_events: Vec<serde_json::Value> = events
1852 .iter()
1853 .map(|e| {
1854 crate::infrastructure::query::graphql::event_to_json(
1855 e,
1856 &field.fields,
1857 )
1858 })
1859 .collect();
1860 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1861 }
1862 Err(e) => errors.push(GraphQLError {
1863 message: format!("events query failed: {e}"),
1864 }),
1865 }
1866 }
1867 "event" => {
1868 if let Some(id_str) = field.arguments.get("id") {
1869 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1870 match store.get_event_by_id(&id) {
1871 Ok(Some(event)) => {
1872 data.insert(
1873 "event".to_string(),
1874 crate::infrastructure::query::graphql::event_to_json(
1875 &event,
1876 &field.fields,
1877 ),
1878 );
1879 }
1880 Ok(None) => {
1881 data.insert("event".to_string(), serde_json::Value::Null);
1882 }
1883 Err(e) => errors.push(GraphQLError {
1884 message: format!("event lookup failed: {e}"),
1885 }),
1886 }
1887 } else {
1888 errors.push(GraphQLError {
1889 message: format!("Invalid UUID: {id_str}"),
1890 });
1891 }
1892 } else {
1893 errors.push(GraphQLError {
1894 message: "event query requires 'id' argument".to_string(),
1895 });
1896 }
1897 }
1898 "projections" => {
1899 let pm = store.projection_manager();
1900 let names: Vec<serde_json::Value> = pm
1901 .list_projections()
1902 .iter()
1903 .map(|(name, _)| serde_json::Value::String(name.clone()))
1904 .collect();
1905 data.insert("projections".to_string(), serde_json::Value::Array(names));
1906 }
1907 "stats" => {
1908 let stats = store.stats();
1909 data.insert(
1910 "stats".to_string(),
1911 serde_json::json!({
1912 "total_events": stats.total_events,
1913 "total_entities": stats.total_entities,
1914 "total_event_types": stats.total_event_types,
1915 }),
1916 );
1917 }
1918 "__schema" => {
1919 data.insert(
1920 "__schema".to_string(),
1921 crate::infrastructure::query::graphql::introspection_schema(),
1922 );
1923 }
1924 other => {
1925 errors.push(GraphQLError {
1926 message: format!("Unknown field: {other}"),
1927 });
1928 }
1929 }
1930 }
1931
1932 Json(
1933 serde_json::to_value(GraphQLResponse {
1934 data: Some(serde_json::Value::Object(data)),
1935 errors,
1936 })
1937 .unwrap(),
1938 )
1939}
1940
1941pub async fn geo_query(
1943 State(store): State<SharedStore>,
1944 Json(req): Json<GeoQueryRequest>,
1945) -> Json<serde_json::Value> {
1946 let events = store.snapshot_events();
1947 let geo_index = store.geo_index();
1948 let results =
1949 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1950 let total = results.len();
1951 Json(serde_json::json!({
1952 "results": results,
1953 "total": total,
1954 }))
1955}
1956
1957pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1959 let stats = store.geo_index().stats();
1960 Json(serde_json::json!(stats))
1961}
1962
1963pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1965 let stats = store.exactly_once().stats();
1966 Json(serde_json::json!(stats))
1967}
1968
1969pub async fn schema_evolution_history(
1971 State(store): State<SharedStore>,
1972 Path(event_type): Path<String>,
1973) -> Json<serde_json::Value> {
1974 let mgr = store.schema_evolution();
1975 let history = mgr.get_history(&event_type);
1976 let version = mgr.get_version(&event_type);
1977 Json(serde_json::json!({
1978 "event_type": event_type,
1979 "current_version": version,
1980 "history": history,
1981 }))
1982}
1983
1984pub async fn schema_evolution_schema(
1986 State(store): State<SharedStore>,
1987 Path(event_type): Path<String>,
1988) -> Json<serde_json::Value> {
1989 let mgr = store.schema_evolution();
1990 if let Some(schema) = mgr.get_schema(&event_type) {
1991 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1992 Json(serde_json::json!({
1993 "event_type": event_type,
1994 "version": mgr.get_version(&event_type),
1995 "inferred_schema": schema,
1996 "json_schema": json_schema,
1997 }))
1998 } else {
1999 Json(serde_json::json!({
2000 "event_type": event_type,
2001 "error": "No schema inferred for this event type"
2002 }))
2003 }
2004}
2005
2006pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2008 let stats = store.schema_evolution().stats();
2009 let event_types = store.schema_evolution().list_event_types();
2010 Json(serde_json::json!({
2011 "stats": stats,
2012 "tracked_event_types": event_types,
2013 }))
2014}
2015
2016#[cfg(feature = "embedded-sync")]
2022pub async fn sync_pull_handler(
2023 State(state): State<AppState>,
2024 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2025) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2026 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2027
2028 let store = &state.store;
2029
2030 let since = request
2033 .version_vector
2034 .values()
2035 .map(|ts| ts.physical_ms)
2036 .min()
2037 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2038
2039 let events = store.query(&crate::application::dto::QueryEventsRequest {
2040 entity_id: None,
2041 event_type: None,
2042 tenant_id: None,
2043 as_of: None,
2044 since,
2045 until: None,
2046 limit: None,
2047 event_type_prefix: None,
2048 payload_filter: None,
2049 })?;
2050
2051 let mut replicated = Vec::with_capacity(events.len());
2053 let mut last_ms = 0u64;
2054 let mut logical = 0u32;
2055
2056 for event in &events {
2057 let event_ms = event.timestamp().timestamp_millis() as u64;
2058 if event_ms == last_ms {
2059 logical += 1;
2060 } else {
2061 last_ms = event_ms;
2062 logical = 0;
2063 }
2064
2065 replicated.push(ReplicatedEvent {
2066 event_id: event.id().to_string(),
2067 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2068 origin_region: "server".to_string(),
2069 event_data: serde_json::json!({
2070 "event_type": event.event_type_str(),
2071 "entity_id": event.entity_id_str(),
2072 "tenant_id": event.tenant_id_str(),
2073 "payload": event.payload,
2074 "metadata": event.metadata,
2075 }),
2076 });
2077 }
2078
2079 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2080 events: replicated,
2081 version_vector: std::collections::BTreeMap::new(),
2082 }))
2083}
2084
2085#[cfg(feature = "embedded-sync")]
2087pub async fn sync_push_handler(
2088 State(state): State<AppState>,
2089 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2090) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2091 let store = &state.store;
2092
2093 let mut accepted = 0usize;
2094 let mut skipped = 0usize;
2095
2096 for rep_event in &request.events {
2097 let event_data = &rep_event.event_data;
2098 let event_type = event_data
2099 .get("event_type")
2100 .and_then(|v| v.as_str())
2101 .unwrap_or("unknown")
2102 .to_string();
2103 let entity_id = event_data
2104 .get("entity_id")
2105 .and_then(|v| v.as_str())
2106 .unwrap_or("unknown")
2107 .to_string();
2108 let tenant_id = event_data
2109 .get("tenant_id")
2110 .and_then(|v| v.as_str())
2111 .unwrap_or("default")
2112 .to_string();
2113 let payload = event_data
2114 .get("payload")
2115 .cloned()
2116 .unwrap_or(serde_json::json!({}));
2117 let metadata = event_data.get("metadata").cloned();
2118
2119 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2120 Ok(domain_event) => {
2121 store.ingest(&domain_event)?;
2122 accepted += 1;
2123 }
2124 Err(_) => {
2125 skipped += 1;
2126 }
2127 }
2128 }
2129
2130 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2131 accepted,
2132 skipped,
2133 version_vector: std::collections::BTreeMap::new(),
2134 }))
2135}
2136
2137pub async fn register_consumer(
2143 State(store): State<SharedStore>,
2144 Json(req): Json<RegisterConsumerRequest>,
2145) -> Result<Json<ConsumerResponse>> {
2146 let consumer = store
2147 .consumer_registry()
2148 .register(&req.consumer_id, &req.event_type_filters);
2149
2150 Ok(Json(ConsumerResponse {
2151 consumer_id: consumer.consumer_id,
2152 event_type_filters: consumer.event_type_filters,
2153 cursor_position: consumer.cursor_position,
2154 }))
2155}
2156
2157pub async fn get_consumer(
2159 State(store): State<SharedStore>,
2160 Path(consumer_id): Path<String>,
2161) -> Result<Json<ConsumerResponse>> {
2162 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2163
2164 Ok(Json(ConsumerResponse {
2165 consumer_id: consumer.consumer_id,
2166 event_type_filters: consumer.event_type_filters,
2167 cursor_position: consumer.cursor_position,
2168 }))
2169}
2170
2171#[derive(Debug, Deserialize)]
2173pub struct ConsumerPollQuery {
2174 pub limit: Option<usize>,
2175}
2176
2177pub async fn poll_consumer_events(
2178 State(store): State<SharedStore>,
2179 Path(consumer_id): Path<String>,
2180 Query(query): Query<ConsumerPollQuery>,
2181) -> Result<Json<ConsumerEventsResponse>> {
2182 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2183 let offset = consumer.cursor_position.unwrap_or(0);
2184 let limit = query.limit.unwrap_or(100);
2185
2186 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2187 let count = events.len();
2188
2189 let consumer_events: Vec<ConsumerEventDto> = events
2190 .into_iter()
2191 .map(|(position, event)| ConsumerEventDto {
2192 position,
2193 event: EventDto::from(&event),
2194 })
2195 .collect();
2196
2197 Ok(Json(ConsumerEventsResponse {
2198 events: consumer_events,
2199 count,
2200 }))
2201}
2202
2203pub async fn ack_consumer(
2205 State(store): State<SharedStore>,
2206 Path(consumer_id): Path<String>,
2207 Json(req): Json<AckRequest>,
2208) -> Result<Json<serde_json::Value>> {
2209 let max_offset = store.total_events() as u64;
2210
2211 store
2212 .consumer_registry()
2213 .ack(&consumer_id, req.position, max_offset)
2214 .map_err(crate::error::AllSourceError::InvalidInput)?;
2215
2216 Ok(Json(serde_json::json!({
2217 "status": "ok",
2218 "consumer_id": consumer_id,
2219 "position": req.position,
2220 })))
2221}
2222
2223#[cfg(test)]
2224mod tests {
2225 use super::*;
2226 use crate::{domain::entities::Event, store::EventStore};
2227
2228 fn create_test_store() -> Arc<EventStore> {
2229 Arc::new(EventStore::new())
2230 }
2231
2232 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2233 Event::from_strings(
2234 event_type.to_string(),
2235 entity_id.to_string(),
2236 "test-stream".to_string(),
2237 serde_json::json!({
2238 "name": "Test",
2239 "value": 42
2240 }),
2241 None,
2242 )
2243 .unwrap()
2244 }
2245
2246 #[tokio::test]
2247 async fn test_query_events_has_more_and_total_count() {
2248 let store = create_test_store();
2249
2250 for i in 0..50 {
2252 store
2253 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2254 .unwrap();
2255 }
2256
2257 let req = QueryEventsRequest {
2259 entity_id: None,
2260 event_type: None,
2261 tenant_id: None,
2262 as_of: None,
2263 since: None,
2264 until: None,
2265 limit: Some(10),
2266 event_type_prefix: None,
2267 payload_filter: None,
2268 };
2269
2270 let requested_limit = req.limit;
2271 let unlimited_req = QueryEventsRequest {
2272 limit: None,
2273 ..QueryEventsRequest {
2274 entity_id: req.entity_id,
2275 event_type: req.event_type,
2276 tenant_id: req.tenant_id,
2277 as_of: req.as_of,
2278 since: req.since,
2279 until: req.until,
2280 limit: None,
2281 event_type_prefix: req.event_type_prefix,
2282 payload_filter: req.payload_filter,
2283 }
2284 };
2285 let all_events = store.query(&unlimited_req).unwrap();
2286 let total_count = all_events.len();
2287 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2288 all_events.into_iter().take(limit).collect()
2289 } else {
2290 all_events
2291 };
2292 let count = limited_events.len();
2293 let has_more = count < total_count;
2294
2295 assert_eq!(count, 10);
2296 assert_eq!(total_count, 50);
2297 assert!(has_more);
2298 }
2299
2300 #[tokio::test]
2301 async fn test_query_events_no_more_results() {
2302 let store = create_test_store();
2303
2304 for i in 0..5 {
2306 store
2307 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2308 .unwrap();
2309 }
2310
2311 let all_events = store
2313 .query(&QueryEventsRequest {
2314 entity_id: None,
2315 event_type: None,
2316 tenant_id: None,
2317 as_of: None,
2318 since: None,
2319 until: None,
2320 limit: None,
2321 event_type_prefix: None,
2322 payload_filter: None,
2323 })
2324 .unwrap();
2325 let total_count = all_events.len();
2326 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2327 let count = limited_events.len();
2328 let has_more = count < total_count;
2329
2330 assert_eq!(count, 5);
2331 assert_eq!(total_count, 5);
2332 assert!(!has_more);
2333 }
2334
2335 #[tokio::test]
2336 async fn test_list_entities_by_type_prefix() {
2337 let store = create_test_store();
2338
2339 store
2341 .ingest(&create_test_event("idx-1", "index.created"))
2342 .unwrap();
2343 store
2344 .ingest(&create_test_event("idx-1", "index.updated"))
2345 .unwrap();
2346 store
2347 .ingest(&create_test_event("idx-2", "index.created"))
2348 .unwrap();
2349 store
2350 .ingest(&create_test_event("idx-3", "index.created"))
2351 .unwrap();
2352 store
2354 .ingest(&create_test_event("trade-1", "trade.created"))
2355 .unwrap();
2356 store
2357 .ingest(&create_test_event("trade-2", "trade.created"))
2358 .unwrap();
2359
2360 let req = ListEntitiesRequest {
2362 event_type_prefix: Some("index.".to_string()),
2363 payload_filter: None,
2364 limit: None,
2365 offset: None,
2366 };
2367 let query_req = QueryEventsRequest {
2368 entity_id: None,
2369 event_type: None,
2370 tenant_id: None,
2371 as_of: None,
2372 since: None,
2373 until: None,
2374 limit: None,
2375 event_type_prefix: req.event_type_prefix,
2376 payload_filter: req.payload_filter,
2377 };
2378 let events = store.query(&query_req).unwrap();
2379
2380 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2382 std::collections::HashMap::new();
2383 for event in &events {
2384 entity_map
2385 .entry(event.entity_id().to_string())
2386 .or_default()
2387 .push(event);
2388 }
2389
2390 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2393 assert_eq!(entity_map["idx-3"].len(), 1);
2394 }
2395
2396 fn create_test_event_with_payload(
2397 entity_id: &str,
2398 event_type: &str,
2399 payload: serde_json::Value,
2400 ) -> Event {
2401 Event::from_strings(
2402 event_type.to_string(),
2403 entity_id.to_string(),
2404 "test-stream".to_string(),
2405 payload,
2406 None,
2407 )
2408 .unwrap()
2409 }
2410
2411 #[tokio::test]
2412 async fn test_detect_duplicates_by_payload_fields() {
2413 let store = create_test_store();
2414
2415 store
2417 .ingest(&create_test_event_with_payload(
2418 "idx-1",
2419 "index.created",
2420 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2421 ))
2422 .unwrap();
2423 store
2424 .ingest(&create_test_event_with_payload(
2425 "idx-2",
2426 "index.created",
2427 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2428 ))
2429 .unwrap();
2430 store
2431 .ingest(&create_test_event_with_payload(
2432 "idx-3",
2433 "index.created",
2434 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2435 ))
2436 .unwrap();
2437 store
2438 .ingest(&create_test_event_with_payload(
2439 "idx-4",
2440 "index.created",
2441 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2442 ))
2443 .unwrap();
2444 store
2445 .ingest(&create_test_event_with_payload(
2446 "idx-5",
2447 "index.created",
2448 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2449 ))
2450 .unwrap();
2451
2452 let query_req = QueryEventsRequest {
2454 entity_id: None,
2455 event_type: None,
2456 tenant_id: None,
2457 as_of: None,
2458 since: None,
2459 until: None,
2460 limit: None,
2461 event_type_prefix: Some("index.".to_string()),
2462 payload_filter: None,
2463 };
2464 let events = store.query(&query_req).unwrap();
2465
2466 let group_by_fields = vec!["name"];
2468 let mut entity_latest: std::collections::HashMap<String, &Event> =
2469 std::collections::HashMap::new();
2470 for event in &events {
2471 let eid = event.entity_id().to_string();
2472 entity_latest
2473 .entry(eid)
2474 .and_modify(|existing| {
2475 if event.timestamp() > existing.timestamp() {
2476 *existing = event;
2477 }
2478 })
2479 .or_insert(event);
2480 }
2481
2482 let mut groups: std::collections::HashMap<String, Vec<String>> =
2483 std::collections::HashMap::new();
2484 for (entity_id, event) in &entity_latest {
2485 let payload = event.payload();
2486 let mut key_parts = serde_json::Map::new();
2487 for field in &group_by_fields {
2488 let value = payload
2489 .get(*field)
2490 .cloned()
2491 .unwrap_or(serde_json::Value::Null);
2492 key_parts.insert((*field).to_string(), value);
2493 }
2494 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2495 groups.entry(key_str).or_default().push(entity_id.clone());
2496 }
2497
2498 let duplicate_groups: Vec<_> = groups
2499 .into_iter()
2500 .filter(|(_, ids)| ids.len() > 1)
2501 .collect();
2502
2503 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2505 assert_eq!(ids.len(), 2);
2506 }
2507 }
2508
2509 #[tokio::test]
2510 async fn test_detect_duplicates_no_duplicates() {
2511 let store = create_test_store();
2512
2513 store
2515 .ingest(&create_test_event_with_payload(
2516 "idx-1",
2517 "index.created",
2518 serde_json::json!({"name": "A"}),
2519 ))
2520 .unwrap();
2521 store
2522 .ingest(&create_test_event_with_payload(
2523 "idx-2",
2524 "index.created",
2525 serde_json::json!({"name": "B"}),
2526 ))
2527 .unwrap();
2528
2529 let query_req = QueryEventsRequest {
2530 entity_id: None,
2531 event_type: None,
2532 tenant_id: None,
2533 as_of: None,
2534 since: None,
2535 until: None,
2536 limit: None,
2537 event_type_prefix: Some("index.".to_string()),
2538 payload_filter: None,
2539 };
2540 let events = store.query(&query_req).unwrap();
2541
2542 let mut entity_latest: std::collections::HashMap<String, &Event> =
2543 std::collections::HashMap::new();
2544 for event in &events {
2545 entity_latest
2546 .entry(event.entity_id().to_string())
2547 .or_insert(event);
2548 }
2549
2550 let mut groups: std::collections::HashMap<String, Vec<String>> =
2551 std::collections::HashMap::new();
2552 for (entity_id, event) in &entity_latest {
2553 let key_str =
2554 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2555 .unwrap();
2556 groups.entry(key_str).or_default().push(entity_id.clone());
2557 }
2558
2559 let duplicate_groups: Vec<_> = groups
2560 .into_iter()
2561 .filter(|(_, ids)| ids.len() > 1)
2562 .collect();
2563
2564 assert_eq!(duplicate_groups.len(), 0); }
2566
2567 #[tokio::test]
2568 async fn test_detect_duplicates_multi_field_group_by() {
2569 let store = create_test_store();
2570
2571 store
2573 .ingest(&create_test_event_with_payload(
2574 "idx-1",
2575 "index.created",
2576 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2577 ))
2578 .unwrap();
2579 store
2580 .ingest(&create_test_event_with_payload(
2581 "idx-2",
2582 "index.created",
2583 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2584 ))
2585 .unwrap();
2586 store
2588 .ingest(&create_test_event_with_payload(
2589 "idx-3",
2590 "index.created",
2591 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2592 ))
2593 .unwrap();
2594
2595 let query_req = QueryEventsRequest {
2596 entity_id: None,
2597 event_type: None,
2598 tenant_id: None,
2599 as_of: None,
2600 since: None,
2601 until: None,
2602 limit: None,
2603 event_type_prefix: Some("index.".to_string()),
2604 payload_filter: None,
2605 };
2606 let events = store.query(&query_req).unwrap();
2607
2608 let group_by_fields = vec!["name", "user_id"];
2609 let mut entity_latest: std::collections::HashMap<String, &Event> =
2610 std::collections::HashMap::new();
2611 for event in &events {
2612 entity_latest
2613 .entry(event.entity_id().to_string())
2614 .and_modify(|existing| {
2615 if event.timestamp() > existing.timestamp() {
2616 *existing = event;
2617 }
2618 })
2619 .or_insert(event);
2620 }
2621
2622 let mut groups: std::collections::HashMap<String, Vec<String>> =
2623 std::collections::HashMap::new();
2624 for (entity_id, event) in &entity_latest {
2625 let payload = event.payload();
2626 let mut key_parts = serde_json::Map::new();
2627 for field in &group_by_fields {
2628 let value = payload
2629 .get(*field)
2630 .cloned()
2631 .unwrap_or(serde_json::Value::Null);
2632 key_parts.insert((*field).to_string(), value);
2633 }
2634 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2635 groups.entry(key_str).or_default().push(entity_id.clone());
2636 }
2637
2638 let duplicate_groups: Vec<_> = groups
2639 .into_iter()
2640 .filter(|(_, ids)| ids.len() > 1)
2641 .collect();
2642
2643 assert_eq!(duplicate_groups.len(), 1);
2645 let (_, ref ids) = duplicate_groups[0];
2646 assert_eq!(ids.len(), 2);
2647 let mut sorted_ids = ids.clone();
2648 sorted_ids.sort();
2649 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2650 }
2651
2652 #[tokio::test]
2653 async fn test_projection_state_cache() {
2654 let store = create_test_store();
2655
2656 let cache = store.projection_state_cache();
2658 cache.insert(
2659 "entity_snapshots:user-123".to_string(),
2660 serde_json::json!({"name": "Test User", "age": 30}),
2661 );
2662
2663 let state = cache.get("entity_snapshots:user-123");
2665 assert!(state.is_some());
2666 let state = state.unwrap();
2667 assert_eq!(state["name"], "Test User");
2668 assert_eq!(state["age"], 30);
2669 }
2670
2671 #[tokio::test]
2672 async fn test_projection_manager_list_projections() {
2673 let store = create_test_store();
2674
2675 let projection_manager = store.projection_manager();
2677 let projections = projection_manager.list_projections();
2678
2679 assert!(projections.len() >= 2);
2681
2682 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2683 assert!(names.contains(&"entity_snapshots"));
2684 assert!(names.contains(&"event_counters"));
2685 }
2686
2687 #[tokio::test]
2688 async fn test_projection_state_after_event_ingestion() {
2689 let store = create_test_store();
2690
2691 let event = create_test_event("user-456", "user.created");
2693 store.ingest(&event).unwrap();
2694
2695 let projection_manager = store.projection_manager();
2697 let snapshot_projection = projection_manager
2698 .get_projection("entity_snapshots")
2699 .unwrap();
2700
2701 let state = snapshot_projection.get_state("user-456");
2702 assert!(state.is_some());
2703 let state = state.unwrap();
2704 assert_eq!(state["name"], "Test");
2705 assert_eq!(state["value"], 42);
2706 }
2707
2708 #[tokio::test]
2709 async fn test_projection_state_cache_multiple_entities() {
2710 let store = create_test_store();
2711 let cache = store.projection_state_cache();
2712
2713 for i in 0..10 {
2715 cache.insert(
2716 format!("entity_snapshots:entity-{i}"),
2717 serde_json::json!({"id": i, "status": "active"}),
2718 );
2719 }
2720
2721 assert_eq!(cache.len(), 10);
2723
2724 for i in 0..10 {
2726 let key = format!("entity_snapshots:entity-{i}");
2727 let state = cache.get(&key);
2728 assert!(state.is_some());
2729 assert_eq!(state.unwrap()["id"], i);
2730 }
2731 }
2732
2733 #[tokio::test]
2734 async fn test_projection_state_update() {
2735 let store = create_test_store();
2736 let cache = store.projection_state_cache();
2737
2738 cache.insert(
2740 "entity_snapshots:user-789".to_string(),
2741 serde_json::json!({"balance": 100}),
2742 );
2743
2744 cache.insert(
2746 "entity_snapshots:user-789".to_string(),
2747 serde_json::json!({"balance": 150}),
2748 );
2749
2750 let state = cache.get("entity_snapshots:user-789").unwrap();
2752 assert_eq!(state["balance"], 150);
2753 }
2754
2755 #[tokio::test]
2756 async fn test_event_counter_projection() {
2757 let store = create_test_store();
2758
2759 store
2761 .ingest(&create_test_event("user-1", "user.created"))
2762 .unwrap();
2763 store
2764 .ingest(&create_test_event("user-2", "user.created"))
2765 .unwrap();
2766 store
2767 .ingest(&create_test_event("user-1", "user.updated"))
2768 .unwrap();
2769
2770 let projection_manager = store.projection_manager();
2772 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2773
2774 let created_state = counter_projection.get_state("user.created");
2776 assert!(created_state.is_some());
2777 assert_eq!(created_state.unwrap()["count"], 2);
2778
2779 let updated_state = counter_projection.get_state("user.updated");
2780 assert!(updated_state.is_some());
2781 assert_eq!(updated_state.unwrap()["count"], 1);
2782 }
2783
2784 #[tokio::test]
2785 async fn test_projection_state_cache_key_format() {
2786 let store = create_test_store();
2787 let cache = store.projection_state_cache();
2788
2789 let key = "orders:order-12345".to_string();
2791 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2792
2793 let state = cache.get(&key).unwrap();
2794 assert_eq!(state["total"], 99.99);
2795 }
2796
2797 #[tokio::test]
2798 async fn test_projection_state_cache_removal() {
2799 let store = create_test_store();
2800 let cache = store.projection_state_cache();
2801
2802 cache.insert(
2804 "test:entity-1".to_string(),
2805 serde_json::json!({"data": "value"}),
2806 );
2807 assert_eq!(cache.len(), 1);
2808
2809 cache.remove("test:entity-1");
2810 assert_eq!(cache.len(), 0);
2811 assert!(cache.get("test:entity-1").is_none());
2812 }
2813
2814 #[tokio::test]
2815 async fn test_get_nonexistent_projection() {
2816 let store = create_test_store();
2817 let projection_manager = store.projection_manager();
2818
2819 let projection = projection_manager.get_projection("nonexistent_projection");
2821 assert!(projection.is_none());
2822 }
2823
2824 #[tokio::test]
2825 async fn test_get_nonexistent_entity_state() {
2826 let store = create_test_store();
2827 let projection_manager = store.projection_manager();
2828
2829 let snapshot_projection = projection_manager
2831 .get_projection("entity_snapshots")
2832 .unwrap();
2833 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2834 assert!(state.is_none());
2835 }
2836
2837 #[tokio::test]
2838 async fn test_projection_state_cache_concurrent_access() {
2839 let store = create_test_store();
2840 let cache = store.projection_state_cache();
2841
2842 let handles: Vec<_> = (0..10)
2844 .map(|i| {
2845 let cache_clone = cache.clone();
2846 tokio::spawn(async move {
2847 cache_clone.insert(
2848 format!("concurrent:entity-{i}"),
2849 serde_json::json!({"thread": i}),
2850 );
2851 })
2852 })
2853 .collect();
2854
2855 for handle in handles {
2856 handle.await.unwrap();
2857 }
2858
2859 assert_eq!(cache.len(), 10);
2861 }
2862
2863 #[tokio::test]
2864 async fn test_projection_state_large_payload() {
2865 let store = create_test_store();
2866 let cache = store.projection_state_cache();
2867
2868 let large_array: Vec<serde_json::Value> = (0..1000)
2870 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2871 .collect();
2872
2873 cache.insert(
2874 "large:entity-1".to_string(),
2875 serde_json::json!({"items": large_array}),
2876 );
2877
2878 let state = cache.get("large:entity-1").unwrap();
2879 let items = state["items"].as_array().unwrap();
2880 assert_eq!(items.len(), 1000);
2881 }
2882
2883 #[tokio::test]
2884 async fn test_projection_state_complex_json() {
2885 let store = create_test_store();
2886 let cache = store.projection_state_cache();
2887
2888 let complex_state = serde_json::json!({
2890 "user": {
2891 "id": "user-123",
2892 "profile": {
2893 "name": "John Doe",
2894 "email": "john@example.com",
2895 "settings": {
2896 "theme": "dark",
2897 "notifications": true
2898 }
2899 },
2900 "roles": ["admin", "user"],
2901 "metadata": {
2902 "created_at": "2025-01-01T00:00:00Z",
2903 "last_login": null
2904 }
2905 }
2906 });
2907
2908 cache.insert("complex:user-123".to_string(), complex_state);
2909
2910 let state = cache.get("complex:user-123").unwrap();
2911 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2912 assert_eq!(state["user"]["roles"][0], "admin");
2913 assert!(state["user"]["metadata"]["last_login"].is_null());
2914 }
2915
2916 #[tokio::test]
2917 async fn test_projection_state_cache_iteration() {
2918 let store = create_test_store();
2919 let cache = store.projection_state_cache();
2920
2921 for i in 0..5 {
2923 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2924 }
2925
2926 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2928 assert_eq!(entries.len(), 5);
2929 }
2930
2931 #[tokio::test]
2932 async fn test_projection_manager_get_entity_snapshots() {
2933 let store = create_test_store();
2934 let projection_manager = store.projection_manager();
2935
2936 let projection = projection_manager.get_projection("entity_snapshots");
2938 assert!(projection.is_some());
2939 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2940 }
2941
2942 #[tokio::test]
2943 async fn test_projection_manager_get_event_counters() {
2944 let store = create_test_store();
2945 let projection_manager = store.projection_manager();
2946
2947 let projection = projection_manager.get_projection("event_counters");
2949 assert!(projection.is_some());
2950 assert_eq!(projection.unwrap().name(), "event_counters");
2951 }
2952
2953 #[tokio::test]
2954 async fn test_projection_state_cache_overwrite() {
2955 let store = create_test_store();
2956 let cache = store.projection_state_cache();
2957
2958 cache.insert(
2960 "overwrite:entity-1".to_string(),
2961 serde_json::json!({"version": 1}),
2962 );
2963
2964 cache.insert(
2966 "overwrite:entity-1".to_string(),
2967 serde_json::json!({"version": 2}),
2968 );
2969
2970 cache.insert(
2972 "overwrite:entity-1".to_string(),
2973 serde_json::json!({"version": 3}),
2974 );
2975
2976 let state = cache.get("overwrite:entity-1").unwrap();
2977 assert_eq!(state["version"], 3);
2978
2979 assert_eq!(cache.len(), 1);
2981 }
2982
2983 #[tokio::test]
2984 async fn test_projection_state_multiple_projections() {
2985 let store = create_test_store();
2986 let cache = store.projection_state_cache();
2987
2988 cache.insert(
2990 "entity_snapshots:user-1".to_string(),
2991 serde_json::json!({"name": "Alice"}),
2992 );
2993 cache.insert(
2994 "event_counters:user.created".to_string(),
2995 serde_json::json!({"count": 5}),
2996 );
2997 cache.insert(
2998 "custom_projection:order-1".to_string(),
2999 serde_json::json!({"total": 150.0}),
3000 );
3001
3002 assert_eq!(
3004 cache.get("entity_snapshots:user-1").unwrap()["name"],
3005 "Alice"
3006 );
3007 assert_eq!(
3008 cache.get("event_counters:user.created").unwrap()["count"],
3009 5
3010 );
3011 assert_eq!(
3012 cache.get("custom_projection:order-1").unwrap()["total"],
3013 150.0
3014 );
3015 }
3016
3017 #[tokio::test]
3018 async fn test_bulk_projection_state_access() {
3019 let store = create_test_store();
3020
3021 for i in 0..5 {
3023 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3024 store.ingest(&event).unwrap();
3025 }
3026
3027 let projection_manager = store.projection_manager();
3029 let snapshot_projection = projection_manager
3030 .get_projection("entity_snapshots")
3031 .unwrap();
3032
3033 for i in 0..5 {
3035 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3036 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3037 }
3038 }
3039
3040 #[tokio::test]
3041 async fn test_bulk_save_projection_states() {
3042 let store = create_test_store();
3043 let cache = store.projection_state_cache();
3044
3045 let states = vec![
3047 BulkSaveStateItem {
3048 entity_id: "bulk-entity-1".to_string(),
3049 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3050 },
3051 BulkSaveStateItem {
3052 entity_id: "bulk-entity-2".to_string(),
3053 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3054 },
3055 BulkSaveStateItem {
3056 entity_id: "bulk-entity-3".to_string(),
3057 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3058 },
3059 ];
3060
3061 let projection_name = "test_projection";
3062
3063 for item in &states {
3065 cache.insert(
3066 format!("{projection_name}:{}", item.entity_id),
3067 item.state.clone(),
3068 );
3069 }
3070
3071 assert_eq!(cache.len(), 3);
3073
3074 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3075 assert_eq!(state1["name"], "Entity 1");
3076 assert_eq!(state1["value"], 100);
3077
3078 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3079 assert_eq!(state2["name"], "Entity 2");
3080 assert_eq!(state2["value"], 200);
3081
3082 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3083 assert_eq!(state3["name"], "Entity 3");
3084 assert_eq!(state3["value"], 300);
3085 }
3086
3087 #[tokio::test]
3088 async fn test_bulk_save_empty_states() {
3089 let store = create_test_store();
3090 let cache = store.projection_state_cache();
3091
3092 cache.clear();
3094
3095 let states: Vec<BulkSaveStateItem> = vec![];
3097 assert_eq!(states.len(), 0);
3098
3099 assert_eq!(cache.len(), 0);
3101 }
3102
3103 #[tokio::test]
3104 async fn test_bulk_save_overwrites_existing() {
3105 let store = create_test_store();
3106 let cache = store.projection_state_cache();
3107
3108 cache.insert(
3110 "test:entity-1".to_string(),
3111 serde_json::json!({"version": 1, "data": "initial"}),
3112 );
3113
3114 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3116 cache.insert("test:entity-1".to_string(), new_state);
3117
3118 let state = cache.get("test:entity-1").unwrap();
3120 assert_eq!(state["version"], 2);
3121 assert_eq!(state["data"], "updated");
3122 }
3123
3124 #[tokio::test]
3125 async fn test_bulk_save_high_volume() {
3126 let store = create_test_store();
3127 let cache = store.projection_state_cache();
3128
3129 for i in 0..1000 {
3131 cache.insert(
3132 format!("volume_test:entity-{i}"),
3133 serde_json::json!({"index": i, "status": "active"}),
3134 );
3135 }
3136
3137 assert_eq!(cache.len(), 1000);
3139
3140 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3142 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3143 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3144 }
3145
3146 #[tokio::test]
3147 async fn test_bulk_save_different_projections() {
3148 let store = create_test_store();
3149 let cache = store.projection_state_cache();
3150
3151 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3153
3154 for proj in &projections {
3155 for i in 0..5 {
3156 cache.insert(
3157 format!("{proj}:entity-{i}"),
3158 serde_json::json!({"projection": proj, "id": i}),
3159 );
3160 }
3161 }
3162
3163 assert_eq!(cache.len(), 15);
3165
3166 for proj in &projections {
3168 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3169 assert_eq!(state["projection"], *proj);
3170 }
3171 }
3172}