1use crate::{
2 application::{
3 dto::{
4 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
5 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
6 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
7 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
8 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
9 },
10 services::{
11 analytics::{
12 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
13 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
14 },
15 pipeline::{PipelineConfig, PipelineStats},
16 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
17 schema::{
18 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
19 ValidateEventRequest, ValidateEventResponse,
20 },
21 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
22 },
23 },
24 domain::entities::Event,
25 error::Result,
26 infrastructure::{
27 persistence::{
28 compaction::CompactionResult,
29 snapshot::{
30 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
31 ListSnapshotsResponse, SnapshotInfo,
32 },
33 },
34 query::{
35 eventql::EventQLRequest,
36 geospatial::GeoQueryRequest,
37 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
38 },
39 replication::ReplicationMode,
40 web::api_v1::AppState,
41 },
42 store::{EventStore, EventTypeInfo, StreamInfo},
43};
44use axum::{
45 Json, Router,
46 extract::{Path, Query, State, WebSocketUpgrade},
47 response::{IntoResponse, Response},
48 routing::{get, post, put},
49};
50use serde::Deserialize;
51use std::sync::Arc;
52use tower_http::{
53 cors::{Any, CorsLayer},
54 trace::TraceLayer,
55};
56
57type SharedStore = Arc<EventStore>;
58
59async fn await_replication_ack(state: &AppState) {
65 let shipper_guard = state.wal_shipper.read().await;
66 if let Some(ref shipper) = *shipper_guard {
67 let mode = shipper.replication_mode();
68 if mode == ReplicationMode::Async {
69 return;
70 }
71
72 let target_offset = shipper.current_leader_offset();
73 if target_offset == 0 {
74 return;
75 }
76
77 let shipper = Arc::clone(shipper);
78 drop(shipper_guard);
80
81 let timer = state
82 .store
83 .metrics()
84 .replication_ack_wait_seconds
85 .start_timer();
86 let acked = shipper.wait_for_ack(target_offset).await;
87 timer.observe_duration();
88
89 if !acked {
90 tracing::warn!(
91 "Replication ACK timeout in {} mode (offset {}). \
92 Write succeeded locally but follower confirmation pending.",
93 mode,
94 target_offset,
95 );
96 }
97 }
98}
99
100pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
101 let app = Router::new()
102 .route("/health", get(health))
103 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
105 .route("/api/v1/events/batch", post(ingest_events_batch))
106 .route("/api/v1/events/query", get(query_events))
107 .route("/api/v1/events/{event_id}", get(get_event_by_id))
108 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
111 .route("/api/v1/event-types", get(list_event_types))
112 .route("/api/v1/entities/duplicates", get(detect_duplicates))
113 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
114 .route(
115 "/api/v1/entities/{entity_id}/snapshot",
116 get(get_entity_snapshot),
117 )
118 .route("/api/v1/stats", get(get_stats))
119 .route("/api/v1/analytics/frequency", get(analytics_frequency))
121 .route("/api/v1/analytics/summary", get(analytics_summary))
122 .route("/api/v1/analytics/correlation", get(analytics_correlation))
123 .route("/api/v1/snapshots", post(create_snapshot))
125 .route("/api/v1/snapshots", get(list_snapshots))
126 .route(
127 "/api/v1/snapshots/{entity_id}/latest",
128 get(get_latest_snapshot),
129 )
130 .route("/api/v1/compaction/trigger", post(trigger_compaction))
132 .route("/api/v1/compaction/stats", get(compaction_stats))
133 .route("/api/v1/schemas", post(register_schema))
135 .route("/api/v1/schemas", get(list_subjects))
136 .route("/api/v1/schemas/{subject}", get(get_schema))
137 .route(
138 "/api/v1/schemas/{subject}/versions",
139 get(list_schema_versions),
140 )
141 .route("/api/v1/schemas/validate", post(validate_event_schema))
142 .route(
143 "/api/v1/schemas/{subject}/compatibility",
144 put(set_compatibility_mode),
145 )
146 .route("/api/v1/replay", post(start_replay))
148 .route("/api/v1/replay", get(list_replays))
149 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
150 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
151 .route(
152 "/api/v1/replay/{replay_id}",
153 axum::routing::delete(delete_replay),
154 )
155 .route("/api/v1/pipelines", post(register_pipeline))
157 .route("/api/v1/pipelines", get(list_pipelines))
158 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
159 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
160 .route(
161 "/api/v1/pipelines/{pipeline_id}",
162 axum::routing::delete(remove_pipeline),
163 )
164 .route(
165 "/api/v1/pipelines/{pipeline_id}/stats",
166 get(get_pipeline_stats),
167 )
168 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
169 .route("/api/v1/projections", get(list_projections))
171 .route("/api/v1/projections/{name}", get(get_projection))
172 .route(
173 "/api/v1/projections/{name}",
174 axum::routing::delete(delete_projection),
175 )
176 .route(
177 "/api/v1/projections/{name}/state",
178 get(get_projection_state_summary),
179 )
180 .route("/api/v1/projections/{name}/reset", post(reset_projection))
181 .route(
182 "/api/v1/projections/{name}/{entity_id}/state",
183 get(get_projection_state),
184 )
185 .route(
186 "/api/v1/projections/{name}/{entity_id}/state",
187 post(save_projection_state),
188 )
189 .route(
190 "/api/v1/projections/{name}/{entity_id}/state",
191 put(save_projection_state),
192 )
193 .route(
194 "/api/v1/projections/{name}/bulk",
195 post(bulk_get_projection_states),
196 )
197 .route(
198 "/api/v1/projections/{name}/bulk/save",
199 post(bulk_save_projection_states),
200 )
201 .route("/api/v1/webhooks", post(register_webhook))
203 .route("/api/v1/webhooks", get(list_webhooks))
204 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
205 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
206 .route(
207 "/api/v1/webhooks/{webhook_id}",
208 axum::routing::delete(delete_webhook),
209 )
210 .route(
211 "/api/v1/webhooks/{webhook_id}/deliveries",
212 get(list_webhook_deliveries),
213 )
214 .route("/api/v1/eventql", post(eventql_query))
216 .route("/api/v1/graphql", post(graphql_query))
217 .route("/api/v1/geospatial/query", post(geo_query))
218 .route("/api/v1/geospatial/stats", get(geo_stats))
219 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
220 .route(
221 "/api/v1/schema-evolution/history/{event_type}",
222 get(schema_evolution_history),
223 )
224 .route(
225 "/api/v1/schema-evolution/schema/{event_type}",
226 get(schema_evolution_schema),
227 )
228 .route(
229 "/api/v1/schema-evolution/stats",
230 get(schema_evolution_stats),
231 )
232 .layer(
233 CorsLayer::new()
234 .allow_origin(Any)
235 .allow_methods(Any)
236 .allow_headers(Any),
237 )
238 .layer(TraceLayer::new_for_http())
239 .with_state(store);
240
241 let listener = tokio::net::TcpListener::bind(addr).await?;
242 axum::serve(listener, app).await?;
243
244 Ok(())
245}
246
247pub async fn health() -> impl IntoResponse {
248 Json(serde_json::json!({
249 "status": "healthy",
250 "service": "allsource-core",
251 "version": env!("CARGO_PKG_VERSION")
252 }))
253}
254
255pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
257 let metrics = store.metrics();
258
259 match metrics.encode() {
260 Ok(encoded) => Response::builder()
261 .status(200)
262 .header("Content-Type", "text/plain; version=0.0.4")
263 .body(encoded)
264 .unwrap()
265 .into_response(),
266 Err(e) => Response::builder()
267 .status(500)
268 .body(format!("Error encoding metrics: {e}"))
269 .unwrap()
270 .into_response(),
271 }
272}
273
274pub async fn ingest_event(
275 State(store): State<SharedStore>,
276 Json(req): Json<IngestEventRequest>,
277) -> Result<Json<IngestEventResponse>> {
278 let expected_version = req.expected_version;
279
280 let event = Event::from_strings(
282 req.event_type,
283 req.entity_id,
284 "default".to_string(),
285 req.payload,
286 req.metadata,
287 )?;
288
289 let event_id = event.id;
290 let timestamp = event.timestamp;
291
292 let new_version = store.ingest_with_expected_version(event, expected_version)?;
293
294 tracing::info!("Event ingested: {}", event_id);
295
296 Ok(Json(IngestEventResponse {
297 event_id,
298 timestamp,
299 version: Some(new_version),
300 }))
301}
302
303pub async fn ingest_event_v1(
307 State(state): State<AppState>,
308 Json(req): Json<IngestEventRequest>,
309) -> Result<Json<IngestEventResponse>> {
310 let expected_version = req.expected_version;
311
312 let event = Event::from_strings(
313 req.event_type,
314 req.entity_id,
315 "default".to_string(),
316 req.payload,
317 req.metadata,
318 )?;
319
320 let event_id = event.id;
321 let timestamp = event.timestamp;
322
323 let new_version = state
324 .store
325 .ingest_with_expected_version(event, expected_version)?;
326
327 await_replication_ack(&state).await;
329
330 tracing::info!("Event ingested: {}", event_id);
331
332 Ok(Json(IngestEventResponse {
333 event_id,
334 timestamp,
335 version: Some(new_version),
336 }))
337}
338
339pub async fn ingest_events_batch(
344 State(store): State<SharedStore>,
345 Json(req): Json<IngestEventsBatchRequest>,
346) -> Result<Json<IngestEventsBatchResponse>> {
347 let total = req.events.len();
348 let mut ingested_events = Vec::with_capacity(total);
349
350 for event_req in req.events {
351 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
352 let expected_version = event_req.expected_version;
353
354 let event = Event::from_strings(
355 event_req.event_type,
356 event_req.entity_id,
357 tenant_id,
358 event_req.payload,
359 event_req.metadata,
360 )?;
361
362 let event_id = event.id;
363 let timestamp = event.timestamp;
364
365 let new_version = store.ingest_with_expected_version(event, expected_version)?;
366
367 ingested_events.push(IngestEventResponse {
368 event_id,
369 timestamp,
370 version: Some(new_version),
371 });
372 }
373
374 let ingested = ingested_events.len();
375 tracing::info!("Batch ingested {} events", ingested);
376
377 Ok(Json(IngestEventsBatchResponse {
378 total,
379 ingested,
380 events: ingested_events,
381 }))
382}
383
384pub async fn ingest_events_batch_v1(
388 State(state): State<AppState>,
389 Json(req): Json<IngestEventsBatchRequest>,
390) -> Result<Json<IngestEventsBatchResponse>> {
391 let total = req.events.len();
392 let mut ingested_events = Vec::with_capacity(total);
393
394 for event_req in req.events {
395 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
396 let expected_version = event_req.expected_version;
397
398 let event = Event::from_strings(
399 event_req.event_type,
400 event_req.entity_id,
401 tenant_id,
402 event_req.payload,
403 event_req.metadata,
404 )?;
405
406 let event_id = event.id;
407 let timestamp = event.timestamp;
408
409 let new_version = state
410 .store
411 .ingest_with_expected_version(event, expected_version)?;
412
413 ingested_events.push(IngestEventResponse {
414 event_id,
415 timestamp,
416 version: Some(new_version),
417 });
418 }
419
420 await_replication_ack(&state).await;
422
423 let ingested = ingested_events.len();
424 tracing::info!("Batch ingested {} events", ingested);
425
426 Ok(Json(IngestEventsBatchResponse {
427 total,
428 ingested,
429 events: ingested_events,
430 }))
431}
432
433pub async fn query_events(
434 State(store): State<SharedStore>,
435 Query(req): Query<QueryEventsRequest>,
436) -> Result<Json<QueryEventsResponse>> {
437 let requested_limit = req.limit;
438 let queried_entity_id = req.entity_id.clone();
439
440 let unlimited_req = QueryEventsRequest {
442 entity_id: req.entity_id,
443 event_type: req.event_type,
444 tenant_id: req.tenant_id,
445 as_of: req.as_of,
446 since: req.since,
447 until: req.until,
448 limit: None,
449 event_type_prefix: req.event_type_prefix,
450 payload_filter: req.payload_filter,
451 };
452 let all_events = store.query(unlimited_req)?;
453 let total_count = all_events.len();
454
455 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
457 all_events.into_iter().take(limit).collect()
458 } else {
459 all_events
460 };
461
462 let count = limited_events.len();
463 let has_more = count < total_count;
464 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
465
466 let entity_version = queried_entity_id
468 .as_deref()
469 .map(|eid| store.get_entity_version(eid));
470
471 tracing::debug!("Query returned {} events (total: {})", count, total_count);
472
473 Ok(Json(QueryEventsResponse {
474 events,
475 count,
476 total_count,
477 has_more,
478 entity_version,
479 }))
480}
481
482pub async fn list_entities(
483 State(store): State<SharedStore>,
484 Query(req): Query<ListEntitiesRequest>,
485) -> Result<Json<ListEntitiesResponse>> {
486 use std::collections::HashMap;
487
488 let query_req = QueryEventsRequest {
490 entity_id: None,
491 event_type: None,
492 tenant_id: None,
493 as_of: None,
494 since: None,
495 until: None,
496 limit: None,
497 event_type_prefix: req.event_type_prefix,
498 payload_filter: req.payload_filter,
499 };
500 let events = store.query(query_req)?;
501
502 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
504 for event in &events {
505 entity_map
506 .entry(event.entity_id().to_string())
507 .or_default()
508 .push(event);
509 }
510
511 let mut summaries: Vec<EntitySummary> = entity_map
513 .into_iter()
514 .map(|(entity_id, events)| {
515 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
516 EntitySummary {
517 entity_id,
518 event_count: events.len(),
519 last_event_type: last.event_type_str().to_string(),
520 last_event_at: last.timestamp(),
521 }
522 })
523 .collect();
524 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
525
526 let total = summaries.len();
527
528 let offset = req.offset.unwrap_or(0);
530 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
531 let summaries = if let Some(limit) = req.limit {
532 let has_more = summaries.len() > limit;
533 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
534 return Ok(Json(ListEntitiesResponse {
535 entities: truncated,
536 total,
537 has_more,
538 }));
539 } else {
540 summaries
541 };
542
543 Ok(Json(ListEntitiesResponse {
544 entities: summaries,
545 total,
546 has_more: false,
547 }))
548}
549
550pub async fn detect_duplicates(
551 State(store): State<SharedStore>,
552 Query(req): Query<DetectDuplicatesRequest>,
553) -> Result<Json<DetectDuplicatesResponse>> {
554 use std::collections::HashMap;
555
556 let group_by_fields: Vec<&str> = req.group_by.split(',').map(|s| s.trim()).collect();
557
558 let query_req = QueryEventsRequest {
560 entity_id: None,
561 event_type: None,
562 tenant_id: None,
563 as_of: None,
564 since: None,
565 until: None,
566 limit: None,
567 event_type_prefix: Some(req.event_type_prefix),
568 payload_filter: None,
569 };
570 let events = store.query(query_req)?;
571
572 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
575 for event in &events {
576 let eid = event.entity_id().to_string();
577 entity_latest
578 .entry(eid)
579 .and_modify(|existing| {
580 if event.timestamp() > existing.timestamp() {
581 *existing = event;
582 }
583 })
584 .or_insert(event);
585 }
586
587 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
589 for (entity_id, event) in &entity_latest {
590 let payload = event.payload();
591 let mut key_parts = serde_json::Map::new();
592 for field in &group_by_fields {
593 let value = payload
594 .get(*field)
595 .cloned()
596 .unwrap_or(serde_json::Value::Null);
597 key_parts.insert(field.to_string(), value);
598 }
599 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
600 groups.entry(key_str).or_default().push(entity_id.clone());
601 }
602
603 let mut duplicate_groups: Vec<DuplicateGroup> = groups
605 .into_iter()
606 .filter(|(_, ids)| ids.len() > 1)
607 .map(|(key_str, mut ids)| {
608 ids.sort();
609 let key: serde_json::Value =
610 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
611 let count = ids.len();
612 DuplicateGroup {
613 key,
614 entity_ids: ids,
615 count,
616 }
617 })
618 .collect();
619
620 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
622
623 let total = duplicate_groups.len();
624
625 let offset = req.offset.unwrap_or(0);
627 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
628
629 if let Some(limit) = req.limit {
630 let has_more = duplicate_groups.len() > limit;
631 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
632 return Ok(Json(DetectDuplicatesResponse {
633 duplicates: truncated,
634 total,
635 has_more,
636 }));
637 }
638
639 Ok(Json(DetectDuplicatesResponse {
640 duplicates: duplicate_groups,
641 total,
642 has_more: false,
643 }))
644}
645
646#[derive(Deserialize)]
647pub struct EntityStateParams {
648 as_of: Option<chrono::DateTime<chrono::Utc>>,
649}
650
651pub async fn get_entity_state(
652 State(store): State<SharedStore>,
653 Path(entity_id): Path<String>,
654 Query(params): Query<EntityStateParams>,
655) -> Result<Json<serde_json::Value>> {
656 let state = store.reconstruct_state(&entity_id, params.as_of)?;
657
658 tracing::info!("State reconstructed for entity: {}", entity_id);
659
660 Ok(Json(state))
661}
662
663pub async fn get_entity_snapshot(
664 State(store): State<SharedStore>,
665 Path(entity_id): Path<String>,
666) -> Result<Json<serde_json::Value>> {
667 let snapshot = store.get_snapshot(&entity_id)?;
668
669 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
670
671 Ok(Json(snapshot))
672}
673
674pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
675 let stats = store.stats();
676 Json(stats)
677}
678
679#[derive(Debug, Deserialize)]
682pub struct ListStreamsParams {
683 pub limit: Option<usize>,
685 pub offset: Option<usize>,
687}
688
689#[derive(Debug, serde::Serialize)]
691pub struct ListStreamsResponse {
692 pub streams: Vec<StreamInfo>,
693 pub total: usize,
694}
695
696pub async fn list_streams(
697 State(store): State<SharedStore>,
698 Query(params): Query<ListStreamsParams>,
699) -> Json<ListStreamsResponse> {
700 let mut streams = store.list_streams();
701 let total = streams.len();
702
703 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
705
706 if let Some(offset) = params.offset {
708 if offset < streams.len() {
709 streams = streams[offset..].to_vec();
710 } else {
711 streams = vec![];
712 }
713 }
714
715 if let Some(limit) = params.limit {
716 streams.truncate(limit);
717 }
718
719 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
720
721 Json(ListStreamsResponse { streams, total })
722}
723
724#[derive(Debug, Deserialize)]
727pub struct ListEventTypesParams {
728 pub limit: Option<usize>,
730 pub offset: Option<usize>,
732}
733
734#[derive(Debug, serde::Serialize)]
736pub struct ListEventTypesResponse {
737 pub event_types: Vec<EventTypeInfo>,
738 pub total: usize,
739}
740
741pub async fn list_event_types(
742 State(store): State<SharedStore>,
743 Query(params): Query<ListEventTypesParams>,
744) -> Json<ListEventTypesResponse> {
745 let mut event_types = store.list_event_types();
746 let total = event_types.len();
747
748 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
750
751 if let Some(offset) = params.offset {
753 if offset < event_types.len() {
754 event_types = event_types[offset..].to_vec();
755 } else {
756 event_types = vec![];
757 }
758 }
759
760 if let Some(limit) = params.limit {
761 event_types.truncate(limit);
762 }
763
764 tracing::debug!(
765 "Listed {} event types (total: {})",
766 event_types.len(),
767 total
768 );
769
770 Json(ListEventTypesResponse { event_types, total })
771}
772
773#[derive(Debug, Deserialize)]
775pub struct WebSocketParams {
776 pub consumer_id: Option<String>,
777}
778
779pub async fn events_websocket(
780 ws: WebSocketUpgrade,
781 State(store): State<SharedStore>,
782 Query(params): Query<WebSocketParams>,
783) -> Response {
784 let websocket_manager = store.websocket_manager();
785
786 ws.on_upgrade(move |socket| async move {
787 if let Some(consumer_id) = params.consumer_id {
788 websocket_manager
789 .handle_socket_with_consumer(socket, consumer_id, store)
790 .await;
791 } else {
792 websocket_manager.handle_socket(socket).await;
793 }
794 })
795}
796
797pub async fn analytics_frequency(
799 State(store): State<SharedStore>,
800 Query(req): Query<EventFrequencyRequest>,
801) -> Result<Json<EventFrequencyResponse>> {
802 let response = AnalyticsEngine::event_frequency(&store, req)?;
803
804 tracing::debug!(
805 "Frequency analysis returned {} buckets",
806 response.buckets.len()
807 );
808
809 Ok(Json(response))
810}
811
812pub async fn analytics_summary(
814 State(store): State<SharedStore>,
815 Query(req): Query<StatsSummaryRequest>,
816) -> Result<Json<StatsSummaryResponse>> {
817 let response = AnalyticsEngine::stats_summary(&store, req)?;
818
819 tracing::debug!(
820 "Stats summary: {} events across {} entities",
821 response.total_events,
822 response.unique_entities
823 );
824
825 Ok(Json(response))
826}
827
828pub async fn analytics_correlation(
830 State(store): State<SharedStore>,
831 Query(req): Query<CorrelationRequest>,
832) -> Result<Json<CorrelationResponse>> {
833 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
834
835 tracing::debug!(
836 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
837 response.correlated_pairs,
838 response.total_a,
839 response.correlation_percentage
840 );
841
842 Ok(Json(response))
843}
844
845pub async fn create_snapshot(
847 State(store): State<SharedStore>,
848 Json(req): Json<CreateSnapshotRequest>,
849) -> Result<Json<CreateSnapshotResponse>> {
850 store.create_snapshot(&req.entity_id)?;
851
852 let snapshot_manager = store.snapshot_manager();
853 let snapshot = snapshot_manager
854 .get_latest_snapshot(&req.entity_id)
855 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
856
857 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
858
859 Ok(Json(CreateSnapshotResponse {
860 snapshot_id: snapshot.id,
861 entity_id: snapshot.entity_id,
862 created_at: snapshot.created_at,
863 event_count: snapshot.event_count,
864 size_bytes: snapshot.metadata.size_bytes,
865 }))
866}
867
868pub async fn list_snapshots(
870 State(store): State<SharedStore>,
871 Query(req): Query<ListSnapshotsRequest>,
872) -> Result<Json<ListSnapshotsResponse>> {
873 let snapshot_manager = store.snapshot_manager();
874
875 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
876 snapshot_manager
877 .get_all_snapshots(&entity_id)
878 .into_iter()
879 .map(SnapshotInfo::from)
880 .collect()
881 } else {
882 let entities = snapshot_manager.list_entities();
884 entities
885 .iter()
886 .flat_map(|entity_id| {
887 snapshot_manager
888 .get_all_snapshots(entity_id)
889 .into_iter()
890 .map(SnapshotInfo::from)
891 })
892 .collect()
893 };
894
895 let total = snapshots.len();
896
897 tracing::debug!("Listed {} snapshots", total);
898
899 Ok(Json(ListSnapshotsResponse { snapshots, total }))
900}
901
902pub async fn get_latest_snapshot(
904 State(store): State<SharedStore>,
905 Path(entity_id): Path<String>,
906) -> Result<Json<serde_json::Value>> {
907 let snapshot_manager = store.snapshot_manager();
908
909 let snapshot = snapshot_manager
910 .get_latest_snapshot(&entity_id)
911 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
912
913 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
914
915 Ok(Json(serde_json::json!({
916 "snapshot_id": snapshot.id,
917 "entity_id": snapshot.entity_id,
918 "created_at": snapshot.created_at,
919 "as_of": snapshot.as_of,
920 "event_count": snapshot.event_count,
921 "size_bytes": snapshot.metadata.size_bytes,
922 "snapshot_type": snapshot.metadata.snapshot_type,
923 "state": snapshot.state
924 })))
925}
926
927pub async fn trigger_compaction(
929 State(store): State<SharedStore>,
930) -> Result<Json<CompactionResult>> {
931 let compaction_manager = store.compaction_manager().ok_or_else(|| {
932 crate::error::AllSourceError::InternalError(
933 "Compaction not enabled (no Parquet storage)".to_string(),
934 )
935 })?;
936
937 tracing::info!("📦 Manual compaction triggered via API");
938
939 let result = compaction_manager.compact_now()?;
940
941 Ok(Json(result))
942}
943
944pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
946 let compaction_manager = store.compaction_manager().ok_or_else(|| {
947 crate::error::AllSourceError::InternalError(
948 "Compaction not enabled (no Parquet storage)".to_string(),
949 )
950 })?;
951
952 let stats = compaction_manager.stats();
953 let config = compaction_manager.config();
954
955 Ok(Json(serde_json::json!({
956 "stats": stats,
957 "config": {
958 "min_files_to_compact": config.min_files_to_compact,
959 "target_file_size": config.target_file_size,
960 "max_file_size": config.max_file_size,
961 "small_file_threshold": config.small_file_threshold,
962 "compaction_interval_seconds": config.compaction_interval_seconds,
963 "auto_compact": config.auto_compact,
964 "strategy": config.strategy
965 }
966 })))
967}
968
969pub async fn register_schema(
971 State(store): State<SharedStore>,
972 Json(req): Json<RegisterSchemaRequest>,
973) -> Result<Json<RegisterSchemaResponse>> {
974 let schema_registry = store.schema_registry();
975
976 let response =
977 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
978
979 tracing::info!(
980 "📋 Schema registered: v{} for '{}'",
981 response.version,
982 response.subject
983 );
984
985 Ok(Json(response))
986}
987
988#[derive(Deserialize)]
990pub struct GetSchemaParams {
991 version: Option<u32>,
992}
993
994pub async fn get_schema(
995 State(store): State<SharedStore>,
996 Path(subject): Path<String>,
997 Query(params): Query<GetSchemaParams>,
998) -> Result<Json<serde_json::Value>> {
999 let schema_registry = store.schema_registry();
1000
1001 let schema = schema_registry.get_schema(&subject, params.version)?;
1002
1003 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1004
1005 Ok(Json(serde_json::json!({
1006 "id": schema.id,
1007 "subject": schema.subject,
1008 "version": schema.version,
1009 "schema": schema.schema,
1010 "created_at": schema.created_at,
1011 "description": schema.description,
1012 "tags": schema.tags
1013 })))
1014}
1015
1016pub async fn list_schema_versions(
1018 State(store): State<SharedStore>,
1019 Path(subject): Path<String>,
1020) -> Result<Json<serde_json::Value>> {
1021 let schema_registry = store.schema_registry();
1022
1023 let versions = schema_registry.list_versions(&subject)?;
1024
1025 Ok(Json(serde_json::json!({
1026 "subject": subject,
1027 "versions": versions
1028 })))
1029}
1030
1031pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1033 let schema_registry = store.schema_registry();
1034
1035 let subjects = schema_registry.list_subjects();
1036
1037 Json(serde_json::json!({
1038 "subjects": subjects,
1039 "total": subjects.len()
1040 }))
1041}
1042
1043pub async fn validate_event_schema(
1045 State(store): State<SharedStore>,
1046 Json(req): Json<ValidateEventRequest>,
1047) -> Result<Json<ValidateEventResponse>> {
1048 let schema_registry = store.schema_registry();
1049
1050 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1051
1052 if response.valid {
1053 tracing::debug!(
1054 "✅ Event validated against schema '{}' v{}",
1055 req.subject,
1056 response.schema_version
1057 );
1058 } else {
1059 tracing::warn!(
1060 "❌ Event validation failed for '{}': {:?}",
1061 req.subject,
1062 response.errors
1063 );
1064 }
1065
1066 Ok(Json(response))
1067}
1068
1069#[derive(Deserialize)]
1071pub struct SetCompatibilityRequest {
1072 compatibility: CompatibilityMode,
1073}
1074
1075pub async fn set_compatibility_mode(
1076 State(store): State<SharedStore>,
1077 Path(subject): Path<String>,
1078 Json(req): Json<SetCompatibilityRequest>,
1079) -> Json<serde_json::Value> {
1080 let schema_registry = store.schema_registry();
1081
1082 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1083
1084 tracing::info!(
1085 "🔧 Set compatibility mode for '{}' to {:?}",
1086 subject,
1087 req.compatibility
1088 );
1089
1090 Json(serde_json::json!({
1091 "subject": subject,
1092 "compatibility": req.compatibility
1093 }))
1094}
1095
1096pub async fn start_replay(
1098 State(store): State<SharedStore>,
1099 Json(req): Json<StartReplayRequest>,
1100) -> Result<Json<StartReplayResponse>> {
1101 let replay_manager = store.replay_manager();
1102
1103 let response = replay_manager.start_replay(store, req)?;
1104
1105 tracing::info!(
1106 "🔄 Started replay {} with {} events",
1107 response.replay_id,
1108 response.total_events
1109 );
1110
1111 Ok(Json(response))
1112}
1113
1114pub async fn get_replay_progress(
1116 State(store): State<SharedStore>,
1117 Path(replay_id): Path<uuid::Uuid>,
1118) -> Result<Json<ReplayProgress>> {
1119 let replay_manager = store.replay_manager();
1120
1121 let progress = replay_manager.get_progress(replay_id)?;
1122
1123 Ok(Json(progress))
1124}
1125
1126pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1128 let replay_manager = store.replay_manager();
1129
1130 let replays = replay_manager.list_replays();
1131
1132 Json(serde_json::json!({
1133 "replays": replays,
1134 "total": replays.len()
1135 }))
1136}
1137
1138pub async fn cancel_replay(
1140 State(store): State<SharedStore>,
1141 Path(replay_id): Path<uuid::Uuid>,
1142) -> Result<Json<serde_json::Value>> {
1143 let replay_manager = store.replay_manager();
1144
1145 replay_manager.cancel_replay(replay_id)?;
1146
1147 tracing::info!("🛑 Cancelled replay {}", replay_id);
1148
1149 Ok(Json(serde_json::json!({
1150 "replay_id": replay_id,
1151 "status": "cancelled"
1152 })))
1153}
1154
1155pub async fn delete_replay(
1157 State(store): State<SharedStore>,
1158 Path(replay_id): Path<uuid::Uuid>,
1159) -> Result<Json<serde_json::Value>> {
1160 let replay_manager = store.replay_manager();
1161
1162 let deleted = replay_manager.delete_replay(replay_id)?;
1163
1164 if deleted {
1165 tracing::info!("🗑️ Deleted replay {}", replay_id);
1166 }
1167
1168 Ok(Json(serde_json::json!({
1169 "replay_id": replay_id,
1170 "deleted": deleted
1171 })))
1172}
1173
1174pub async fn register_pipeline(
1176 State(store): State<SharedStore>,
1177 Json(config): Json<PipelineConfig>,
1178) -> Result<Json<serde_json::Value>> {
1179 let pipeline_manager = store.pipeline_manager();
1180
1181 let pipeline_id = pipeline_manager.register(config.clone());
1182
1183 tracing::info!(
1184 "🔀 Pipeline registered: {} (name: {})",
1185 pipeline_id,
1186 config.name
1187 );
1188
1189 Ok(Json(serde_json::json!({
1190 "pipeline_id": pipeline_id,
1191 "name": config.name,
1192 "enabled": config.enabled
1193 })))
1194}
1195
1196pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1198 let pipeline_manager = store.pipeline_manager();
1199
1200 let pipelines = pipeline_manager.list();
1201
1202 tracing::debug!("Listed {} pipelines", pipelines.len());
1203
1204 Json(serde_json::json!({
1205 "pipelines": pipelines,
1206 "total": pipelines.len()
1207 }))
1208}
1209
1210pub async fn get_pipeline(
1212 State(store): State<SharedStore>,
1213 Path(pipeline_id): Path<uuid::Uuid>,
1214) -> Result<Json<PipelineConfig>> {
1215 let pipeline_manager = store.pipeline_manager();
1216
1217 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1218 crate::error::AllSourceError::ValidationError(format!(
1219 "Pipeline not found: {}",
1220 pipeline_id
1221 ))
1222 })?;
1223
1224 Ok(Json(pipeline.config().clone()))
1225}
1226
1227pub async fn remove_pipeline(
1229 State(store): State<SharedStore>,
1230 Path(pipeline_id): Path<uuid::Uuid>,
1231) -> Result<Json<serde_json::Value>> {
1232 let pipeline_manager = store.pipeline_manager();
1233
1234 let removed = pipeline_manager.remove(pipeline_id);
1235
1236 if removed {
1237 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1238 }
1239
1240 Ok(Json(serde_json::json!({
1241 "pipeline_id": pipeline_id,
1242 "removed": removed
1243 })))
1244}
1245
1246pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1248 let pipeline_manager = store.pipeline_manager();
1249
1250 let stats = pipeline_manager.all_stats();
1251
1252 Json(serde_json::json!({
1253 "stats": stats,
1254 "total": stats.len()
1255 }))
1256}
1257
1258pub async fn get_pipeline_stats(
1260 State(store): State<SharedStore>,
1261 Path(pipeline_id): Path<uuid::Uuid>,
1262) -> Result<Json<PipelineStats>> {
1263 let pipeline_manager = store.pipeline_manager();
1264
1265 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1266 crate::error::AllSourceError::ValidationError(format!(
1267 "Pipeline not found: {}",
1268 pipeline_id
1269 ))
1270 })?;
1271
1272 Ok(Json(pipeline.stats()))
1273}
1274
1275pub async fn reset_pipeline(
1277 State(store): State<SharedStore>,
1278 Path(pipeline_id): Path<uuid::Uuid>,
1279) -> Result<Json<serde_json::Value>> {
1280 let pipeline_manager = store.pipeline_manager();
1281
1282 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1283 crate::error::AllSourceError::ValidationError(format!(
1284 "Pipeline not found: {}",
1285 pipeline_id
1286 ))
1287 })?;
1288
1289 pipeline.reset();
1290
1291 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1292
1293 Ok(Json(serde_json::json!({
1294 "pipeline_id": pipeline_id,
1295 "reset": true
1296 })))
1297}
1298
1299pub async fn get_event_by_id(
1305 State(store): State<SharedStore>,
1306 Path(event_id): Path<uuid::Uuid>,
1307) -> Result<Json<serde_json::Value>> {
1308 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1309 crate::error::AllSourceError::EntityNotFound(format!("Event '{}' not found", event_id))
1310 })?;
1311
1312 let dto = EventDto::from(&event);
1313
1314 tracing::debug!("Event retrieved by ID: {}", event_id);
1315
1316 Ok(Json(serde_json::json!({
1317 "event": dto,
1318 "found": true
1319 })))
1320}
1321
1322pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1328 let projection_manager = store.projection_manager();
1329 let status_map = store.projection_status();
1330
1331 let projections: Vec<serde_json::Value> = projection_manager
1332 .list_projections()
1333 .iter()
1334 .map(|(name, projection)| {
1335 let status = status_map
1336 .get(name)
1337 .map(|s| s.value().clone())
1338 .unwrap_or_else(|| "running".to_string());
1339 serde_json::json!({
1340 "name": name,
1341 "type": format!("{:?}", projection.name()),
1342 "status": status,
1343 })
1344 })
1345 .collect();
1346
1347 tracing::debug!("Listed {} projections", projections.len());
1348
1349 Json(serde_json::json!({
1350 "projections": projections,
1351 "total": projections.len()
1352 }))
1353}
1354
1355pub async fn get_projection(
1357 State(store): State<SharedStore>,
1358 Path(name): Path<String>,
1359) -> Result<Json<serde_json::Value>> {
1360 let projection_manager = store.projection_manager();
1361
1362 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1363 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1364 })?;
1365
1366 Ok(Json(serde_json::json!({
1367 "name": projection.name(),
1368 "found": true
1369 })))
1370}
1371
1372pub async fn get_projection_state(
1377 State(store): State<SharedStore>,
1378 Path((name, entity_id)): Path<(String, String)>,
1379) -> Result<Json<serde_json::Value>> {
1380 let projection_manager = store.projection_manager();
1381
1382 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1383 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1384 })?;
1385
1386 let state = projection.get_state(&entity_id);
1387
1388 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1389
1390 Ok(Json(serde_json::json!({
1391 "projection": name,
1392 "entity_id": entity_id,
1393 "state": state,
1394 "found": state.is_some()
1395 })))
1396}
1397
1398pub async fn delete_projection(
1403 State(store): State<SharedStore>,
1404 Path(name): Path<String>,
1405) -> Result<Json<serde_json::Value>> {
1406 let projection_manager = store.projection_manager();
1407
1408 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1409 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1410 })?;
1411
1412 projection.clear();
1413
1414 let cache = store.projection_state_cache();
1416 let prefix = format!("{name}:");
1417 let keys_to_remove: Vec<String> = cache
1418 .iter()
1419 .filter(|entry| entry.key().starts_with(&prefix))
1420 .map(|entry| entry.key().clone())
1421 .collect();
1422 for key in keys_to_remove {
1423 cache.remove(&key);
1424 }
1425
1426 tracing::info!("Projection deleted (cleared): {}", name);
1427
1428 Ok(Json(serde_json::json!({
1429 "projection": name,
1430 "deleted": true
1431 })))
1432}
1433
1434pub async fn get_projection_state_summary(
1438 State(store): State<SharedStore>,
1439 Path(name): Path<String>,
1440) -> Result<Json<serde_json::Value>> {
1441 let projection_manager = store.projection_manager();
1442
1443 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1444 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1445 })?;
1446
1447 let cache = store.projection_state_cache();
1449 let prefix = format!("{name}:");
1450 let states: Vec<serde_json::Value> = cache
1451 .iter()
1452 .filter(|entry| entry.key().starts_with(&prefix))
1453 .map(|entry| {
1454 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1455 serde_json::json!({
1456 "entity_id": entity_id,
1457 "state": entry.value().clone()
1458 })
1459 })
1460 .collect();
1461
1462 let total = states.len();
1463
1464 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1465
1466 Ok(Json(serde_json::json!({
1467 "projection": name,
1468 "states": states,
1469 "total": total
1470 })))
1471}
1472
1473pub async fn reset_projection(
1477 State(store): State<SharedStore>,
1478 Path(name): Path<String>,
1479) -> Result<Json<serde_json::Value>> {
1480 let reprocessed = store.reset_projection(&name)?;
1481
1482 tracing::info!(
1483 "Projection reset: {} ({} events reprocessed)",
1484 name,
1485 reprocessed
1486 );
1487
1488 Ok(Json(serde_json::json!({
1489 "projection": name,
1490 "reset": true,
1491 "events_reprocessed": reprocessed
1492 })))
1493}
1494
1495pub async fn pause_projection(
1499 State(store): State<SharedStore>,
1500 Path(name): Path<String>,
1501) -> Result<Json<serde_json::Value>> {
1502 let projection_manager = store.projection_manager();
1503
1504 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1506 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1507 })?;
1508
1509 store
1510 .projection_status()
1511 .insert(name.clone(), "paused".to_string());
1512
1513 tracing::info!("Projection paused: {}", name);
1514
1515 Ok(Json(serde_json::json!({
1516 "projection": name,
1517 "status": "paused"
1518 })))
1519}
1520
1521pub async fn start_projection(
1525 State(store): State<SharedStore>,
1526 Path(name): Path<String>,
1527) -> Result<Json<serde_json::Value>> {
1528 let projection_manager = store.projection_manager();
1529
1530 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1532 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1533 })?;
1534
1535 store
1536 .projection_status()
1537 .insert(name.clone(), "running".to_string());
1538
1539 tracing::info!("Projection started: {}", name);
1540
1541 Ok(Json(serde_json::json!({
1542 "projection": name,
1543 "status": "running"
1544 })))
1545}
1546
1547#[derive(Debug, Deserialize)]
1549pub struct SaveProjectionStateRequest {
1550 pub state: serde_json::Value,
1551}
1552
1553pub async fn save_projection_state(
1558 State(store): State<SharedStore>,
1559 Path((name, entity_id)): Path<(String, String)>,
1560 Json(req): Json<SaveProjectionStateRequest>,
1561) -> Result<Json<serde_json::Value>> {
1562 let projection_cache = store.projection_state_cache();
1563
1564 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1566
1567 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1568
1569 Ok(Json(serde_json::json!({
1570 "projection": name,
1571 "entity_id": entity_id,
1572 "saved": true
1573 })))
1574}
1575
1576#[derive(Debug, Deserialize)]
1580pub struct BulkGetStateRequest {
1581 pub entity_ids: Vec<String>,
1582}
1583
1584#[derive(Debug, Deserialize)]
1588pub struct BulkSaveStateRequest {
1589 pub states: Vec<BulkSaveStateItem>,
1590}
1591
1592#[derive(Debug, Deserialize)]
1593pub struct BulkSaveStateItem {
1594 pub entity_id: String,
1595 pub state: serde_json::Value,
1596}
1597
1598pub async fn bulk_get_projection_states(
1599 State(store): State<SharedStore>,
1600 Path(name): Path<String>,
1601 Json(req): Json<BulkGetStateRequest>,
1602) -> Result<Json<serde_json::Value>> {
1603 let projection_manager = store.projection_manager();
1604
1605 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1606 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1607 })?;
1608
1609 let states: Vec<serde_json::Value> = req
1610 .entity_ids
1611 .iter()
1612 .map(|entity_id| {
1613 let state = projection.get_state(entity_id);
1614 serde_json::json!({
1615 "entity_id": entity_id,
1616 "state": state,
1617 "found": state.is_some()
1618 })
1619 })
1620 .collect();
1621
1622 tracing::debug!(
1623 "Bulk projection state retrieved: {} entities from {}",
1624 states.len(),
1625 name
1626 );
1627
1628 Ok(Json(serde_json::json!({
1629 "projection": name,
1630 "states": states,
1631 "total": states.len()
1632 })))
1633}
1634
1635pub async fn bulk_save_projection_states(
1640 State(store): State<SharedStore>,
1641 Path(name): Path<String>,
1642 Json(req): Json<BulkSaveStateRequest>,
1643) -> Result<Json<serde_json::Value>> {
1644 let projection_cache = store.projection_state_cache();
1645
1646 let mut saved_count = 0;
1647 for item in &req.states {
1648 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1649 saved_count += 1;
1650 }
1651
1652 tracing::info!(
1653 "Bulk projection state saved: {} entities for {}",
1654 saved_count,
1655 name
1656 );
1657
1658 Ok(Json(serde_json::json!({
1659 "projection": name,
1660 "saved": saved_count,
1661 "total": req.states.len()
1662 })))
1663}
1664
1665#[derive(Debug, Deserialize)]
1671pub struct ListWebhooksParams {
1672 pub tenant_id: Option<String>,
1673}
1674
1675pub async fn register_webhook(
1677 State(store): State<SharedStore>,
1678 Json(req): Json<RegisterWebhookRequest>,
1679) -> Json<serde_json::Value> {
1680 let registry = store.webhook_registry();
1681 let webhook = registry.register(req);
1682
1683 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1684
1685 Json(serde_json::json!({
1686 "webhook": webhook,
1687 "created": true
1688 }))
1689}
1690
1691pub async fn list_webhooks(
1693 State(store): State<SharedStore>,
1694 Query(params): Query<ListWebhooksParams>,
1695) -> Json<serde_json::Value> {
1696 let registry = store.webhook_registry();
1697
1698 let webhooks = if let Some(tenant_id) = params.tenant_id {
1699 registry.list_by_tenant(&tenant_id)
1700 } else {
1701 vec![]
1703 };
1704
1705 let total = webhooks.len();
1706
1707 Json(serde_json::json!({
1708 "webhooks": webhooks,
1709 "total": total
1710 }))
1711}
1712
1713pub async fn get_webhook(
1715 State(store): State<SharedStore>,
1716 Path(webhook_id): Path<uuid::Uuid>,
1717) -> Result<Json<serde_json::Value>> {
1718 let registry = store.webhook_registry();
1719
1720 let webhook = registry.get(webhook_id).ok_or_else(|| {
1721 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1722 })?;
1723
1724 Ok(Json(serde_json::json!({
1725 "webhook": webhook,
1726 "found": true
1727 })))
1728}
1729
1730pub async fn update_webhook(
1732 State(store): State<SharedStore>,
1733 Path(webhook_id): Path<uuid::Uuid>,
1734 Json(req): Json<UpdateWebhookRequest>,
1735) -> Result<Json<serde_json::Value>> {
1736 let registry = store.webhook_registry();
1737
1738 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1739 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1740 })?;
1741
1742 tracing::info!("Webhook updated: {}", webhook_id);
1743
1744 Ok(Json(serde_json::json!({
1745 "webhook": webhook,
1746 "updated": true
1747 })))
1748}
1749
1750pub async fn delete_webhook(
1752 State(store): State<SharedStore>,
1753 Path(webhook_id): Path<uuid::Uuid>,
1754) -> Result<Json<serde_json::Value>> {
1755 let registry = store.webhook_registry();
1756
1757 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1758 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1759 })?;
1760
1761 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1762
1763 Ok(Json(serde_json::json!({
1764 "webhook_id": webhook_id,
1765 "deleted": true
1766 })))
1767}
1768
1769#[derive(Debug, Deserialize)]
1771pub struct ListDeliveriesParams {
1772 pub limit: Option<usize>,
1773}
1774
1775pub async fn list_webhook_deliveries(
1777 State(store): State<SharedStore>,
1778 Path(webhook_id): Path<uuid::Uuid>,
1779 Query(params): Query<ListDeliveriesParams>,
1780) -> Result<Json<serde_json::Value>> {
1781 let registry = store.webhook_registry();
1782
1783 registry.get(webhook_id).ok_or_else(|| {
1785 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1786 })?;
1787
1788 let limit = params.limit.unwrap_or(50);
1789 let deliveries = registry.get_deliveries(webhook_id, limit);
1790 let total = deliveries.len();
1791
1792 Ok(Json(serde_json::json!({
1793 "webhook_id": webhook_id,
1794 "deliveries": deliveries,
1795 "total": total
1796 })))
1797}
1798
1799pub async fn eventql_query(
1805 State(store): State<SharedStore>,
1806 Json(req): Json<EventQLRequest>,
1807) -> Result<Json<serde_json::Value>> {
1808 let events = store.snapshot_events();
1809 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1810 Ok(response) => Ok(Json(serde_json::json!({
1811 "columns": response.columns,
1812 "rows": response.rows,
1813 "row_count": response.row_count,
1814 }))),
1815 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1816 }
1817}
1818
1819pub async fn graphql_query(
1821 State(store): State<SharedStore>,
1822 Json(req): Json<GraphQLRequest>,
1823) -> Json<serde_json::Value> {
1824 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1825 Ok(f) => f,
1826 Err(e) => {
1827 return Json(
1828 serde_json::to_value(GraphQLResponse {
1829 data: None,
1830 errors: vec![GraphQLError { message: e }],
1831 })
1832 .unwrap(),
1833 );
1834 }
1835 };
1836
1837 let mut data = serde_json::Map::new();
1838 let mut errors = Vec::new();
1839
1840 for field in &fields {
1841 match field.name.as_str() {
1842 "events" => {
1843 let request = crate::application::dto::QueryEventsRequest {
1844 entity_id: field.arguments.get("entity_id").cloned(),
1845 event_type: field.arguments.get("event_type").cloned(),
1846 tenant_id: field.arguments.get("tenant_id").cloned(),
1847 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1848 as_of: None,
1849 since: None,
1850 until: None,
1851 event_type_prefix: None,
1852 payload_filter: None,
1853 };
1854 match store.query(request) {
1855 Ok(events) => {
1856 let json_events: Vec<serde_json::Value> = events
1857 .iter()
1858 .map(|e| {
1859 crate::infrastructure::query::graphql::event_to_json(
1860 e,
1861 &field.fields,
1862 )
1863 })
1864 .collect();
1865 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1866 }
1867 Err(e) => errors.push(GraphQLError {
1868 message: format!("events query failed: {e}"),
1869 }),
1870 }
1871 }
1872 "event" => {
1873 if let Some(id_str) = field.arguments.get("id") {
1874 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1875 match store.get_event_by_id(&id) {
1876 Ok(Some(event)) => {
1877 data.insert(
1878 "event".to_string(),
1879 crate::infrastructure::query::graphql::event_to_json(
1880 &event,
1881 &field.fields,
1882 ),
1883 );
1884 }
1885 Ok(None) => {
1886 data.insert("event".to_string(), serde_json::Value::Null);
1887 }
1888 Err(e) => errors.push(GraphQLError {
1889 message: format!("event lookup failed: {e}"),
1890 }),
1891 }
1892 } else {
1893 errors.push(GraphQLError {
1894 message: format!("Invalid UUID: {id_str}"),
1895 });
1896 }
1897 } else {
1898 errors.push(GraphQLError {
1899 message: "event query requires 'id' argument".to_string(),
1900 });
1901 }
1902 }
1903 "projections" => {
1904 let pm = store.projection_manager();
1905 let names: Vec<serde_json::Value> = pm
1906 .list_projections()
1907 .iter()
1908 .map(|(name, _)| serde_json::Value::String(name.clone()))
1909 .collect();
1910 data.insert("projections".to_string(), serde_json::Value::Array(names));
1911 }
1912 "stats" => {
1913 let stats = store.stats();
1914 data.insert(
1915 "stats".to_string(),
1916 serde_json::json!({
1917 "total_events": stats.total_events,
1918 "total_entities": stats.total_entities,
1919 "total_event_types": stats.total_event_types,
1920 }),
1921 );
1922 }
1923 "__schema" => {
1924 data.insert(
1925 "__schema".to_string(),
1926 crate::infrastructure::query::graphql::introspection_schema(),
1927 );
1928 }
1929 other => {
1930 errors.push(GraphQLError {
1931 message: format!("Unknown field: {other}"),
1932 });
1933 }
1934 }
1935 }
1936
1937 Json(
1938 serde_json::to_value(GraphQLResponse {
1939 data: Some(serde_json::Value::Object(data)),
1940 errors,
1941 })
1942 .unwrap(),
1943 )
1944}
1945
1946pub async fn geo_query(
1948 State(store): State<SharedStore>,
1949 Json(req): Json<GeoQueryRequest>,
1950) -> Json<serde_json::Value> {
1951 let events = store.snapshot_events();
1952 let geo_index = store.geo_index();
1953 let results =
1954 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1955 let total = results.len();
1956 Json(serde_json::json!({
1957 "results": results,
1958 "total": total,
1959 }))
1960}
1961
1962pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1964 let stats = store.geo_index().stats();
1965 Json(serde_json::json!(stats))
1966}
1967
1968pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1970 let stats = store.exactly_once().stats();
1971 Json(serde_json::json!(stats))
1972}
1973
1974pub async fn schema_evolution_history(
1976 State(store): State<SharedStore>,
1977 Path(event_type): Path<String>,
1978) -> Json<serde_json::Value> {
1979 let mgr = store.schema_evolution();
1980 let history = mgr.get_history(&event_type);
1981 let version = mgr.get_version(&event_type);
1982 Json(serde_json::json!({
1983 "event_type": event_type,
1984 "current_version": version,
1985 "history": history,
1986 }))
1987}
1988
1989pub async fn schema_evolution_schema(
1991 State(store): State<SharedStore>,
1992 Path(event_type): Path<String>,
1993) -> Json<serde_json::Value> {
1994 let mgr = store.schema_evolution();
1995 if let Some(schema) = mgr.get_schema(&event_type) {
1996 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1997 Json(serde_json::json!({
1998 "event_type": event_type,
1999 "version": mgr.get_version(&event_type),
2000 "inferred_schema": schema,
2001 "json_schema": json_schema,
2002 }))
2003 } else {
2004 Json(serde_json::json!({
2005 "event_type": event_type,
2006 "error": "No schema inferred for this event type"
2007 }))
2008 }
2009}
2010
2011pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2013 let stats = store.schema_evolution().stats();
2014 let event_types = store.schema_evolution().list_event_types();
2015 Json(serde_json::json!({
2016 "stats": stats,
2017 "tracked_event_types": event_types,
2018 }))
2019}
2020
2021#[cfg(feature = "embedded-sync")]
2027pub async fn sync_pull_handler(
2028 State(state): State<AppState>,
2029 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2030) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2031 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2032
2033 let store = &state.store;
2034
2035 let since = request
2038 .version_vector
2039 .values()
2040 .map(|ts| ts.physical_ms)
2041 .min()
2042 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2043
2044 let events = store.query(crate::application::dto::QueryEventsRequest {
2045 entity_id: None,
2046 event_type: None,
2047 tenant_id: None,
2048 as_of: None,
2049 since,
2050 until: None,
2051 limit: None,
2052 event_type_prefix: None,
2053 payload_filter: None,
2054 })?;
2055
2056 let mut replicated = Vec::with_capacity(events.len());
2058 let mut last_ms = 0u64;
2059 let mut logical = 0u32;
2060
2061 for event in &events {
2062 let event_ms = event.timestamp().timestamp_millis() as u64;
2063 if event_ms == last_ms {
2064 logical += 1;
2065 } else {
2066 last_ms = event_ms;
2067 logical = 0;
2068 }
2069
2070 replicated.push(ReplicatedEvent {
2071 event_id: event.id().to_string(),
2072 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2073 origin_region: "server".to_string(),
2074 event_data: serde_json::json!({
2075 "event_type": event.event_type_str(),
2076 "entity_id": event.entity_id_str(),
2077 "tenant_id": event.tenant_id_str(),
2078 "payload": event.payload,
2079 "metadata": event.metadata,
2080 }),
2081 });
2082 }
2083
2084 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2085 events: replicated,
2086 version_vector: std::collections::BTreeMap::new(),
2087 }))
2088}
2089
2090#[cfg(feature = "embedded-sync")]
2092pub async fn sync_push_handler(
2093 State(state): State<AppState>,
2094 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2095) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2096 let store = &state.store;
2097
2098 let mut accepted = 0usize;
2099 let mut skipped = 0usize;
2100
2101 for rep_event in &request.events {
2102 let event_data = &rep_event.event_data;
2103 let event_type = event_data
2104 .get("event_type")
2105 .and_then(|v| v.as_str())
2106 .unwrap_or("unknown")
2107 .to_string();
2108 let entity_id = event_data
2109 .get("entity_id")
2110 .and_then(|v| v.as_str())
2111 .unwrap_or("unknown")
2112 .to_string();
2113 let tenant_id = event_data
2114 .get("tenant_id")
2115 .and_then(|v| v.as_str())
2116 .unwrap_or("default")
2117 .to_string();
2118 let payload = event_data
2119 .get("payload")
2120 .cloned()
2121 .unwrap_or(serde_json::json!({}));
2122 let metadata = event_data.get("metadata").cloned();
2123
2124 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2125 Ok(domain_event) => {
2126 store.ingest(domain_event)?;
2127 accepted += 1;
2128 }
2129 Err(_) => {
2130 skipped += 1;
2131 }
2132 }
2133 }
2134
2135 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2136 accepted,
2137 skipped,
2138 version_vector: std::collections::BTreeMap::new(),
2139 }))
2140}
2141
2142pub async fn register_consumer(
2148 State(store): State<SharedStore>,
2149 Json(req): Json<RegisterConsumerRequest>,
2150) -> Result<Json<ConsumerResponse>> {
2151 let consumer = store
2152 .consumer_registry()
2153 .register(req.consumer_id, req.event_type_filters);
2154
2155 Ok(Json(ConsumerResponse {
2156 consumer_id: consumer.consumer_id,
2157 event_type_filters: consumer.event_type_filters,
2158 cursor_position: consumer.cursor_position,
2159 }))
2160}
2161
2162pub async fn get_consumer(
2164 State(store): State<SharedStore>,
2165 Path(consumer_id): Path<String>,
2166) -> Result<Json<ConsumerResponse>> {
2167 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2168
2169 Ok(Json(ConsumerResponse {
2170 consumer_id: consumer.consumer_id,
2171 event_type_filters: consumer.event_type_filters,
2172 cursor_position: consumer.cursor_position,
2173 }))
2174}
2175
2176#[derive(Debug, Deserialize)]
2178pub struct ConsumerPollQuery {
2179 pub limit: Option<usize>,
2180}
2181
2182pub async fn poll_consumer_events(
2183 State(store): State<SharedStore>,
2184 Path(consumer_id): Path<String>,
2185 Query(query): Query<ConsumerPollQuery>,
2186) -> Result<Json<ConsumerEventsResponse>> {
2187 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2188 let offset = consumer.cursor_position.unwrap_or(0);
2189 let limit = query.limit.unwrap_or(100);
2190
2191 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2192 let count = events.len();
2193
2194 let consumer_events: Vec<ConsumerEventDto> = events
2195 .into_iter()
2196 .map(|(position, event)| ConsumerEventDto {
2197 position,
2198 event: EventDto::from(&event),
2199 })
2200 .collect();
2201
2202 Ok(Json(ConsumerEventsResponse {
2203 events: consumer_events,
2204 count,
2205 }))
2206}
2207
2208pub async fn ack_consumer(
2210 State(store): State<SharedStore>,
2211 Path(consumer_id): Path<String>,
2212 Json(req): Json<AckRequest>,
2213) -> Result<Json<serde_json::Value>> {
2214 let max_offset = store.total_events() as u64;
2215
2216 store
2217 .consumer_registry()
2218 .ack(&consumer_id, req.position, max_offset)
2219 .map_err(crate::error::AllSourceError::InvalidInput)?;
2220
2221 Ok(Json(serde_json::json!({
2222 "status": "ok",
2223 "consumer_id": consumer_id,
2224 "position": req.position,
2225 })))
2226}
2227
2228#[cfg(test)]
2229mod tests {
2230 use super::*;
2231 use crate::{domain::entities::Event, store::EventStore};
2232
2233 fn create_test_store() -> Arc<EventStore> {
2234 Arc::new(EventStore::new())
2235 }
2236
2237 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2238 Event::from_strings(
2239 event_type.to_string(),
2240 entity_id.to_string(),
2241 "test-stream".to_string(),
2242 serde_json::json!({
2243 "name": "Test",
2244 "value": 42
2245 }),
2246 None,
2247 )
2248 .unwrap()
2249 }
2250
2251 #[tokio::test]
2252 async fn test_query_events_has_more_and_total_count() {
2253 let store = create_test_store();
2254
2255 for i in 0..50 {
2257 store
2258 .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2259 .unwrap();
2260 }
2261
2262 let req = QueryEventsRequest {
2264 entity_id: None,
2265 event_type: None,
2266 tenant_id: None,
2267 as_of: None,
2268 since: None,
2269 until: None,
2270 limit: Some(10),
2271 event_type_prefix: None,
2272 payload_filter: None,
2273 };
2274
2275 let requested_limit = req.limit;
2276 let unlimited_req = QueryEventsRequest {
2277 limit: None,
2278 ..QueryEventsRequest {
2279 entity_id: req.entity_id,
2280 event_type: req.event_type,
2281 tenant_id: req.tenant_id,
2282 as_of: req.as_of,
2283 since: req.since,
2284 until: req.until,
2285 limit: None,
2286 event_type_prefix: req.event_type_prefix,
2287 payload_filter: req.payload_filter,
2288 }
2289 };
2290 let all_events = store.query(unlimited_req).unwrap();
2291 let total_count = all_events.len();
2292 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2293 all_events.into_iter().take(limit).collect()
2294 } else {
2295 all_events
2296 };
2297 let count = limited_events.len();
2298 let has_more = count < total_count;
2299
2300 assert_eq!(count, 10);
2301 assert_eq!(total_count, 50);
2302 assert!(has_more);
2303 }
2304
2305 #[tokio::test]
2306 async fn test_query_events_no_more_results() {
2307 let store = create_test_store();
2308
2309 for i in 0..5 {
2311 store
2312 .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2313 .unwrap();
2314 }
2315
2316 let all_events = store
2318 .query(QueryEventsRequest {
2319 entity_id: None,
2320 event_type: None,
2321 tenant_id: None,
2322 as_of: None,
2323 since: None,
2324 until: None,
2325 limit: None,
2326 event_type_prefix: None,
2327 payload_filter: None,
2328 })
2329 .unwrap();
2330 let total_count = all_events.len();
2331 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2332 let count = limited_events.len();
2333 let has_more = count < total_count;
2334
2335 assert_eq!(count, 5);
2336 assert_eq!(total_count, 5);
2337 assert!(!has_more);
2338 }
2339
2340 #[tokio::test]
2341 async fn test_list_entities_by_type_prefix() {
2342 let store = create_test_store();
2343
2344 store
2346 .ingest(create_test_event("idx-1", "index.created"))
2347 .unwrap();
2348 store
2349 .ingest(create_test_event("idx-1", "index.updated"))
2350 .unwrap();
2351 store
2352 .ingest(create_test_event("idx-2", "index.created"))
2353 .unwrap();
2354 store
2355 .ingest(create_test_event("idx-3", "index.created"))
2356 .unwrap();
2357 store
2359 .ingest(create_test_event("trade-1", "trade.created"))
2360 .unwrap();
2361 store
2362 .ingest(create_test_event("trade-2", "trade.created"))
2363 .unwrap();
2364
2365 let req = ListEntitiesRequest {
2367 event_type_prefix: Some("index.".to_string()),
2368 payload_filter: None,
2369 limit: None,
2370 offset: None,
2371 };
2372 let query_req = QueryEventsRequest {
2373 entity_id: None,
2374 event_type: None,
2375 tenant_id: None,
2376 as_of: None,
2377 since: None,
2378 until: None,
2379 limit: None,
2380 event_type_prefix: req.event_type_prefix,
2381 payload_filter: req.payload_filter,
2382 };
2383 let events = store.query(query_req).unwrap();
2384
2385 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2387 std::collections::HashMap::new();
2388 for event in &events {
2389 entity_map
2390 .entry(event.entity_id().to_string())
2391 .or_default()
2392 .push(event);
2393 }
2394
2395 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2398 assert_eq!(entity_map["idx-3"].len(), 1);
2399 }
2400
2401 fn create_test_event_with_payload(
2402 entity_id: &str,
2403 event_type: &str,
2404 payload: serde_json::Value,
2405 ) -> Event {
2406 Event::from_strings(
2407 event_type.to_string(),
2408 entity_id.to_string(),
2409 "test-stream".to_string(),
2410 payload,
2411 None,
2412 )
2413 .unwrap()
2414 }
2415
2416 #[tokio::test]
2417 async fn test_detect_duplicates_by_payload_fields() {
2418 let store = create_test_store();
2419
2420 store
2422 .ingest(create_test_event_with_payload(
2423 "idx-1",
2424 "index.created",
2425 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2426 ))
2427 .unwrap();
2428 store
2429 .ingest(create_test_event_with_payload(
2430 "idx-2",
2431 "index.created",
2432 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2433 ))
2434 .unwrap();
2435 store
2436 .ingest(create_test_event_with_payload(
2437 "idx-3",
2438 "index.created",
2439 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2440 ))
2441 .unwrap();
2442 store
2443 .ingest(create_test_event_with_payload(
2444 "idx-4",
2445 "index.created",
2446 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2447 ))
2448 .unwrap();
2449 store
2450 .ingest(create_test_event_with_payload(
2451 "idx-5",
2452 "index.created",
2453 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2454 ))
2455 .unwrap();
2456
2457 let query_req = QueryEventsRequest {
2459 entity_id: None,
2460 event_type: None,
2461 tenant_id: None,
2462 as_of: None,
2463 since: None,
2464 until: None,
2465 limit: None,
2466 event_type_prefix: Some("index.".to_string()),
2467 payload_filter: None,
2468 };
2469 let events = store.query(query_req).unwrap();
2470
2471 let group_by_fields = vec!["name"];
2473 let mut entity_latest: std::collections::HashMap<String, &Event> =
2474 std::collections::HashMap::new();
2475 for event in &events {
2476 let eid = event.entity_id().to_string();
2477 entity_latest
2478 .entry(eid)
2479 .and_modify(|existing| {
2480 if event.timestamp() > existing.timestamp() {
2481 *existing = event;
2482 }
2483 })
2484 .or_insert(event);
2485 }
2486
2487 let mut groups: std::collections::HashMap<String, Vec<String>> =
2488 std::collections::HashMap::new();
2489 for (entity_id, event) in &entity_latest {
2490 let payload = event.payload();
2491 let mut key_parts = serde_json::Map::new();
2492 for field in &group_by_fields {
2493 let value = payload
2494 .get(*field)
2495 .cloned()
2496 .unwrap_or(serde_json::Value::Null);
2497 key_parts.insert(field.to_string(), value);
2498 }
2499 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2500 groups.entry(key_str).or_default().push(entity_id.clone());
2501 }
2502
2503 let duplicate_groups: Vec<_> = groups
2504 .into_iter()
2505 .filter(|(_, ids)| ids.len() > 1)
2506 .collect();
2507
2508 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2510 assert_eq!(ids.len(), 2);
2511 }
2512 }
2513
2514 #[tokio::test]
2515 async fn test_detect_duplicates_no_duplicates() {
2516 let store = create_test_store();
2517
2518 store
2520 .ingest(create_test_event_with_payload(
2521 "idx-1",
2522 "index.created",
2523 serde_json::json!({"name": "A"}),
2524 ))
2525 .unwrap();
2526 store
2527 .ingest(create_test_event_with_payload(
2528 "idx-2",
2529 "index.created",
2530 serde_json::json!({"name": "B"}),
2531 ))
2532 .unwrap();
2533
2534 let query_req = QueryEventsRequest {
2535 entity_id: None,
2536 event_type: None,
2537 tenant_id: None,
2538 as_of: None,
2539 since: None,
2540 until: None,
2541 limit: None,
2542 event_type_prefix: Some("index.".to_string()),
2543 payload_filter: None,
2544 };
2545 let events = store.query(query_req).unwrap();
2546
2547 let mut entity_latest: std::collections::HashMap<String, &Event> =
2548 std::collections::HashMap::new();
2549 for event in &events {
2550 entity_latest
2551 .entry(event.entity_id().to_string())
2552 .or_insert(event);
2553 }
2554
2555 let mut groups: std::collections::HashMap<String, Vec<String>> =
2556 std::collections::HashMap::new();
2557 for (entity_id, event) in &entity_latest {
2558 let key_str =
2559 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2560 .unwrap();
2561 groups.entry(key_str).or_default().push(entity_id.clone());
2562 }
2563
2564 let duplicate_groups: Vec<_> = groups
2565 .into_iter()
2566 .filter(|(_, ids)| ids.len() > 1)
2567 .collect();
2568
2569 assert_eq!(duplicate_groups.len(), 0); }
2571
2572 #[tokio::test]
2573 async fn test_detect_duplicates_multi_field_group_by() {
2574 let store = create_test_store();
2575
2576 store
2578 .ingest(create_test_event_with_payload(
2579 "idx-1",
2580 "index.created",
2581 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2582 ))
2583 .unwrap();
2584 store
2585 .ingest(create_test_event_with_payload(
2586 "idx-2",
2587 "index.created",
2588 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2589 ))
2590 .unwrap();
2591 store
2593 .ingest(create_test_event_with_payload(
2594 "idx-3",
2595 "index.created",
2596 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2597 ))
2598 .unwrap();
2599
2600 let query_req = QueryEventsRequest {
2601 entity_id: None,
2602 event_type: None,
2603 tenant_id: None,
2604 as_of: None,
2605 since: None,
2606 until: None,
2607 limit: None,
2608 event_type_prefix: Some("index.".to_string()),
2609 payload_filter: None,
2610 };
2611 let events = store.query(query_req).unwrap();
2612
2613 let group_by_fields = vec!["name", "user_id"];
2614 let mut entity_latest: std::collections::HashMap<String, &Event> =
2615 std::collections::HashMap::new();
2616 for event in &events {
2617 entity_latest
2618 .entry(event.entity_id().to_string())
2619 .and_modify(|existing| {
2620 if event.timestamp() > existing.timestamp() {
2621 *existing = event;
2622 }
2623 })
2624 .or_insert(event);
2625 }
2626
2627 let mut groups: std::collections::HashMap<String, Vec<String>> =
2628 std::collections::HashMap::new();
2629 for (entity_id, event) in &entity_latest {
2630 let payload = event.payload();
2631 let mut key_parts = serde_json::Map::new();
2632 for field in &group_by_fields {
2633 let value = payload
2634 .get(*field)
2635 .cloned()
2636 .unwrap_or(serde_json::Value::Null);
2637 key_parts.insert(field.to_string(), value);
2638 }
2639 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2640 groups.entry(key_str).or_default().push(entity_id.clone());
2641 }
2642
2643 let duplicate_groups: Vec<_> = groups
2644 .into_iter()
2645 .filter(|(_, ids)| ids.len() > 1)
2646 .collect();
2647
2648 assert_eq!(duplicate_groups.len(), 1);
2650 let (_, ref ids) = duplicate_groups[0];
2651 assert_eq!(ids.len(), 2);
2652 let mut sorted_ids = ids.clone();
2653 sorted_ids.sort();
2654 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2655 }
2656
2657 #[tokio::test]
2658 async fn test_projection_state_cache() {
2659 let store = create_test_store();
2660
2661 let cache = store.projection_state_cache();
2663 cache.insert(
2664 "entity_snapshots:user-123".to_string(),
2665 serde_json::json!({"name": "Test User", "age": 30}),
2666 );
2667
2668 let state = cache.get("entity_snapshots:user-123");
2670 assert!(state.is_some());
2671 let state = state.unwrap();
2672 assert_eq!(state["name"], "Test User");
2673 assert_eq!(state["age"], 30);
2674 }
2675
2676 #[tokio::test]
2677 async fn test_projection_manager_list_projections() {
2678 let store = create_test_store();
2679
2680 let projection_manager = store.projection_manager();
2682 let projections = projection_manager.list_projections();
2683
2684 assert!(projections.len() >= 2);
2686
2687 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2688 assert!(names.contains(&"entity_snapshots"));
2689 assert!(names.contains(&"event_counters"));
2690 }
2691
2692 #[tokio::test]
2693 async fn test_projection_state_after_event_ingestion() {
2694 let store = create_test_store();
2695
2696 let event = create_test_event("user-456", "user.created");
2698 store.ingest(event).unwrap();
2699
2700 let projection_manager = store.projection_manager();
2702 let snapshot_projection = projection_manager
2703 .get_projection("entity_snapshots")
2704 .unwrap();
2705
2706 let state = snapshot_projection.get_state("user-456");
2707 assert!(state.is_some());
2708 let state = state.unwrap();
2709 assert_eq!(state["name"], "Test");
2710 assert_eq!(state["value"], 42);
2711 }
2712
2713 #[tokio::test]
2714 async fn test_projection_state_cache_multiple_entities() {
2715 let store = create_test_store();
2716 let cache = store.projection_state_cache();
2717
2718 for i in 0..10 {
2720 cache.insert(
2721 format!("entity_snapshots:entity-{}", i),
2722 serde_json::json!({"id": i, "status": "active"}),
2723 );
2724 }
2725
2726 assert_eq!(cache.len(), 10);
2728
2729 for i in 0..10 {
2731 let key = format!("entity_snapshots:entity-{}", i);
2732 let state = cache.get(&key);
2733 assert!(state.is_some());
2734 assert_eq!(state.unwrap()["id"], i);
2735 }
2736 }
2737
2738 #[tokio::test]
2739 async fn test_projection_state_update() {
2740 let store = create_test_store();
2741 let cache = store.projection_state_cache();
2742
2743 cache.insert(
2745 "entity_snapshots:user-789".to_string(),
2746 serde_json::json!({"balance": 100}),
2747 );
2748
2749 cache.insert(
2751 "entity_snapshots:user-789".to_string(),
2752 serde_json::json!({"balance": 150}),
2753 );
2754
2755 let state = cache.get("entity_snapshots:user-789").unwrap();
2757 assert_eq!(state["balance"], 150);
2758 }
2759
2760 #[tokio::test]
2761 async fn test_event_counter_projection() {
2762 let store = create_test_store();
2763
2764 store
2766 .ingest(create_test_event("user-1", "user.created"))
2767 .unwrap();
2768 store
2769 .ingest(create_test_event("user-2", "user.created"))
2770 .unwrap();
2771 store
2772 .ingest(create_test_event("user-1", "user.updated"))
2773 .unwrap();
2774
2775 let projection_manager = store.projection_manager();
2777 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2778
2779 let created_state = counter_projection.get_state("user.created");
2781 assert!(created_state.is_some());
2782 assert_eq!(created_state.unwrap()["count"], 2);
2783
2784 let updated_state = counter_projection.get_state("user.updated");
2785 assert!(updated_state.is_some());
2786 assert_eq!(updated_state.unwrap()["count"], 1);
2787 }
2788
2789 #[tokio::test]
2790 async fn test_projection_state_cache_key_format() {
2791 let store = create_test_store();
2792 let cache = store.projection_state_cache();
2793
2794 let key = "orders:order-12345".to_string();
2796 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2797
2798 let state = cache.get(&key).unwrap();
2799 assert_eq!(state["total"], 99.99);
2800 }
2801
2802 #[tokio::test]
2803 async fn test_projection_state_cache_removal() {
2804 let store = create_test_store();
2805 let cache = store.projection_state_cache();
2806
2807 cache.insert(
2809 "test:entity-1".to_string(),
2810 serde_json::json!({"data": "value"}),
2811 );
2812 assert_eq!(cache.len(), 1);
2813
2814 cache.remove("test:entity-1");
2815 assert_eq!(cache.len(), 0);
2816 assert!(cache.get("test:entity-1").is_none());
2817 }
2818
2819 #[tokio::test]
2820 async fn test_get_nonexistent_projection() {
2821 let store = create_test_store();
2822 let projection_manager = store.projection_manager();
2823
2824 let projection = projection_manager.get_projection("nonexistent_projection");
2826 assert!(projection.is_none());
2827 }
2828
2829 #[tokio::test]
2830 async fn test_get_nonexistent_entity_state() {
2831 let store = create_test_store();
2832 let projection_manager = store.projection_manager();
2833
2834 let snapshot_projection = projection_manager
2836 .get_projection("entity_snapshots")
2837 .unwrap();
2838 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2839 assert!(state.is_none());
2840 }
2841
2842 #[tokio::test]
2843 async fn test_projection_state_cache_concurrent_access() {
2844 let store = create_test_store();
2845 let cache = store.projection_state_cache();
2846
2847 let handles: Vec<_> = (0..10)
2849 .map(|i| {
2850 let cache_clone = cache.clone();
2851 tokio::spawn(async move {
2852 cache_clone.insert(
2853 format!("concurrent:entity-{}", i),
2854 serde_json::json!({"thread": i}),
2855 );
2856 })
2857 })
2858 .collect();
2859
2860 for handle in handles {
2861 handle.await.unwrap();
2862 }
2863
2864 assert_eq!(cache.len(), 10);
2866 }
2867
2868 #[tokio::test]
2869 async fn test_projection_state_large_payload() {
2870 let store = create_test_store();
2871 let cache = store.projection_state_cache();
2872
2873 let large_array: Vec<serde_json::Value> = (0..1000)
2875 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2876 .collect();
2877
2878 cache.insert(
2879 "large:entity-1".to_string(),
2880 serde_json::json!({"items": large_array}),
2881 );
2882
2883 let state = cache.get("large:entity-1").unwrap();
2884 let items = state["items"].as_array().unwrap();
2885 assert_eq!(items.len(), 1000);
2886 }
2887
2888 #[tokio::test]
2889 async fn test_projection_state_complex_json() {
2890 let store = create_test_store();
2891 let cache = store.projection_state_cache();
2892
2893 let complex_state = serde_json::json!({
2895 "user": {
2896 "id": "user-123",
2897 "profile": {
2898 "name": "John Doe",
2899 "email": "john@example.com",
2900 "settings": {
2901 "theme": "dark",
2902 "notifications": true
2903 }
2904 },
2905 "roles": ["admin", "user"],
2906 "metadata": {
2907 "created_at": "2025-01-01T00:00:00Z",
2908 "last_login": null
2909 }
2910 }
2911 });
2912
2913 cache.insert("complex:user-123".to_string(), complex_state);
2914
2915 let state = cache.get("complex:user-123").unwrap();
2916 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2917 assert_eq!(state["user"]["roles"][0], "admin");
2918 assert!(state["user"]["metadata"]["last_login"].is_null());
2919 }
2920
2921 #[tokio::test]
2922 async fn test_projection_state_cache_iteration() {
2923 let store = create_test_store();
2924 let cache = store.projection_state_cache();
2925
2926 for i in 0..5 {
2928 cache.insert(
2929 format!("iter:entity-{}", i),
2930 serde_json::json!({"index": i}),
2931 );
2932 }
2933
2934 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2936 assert_eq!(entries.len(), 5);
2937 }
2938
2939 #[tokio::test]
2940 async fn test_projection_manager_get_entity_snapshots() {
2941 let store = create_test_store();
2942 let projection_manager = store.projection_manager();
2943
2944 let projection = projection_manager.get_projection("entity_snapshots");
2946 assert!(projection.is_some());
2947 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2948 }
2949
2950 #[tokio::test]
2951 async fn test_projection_manager_get_event_counters() {
2952 let store = create_test_store();
2953 let projection_manager = store.projection_manager();
2954
2955 let projection = projection_manager.get_projection("event_counters");
2957 assert!(projection.is_some());
2958 assert_eq!(projection.unwrap().name(), "event_counters");
2959 }
2960
2961 #[tokio::test]
2962 async fn test_projection_state_cache_overwrite() {
2963 let store = create_test_store();
2964 let cache = store.projection_state_cache();
2965
2966 cache.insert(
2968 "overwrite:entity-1".to_string(),
2969 serde_json::json!({"version": 1}),
2970 );
2971
2972 cache.insert(
2974 "overwrite:entity-1".to_string(),
2975 serde_json::json!({"version": 2}),
2976 );
2977
2978 cache.insert(
2980 "overwrite:entity-1".to_string(),
2981 serde_json::json!({"version": 3}),
2982 );
2983
2984 let state = cache.get("overwrite:entity-1").unwrap();
2985 assert_eq!(state["version"], 3);
2986
2987 assert_eq!(cache.len(), 1);
2989 }
2990
2991 #[tokio::test]
2992 async fn test_projection_state_multiple_projections() {
2993 let store = create_test_store();
2994 let cache = store.projection_state_cache();
2995
2996 cache.insert(
2998 "entity_snapshots:user-1".to_string(),
2999 serde_json::json!({"name": "Alice"}),
3000 );
3001 cache.insert(
3002 "event_counters:user.created".to_string(),
3003 serde_json::json!({"count": 5}),
3004 );
3005 cache.insert(
3006 "custom_projection:order-1".to_string(),
3007 serde_json::json!({"total": 150.0}),
3008 );
3009
3010 assert_eq!(
3012 cache.get("entity_snapshots:user-1").unwrap()["name"],
3013 "Alice"
3014 );
3015 assert_eq!(
3016 cache.get("event_counters:user.created").unwrap()["count"],
3017 5
3018 );
3019 assert_eq!(
3020 cache.get("custom_projection:order-1").unwrap()["total"],
3021 150.0
3022 );
3023 }
3024
3025 #[tokio::test]
3026 async fn test_bulk_projection_state_access() {
3027 let store = create_test_store();
3028
3029 for i in 0..5 {
3031 let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
3032 store.ingest(event).unwrap();
3033 }
3034
3035 let projection_manager = store.projection_manager();
3037 let snapshot_projection = projection_manager
3038 .get_projection("entity_snapshots")
3039 .unwrap();
3040
3041 for i in 0..5 {
3043 let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
3044 assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
3045 }
3046 }
3047
3048 #[tokio::test]
3049 async fn test_bulk_save_projection_states() {
3050 let store = create_test_store();
3051 let cache = store.projection_state_cache();
3052
3053 let states = vec![
3055 BulkSaveStateItem {
3056 entity_id: "bulk-entity-1".to_string(),
3057 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3058 },
3059 BulkSaveStateItem {
3060 entity_id: "bulk-entity-2".to_string(),
3061 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3062 },
3063 BulkSaveStateItem {
3064 entity_id: "bulk-entity-3".to_string(),
3065 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3066 },
3067 ];
3068
3069 let projection_name = "test_projection";
3070
3071 for item in &states {
3073 cache.insert(
3074 format!("{projection_name}:{}", item.entity_id),
3075 item.state.clone(),
3076 );
3077 }
3078
3079 assert_eq!(cache.len(), 3);
3081
3082 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3083 assert_eq!(state1["name"], "Entity 1");
3084 assert_eq!(state1["value"], 100);
3085
3086 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3087 assert_eq!(state2["name"], "Entity 2");
3088 assert_eq!(state2["value"], 200);
3089
3090 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3091 assert_eq!(state3["name"], "Entity 3");
3092 assert_eq!(state3["value"], 300);
3093 }
3094
3095 #[tokio::test]
3096 async fn test_bulk_save_empty_states() {
3097 let store = create_test_store();
3098 let cache = store.projection_state_cache();
3099
3100 cache.clear();
3102
3103 let states: Vec<BulkSaveStateItem> = vec![];
3105 assert_eq!(states.len(), 0);
3106
3107 assert_eq!(cache.len(), 0);
3109 }
3110
3111 #[tokio::test]
3112 async fn test_bulk_save_overwrites_existing() {
3113 let store = create_test_store();
3114 let cache = store.projection_state_cache();
3115
3116 cache.insert(
3118 "test:entity-1".to_string(),
3119 serde_json::json!({"version": 1, "data": "initial"}),
3120 );
3121
3122 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3124 cache.insert("test:entity-1".to_string(), new_state);
3125
3126 let state = cache.get("test:entity-1").unwrap();
3128 assert_eq!(state["version"], 2);
3129 assert_eq!(state["data"], "updated");
3130 }
3131
3132 #[tokio::test]
3133 async fn test_bulk_save_high_volume() {
3134 let store = create_test_store();
3135 let cache = store.projection_state_cache();
3136
3137 for i in 0..1000 {
3139 cache.insert(
3140 format!("volume_test:entity-{}", i),
3141 serde_json::json!({"index": i, "status": "active"}),
3142 );
3143 }
3144
3145 assert_eq!(cache.len(), 1000);
3147
3148 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3150 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3151 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3152 }
3153
3154 #[tokio::test]
3155 async fn test_bulk_save_different_projections() {
3156 let store = create_test_store();
3157 let cache = store.projection_state_cache();
3158
3159 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3161
3162 for proj in projections.iter() {
3163 for i in 0..5 {
3164 cache.insert(
3165 format!("{proj}:entity-{i}"),
3166 serde_json::json!({"projection": proj, "id": i}),
3167 );
3168 }
3169 }
3170
3171 assert_eq!(cache.len(), 15);
3173
3174 for proj in projections.iter() {
3176 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3177 assert_eq!(state["projection"], *proj);
3178 }
3179 }
3180}