1use crate::{
2 application::{
3 dto::{
4 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
5 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
6 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
7 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
8 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
9 },
10 services::{
11 analytics::{
12 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
13 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
14 },
15 pipeline::{PipelineConfig, PipelineStats},
16 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
17 schema::{
18 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
19 ValidateEventRequest, ValidateEventResponse,
20 },
21 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
22 },
23 },
24 domain::entities::Event,
25 error::Result,
26 infrastructure::{
27 persistence::{
28 compaction::CompactionResult,
29 snapshot::{
30 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
31 ListSnapshotsResponse, SnapshotInfo,
32 },
33 },
34 query::{
35 geospatial::GeoQueryRequest,
36 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
37 },
38 replication::ReplicationMode,
39 web::api_v1::AppState,
40 },
41 store::{EventStore, EventTypeInfo, StreamInfo},
42};
43use axum::{
44 Json, Router,
45 extract::{Path, Query, State, WebSocketUpgrade},
46 response::{IntoResponse, Response},
47 routing::{get, post, put},
48};
49use serde::Deserialize;
50use std::sync::Arc;
51use tower_http::{
52 cors::{Any, CorsLayer},
53 trace::TraceLayer,
54};
55
56type SharedStore = Arc<EventStore>;
57
58async fn await_replication_ack(state: &AppState) {
64 let shipper_guard = state.wal_shipper.read().await;
65 if let Some(ref shipper) = *shipper_guard {
66 let mode = shipper.replication_mode();
67 if mode == ReplicationMode::Async {
68 return;
69 }
70
71 let target_offset = shipper.current_leader_offset();
72 if target_offset == 0 {
73 return;
74 }
75
76 let shipper = Arc::clone(shipper);
77 drop(shipper_guard);
79
80 let timer = state
81 .store
82 .metrics()
83 .replication_ack_wait_seconds
84 .start_timer();
85 let acked = shipper.wait_for_ack(target_offset).await;
86 timer.observe_duration();
87
88 if !acked {
89 tracing::warn!(
90 "Replication ACK timeout in {} mode (offset {}). \
91 Write succeeded locally but follower confirmation pending.",
92 mode,
93 target_offset,
94 );
95 }
96 }
97}
98
99pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
100 let app = Router::new()
101 .route("/health", get(health))
102 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
104 .route("/api/v1/events/batch", post(ingest_events_batch))
105 .route("/api/v1/events/query", get(query_events))
106 .route("/api/v1/events/{event_id}", get(get_event_by_id))
107 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
110 .route("/api/v1/event-types", get(list_event_types))
111 .route("/api/v1/entities/duplicates", get(detect_duplicates))
112 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
113 .route(
114 "/api/v1/entities/{entity_id}/snapshot",
115 get(get_entity_snapshot),
116 )
117 .route("/api/v1/stats", get(get_stats))
118 .route("/api/v1/analytics/frequency", get(analytics_frequency))
120 .route("/api/v1/analytics/summary", get(analytics_summary))
121 .route("/api/v1/analytics/correlation", get(analytics_correlation))
122 .route("/api/v1/snapshots", post(create_snapshot))
124 .route("/api/v1/snapshots", get(list_snapshots))
125 .route(
126 "/api/v1/snapshots/{entity_id}/latest",
127 get(get_latest_snapshot),
128 )
129 .route("/api/v1/compaction/trigger", post(trigger_compaction))
131 .route("/api/v1/compaction/stats", get(compaction_stats))
132 .route("/api/v1/schemas", post(register_schema))
134 .route("/api/v1/schemas", get(list_subjects))
135 .route("/api/v1/schemas/{subject}", get(get_schema))
136 .route(
137 "/api/v1/schemas/{subject}/versions",
138 get(list_schema_versions),
139 )
140 .route("/api/v1/schemas/validate", post(validate_event_schema))
141 .route(
142 "/api/v1/schemas/{subject}/compatibility",
143 put(set_compatibility_mode),
144 )
145 .route("/api/v1/replay", post(start_replay))
147 .route("/api/v1/replay", get(list_replays))
148 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
149 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
150 .route(
151 "/api/v1/replay/{replay_id}",
152 axum::routing::delete(delete_replay),
153 )
154 .route("/api/v1/pipelines", post(register_pipeline))
156 .route("/api/v1/pipelines", get(list_pipelines))
157 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
158 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
159 .route(
160 "/api/v1/pipelines/{pipeline_id}",
161 axum::routing::delete(remove_pipeline),
162 )
163 .route(
164 "/api/v1/pipelines/{pipeline_id}/stats",
165 get(get_pipeline_stats),
166 )
167 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
168 .route("/api/v1/projections", get(list_projections))
170 .route("/api/v1/projections/{name}", get(get_projection))
171 .route(
172 "/api/v1/projections/{name}",
173 axum::routing::delete(delete_projection),
174 )
175 .route(
176 "/api/v1/projections/{name}/state",
177 get(get_projection_state_summary),
178 )
179 .route("/api/v1/projections/{name}/reset", post(reset_projection))
180 .route(
181 "/api/v1/projections/{name}/{entity_id}/state",
182 get(get_projection_state),
183 )
184 .route(
185 "/api/v1/projections/{name}/{entity_id}/state",
186 post(save_projection_state),
187 )
188 .route(
189 "/api/v1/projections/{name}/{entity_id}/state",
190 put(save_projection_state),
191 )
192 .route(
193 "/api/v1/projections/{name}/bulk",
194 post(bulk_get_projection_states),
195 )
196 .route(
197 "/api/v1/projections/{name}/bulk/save",
198 post(bulk_save_projection_states),
199 )
200 .route("/api/v1/webhooks", post(register_webhook))
202 .route("/api/v1/webhooks", get(list_webhooks))
203 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
204 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
205 .route(
206 "/api/v1/webhooks/{webhook_id}",
207 axum::routing::delete(delete_webhook),
208 )
209 .route(
210 "/api/v1/webhooks/{webhook_id}/deliveries",
211 get(list_webhook_deliveries),
212 )
213 .route("/api/v1/graphql", post(graphql_query))
215 .route("/api/v1/geospatial/query", post(geo_query))
216 .route("/api/v1/geospatial/stats", get(geo_stats))
217 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
218 .route(
219 "/api/v1/schema-evolution/history/{event_type}",
220 get(schema_evolution_history),
221 )
222 .route(
223 "/api/v1/schema-evolution/schema/{event_type}",
224 get(schema_evolution_schema),
225 )
226 .route(
227 "/api/v1/schema-evolution/stats",
228 get(schema_evolution_stats),
229 )
230 .layer(
231 CorsLayer::new()
232 .allow_origin(Any)
233 .allow_methods(Any)
234 .allow_headers(Any),
235 )
236 .layer(TraceLayer::new_for_http())
237 .with_state(store);
238
239 let listener = tokio::net::TcpListener::bind(addr).await?;
240 axum::serve(listener, app).await?;
241
242 Ok(())
243}
244
245pub async fn health() -> impl IntoResponse {
246 Json(serde_json::json!({
247 "status": "healthy",
248 "service": "allsource-core",
249 "version": env!("CARGO_PKG_VERSION")
250 }))
251}
252
253pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
255 let metrics = store.metrics();
256
257 match metrics.encode() {
258 Ok(encoded) => Response::builder()
259 .status(200)
260 .header("Content-Type", "text/plain; version=0.0.4")
261 .body(encoded)
262 .unwrap()
263 .into_response(),
264 Err(e) => Response::builder()
265 .status(500)
266 .body(format!("Error encoding metrics: {e}"))
267 .unwrap()
268 .into_response(),
269 }
270}
271
272pub async fn ingest_event(
273 State(store): State<SharedStore>,
274 Json(req): Json<IngestEventRequest>,
275) -> Result<Json<IngestEventResponse>> {
276 let expected_version = req.expected_version;
277
278 let event = Event::from_strings(
280 req.event_type,
281 req.entity_id,
282 "default".to_string(),
283 req.payload,
284 req.metadata,
285 )?;
286
287 let event_id = event.id;
288 let timestamp = event.timestamp;
289
290 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
291
292 tracing::info!("Event ingested: {}", event_id);
293
294 Ok(Json(IngestEventResponse {
295 event_id,
296 timestamp,
297 version: Some(new_version),
298 }))
299}
300
301pub async fn ingest_event_v1(
305 State(state): State<AppState>,
306 Json(req): Json<IngestEventRequest>,
307) -> Result<Json<IngestEventResponse>> {
308 let expected_version = req.expected_version;
309
310 let event = Event::from_strings(
311 req.event_type,
312 req.entity_id,
313 "default".to_string(),
314 req.payload,
315 req.metadata,
316 )?;
317
318 let event_id = event.id;
319 let timestamp = event.timestamp;
320
321 let new_version = state
322 .store
323 .ingest_with_expected_version(&event, expected_version)?;
324
325 await_replication_ack(&state).await;
327
328 tracing::info!("Event ingested: {}", event_id);
329
330 Ok(Json(IngestEventResponse {
331 event_id,
332 timestamp,
333 version: Some(new_version),
334 }))
335}
336
337pub async fn ingest_events_batch(
342 State(store): State<SharedStore>,
343 Json(req): Json<IngestEventsBatchRequest>,
344) -> Result<Json<IngestEventsBatchResponse>> {
345 let total = req.events.len();
346 let mut ingested_events = Vec::with_capacity(total);
347
348 for event_req in req.events {
349 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
350 let expected_version = event_req.expected_version;
351
352 let event = Event::from_strings(
353 event_req.event_type,
354 event_req.entity_id,
355 tenant_id,
356 event_req.payload,
357 event_req.metadata,
358 )?;
359
360 let event_id = event.id;
361 let timestamp = event.timestamp;
362
363 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
364
365 ingested_events.push(IngestEventResponse {
366 event_id,
367 timestamp,
368 version: Some(new_version),
369 });
370 }
371
372 let ingested = ingested_events.len();
373 tracing::info!("Batch ingested {} events", ingested);
374
375 Ok(Json(IngestEventsBatchResponse {
376 total,
377 ingested,
378 events: ingested_events,
379 }))
380}
381
382pub async fn ingest_events_batch_v1(
386 State(state): State<AppState>,
387 Json(req): Json<IngestEventsBatchRequest>,
388) -> Result<Json<IngestEventsBatchResponse>> {
389 let total = req.events.len();
390 let mut ingested_events = Vec::with_capacity(total);
391
392 for event_req in req.events {
393 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
394 let expected_version = event_req.expected_version;
395
396 let event = Event::from_strings(
397 event_req.event_type,
398 event_req.entity_id,
399 tenant_id,
400 event_req.payload,
401 event_req.metadata,
402 )?;
403
404 let event_id = event.id;
405 let timestamp = event.timestamp;
406
407 let new_version = state
408 .store
409 .ingest_with_expected_version(&event, expected_version)?;
410
411 ingested_events.push(IngestEventResponse {
412 event_id,
413 timestamp,
414 version: Some(new_version),
415 });
416 }
417
418 await_replication_ack(&state).await;
420
421 let ingested = ingested_events.len();
422 tracing::info!("Batch ingested {} events", ingested);
423
424 Ok(Json(IngestEventsBatchResponse {
425 total,
426 ingested,
427 events: ingested_events,
428 }))
429}
430
431pub async fn query_events(
432 State(store): State<SharedStore>,
433 Query(req): Query<QueryEventsRequest>,
434) -> Result<Json<QueryEventsResponse>> {
435 let requested_limit = req.limit;
436 let queried_entity_id = req.entity_id.clone();
437
438 let unlimited_req = QueryEventsRequest {
440 entity_id: req.entity_id,
441 event_type: req.event_type,
442 tenant_id: req.tenant_id,
443 as_of: req.as_of,
444 since: req.since,
445 until: req.until,
446 limit: None,
447 event_type_prefix: req.event_type_prefix,
448 payload_filter: req.payload_filter,
449 };
450 let all_events = store.query(&unlimited_req)?;
451 let total_count = all_events.len();
452
453 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
455 all_events.into_iter().take(limit).collect()
456 } else {
457 all_events
458 };
459
460 let count = limited_events.len();
461 let has_more = count < total_count;
462 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
463
464 let entity_version = queried_entity_id
466 .as_deref()
467 .map(|eid| store.get_entity_version(eid));
468
469 tracing::debug!("Query returned {} events (total: {})", count, total_count);
470
471 Ok(Json(QueryEventsResponse {
472 events,
473 count,
474 total_count,
475 has_more,
476 entity_version,
477 }))
478}
479
480pub async fn list_entities(
481 State(store): State<SharedStore>,
482 Query(req): Query<ListEntitiesRequest>,
483) -> Result<Json<ListEntitiesResponse>> {
484 use std::collections::HashMap;
485
486 let query_req = QueryEventsRequest {
488 entity_id: None,
489 event_type: None,
490 tenant_id: None,
491 as_of: None,
492 since: None,
493 until: None,
494 limit: None,
495 event_type_prefix: req.event_type_prefix,
496 payload_filter: req.payload_filter,
497 };
498 let events = store.query(&query_req)?;
499
500 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
502 for event in &events {
503 entity_map
504 .entry(event.entity_id().to_string())
505 .or_default()
506 .push(event);
507 }
508
509 let mut summaries: Vec<EntitySummary> = entity_map
511 .into_iter()
512 .map(|(entity_id, events)| {
513 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
514 EntitySummary {
515 entity_id,
516 event_count: events.len(),
517 last_event_type: last.event_type_str().to_string(),
518 last_event_at: last.timestamp(),
519 }
520 })
521 .collect();
522 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
523
524 let total = summaries.len();
525
526 let offset = req.offset.unwrap_or(0);
528 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
529 let summaries = if let Some(limit) = req.limit {
530 let has_more = summaries.len() > limit;
531 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
532 return Ok(Json(ListEntitiesResponse {
533 entities: truncated,
534 total,
535 has_more,
536 }));
537 } else {
538 summaries
539 };
540
541 Ok(Json(ListEntitiesResponse {
542 entities: summaries,
543 total,
544 has_more: false,
545 }))
546}
547
548pub async fn detect_duplicates(
549 State(store): State<SharedStore>,
550 Query(req): Query<DetectDuplicatesRequest>,
551) -> Result<Json<DetectDuplicatesResponse>> {
552 use std::collections::HashMap;
553
554 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
555
556 let query_req = QueryEventsRequest {
558 entity_id: None,
559 event_type: None,
560 tenant_id: None,
561 as_of: None,
562 since: None,
563 until: None,
564 limit: None,
565 event_type_prefix: Some(req.event_type_prefix),
566 payload_filter: None,
567 };
568 let events = store.query(&query_req)?;
569
570 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
573 for event in &events {
574 let eid = event.entity_id().to_string();
575 entity_latest
576 .entry(eid)
577 .and_modify(|existing| {
578 if event.timestamp() > existing.timestamp() {
579 *existing = event;
580 }
581 })
582 .or_insert(event);
583 }
584
585 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
587 for (entity_id, event) in &entity_latest {
588 let payload = event.payload();
589 let mut key_parts = serde_json::Map::new();
590 for field in &group_by_fields {
591 let value = payload
592 .get(*field)
593 .cloned()
594 .unwrap_or(serde_json::Value::Null);
595 key_parts.insert((*field).to_string(), value);
596 }
597 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
598 groups.entry(key_str).or_default().push(entity_id.clone());
599 }
600
601 let mut duplicate_groups: Vec<DuplicateGroup> = groups
603 .into_iter()
604 .filter(|(_, ids)| ids.len() > 1)
605 .map(|(key_str, mut ids)| {
606 ids.sort();
607 let key: serde_json::Value =
608 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
609 let count = ids.len();
610 DuplicateGroup {
611 key,
612 entity_ids: ids,
613 count,
614 }
615 })
616 .collect();
617
618 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
620
621 let total = duplicate_groups.len();
622
623 let offset = req.offset.unwrap_or(0);
625 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
626
627 if let Some(limit) = req.limit {
628 let has_more = duplicate_groups.len() > limit;
629 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
630 return Ok(Json(DetectDuplicatesResponse {
631 duplicates: truncated,
632 total,
633 has_more,
634 }));
635 }
636
637 Ok(Json(DetectDuplicatesResponse {
638 duplicates: duplicate_groups,
639 total,
640 has_more: false,
641 }))
642}
643
644#[derive(Deserialize)]
645pub struct EntityStateParams {
646 as_of: Option<chrono::DateTime<chrono::Utc>>,
647}
648
649pub async fn get_entity_state(
650 State(store): State<SharedStore>,
651 Path(entity_id): Path<String>,
652 Query(params): Query<EntityStateParams>,
653) -> Result<Json<serde_json::Value>> {
654 let state = store.reconstruct_state(&entity_id, params.as_of)?;
655
656 tracing::info!("State reconstructed for entity: {}", entity_id);
657
658 Ok(Json(state))
659}
660
661pub async fn get_entity_snapshot(
662 State(store): State<SharedStore>,
663 Path(entity_id): Path<String>,
664) -> Result<Json<serde_json::Value>> {
665 let snapshot = store.get_snapshot(&entity_id)?;
666
667 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
668
669 Ok(Json(snapshot))
670}
671
672pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
673 let stats = store.stats();
674 Json(stats)
675}
676
677#[derive(Debug, Deserialize)]
680pub struct ListStreamsParams {
681 pub limit: Option<usize>,
683 pub offset: Option<usize>,
685}
686
687#[derive(Debug, serde::Serialize)]
689pub struct ListStreamsResponse {
690 pub streams: Vec<StreamInfo>,
691 pub total: usize,
692}
693
694pub async fn list_streams(
695 State(store): State<SharedStore>,
696 Query(params): Query<ListStreamsParams>,
697) -> Json<ListStreamsResponse> {
698 let mut streams = store.list_streams();
699 let total = streams.len();
700
701 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
703
704 if let Some(offset) = params.offset {
706 if offset < streams.len() {
707 streams = streams[offset..].to_vec();
708 } else {
709 streams = vec![];
710 }
711 }
712
713 if let Some(limit) = params.limit {
714 streams.truncate(limit);
715 }
716
717 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
718
719 Json(ListStreamsResponse { streams, total })
720}
721
722#[derive(Debug, Deserialize)]
725pub struct ListEventTypesParams {
726 pub limit: Option<usize>,
728 pub offset: Option<usize>,
730}
731
732#[derive(Debug, serde::Serialize)]
734pub struct ListEventTypesResponse {
735 pub event_types: Vec<EventTypeInfo>,
736 pub total: usize,
737}
738
739pub async fn list_event_types(
740 State(store): State<SharedStore>,
741 Query(params): Query<ListEventTypesParams>,
742) -> Json<ListEventTypesResponse> {
743 let mut event_types = store.list_event_types();
744 let total = event_types.len();
745
746 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
748
749 if let Some(offset) = params.offset {
751 if offset < event_types.len() {
752 event_types = event_types[offset..].to_vec();
753 } else {
754 event_types = vec![];
755 }
756 }
757
758 if let Some(limit) = params.limit {
759 event_types.truncate(limit);
760 }
761
762 tracing::debug!(
763 "Listed {} event types (total: {})",
764 event_types.len(),
765 total
766 );
767
768 Json(ListEventTypesResponse { event_types, total })
769}
770
771#[derive(Debug, Deserialize)]
773pub struct WebSocketParams {
774 pub consumer_id: Option<String>,
775}
776
777pub async fn events_websocket(
778 ws: WebSocketUpgrade,
779 State(store): State<SharedStore>,
780 Query(params): Query<WebSocketParams>,
781) -> Response {
782 let websocket_manager = store.websocket_manager();
783
784 ws.on_upgrade(move |socket| async move {
785 if let Some(consumer_id) = params.consumer_id {
786 websocket_manager
787 .handle_socket_with_consumer(socket, consumer_id, store)
788 .await;
789 } else {
790 websocket_manager.handle_socket(socket).await;
791 }
792 })
793}
794
795pub async fn analytics_frequency(
797 State(store): State<SharedStore>,
798 Query(req): Query<EventFrequencyRequest>,
799) -> Result<Json<EventFrequencyResponse>> {
800 let response = AnalyticsEngine::event_frequency(&store, &req)?;
801
802 tracing::debug!(
803 "Frequency analysis returned {} buckets",
804 response.buckets.len()
805 );
806
807 Ok(Json(response))
808}
809
810pub async fn analytics_summary(
812 State(store): State<SharedStore>,
813 Query(req): Query<StatsSummaryRequest>,
814) -> Result<Json<StatsSummaryResponse>> {
815 let response = AnalyticsEngine::stats_summary(&store, &req)?;
816
817 tracing::debug!(
818 "Stats summary: {} events across {} entities",
819 response.total_events,
820 response.unique_entities
821 );
822
823 Ok(Json(response))
824}
825
826pub async fn analytics_correlation(
828 State(store): State<SharedStore>,
829 Query(req): Query<CorrelationRequest>,
830) -> Result<Json<CorrelationResponse>> {
831 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
832
833 tracing::debug!(
834 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
835 response.correlated_pairs,
836 response.total_a,
837 response.correlation_percentage
838 );
839
840 Ok(Json(response))
841}
842
843pub async fn create_snapshot(
845 State(store): State<SharedStore>,
846 Json(req): Json<CreateSnapshotRequest>,
847) -> Result<Json<CreateSnapshotResponse>> {
848 store.create_snapshot(&req.entity_id)?;
849
850 let snapshot_manager = store.snapshot_manager();
851 let snapshot = snapshot_manager
852 .get_latest_snapshot(&req.entity_id)
853 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
854
855 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
856
857 Ok(Json(CreateSnapshotResponse {
858 snapshot_id: snapshot.id,
859 entity_id: snapshot.entity_id,
860 created_at: snapshot.created_at,
861 event_count: snapshot.event_count,
862 size_bytes: snapshot.metadata.size_bytes,
863 }))
864}
865
866pub async fn list_snapshots(
868 State(store): State<SharedStore>,
869 Query(req): Query<ListSnapshotsRequest>,
870) -> Result<Json<ListSnapshotsResponse>> {
871 let snapshot_manager = store.snapshot_manager();
872
873 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
874 snapshot_manager
875 .get_all_snapshots(&entity_id)
876 .into_iter()
877 .map(SnapshotInfo::from)
878 .collect()
879 } else {
880 let entities = snapshot_manager.list_entities();
882 entities
883 .iter()
884 .flat_map(|entity_id| {
885 snapshot_manager
886 .get_all_snapshots(entity_id)
887 .into_iter()
888 .map(SnapshotInfo::from)
889 })
890 .collect()
891 };
892
893 let total = snapshots.len();
894
895 tracing::debug!("Listed {} snapshots", total);
896
897 Ok(Json(ListSnapshotsResponse { snapshots, total }))
898}
899
900pub async fn get_latest_snapshot(
902 State(store): State<SharedStore>,
903 Path(entity_id): Path<String>,
904) -> Result<Json<serde_json::Value>> {
905 let snapshot_manager = store.snapshot_manager();
906
907 let snapshot = snapshot_manager
908 .get_latest_snapshot(&entity_id)
909 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
910
911 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
912
913 Ok(Json(serde_json::json!({
914 "snapshot_id": snapshot.id,
915 "entity_id": snapshot.entity_id,
916 "created_at": snapshot.created_at,
917 "as_of": snapshot.as_of,
918 "event_count": snapshot.event_count,
919 "size_bytes": snapshot.metadata.size_bytes,
920 "snapshot_type": snapshot.metadata.snapshot_type,
921 "state": snapshot.state
922 })))
923}
924
925pub async fn trigger_compaction(
927 State(store): State<SharedStore>,
928) -> Result<Json<CompactionResult>> {
929 let compaction_manager = store.compaction_manager().ok_or_else(|| {
930 crate::error::AllSourceError::InternalError(
931 "Compaction not enabled (no Parquet storage)".to_string(),
932 )
933 })?;
934
935 tracing::info!("📦 Manual compaction triggered via API");
936
937 let result = compaction_manager.compact_now()?;
938
939 Ok(Json(result))
940}
941
942pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
944 let compaction_manager = store.compaction_manager().ok_or_else(|| {
945 crate::error::AllSourceError::InternalError(
946 "Compaction not enabled (no Parquet storage)".to_string(),
947 )
948 })?;
949
950 let stats = compaction_manager.stats();
951 let config = compaction_manager.config();
952
953 Ok(Json(serde_json::json!({
954 "stats": stats,
955 "config": {
956 "min_files_to_compact": config.min_files_to_compact,
957 "target_file_size": config.target_file_size,
958 "max_file_size": config.max_file_size,
959 "small_file_threshold": config.small_file_threshold,
960 "compaction_interval_seconds": config.compaction_interval_seconds,
961 "auto_compact": config.auto_compact,
962 "strategy": config.strategy
963 }
964 })))
965}
966
967pub async fn register_schema(
969 State(store): State<SharedStore>,
970 Json(req): Json<RegisterSchemaRequest>,
971) -> Result<Json<RegisterSchemaResponse>> {
972 let schema_registry = store.schema_registry();
973
974 let response =
975 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
976
977 tracing::info!(
978 "📋 Schema registered: v{} for '{}'",
979 response.version,
980 response.subject
981 );
982
983 Ok(Json(response))
984}
985
986#[derive(Deserialize)]
988pub struct GetSchemaParams {
989 version: Option<u32>,
990}
991
992pub async fn get_schema(
993 State(store): State<SharedStore>,
994 Path(subject): Path<String>,
995 Query(params): Query<GetSchemaParams>,
996) -> Result<Json<serde_json::Value>> {
997 let schema_registry = store.schema_registry();
998
999 let schema = schema_registry.get_schema(&subject, params.version)?;
1000
1001 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1002
1003 Ok(Json(serde_json::json!({
1004 "id": schema.id,
1005 "subject": schema.subject,
1006 "version": schema.version,
1007 "schema": schema.schema,
1008 "created_at": schema.created_at,
1009 "description": schema.description,
1010 "tags": schema.tags
1011 })))
1012}
1013
1014pub async fn list_schema_versions(
1016 State(store): State<SharedStore>,
1017 Path(subject): Path<String>,
1018) -> Result<Json<serde_json::Value>> {
1019 let schema_registry = store.schema_registry();
1020
1021 let versions = schema_registry.list_versions(&subject)?;
1022
1023 Ok(Json(serde_json::json!({
1024 "subject": subject,
1025 "versions": versions
1026 })))
1027}
1028
1029pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1031 let schema_registry = store.schema_registry();
1032
1033 let subjects = schema_registry.list_subjects();
1034
1035 Json(serde_json::json!({
1036 "subjects": subjects,
1037 "total": subjects.len()
1038 }))
1039}
1040
1041pub async fn validate_event_schema(
1043 State(store): State<SharedStore>,
1044 Json(req): Json<ValidateEventRequest>,
1045) -> Result<Json<ValidateEventResponse>> {
1046 let schema_registry = store.schema_registry();
1047
1048 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1049
1050 if response.valid {
1051 tracing::debug!(
1052 "✅ Event validated against schema '{}' v{}",
1053 req.subject,
1054 response.schema_version
1055 );
1056 } else {
1057 tracing::warn!(
1058 "❌ Event validation failed for '{}': {:?}",
1059 req.subject,
1060 response.errors
1061 );
1062 }
1063
1064 Ok(Json(response))
1065}
1066
1067#[derive(Deserialize)]
1069pub struct SetCompatibilityRequest {
1070 compatibility: CompatibilityMode,
1071}
1072
1073pub async fn set_compatibility_mode(
1074 State(store): State<SharedStore>,
1075 Path(subject): Path<String>,
1076 Json(req): Json<SetCompatibilityRequest>,
1077) -> Json<serde_json::Value> {
1078 let schema_registry = store.schema_registry();
1079
1080 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1081
1082 tracing::info!(
1083 "🔧 Set compatibility mode for '{}' to {:?}",
1084 subject,
1085 req.compatibility
1086 );
1087
1088 Json(serde_json::json!({
1089 "subject": subject,
1090 "compatibility": req.compatibility
1091 }))
1092}
1093
1094pub async fn start_replay(
1096 State(store): State<SharedStore>,
1097 Json(req): Json<StartReplayRequest>,
1098) -> Result<Json<StartReplayResponse>> {
1099 let replay_manager = store.replay_manager();
1100
1101 let response = replay_manager.start_replay(store, req)?;
1102
1103 tracing::info!(
1104 "🔄 Started replay {} with {} events",
1105 response.replay_id,
1106 response.total_events
1107 );
1108
1109 Ok(Json(response))
1110}
1111
1112pub async fn get_replay_progress(
1114 State(store): State<SharedStore>,
1115 Path(replay_id): Path<uuid::Uuid>,
1116) -> Result<Json<ReplayProgress>> {
1117 let replay_manager = store.replay_manager();
1118
1119 let progress = replay_manager.get_progress(replay_id)?;
1120
1121 Ok(Json(progress))
1122}
1123
1124pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1126 let replay_manager = store.replay_manager();
1127
1128 let replays = replay_manager.list_replays();
1129
1130 Json(serde_json::json!({
1131 "replays": replays,
1132 "total": replays.len()
1133 }))
1134}
1135
1136pub async fn cancel_replay(
1138 State(store): State<SharedStore>,
1139 Path(replay_id): Path<uuid::Uuid>,
1140) -> Result<Json<serde_json::Value>> {
1141 let replay_manager = store.replay_manager();
1142
1143 replay_manager.cancel_replay(replay_id)?;
1144
1145 tracing::info!("🛑 Cancelled replay {}", replay_id);
1146
1147 Ok(Json(serde_json::json!({
1148 "replay_id": replay_id,
1149 "status": "cancelled"
1150 })))
1151}
1152
1153pub async fn delete_replay(
1155 State(store): State<SharedStore>,
1156 Path(replay_id): Path<uuid::Uuid>,
1157) -> Result<Json<serde_json::Value>> {
1158 let replay_manager = store.replay_manager();
1159
1160 let deleted = replay_manager.delete_replay(replay_id)?;
1161
1162 if deleted {
1163 tracing::info!("🗑️ Deleted replay {}", replay_id);
1164 }
1165
1166 Ok(Json(serde_json::json!({
1167 "replay_id": replay_id,
1168 "deleted": deleted
1169 })))
1170}
1171
1172pub async fn register_pipeline(
1174 State(store): State<SharedStore>,
1175 Json(config): Json<PipelineConfig>,
1176) -> Result<Json<serde_json::Value>> {
1177 let pipeline_manager = store.pipeline_manager();
1178
1179 let pipeline_id = pipeline_manager.register(config.clone());
1180
1181 tracing::info!(
1182 "🔀 Pipeline registered: {} (name: {})",
1183 pipeline_id,
1184 config.name
1185 );
1186
1187 Ok(Json(serde_json::json!({
1188 "pipeline_id": pipeline_id,
1189 "name": config.name,
1190 "enabled": config.enabled
1191 })))
1192}
1193
1194pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1196 let pipeline_manager = store.pipeline_manager();
1197
1198 let pipelines = pipeline_manager.list();
1199
1200 tracing::debug!("Listed {} pipelines", pipelines.len());
1201
1202 Json(serde_json::json!({
1203 "pipelines": pipelines,
1204 "total": pipelines.len()
1205 }))
1206}
1207
1208pub async fn get_pipeline(
1210 State(store): State<SharedStore>,
1211 Path(pipeline_id): Path<uuid::Uuid>,
1212) -> Result<Json<PipelineConfig>> {
1213 let pipeline_manager = store.pipeline_manager();
1214
1215 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1216 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1217 })?;
1218
1219 Ok(Json(pipeline.config().clone()))
1220}
1221
1222pub async fn remove_pipeline(
1224 State(store): State<SharedStore>,
1225 Path(pipeline_id): Path<uuid::Uuid>,
1226) -> Result<Json<serde_json::Value>> {
1227 let pipeline_manager = store.pipeline_manager();
1228
1229 let removed = pipeline_manager.remove(pipeline_id);
1230
1231 if removed {
1232 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1233 }
1234
1235 Ok(Json(serde_json::json!({
1236 "pipeline_id": pipeline_id,
1237 "removed": removed
1238 })))
1239}
1240
1241pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1243 let pipeline_manager = store.pipeline_manager();
1244
1245 let stats = pipeline_manager.all_stats();
1246
1247 Json(serde_json::json!({
1248 "stats": stats,
1249 "total": stats.len()
1250 }))
1251}
1252
1253pub async fn get_pipeline_stats(
1255 State(store): State<SharedStore>,
1256 Path(pipeline_id): Path<uuid::Uuid>,
1257) -> Result<Json<PipelineStats>> {
1258 let pipeline_manager = store.pipeline_manager();
1259
1260 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1261 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1262 })?;
1263
1264 Ok(Json(pipeline.stats()))
1265}
1266
1267pub async fn reset_pipeline(
1269 State(store): State<SharedStore>,
1270 Path(pipeline_id): Path<uuid::Uuid>,
1271) -> Result<Json<serde_json::Value>> {
1272 let pipeline_manager = store.pipeline_manager();
1273
1274 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1275 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1276 })?;
1277
1278 pipeline.reset();
1279
1280 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1281
1282 Ok(Json(serde_json::json!({
1283 "pipeline_id": pipeline_id,
1284 "reset": true
1285 })))
1286}
1287
1288pub async fn get_event_by_id(
1294 State(store): State<SharedStore>,
1295 Path(event_id): Path<uuid::Uuid>,
1296) -> Result<Json<serde_json::Value>> {
1297 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1298 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1299 })?;
1300
1301 let dto = EventDto::from(&event);
1302
1303 tracing::debug!("Event retrieved by ID: {}", event_id);
1304
1305 Ok(Json(serde_json::json!({
1306 "event": dto,
1307 "found": true
1308 })))
1309}
1310
1311pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1317 let projection_manager = store.projection_manager();
1318 let status_map = store.projection_status();
1319
1320 let projections: Vec<serde_json::Value> = projection_manager
1321 .list_projections()
1322 .iter()
1323 .map(|(name, projection)| {
1324 let status = status_map
1325 .get(name)
1326 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1327 serde_json::json!({
1328 "name": name,
1329 "type": format!("{:?}", projection.name()),
1330 "status": status,
1331 })
1332 })
1333 .collect();
1334
1335 tracing::debug!("Listed {} projections", projections.len());
1336
1337 Json(serde_json::json!({
1338 "projections": projections,
1339 "total": projections.len()
1340 }))
1341}
1342
1343pub async fn get_projection(
1345 State(store): State<SharedStore>,
1346 Path(name): Path<String>,
1347) -> Result<Json<serde_json::Value>> {
1348 let projection_manager = store.projection_manager();
1349
1350 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1351 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1352 })?;
1353
1354 Ok(Json(serde_json::json!({
1355 "name": projection.name(),
1356 "found": true
1357 })))
1358}
1359
1360pub async fn get_projection_state(
1365 State(store): State<SharedStore>,
1366 Path((name, entity_id)): Path<(String, String)>,
1367) -> Result<Json<serde_json::Value>> {
1368 let projection_manager = store.projection_manager();
1369
1370 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1371 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1372 })?;
1373
1374 let state = projection.get_state(&entity_id);
1375
1376 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1377
1378 Ok(Json(serde_json::json!({
1379 "projection": name,
1380 "entity_id": entity_id,
1381 "state": state,
1382 "found": state.is_some()
1383 })))
1384}
1385
1386pub async fn delete_projection(
1391 State(store): State<SharedStore>,
1392 Path(name): Path<String>,
1393) -> Result<Json<serde_json::Value>> {
1394 let projection_manager = store.projection_manager();
1395
1396 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1397 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1398 })?;
1399
1400 projection.clear();
1401
1402 let cache = store.projection_state_cache();
1404 let prefix = format!("{name}:");
1405 let keys_to_remove: Vec<String> = cache
1406 .iter()
1407 .filter(|entry| entry.key().starts_with(&prefix))
1408 .map(|entry| entry.key().clone())
1409 .collect();
1410 for key in keys_to_remove {
1411 cache.remove(&key);
1412 }
1413
1414 tracing::info!("Projection deleted (cleared): {}", name);
1415
1416 Ok(Json(serde_json::json!({
1417 "projection": name,
1418 "deleted": true
1419 })))
1420}
1421
1422pub async fn get_projection_state_summary(
1426 State(store): State<SharedStore>,
1427 Path(name): Path<String>,
1428) -> Result<Json<serde_json::Value>> {
1429 let projection_manager = store.projection_manager();
1430
1431 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1432 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1433 })?;
1434
1435 let cache = store.projection_state_cache();
1437 let prefix = format!("{name}:");
1438 let states: Vec<serde_json::Value> = cache
1439 .iter()
1440 .filter(|entry| entry.key().starts_with(&prefix))
1441 .map(|entry| {
1442 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1443 serde_json::json!({
1444 "entity_id": entity_id,
1445 "state": entry.value().clone()
1446 })
1447 })
1448 .collect();
1449
1450 let total = states.len();
1451
1452 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1453
1454 Ok(Json(serde_json::json!({
1455 "projection": name,
1456 "states": states,
1457 "total": total
1458 })))
1459}
1460
1461pub async fn reset_projection(
1465 State(store): State<SharedStore>,
1466 Path(name): Path<String>,
1467) -> Result<Json<serde_json::Value>> {
1468 let reprocessed = store.reset_projection(&name)?;
1469
1470 tracing::info!(
1471 "Projection reset: {} ({} events reprocessed)",
1472 name,
1473 reprocessed
1474 );
1475
1476 Ok(Json(serde_json::json!({
1477 "projection": name,
1478 "reset": true,
1479 "events_reprocessed": reprocessed
1480 })))
1481}
1482
1483pub async fn pause_projection(
1487 State(store): State<SharedStore>,
1488 Path(name): Path<String>,
1489) -> Result<Json<serde_json::Value>> {
1490 let projection_manager = store.projection_manager();
1491
1492 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1494 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1495 })?;
1496
1497 store
1498 .projection_status()
1499 .insert(name.clone(), "paused".to_string());
1500
1501 tracing::info!("Projection paused: {}", name);
1502
1503 Ok(Json(serde_json::json!({
1504 "projection": name,
1505 "status": "paused"
1506 })))
1507}
1508
1509pub async fn start_projection(
1513 State(store): State<SharedStore>,
1514 Path(name): Path<String>,
1515) -> Result<Json<serde_json::Value>> {
1516 let projection_manager = store.projection_manager();
1517
1518 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1520 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1521 })?;
1522
1523 store
1524 .projection_status()
1525 .insert(name.clone(), "running".to_string());
1526
1527 tracing::info!("Projection started: {}", name);
1528
1529 Ok(Json(serde_json::json!({
1530 "projection": name,
1531 "status": "running"
1532 })))
1533}
1534
1535#[derive(Debug, Deserialize)]
1537pub struct SaveProjectionStateRequest {
1538 pub state: serde_json::Value,
1539}
1540
1541pub async fn save_projection_state(
1546 State(store): State<SharedStore>,
1547 Path((name, entity_id)): Path<(String, String)>,
1548 Json(req): Json<SaveProjectionStateRequest>,
1549) -> Result<Json<serde_json::Value>> {
1550 let projection_cache = store.projection_state_cache();
1551
1552 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1554
1555 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1556
1557 Ok(Json(serde_json::json!({
1558 "projection": name,
1559 "entity_id": entity_id,
1560 "saved": true
1561 })))
1562}
1563
1564#[derive(Debug, Deserialize)]
1568pub struct BulkGetStateRequest {
1569 pub entity_ids: Vec<String>,
1570}
1571
1572#[derive(Debug, Deserialize)]
1576pub struct BulkSaveStateRequest {
1577 pub states: Vec<BulkSaveStateItem>,
1578}
1579
1580#[derive(Debug, Deserialize)]
1581pub struct BulkSaveStateItem {
1582 pub entity_id: String,
1583 pub state: serde_json::Value,
1584}
1585
1586pub async fn bulk_get_projection_states(
1587 State(store): State<SharedStore>,
1588 Path(name): Path<String>,
1589 Json(req): Json<BulkGetStateRequest>,
1590) -> Result<Json<serde_json::Value>> {
1591 let projection_manager = store.projection_manager();
1592
1593 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1594 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1595 })?;
1596
1597 let states: Vec<serde_json::Value> = req
1598 .entity_ids
1599 .iter()
1600 .map(|entity_id| {
1601 let state = projection.get_state(entity_id);
1602 serde_json::json!({
1603 "entity_id": entity_id,
1604 "state": state,
1605 "found": state.is_some()
1606 })
1607 })
1608 .collect();
1609
1610 tracing::debug!(
1611 "Bulk projection state retrieved: {} entities from {}",
1612 states.len(),
1613 name
1614 );
1615
1616 Ok(Json(serde_json::json!({
1617 "projection": name,
1618 "states": states,
1619 "total": states.len()
1620 })))
1621}
1622
1623pub async fn bulk_save_projection_states(
1628 State(store): State<SharedStore>,
1629 Path(name): Path<String>,
1630 Json(req): Json<BulkSaveStateRequest>,
1631) -> Result<Json<serde_json::Value>> {
1632 let projection_cache = store.projection_state_cache();
1633
1634 let mut saved_count = 0;
1635 for item in &req.states {
1636 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1637 saved_count += 1;
1638 }
1639
1640 tracing::info!(
1641 "Bulk projection state saved: {} entities for {}",
1642 saved_count,
1643 name
1644 );
1645
1646 Ok(Json(serde_json::json!({
1647 "projection": name,
1648 "saved": saved_count,
1649 "total": req.states.len()
1650 })))
1651}
1652
1653#[derive(Debug, Deserialize)]
1659pub struct ListWebhooksParams {
1660 pub tenant_id: Option<String>,
1661}
1662
1663pub async fn register_webhook(
1665 State(store): State<SharedStore>,
1666 Json(req): Json<RegisterWebhookRequest>,
1667) -> Json<serde_json::Value> {
1668 let registry = store.webhook_registry();
1669 let webhook = registry.register(req);
1670
1671 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1672
1673 Json(serde_json::json!({
1674 "webhook": webhook,
1675 "created": true
1676 }))
1677}
1678
1679pub async fn list_webhooks(
1681 State(store): State<SharedStore>,
1682 Query(params): Query<ListWebhooksParams>,
1683) -> Json<serde_json::Value> {
1684 let registry = store.webhook_registry();
1685
1686 let webhooks = if let Some(tenant_id) = params.tenant_id {
1687 registry.list_by_tenant(&tenant_id)
1688 } else {
1689 vec![]
1691 };
1692
1693 let total = webhooks.len();
1694
1695 Json(serde_json::json!({
1696 "webhooks": webhooks,
1697 "total": total
1698 }))
1699}
1700
1701pub async fn get_webhook(
1703 State(store): State<SharedStore>,
1704 Path(webhook_id): Path<uuid::Uuid>,
1705) -> Result<Json<serde_json::Value>> {
1706 let registry = store.webhook_registry();
1707
1708 let webhook = registry.get(webhook_id).ok_or_else(|| {
1709 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1710 })?;
1711
1712 Ok(Json(serde_json::json!({
1713 "webhook": webhook,
1714 "found": true
1715 })))
1716}
1717
1718pub async fn update_webhook(
1720 State(store): State<SharedStore>,
1721 Path(webhook_id): Path<uuid::Uuid>,
1722 Json(req): Json<UpdateWebhookRequest>,
1723) -> Result<Json<serde_json::Value>> {
1724 let registry = store.webhook_registry();
1725
1726 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1727 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1728 })?;
1729
1730 tracing::info!("Webhook updated: {}", webhook_id);
1731
1732 Ok(Json(serde_json::json!({
1733 "webhook": webhook,
1734 "updated": true
1735 })))
1736}
1737
1738pub async fn delete_webhook(
1740 State(store): State<SharedStore>,
1741 Path(webhook_id): Path<uuid::Uuid>,
1742) -> Result<Json<serde_json::Value>> {
1743 let registry = store.webhook_registry();
1744
1745 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1746 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1747 })?;
1748
1749 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1750
1751 Ok(Json(serde_json::json!({
1752 "webhook_id": webhook_id,
1753 "deleted": true
1754 })))
1755}
1756
1757#[derive(Debug, Deserialize)]
1759pub struct ListDeliveriesParams {
1760 pub limit: Option<usize>,
1761}
1762
1763pub async fn list_webhook_deliveries(
1765 State(store): State<SharedStore>,
1766 Path(webhook_id): Path<uuid::Uuid>,
1767 Query(params): Query<ListDeliveriesParams>,
1768) -> Result<Json<serde_json::Value>> {
1769 let registry = store.webhook_registry();
1770
1771 registry.get(webhook_id).ok_or_else(|| {
1773 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1774 })?;
1775
1776 let limit = params.limit.unwrap_or(50);
1777 let deliveries = registry.get_deliveries(webhook_id, limit);
1778 let total = deliveries.len();
1779
1780 Ok(Json(serde_json::json!({
1781 "webhook_id": webhook_id,
1782 "deliveries": deliveries,
1783 "total": total
1784 })))
1785}
1786
1787#[cfg(feature = "analytics")]
1793pub async fn eventql_query(
1794 State(store): State<SharedStore>,
1795 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1796) -> Result<Json<serde_json::Value>> {
1797 let events = store.snapshot_events();
1798 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1799 Ok(response) => Ok(Json(serde_json::json!({
1800 "columns": response.columns,
1801 "rows": response.rows,
1802 "row_count": response.row_count,
1803 }))),
1804 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1805 }
1806}
1807
1808pub async fn graphql_query(
1810 State(store): State<SharedStore>,
1811 Json(req): Json<GraphQLRequest>,
1812) -> Json<serde_json::Value> {
1813 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1814 Ok(f) => f,
1815 Err(e) => {
1816 return Json(
1817 serde_json::to_value(GraphQLResponse {
1818 data: None,
1819 errors: vec![GraphQLError { message: e }],
1820 })
1821 .unwrap(),
1822 );
1823 }
1824 };
1825
1826 let mut data = serde_json::Map::new();
1827 let mut errors = Vec::new();
1828
1829 for field in &fields {
1830 match field.name.as_str() {
1831 "events" => {
1832 let request = crate::application::dto::QueryEventsRequest {
1833 entity_id: field.arguments.get("entity_id").cloned(),
1834 event_type: field.arguments.get("event_type").cloned(),
1835 tenant_id: field.arguments.get("tenant_id").cloned(),
1836 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1837 as_of: None,
1838 since: None,
1839 until: None,
1840 event_type_prefix: None,
1841 payload_filter: None,
1842 };
1843 match store.query(&request) {
1844 Ok(events) => {
1845 let json_events: Vec<serde_json::Value> = events
1846 .iter()
1847 .map(|e| {
1848 crate::infrastructure::query::graphql::event_to_json(
1849 e,
1850 &field.fields,
1851 )
1852 })
1853 .collect();
1854 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1855 }
1856 Err(e) => errors.push(GraphQLError {
1857 message: format!("events query failed: {e}"),
1858 }),
1859 }
1860 }
1861 "event" => {
1862 if let Some(id_str) = field.arguments.get("id") {
1863 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1864 match store.get_event_by_id(&id) {
1865 Ok(Some(event)) => {
1866 data.insert(
1867 "event".to_string(),
1868 crate::infrastructure::query::graphql::event_to_json(
1869 &event,
1870 &field.fields,
1871 ),
1872 );
1873 }
1874 Ok(None) => {
1875 data.insert("event".to_string(), serde_json::Value::Null);
1876 }
1877 Err(e) => errors.push(GraphQLError {
1878 message: format!("event lookup failed: {e}"),
1879 }),
1880 }
1881 } else {
1882 errors.push(GraphQLError {
1883 message: format!("Invalid UUID: {id_str}"),
1884 });
1885 }
1886 } else {
1887 errors.push(GraphQLError {
1888 message: "event query requires 'id' argument".to_string(),
1889 });
1890 }
1891 }
1892 "projections" => {
1893 let pm = store.projection_manager();
1894 let names: Vec<serde_json::Value> = pm
1895 .list_projections()
1896 .iter()
1897 .map(|(name, _)| serde_json::Value::String(name.clone()))
1898 .collect();
1899 data.insert("projections".to_string(), serde_json::Value::Array(names));
1900 }
1901 "stats" => {
1902 let stats = store.stats();
1903 data.insert(
1904 "stats".to_string(),
1905 serde_json::json!({
1906 "total_events": stats.total_events,
1907 "total_entities": stats.total_entities,
1908 "total_event_types": stats.total_event_types,
1909 }),
1910 );
1911 }
1912 "__schema" => {
1913 data.insert(
1914 "__schema".to_string(),
1915 crate::infrastructure::query::graphql::introspection_schema(),
1916 );
1917 }
1918 other => {
1919 errors.push(GraphQLError {
1920 message: format!("Unknown field: {other}"),
1921 });
1922 }
1923 }
1924 }
1925
1926 Json(
1927 serde_json::to_value(GraphQLResponse {
1928 data: Some(serde_json::Value::Object(data)),
1929 errors,
1930 })
1931 .unwrap(),
1932 )
1933}
1934
1935pub async fn geo_query(
1937 State(store): State<SharedStore>,
1938 Json(req): Json<GeoQueryRequest>,
1939) -> Json<serde_json::Value> {
1940 let events = store.snapshot_events();
1941 let geo_index = store.geo_index();
1942 let results =
1943 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1944 let total = results.len();
1945 Json(serde_json::json!({
1946 "results": results,
1947 "total": total,
1948 }))
1949}
1950
1951pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1953 let stats = store.geo_index().stats();
1954 Json(serde_json::json!(stats))
1955}
1956
1957pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1959 let stats = store.exactly_once().stats();
1960 Json(serde_json::json!(stats))
1961}
1962
1963pub async fn schema_evolution_history(
1965 State(store): State<SharedStore>,
1966 Path(event_type): Path<String>,
1967) -> Json<serde_json::Value> {
1968 let mgr = store.schema_evolution();
1969 let history = mgr.get_history(&event_type);
1970 let version = mgr.get_version(&event_type);
1971 Json(serde_json::json!({
1972 "event_type": event_type,
1973 "current_version": version,
1974 "history": history,
1975 }))
1976}
1977
1978pub async fn schema_evolution_schema(
1980 State(store): State<SharedStore>,
1981 Path(event_type): Path<String>,
1982) -> Json<serde_json::Value> {
1983 let mgr = store.schema_evolution();
1984 if let Some(schema) = mgr.get_schema(&event_type) {
1985 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1986 Json(serde_json::json!({
1987 "event_type": event_type,
1988 "version": mgr.get_version(&event_type),
1989 "inferred_schema": schema,
1990 "json_schema": json_schema,
1991 }))
1992 } else {
1993 Json(serde_json::json!({
1994 "event_type": event_type,
1995 "error": "No schema inferred for this event type"
1996 }))
1997 }
1998}
1999
2000pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2002 let stats = store.schema_evolution().stats();
2003 let event_types = store.schema_evolution().list_event_types();
2004 Json(serde_json::json!({
2005 "stats": stats,
2006 "tracked_event_types": event_types,
2007 }))
2008}
2009
2010#[cfg(feature = "embedded-sync")]
2016pub async fn sync_pull_handler(
2017 State(state): State<AppState>,
2018 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2019) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2020 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2021
2022 let store = &state.store;
2023
2024 let since = request
2027 .version_vector
2028 .values()
2029 .map(|ts| ts.physical_ms)
2030 .min()
2031 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2032
2033 let events = store.query(&crate::application::dto::QueryEventsRequest {
2034 entity_id: None,
2035 event_type: None,
2036 tenant_id: None,
2037 as_of: None,
2038 since,
2039 until: None,
2040 limit: None,
2041 event_type_prefix: None,
2042 payload_filter: None,
2043 })?;
2044
2045 let mut replicated = Vec::with_capacity(events.len());
2047 let mut last_ms = 0u64;
2048 let mut logical = 0u32;
2049
2050 for event in &events {
2051 let event_ms = event.timestamp().timestamp_millis() as u64;
2052 if event_ms == last_ms {
2053 logical += 1;
2054 } else {
2055 last_ms = event_ms;
2056 logical = 0;
2057 }
2058
2059 replicated.push(ReplicatedEvent {
2060 event_id: event.id().to_string(),
2061 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2062 origin_region: "server".to_string(),
2063 event_data: serde_json::json!({
2064 "event_type": event.event_type_str(),
2065 "entity_id": event.entity_id_str(),
2066 "tenant_id": event.tenant_id_str(),
2067 "payload": event.payload,
2068 "metadata": event.metadata,
2069 }),
2070 });
2071 }
2072
2073 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2074 events: replicated,
2075 version_vector: std::collections::BTreeMap::new(),
2076 }))
2077}
2078
2079#[cfg(feature = "embedded-sync")]
2081pub async fn sync_push_handler(
2082 State(state): State<AppState>,
2083 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2084) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2085 let store = &state.store;
2086
2087 let mut accepted = 0usize;
2088 let mut skipped = 0usize;
2089
2090 for rep_event in &request.events {
2091 let event_data = &rep_event.event_data;
2092 let event_type = event_data
2093 .get("event_type")
2094 .and_then(|v| v.as_str())
2095 .unwrap_or("unknown")
2096 .to_string();
2097 let entity_id = event_data
2098 .get("entity_id")
2099 .and_then(|v| v.as_str())
2100 .unwrap_or("unknown")
2101 .to_string();
2102 let tenant_id = event_data
2103 .get("tenant_id")
2104 .and_then(|v| v.as_str())
2105 .unwrap_or("default")
2106 .to_string();
2107 let payload = event_data
2108 .get("payload")
2109 .cloned()
2110 .unwrap_or(serde_json::json!({}));
2111 let metadata = event_data.get("metadata").cloned();
2112
2113 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2114 Ok(domain_event) => {
2115 store.ingest(&domain_event)?;
2116 accepted += 1;
2117 }
2118 Err(_) => {
2119 skipped += 1;
2120 }
2121 }
2122 }
2123
2124 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2125 accepted,
2126 skipped,
2127 version_vector: std::collections::BTreeMap::new(),
2128 }))
2129}
2130
2131pub async fn register_consumer(
2137 State(store): State<SharedStore>,
2138 Json(req): Json<RegisterConsumerRequest>,
2139) -> Result<Json<ConsumerResponse>> {
2140 let consumer = store
2141 .consumer_registry()
2142 .register(&req.consumer_id, &req.event_type_filters);
2143
2144 Ok(Json(ConsumerResponse {
2145 consumer_id: consumer.consumer_id,
2146 event_type_filters: consumer.event_type_filters,
2147 cursor_position: consumer.cursor_position,
2148 }))
2149}
2150
2151pub async fn get_consumer(
2153 State(store): State<SharedStore>,
2154 Path(consumer_id): Path<String>,
2155) -> Result<Json<ConsumerResponse>> {
2156 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2157
2158 Ok(Json(ConsumerResponse {
2159 consumer_id: consumer.consumer_id,
2160 event_type_filters: consumer.event_type_filters,
2161 cursor_position: consumer.cursor_position,
2162 }))
2163}
2164
2165#[derive(Debug, Deserialize)]
2167pub struct ConsumerPollQuery {
2168 pub limit: Option<usize>,
2169}
2170
2171pub async fn poll_consumer_events(
2172 State(store): State<SharedStore>,
2173 Path(consumer_id): Path<String>,
2174 Query(query): Query<ConsumerPollQuery>,
2175) -> Result<Json<ConsumerEventsResponse>> {
2176 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2177 let offset = consumer.cursor_position.unwrap_or(0);
2178 let limit = query.limit.unwrap_or(100);
2179
2180 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2181 let count = events.len();
2182
2183 let consumer_events: Vec<ConsumerEventDto> = events
2184 .into_iter()
2185 .map(|(position, event)| ConsumerEventDto {
2186 position,
2187 event: EventDto::from(&event),
2188 })
2189 .collect();
2190
2191 Ok(Json(ConsumerEventsResponse {
2192 events: consumer_events,
2193 count,
2194 }))
2195}
2196
2197pub async fn ack_consumer(
2199 State(store): State<SharedStore>,
2200 Path(consumer_id): Path<String>,
2201 Json(req): Json<AckRequest>,
2202) -> Result<Json<serde_json::Value>> {
2203 let max_offset = store.total_events() as u64;
2204
2205 store
2206 .consumer_registry()
2207 .ack(&consumer_id, req.position, max_offset)
2208 .map_err(crate::error::AllSourceError::InvalidInput)?;
2209
2210 Ok(Json(serde_json::json!({
2211 "status": "ok",
2212 "consumer_id": consumer_id,
2213 "position": req.position,
2214 })))
2215}
2216
2217#[cfg(test)]
2218mod tests {
2219 use super::*;
2220 use crate::{domain::entities::Event, store::EventStore};
2221
2222 fn create_test_store() -> Arc<EventStore> {
2223 Arc::new(EventStore::new())
2224 }
2225
2226 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2227 Event::from_strings(
2228 event_type.to_string(),
2229 entity_id.to_string(),
2230 "test-stream".to_string(),
2231 serde_json::json!({
2232 "name": "Test",
2233 "value": 42
2234 }),
2235 None,
2236 )
2237 .unwrap()
2238 }
2239
2240 #[tokio::test]
2241 async fn test_query_events_has_more_and_total_count() {
2242 let store = create_test_store();
2243
2244 for i in 0..50 {
2246 store
2247 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2248 .unwrap();
2249 }
2250
2251 let req = QueryEventsRequest {
2253 entity_id: None,
2254 event_type: None,
2255 tenant_id: None,
2256 as_of: None,
2257 since: None,
2258 until: None,
2259 limit: Some(10),
2260 event_type_prefix: None,
2261 payload_filter: None,
2262 };
2263
2264 let requested_limit = req.limit;
2265 let unlimited_req = QueryEventsRequest {
2266 limit: None,
2267 ..QueryEventsRequest {
2268 entity_id: req.entity_id,
2269 event_type: req.event_type,
2270 tenant_id: req.tenant_id,
2271 as_of: req.as_of,
2272 since: req.since,
2273 until: req.until,
2274 limit: None,
2275 event_type_prefix: req.event_type_prefix,
2276 payload_filter: req.payload_filter,
2277 }
2278 };
2279 let all_events = store.query(&unlimited_req).unwrap();
2280 let total_count = all_events.len();
2281 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2282 all_events.into_iter().take(limit).collect()
2283 } else {
2284 all_events
2285 };
2286 let count = limited_events.len();
2287 let has_more = count < total_count;
2288
2289 assert_eq!(count, 10);
2290 assert_eq!(total_count, 50);
2291 assert!(has_more);
2292 }
2293
2294 #[tokio::test]
2295 async fn test_query_events_no_more_results() {
2296 let store = create_test_store();
2297
2298 for i in 0..5 {
2300 store
2301 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2302 .unwrap();
2303 }
2304
2305 let all_events = store
2307 .query(&QueryEventsRequest {
2308 entity_id: None,
2309 event_type: None,
2310 tenant_id: None,
2311 as_of: None,
2312 since: None,
2313 until: None,
2314 limit: None,
2315 event_type_prefix: None,
2316 payload_filter: None,
2317 })
2318 .unwrap();
2319 let total_count = all_events.len();
2320 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2321 let count = limited_events.len();
2322 let has_more = count < total_count;
2323
2324 assert_eq!(count, 5);
2325 assert_eq!(total_count, 5);
2326 assert!(!has_more);
2327 }
2328
2329 #[tokio::test]
2330 async fn test_list_entities_by_type_prefix() {
2331 let store = create_test_store();
2332
2333 store
2335 .ingest(&create_test_event("idx-1", "index.created"))
2336 .unwrap();
2337 store
2338 .ingest(&create_test_event("idx-1", "index.updated"))
2339 .unwrap();
2340 store
2341 .ingest(&create_test_event("idx-2", "index.created"))
2342 .unwrap();
2343 store
2344 .ingest(&create_test_event("idx-3", "index.created"))
2345 .unwrap();
2346 store
2348 .ingest(&create_test_event("trade-1", "trade.created"))
2349 .unwrap();
2350 store
2351 .ingest(&create_test_event("trade-2", "trade.created"))
2352 .unwrap();
2353
2354 let req = ListEntitiesRequest {
2356 event_type_prefix: Some("index.".to_string()),
2357 payload_filter: None,
2358 limit: None,
2359 offset: None,
2360 };
2361 let query_req = QueryEventsRequest {
2362 entity_id: None,
2363 event_type: None,
2364 tenant_id: None,
2365 as_of: None,
2366 since: None,
2367 until: None,
2368 limit: None,
2369 event_type_prefix: req.event_type_prefix,
2370 payload_filter: req.payload_filter,
2371 };
2372 let events = store.query(&query_req).unwrap();
2373
2374 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2376 std::collections::HashMap::new();
2377 for event in &events {
2378 entity_map
2379 .entry(event.entity_id().to_string())
2380 .or_default()
2381 .push(event);
2382 }
2383
2384 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2387 assert_eq!(entity_map["idx-3"].len(), 1);
2388 }
2389
2390 fn create_test_event_with_payload(
2391 entity_id: &str,
2392 event_type: &str,
2393 payload: serde_json::Value,
2394 ) -> Event {
2395 Event::from_strings(
2396 event_type.to_string(),
2397 entity_id.to_string(),
2398 "test-stream".to_string(),
2399 payload,
2400 None,
2401 )
2402 .unwrap()
2403 }
2404
2405 #[tokio::test]
2406 async fn test_detect_duplicates_by_payload_fields() {
2407 let store = create_test_store();
2408
2409 store
2411 .ingest(&create_test_event_with_payload(
2412 "idx-1",
2413 "index.created",
2414 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2415 ))
2416 .unwrap();
2417 store
2418 .ingest(&create_test_event_with_payload(
2419 "idx-2",
2420 "index.created",
2421 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2422 ))
2423 .unwrap();
2424 store
2425 .ingest(&create_test_event_with_payload(
2426 "idx-3",
2427 "index.created",
2428 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2429 ))
2430 .unwrap();
2431 store
2432 .ingest(&create_test_event_with_payload(
2433 "idx-4",
2434 "index.created",
2435 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2436 ))
2437 .unwrap();
2438 store
2439 .ingest(&create_test_event_with_payload(
2440 "idx-5",
2441 "index.created",
2442 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2443 ))
2444 .unwrap();
2445
2446 let query_req = QueryEventsRequest {
2448 entity_id: None,
2449 event_type: None,
2450 tenant_id: None,
2451 as_of: None,
2452 since: None,
2453 until: None,
2454 limit: None,
2455 event_type_prefix: Some("index.".to_string()),
2456 payload_filter: None,
2457 };
2458 let events = store.query(&query_req).unwrap();
2459
2460 let group_by_fields = vec!["name"];
2462 let mut entity_latest: std::collections::HashMap<String, &Event> =
2463 std::collections::HashMap::new();
2464 for event in &events {
2465 let eid = event.entity_id().to_string();
2466 entity_latest
2467 .entry(eid)
2468 .and_modify(|existing| {
2469 if event.timestamp() > existing.timestamp() {
2470 *existing = event;
2471 }
2472 })
2473 .or_insert(event);
2474 }
2475
2476 let mut groups: std::collections::HashMap<String, Vec<String>> =
2477 std::collections::HashMap::new();
2478 for (entity_id, event) in &entity_latest {
2479 let payload = event.payload();
2480 let mut key_parts = serde_json::Map::new();
2481 for field in &group_by_fields {
2482 let value = payload
2483 .get(*field)
2484 .cloned()
2485 .unwrap_or(serde_json::Value::Null);
2486 key_parts.insert((*field).to_string(), value);
2487 }
2488 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2489 groups.entry(key_str).or_default().push(entity_id.clone());
2490 }
2491
2492 let duplicate_groups: Vec<_> = groups
2493 .into_iter()
2494 .filter(|(_, ids)| ids.len() > 1)
2495 .collect();
2496
2497 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2499 assert_eq!(ids.len(), 2);
2500 }
2501 }
2502
2503 #[tokio::test]
2504 async fn test_detect_duplicates_no_duplicates() {
2505 let store = create_test_store();
2506
2507 store
2509 .ingest(&create_test_event_with_payload(
2510 "idx-1",
2511 "index.created",
2512 serde_json::json!({"name": "A"}),
2513 ))
2514 .unwrap();
2515 store
2516 .ingest(&create_test_event_with_payload(
2517 "idx-2",
2518 "index.created",
2519 serde_json::json!({"name": "B"}),
2520 ))
2521 .unwrap();
2522
2523 let query_req = QueryEventsRequest {
2524 entity_id: None,
2525 event_type: None,
2526 tenant_id: None,
2527 as_of: None,
2528 since: None,
2529 until: None,
2530 limit: None,
2531 event_type_prefix: Some("index.".to_string()),
2532 payload_filter: None,
2533 };
2534 let events = store.query(&query_req).unwrap();
2535
2536 let mut entity_latest: std::collections::HashMap<String, &Event> =
2537 std::collections::HashMap::new();
2538 for event in &events {
2539 entity_latest
2540 .entry(event.entity_id().to_string())
2541 .or_insert(event);
2542 }
2543
2544 let mut groups: std::collections::HashMap<String, Vec<String>> =
2545 std::collections::HashMap::new();
2546 for (entity_id, event) in &entity_latest {
2547 let key_str =
2548 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2549 .unwrap();
2550 groups.entry(key_str).or_default().push(entity_id.clone());
2551 }
2552
2553 let duplicate_groups: Vec<_> = groups
2554 .into_iter()
2555 .filter(|(_, ids)| ids.len() > 1)
2556 .collect();
2557
2558 assert_eq!(duplicate_groups.len(), 0); }
2560
2561 #[tokio::test]
2562 async fn test_detect_duplicates_multi_field_group_by() {
2563 let store = create_test_store();
2564
2565 store
2567 .ingest(&create_test_event_with_payload(
2568 "idx-1",
2569 "index.created",
2570 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2571 ))
2572 .unwrap();
2573 store
2574 .ingest(&create_test_event_with_payload(
2575 "idx-2",
2576 "index.created",
2577 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2578 ))
2579 .unwrap();
2580 store
2582 .ingest(&create_test_event_with_payload(
2583 "idx-3",
2584 "index.created",
2585 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2586 ))
2587 .unwrap();
2588
2589 let query_req = QueryEventsRequest {
2590 entity_id: None,
2591 event_type: None,
2592 tenant_id: None,
2593 as_of: None,
2594 since: None,
2595 until: None,
2596 limit: None,
2597 event_type_prefix: Some("index.".to_string()),
2598 payload_filter: None,
2599 };
2600 let events = store.query(&query_req).unwrap();
2601
2602 let group_by_fields = vec!["name", "user_id"];
2603 let mut entity_latest: std::collections::HashMap<String, &Event> =
2604 std::collections::HashMap::new();
2605 for event in &events {
2606 entity_latest
2607 .entry(event.entity_id().to_string())
2608 .and_modify(|existing| {
2609 if event.timestamp() > existing.timestamp() {
2610 *existing = event;
2611 }
2612 })
2613 .or_insert(event);
2614 }
2615
2616 let mut groups: std::collections::HashMap<String, Vec<String>> =
2617 std::collections::HashMap::new();
2618 for (entity_id, event) in &entity_latest {
2619 let payload = event.payload();
2620 let mut key_parts = serde_json::Map::new();
2621 for field in &group_by_fields {
2622 let value = payload
2623 .get(*field)
2624 .cloned()
2625 .unwrap_or(serde_json::Value::Null);
2626 key_parts.insert((*field).to_string(), value);
2627 }
2628 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2629 groups.entry(key_str).or_default().push(entity_id.clone());
2630 }
2631
2632 let duplicate_groups: Vec<_> = groups
2633 .into_iter()
2634 .filter(|(_, ids)| ids.len() > 1)
2635 .collect();
2636
2637 assert_eq!(duplicate_groups.len(), 1);
2639 let (_, ref ids) = duplicate_groups[0];
2640 assert_eq!(ids.len(), 2);
2641 let mut sorted_ids = ids.clone();
2642 sorted_ids.sort();
2643 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2644 }
2645
2646 #[tokio::test]
2647 async fn test_projection_state_cache() {
2648 let store = create_test_store();
2649
2650 let cache = store.projection_state_cache();
2652 cache.insert(
2653 "entity_snapshots:user-123".to_string(),
2654 serde_json::json!({"name": "Test User", "age": 30}),
2655 );
2656
2657 let state = cache.get("entity_snapshots:user-123");
2659 assert!(state.is_some());
2660 let state = state.unwrap();
2661 assert_eq!(state["name"], "Test User");
2662 assert_eq!(state["age"], 30);
2663 }
2664
2665 #[tokio::test]
2666 async fn test_projection_manager_list_projections() {
2667 let store = create_test_store();
2668
2669 let projection_manager = store.projection_manager();
2671 let projections = projection_manager.list_projections();
2672
2673 assert!(projections.len() >= 2);
2675
2676 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2677 assert!(names.contains(&"entity_snapshots"));
2678 assert!(names.contains(&"event_counters"));
2679 }
2680
2681 #[tokio::test]
2682 async fn test_projection_state_after_event_ingestion() {
2683 let store = create_test_store();
2684
2685 let event = create_test_event("user-456", "user.created");
2687 store.ingest(&event).unwrap();
2688
2689 let projection_manager = store.projection_manager();
2691 let snapshot_projection = projection_manager
2692 .get_projection("entity_snapshots")
2693 .unwrap();
2694
2695 let state = snapshot_projection.get_state("user-456");
2696 assert!(state.is_some());
2697 let state = state.unwrap();
2698 assert_eq!(state["name"], "Test");
2699 assert_eq!(state["value"], 42);
2700 }
2701
2702 #[tokio::test]
2703 async fn test_projection_state_cache_multiple_entities() {
2704 let store = create_test_store();
2705 let cache = store.projection_state_cache();
2706
2707 for i in 0..10 {
2709 cache.insert(
2710 format!("entity_snapshots:entity-{i}"),
2711 serde_json::json!({"id": i, "status": "active"}),
2712 );
2713 }
2714
2715 assert_eq!(cache.len(), 10);
2717
2718 for i in 0..10 {
2720 let key = format!("entity_snapshots:entity-{i}");
2721 let state = cache.get(&key);
2722 assert!(state.is_some());
2723 assert_eq!(state.unwrap()["id"], i);
2724 }
2725 }
2726
2727 #[tokio::test]
2728 async fn test_projection_state_update() {
2729 let store = create_test_store();
2730 let cache = store.projection_state_cache();
2731
2732 cache.insert(
2734 "entity_snapshots:user-789".to_string(),
2735 serde_json::json!({"balance": 100}),
2736 );
2737
2738 cache.insert(
2740 "entity_snapshots:user-789".to_string(),
2741 serde_json::json!({"balance": 150}),
2742 );
2743
2744 let state = cache.get("entity_snapshots:user-789").unwrap();
2746 assert_eq!(state["balance"], 150);
2747 }
2748
2749 #[tokio::test]
2750 async fn test_event_counter_projection() {
2751 let store = create_test_store();
2752
2753 store
2755 .ingest(&create_test_event("user-1", "user.created"))
2756 .unwrap();
2757 store
2758 .ingest(&create_test_event("user-2", "user.created"))
2759 .unwrap();
2760 store
2761 .ingest(&create_test_event("user-1", "user.updated"))
2762 .unwrap();
2763
2764 let projection_manager = store.projection_manager();
2766 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2767
2768 let created_state = counter_projection.get_state("user.created");
2770 assert!(created_state.is_some());
2771 assert_eq!(created_state.unwrap()["count"], 2);
2772
2773 let updated_state = counter_projection.get_state("user.updated");
2774 assert!(updated_state.is_some());
2775 assert_eq!(updated_state.unwrap()["count"], 1);
2776 }
2777
2778 #[tokio::test]
2779 async fn test_projection_state_cache_key_format() {
2780 let store = create_test_store();
2781 let cache = store.projection_state_cache();
2782
2783 let key = "orders:order-12345".to_string();
2785 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2786
2787 let state = cache.get(&key).unwrap();
2788 assert_eq!(state["total"], 99.99);
2789 }
2790
2791 #[tokio::test]
2792 async fn test_projection_state_cache_removal() {
2793 let store = create_test_store();
2794 let cache = store.projection_state_cache();
2795
2796 cache.insert(
2798 "test:entity-1".to_string(),
2799 serde_json::json!({"data": "value"}),
2800 );
2801 assert_eq!(cache.len(), 1);
2802
2803 cache.remove("test:entity-1");
2804 assert_eq!(cache.len(), 0);
2805 assert!(cache.get("test:entity-1").is_none());
2806 }
2807
2808 #[tokio::test]
2809 async fn test_get_nonexistent_projection() {
2810 let store = create_test_store();
2811 let projection_manager = store.projection_manager();
2812
2813 let projection = projection_manager.get_projection("nonexistent_projection");
2815 assert!(projection.is_none());
2816 }
2817
2818 #[tokio::test]
2819 async fn test_get_nonexistent_entity_state() {
2820 let store = create_test_store();
2821 let projection_manager = store.projection_manager();
2822
2823 let snapshot_projection = projection_manager
2825 .get_projection("entity_snapshots")
2826 .unwrap();
2827 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2828 assert!(state.is_none());
2829 }
2830
2831 #[tokio::test]
2832 async fn test_projection_state_cache_concurrent_access() {
2833 let store = create_test_store();
2834 let cache = store.projection_state_cache();
2835
2836 let handles: Vec<_> = (0..10)
2838 .map(|i| {
2839 let cache_clone = cache.clone();
2840 tokio::spawn(async move {
2841 cache_clone.insert(
2842 format!("concurrent:entity-{i}"),
2843 serde_json::json!({"thread": i}),
2844 );
2845 })
2846 })
2847 .collect();
2848
2849 for handle in handles {
2850 handle.await.unwrap();
2851 }
2852
2853 assert_eq!(cache.len(), 10);
2855 }
2856
2857 #[tokio::test]
2858 async fn test_projection_state_large_payload() {
2859 let store = create_test_store();
2860 let cache = store.projection_state_cache();
2861
2862 let large_array: Vec<serde_json::Value> = (0..1000)
2864 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2865 .collect();
2866
2867 cache.insert(
2868 "large:entity-1".to_string(),
2869 serde_json::json!({"items": large_array}),
2870 );
2871
2872 let state = cache.get("large:entity-1").unwrap();
2873 let items = state["items"].as_array().unwrap();
2874 assert_eq!(items.len(), 1000);
2875 }
2876
2877 #[tokio::test]
2878 async fn test_projection_state_complex_json() {
2879 let store = create_test_store();
2880 let cache = store.projection_state_cache();
2881
2882 let complex_state = serde_json::json!({
2884 "user": {
2885 "id": "user-123",
2886 "profile": {
2887 "name": "John Doe",
2888 "email": "john@example.com",
2889 "settings": {
2890 "theme": "dark",
2891 "notifications": true
2892 }
2893 },
2894 "roles": ["admin", "user"],
2895 "metadata": {
2896 "created_at": "2025-01-01T00:00:00Z",
2897 "last_login": null
2898 }
2899 }
2900 });
2901
2902 cache.insert("complex:user-123".to_string(), complex_state);
2903
2904 let state = cache.get("complex:user-123").unwrap();
2905 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2906 assert_eq!(state["user"]["roles"][0], "admin");
2907 assert!(state["user"]["metadata"]["last_login"].is_null());
2908 }
2909
2910 #[tokio::test]
2911 async fn test_projection_state_cache_iteration() {
2912 let store = create_test_store();
2913 let cache = store.projection_state_cache();
2914
2915 for i in 0..5 {
2917 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2918 }
2919
2920 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2922 assert_eq!(entries.len(), 5);
2923 }
2924
2925 #[tokio::test]
2926 async fn test_projection_manager_get_entity_snapshots() {
2927 let store = create_test_store();
2928 let projection_manager = store.projection_manager();
2929
2930 let projection = projection_manager.get_projection("entity_snapshots");
2932 assert!(projection.is_some());
2933 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2934 }
2935
2936 #[tokio::test]
2937 async fn test_projection_manager_get_event_counters() {
2938 let store = create_test_store();
2939 let projection_manager = store.projection_manager();
2940
2941 let projection = projection_manager.get_projection("event_counters");
2943 assert!(projection.is_some());
2944 assert_eq!(projection.unwrap().name(), "event_counters");
2945 }
2946
2947 #[tokio::test]
2948 async fn test_projection_state_cache_overwrite() {
2949 let store = create_test_store();
2950 let cache = store.projection_state_cache();
2951
2952 cache.insert(
2954 "overwrite:entity-1".to_string(),
2955 serde_json::json!({"version": 1}),
2956 );
2957
2958 cache.insert(
2960 "overwrite:entity-1".to_string(),
2961 serde_json::json!({"version": 2}),
2962 );
2963
2964 cache.insert(
2966 "overwrite:entity-1".to_string(),
2967 serde_json::json!({"version": 3}),
2968 );
2969
2970 let state = cache.get("overwrite:entity-1").unwrap();
2971 assert_eq!(state["version"], 3);
2972
2973 assert_eq!(cache.len(), 1);
2975 }
2976
2977 #[tokio::test]
2978 async fn test_projection_state_multiple_projections() {
2979 let store = create_test_store();
2980 let cache = store.projection_state_cache();
2981
2982 cache.insert(
2984 "entity_snapshots:user-1".to_string(),
2985 serde_json::json!({"name": "Alice"}),
2986 );
2987 cache.insert(
2988 "event_counters:user.created".to_string(),
2989 serde_json::json!({"count": 5}),
2990 );
2991 cache.insert(
2992 "custom_projection:order-1".to_string(),
2993 serde_json::json!({"total": 150.0}),
2994 );
2995
2996 assert_eq!(
2998 cache.get("entity_snapshots:user-1").unwrap()["name"],
2999 "Alice"
3000 );
3001 assert_eq!(
3002 cache.get("event_counters:user.created").unwrap()["count"],
3003 5
3004 );
3005 assert_eq!(
3006 cache.get("custom_projection:order-1").unwrap()["total"],
3007 150.0
3008 );
3009 }
3010
3011 #[tokio::test]
3012 async fn test_bulk_projection_state_access() {
3013 let store = create_test_store();
3014
3015 for i in 0..5 {
3017 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3018 store.ingest(&event).unwrap();
3019 }
3020
3021 let projection_manager = store.projection_manager();
3023 let snapshot_projection = projection_manager
3024 .get_projection("entity_snapshots")
3025 .unwrap();
3026
3027 for i in 0..5 {
3029 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3030 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3031 }
3032 }
3033
3034 #[tokio::test]
3035 async fn test_bulk_save_projection_states() {
3036 let store = create_test_store();
3037 let cache = store.projection_state_cache();
3038
3039 let states = vec![
3041 BulkSaveStateItem {
3042 entity_id: "bulk-entity-1".to_string(),
3043 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3044 },
3045 BulkSaveStateItem {
3046 entity_id: "bulk-entity-2".to_string(),
3047 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3048 },
3049 BulkSaveStateItem {
3050 entity_id: "bulk-entity-3".to_string(),
3051 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3052 },
3053 ];
3054
3055 let projection_name = "test_projection";
3056
3057 for item in &states {
3059 cache.insert(
3060 format!("{projection_name}:{}", item.entity_id),
3061 item.state.clone(),
3062 );
3063 }
3064
3065 assert_eq!(cache.len(), 3);
3067
3068 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3069 assert_eq!(state1["name"], "Entity 1");
3070 assert_eq!(state1["value"], 100);
3071
3072 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3073 assert_eq!(state2["name"], "Entity 2");
3074 assert_eq!(state2["value"], 200);
3075
3076 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3077 assert_eq!(state3["name"], "Entity 3");
3078 assert_eq!(state3["value"], 300);
3079 }
3080
3081 #[tokio::test]
3082 async fn test_bulk_save_empty_states() {
3083 let store = create_test_store();
3084 let cache = store.projection_state_cache();
3085
3086 cache.clear();
3088
3089 let states: Vec<BulkSaveStateItem> = vec![];
3091 assert_eq!(states.len(), 0);
3092
3093 assert_eq!(cache.len(), 0);
3095 }
3096
3097 #[tokio::test]
3098 async fn test_bulk_save_overwrites_existing() {
3099 let store = create_test_store();
3100 let cache = store.projection_state_cache();
3101
3102 cache.insert(
3104 "test:entity-1".to_string(),
3105 serde_json::json!({"version": 1, "data": "initial"}),
3106 );
3107
3108 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3110 cache.insert("test:entity-1".to_string(), new_state);
3111
3112 let state = cache.get("test:entity-1").unwrap();
3114 assert_eq!(state["version"], 2);
3115 assert_eq!(state["data"], "updated");
3116 }
3117
3118 #[tokio::test]
3119 async fn test_bulk_save_high_volume() {
3120 let store = create_test_store();
3121 let cache = store.projection_state_cache();
3122
3123 for i in 0..1000 {
3125 cache.insert(
3126 format!("volume_test:entity-{i}"),
3127 serde_json::json!({"index": i, "status": "active"}),
3128 );
3129 }
3130
3131 assert_eq!(cache.len(), 1000);
3133
3134 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3136 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3137 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3138 }
3139
3140 #[tokio::test]
3141 async fn test_bulk_save_different_projections() {
3142 let store = create_test_store();
3143 let cache = store.projection_state_cache();
3144
3145 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3147
3148 for proj in &projections {
3149 for i in 0..5 {
3150 cache.insert(
3151 format!("{proj}:entity-{i}"),
3152 serde_json::json!({"projection": proj, "id": i}),
3153 );
3154 }
3155 }
3156
3157 assert_eq!(cache.len(), 15);
3159
3160 for proj in &projections {
3162 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3163 assert_eq!(state["projection"], *proj);
3164 }
3165 }
3166}