1#[cfg(feature = "replication")]
2use crate::infrastructure::replication::ReplicationMode;
3use crate::{
4 application::{
5 dto::{
6 AckRequest, ConsumerEventDto, ConsumerEventsResponse, ConsumerResponse,
7 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
8 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
9 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
10 QueryEventsRequest, QueryEventsResponse, RegisterConsumerRequest,
11 },
12 services::{
13 analytics::{
14 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
15 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
16 },
17 pipeline::{PipelineConfig, PipelineStats},
18 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
19 schema::{
20 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
21 ValidateEventRequest, ValidateEventResponse,
22 },
23 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
24 },
25 },
26 domain::entities::Event,
27 error::Result,
28 infrastructure::{
29 persistence::{
30 compaction::CompactionResult,
31 snapshot::{
32 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
33 ListSnapshotsResponse, SnapshotInfo,
34 },
35 },
36 query::{
37 geospatial::GeoQueryRequest,
38 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
39 },
40 security::middleware::OptionalAuth,
41 web::api_v1::AppState,
42 },
43 store::{EventStore, EventTypeInfo, StreamInfo},
44};
45use axum::{
46 Json, Router,
47 extract::{Path, Query, State, WebSocketUpgrade},
48 response::{IntoResponse, Response},
49 routing::{get, post, put},
50};
51use serde::Deserialize;
52use std::sync::Arc;
53use tower_http::{
54 cors::{Any, CorsLayer},
55 trace::TraceLayer,
56};
57
58type SharedStore = Arc<EventStore>;
59
60#[cfg(feature = "replication")]
66async fn await_replication_ack(state: &AppState) {
67 let shipper_guard = state.wal_shipper.read().await;
68 if let Some(ref shipper) = *shipper_guard {
69 let mode = shipper.replication_mode();
70 if mode == ReplicationMode::Async {
71 return;
72 }
73
74 let target_offset = shipper.current_leader_offset();
75 if target_offset == 0 {
76 return;
77 }
78
79 let shipper = Arc::clone(shipper);
80 drop(shipper_guard);
82
83 let timer = state
84 .store
85 .metrics()
86 .replication_ack_wait_seconds
87 .start_timer();
88 let acked = shipper.wait_for_ack(target_offset).await;
89 timer.observe_duration();
90
91 if !acked {
92 tracing::warn!(
93 "Replication ACK timeout in {} mode (offset {}). \
94 Write succeeded locally but follower confirmation pending.",
95 mode,
96 target_offset,
97 );
98 }
99 }
100}
101#[cfg(not(feature = "replication"))]
102async fn await_replication_ack(_state: &AppState) {
103 }
105
106pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
107 let app = Router::new()
108 .route("/health", get(health))
109 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
111 .route("/api/v1/events/batch", post(ingest_events_batch))
112 .route("/api/v1/events/query", get(query_events))
113 .route("/api/v1/events/{event_id}", get(get_event_by_id))
114 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
117 .route("/api/v1/event-types", get(list_event_types))
118 .route("/api/v1/entities/duplicates", get(detect_duplicates))
119 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
120 .route(
121 "/api/v1/entities/{entity_id}/snapshot",
122 get(get_entity_snapshot),
123 )
124 .route("/api/v1/stats", get(get_stats))
125 .route("/api/v1/analytics/frequency", get(analytics_frequency))
127 .route("/api/v1/analytics/summary", get(analytics_summary))
128 .route("/api/v1/analytics/correlation", get(analytics_correlation))
129 .route("/api/v1/snapshots", post(create_snapshot))
131 .route("/api/v1/snapshots", get(list_snapshots))
132 .route(
133 "/api/v1/snapshots/{entity_id}/latest",
134 get(get_latest_snapshot),
135 )
136 .route("/api/v1/compaction/trigger", post(trigger_compaction))
138 .route("/api/v1/compaction/stats", get(compaction_stats))
139 .route("/api/v1/schemas", post(register_schema))
141 .route("/api/v1/schemas", get(list_subjects))
142 .route("/api/v1/schemas/{subject}", get(get_schema))
143 .route(
144 "/api/v1/schemas/{subject}/versions",
145 get(list_schema_versions),
146 )
147 .route("/api/v1/schemas/validate", post(validate_event_schema))
148 .route(
149 "/api/v1/schemas/{subject}/compatibility",
150 put(set_compatibility_mode),
151 )
152 .route("/api/v1/replay", post(start_replay))
154 .route("/api/v1/replay", get(list_replays))
155 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
156 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
157 .route(
158 "/api/v1/replay/{replay_id}",
159 axum::routing::delete(delete_replay),
160 )
161 .route("/api/v1/pipelines", post(register_pipeline))
163 .route("/api/v1/pipelines", get(list_pipelines))
164 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
165 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
166 .route(
167 "/api/v1/pipelines/{pipeline_id}",
168 axum::routing::delete(remove_pipeline),
169 )
170 .route(
171 "/api/v1/pipelines/{pipeline_id}/stats",
172 get(get_pipeline_stats),
173 )
174 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
175 .route("/api/v1/projections", get(list_projections))
177 .route("/api/v1/projections/{name}", get(get_projection))
178 .route(
179 "/api/v1/projections/{name}",
180 axum::routing::delete(delete_projection),
181 )
182 .route(
183 "/api/v1/projections/{name}/state",
184 get(get_projection_state_summary),
185 )
186 .route("/api/v1/projections/{name}/reset", post(reset_projection))
187 .route(
188 "/api/v1/projections/{name}/{entity_id}/state",
189 get(get_projection_state),
190 )
191 .route(
192 "/api/v1/projections/{name}/{entity_id}/state",
193 post(save_projection_state),
194 )
195 .route(
196 "/api/v1/projections/{name}/{entity_id}/state",
197 put(save_projection_state),
198 )
199 .route(
200 "/api/v1/projections/{name}/bulk",
201 post(bulk_get_projection_states),
202 )
203 .route(
204 "/api/v1/projections/{name}/bulk/save",
205 post(bulk_save_projection_states),
206 )
207 .route("/api/v1/webhooks", post(register_webhook))
209 .route("/api/v1/webhooks", get(list_webhooks))
210 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
211 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
212 .route(
213 "/api/v1/webhooks/{webhook_id}",
214 axum::routing::delete(delete_webhook),
215 )
216 .route(
217 "/api/v1/webhooks/{webhook_id}/deliveries",
218 get(list_webhook_deliveries),
219 )
220 .route("/api/v1/graphql", post(graphql_query))
222 .route("/api/v1/geospatial/query", post(geo_query))
223 .route("/api/v1/geospatial/stats", get(geo_stats))
224 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
225 .route(
226 "/api/v1/schema-evolution/history/{event_type}",
227 get(schema_evolution_history),
228 )
229 .route(
230 "/api/v1/schema-evolution/schema/{event_type}",
231 get(schema_evolution_schema),
232 )
233 .route(
234 "/api/v1/schema-evolution/stats",
235 get(schema_evolution_stats),
236 )
237 .layer(
238 CorsLayer::new()
239 .allow_origin(Any)
240 .allow_methods(Any)
241 .allow_headers(Any),
242 )
243 .layer(TraceLayer::new_for_http())
244 .with_state(store);
245
246 let listener = tokio::net::TcpListener::bind(addr).await?;
247 axum::serve(listener, app).await?;
248
249 Ok(())
250}
251
252pub async fn health() -> impl IntoResponse {
253 Json(serde_json::json!({
254 "status": "healthy",
255 "service": "allsource-core",
256 "version": env!("CARGO_PKG_VERSION")
257 }))
258}
259
260pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
262 let metrics = store.metrics();
263
264 match metrics.encode() {
265 Ok(encoded) => Response::builder()
266 .status(200)
267 .header("Content-Type", "text/plain; version=0.0.4")
268 .body(encoded)
269 .unwrap()
270 .into_response(),
271 Err(e) => Response::builder()
272 .status(500)
273 .body(format!("Error encoding metrics: {e}"))
274 .unwrap()
275 .into_response(),
276 }
277}
278
279pub async fn ingest_event(
280 State(store): State<SharedStore>,
281 Json(req): Json<IngestEventRequest>,
282) -> Result<Json<IngestEventResponse>> {
283 let expected_version = req.expected_version;
284
285 let event = Event::from_strings(
287 req.event_type,
288 req.entity_id,
289 "default".to_string(),
290 req.payload,
291 req.metadata,
292 )?;
293
294 let event_id = event.id;
295 let timestamp = event.timestamp;
296
297 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
298
299 tracing::info!("Event ingested: {}", event_id);
300
301 Ok(Json(IngestEventResponse {
302 event_id,
303 timestamp,
304 version: Some(new_version),
305 }))
306}
307
308pub async fn ingest_event_v1(
312 State(state): State<AppState>,
313 Json(req): Json<IngestEventRequest>,
314) -> Result<Json<IngestEventResponse>> {
315 let expected_version = req.expected_version;
316
317 let event = Event::from_strings(
318 req.event_type,
319 req.entity_id,
320 "default".to_string(),
321 req.payload,
322 req.metadata,
323 )?;
324
325 let event_id = event.id;
326 let timestamp = event.timestamp;
327
328 let new_version = state
329 .store
330 .ingest_with_expected_version(&event, expected_version)?;
331
332 await_replication_ack(&state).await;
334
335 tracing::info!("Event ingested: {}", event_id);
336
337 Ok(Json(IngestEventResponse {
338 event_id,
339 timestamp,
340 version: Some(new_version),
341 }))
342}
343
344pub async fn ingest_events_batch(
349 State(store): State<SharedStore>,
350 Json(req): Json<IngestEventsBatchRequest>,
351) -> Result<Json<IngestEventsBatchResponse>> {
352 let total = req.events.len();
353 let mut ingested_events = Vec::with_capacity(total);
354
355 for event_req in req.events {
356 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
357 let expected_version = event_req.expected_version;
358
359 let event = Event::from_strings(
360 event_req.event_type,
361 event_req.entity_id,
362 tenant_id,
363 event_req.payload,
364 event_req.metadata,
365 )?;
366
367 let event_id = event.id;
368 let timestamp = event.timestamp;
369
370 let new_version = store.ingest_with_expected_version(&event, expected_version)?;
371
372 ingested_events.push(IngestEventResponse {
373 event_id,
374 timestamp,
375 version: Some(new_version),
376 });
377 }
378
379 let ingested = ingested_events.len();
380 tracing::info!("Batch ingested {} events", ingested);
381
382 Ok(Json(IngestEventsBatchResponse {
383 total,
384 ingested,
385 events: ingested_events,
386 }))
387}
388
389pub async fn ingest_events_batch_v1(
393 State(state): State<AppState>,
394 Json(req): Json<IngestEventsBatchRequest>,
395) -> Result<Json<IngestEventsBatchResponse>> {
396 let total = req.events.len();
397 let mut ingested_events = Vec::with_capacity(total);
398
399 for event_req in req.events {
400 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
401 let expected_version = event_req.expected_version;
402
403 let event = Event::from_strings(
404 event_req.event_type,
405 event_req.entity_id,
406 tenant_id,
407 event_req.payload,
408 event_req.metadata,
409 )?;
410
411 let event_id = event.id;
412 let timestamp = event.timestamp;
413
414 let new_version = state
415 .store
416 .ingest_with_expected_version(&event, expected_version)?;
417
418 ingested_events.push(IngestEventResponse {
419 event_id,
420 timestamp,
421 version: Some(new_version),
422 });
423 }
424
425 await_replication_ack(&state).await;
427
428 let ingested = ingested_events.len();
429 tracing::info!("Batch ingested {} events", ingested);
430
431 Ok(Json(IngestEventsBatchResponse {
432 total,
433 ingested,
434 events: ingested_events,
435 }))
436}
437
438pub async fn query_events(
439 OptionalAuth(auth): OptionalAuth,
440 Query(req): Query<QueryEventsRequest>,
441 State(store): State<SharedStore>,
442) -> Result<Json<QueryEventsResponse>> {
443 let requested_limit = req.limit;
444 let queried_entity_id = req.entity_id.clone();
445
446 let enforced_tenant = auth
449 .as_ref()
450 .map(|a| a.tenant_id().to_string())
451 .or(req.tenant_id.clone());
452
453 let unlimited_req = QueryEventsRequest {
455 entity_id: req.entity_id,
456 event_type: req.event_type,
457 tenant_id: enforced_tenant,
458 as_of: req.as_of,
459 since: req.since,
460 until: req.until,
461 limit: None,
462 event_type_prefix: req.event_type_prefix,
463 payload_filter: req.payload_filter,
464 };
465 let all_events = store.query(&unlimited_req)?;
466 let total_count = all_events.len();
467
468 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
470 all_events.into_iter().take(limit).collect()
471 } else {
472 all_events
473 };
474
475 let count = limited_events.len();
476 let has_more = count < total_count;
477 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
478
479 let entity_version = queried_entity_id
481 .as_deref()
482 .map(|eid| store.get_entity_version(eid));
483
484 tracing::debug!("Query returned {} events (total: {})", count, total_count);
485
486 Ok(Json(QueryEventsResponse {
487 events,
488 count,
489 total_count,
490 has_more,
491 entity_version,
492 }))
493}
494
495pub async fn list_entities(
496 State(store): State<SharedStore>,
497 Query(req): Query<ListEntitiesRequest>,
498) -> Result<Json<ListEntitiesResponse>> {
499 use std::collections::HashMap;
500
501 let query_req = QueryEventsRequest {
503 entity_id: None,
504 event_type: None,
505 tenant_id: None,
506 as_of: None,
507 since: None,
508 until: None,
509 limit: None,
510 event_type_prefix: req.event_type_prefix,
511 payload_filter: req.payload_filter,
512 };
513 let events = store.query(&query_req)?;
514
515 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
517 for event in &events {
518 entity_map
519 .entry(event.entity_id().to_string())
520 .or_default()
521 .push(event);
522 }
523
524 let mut summaries: Vec<EntitySummary> = entity_map
526 .into_iter()
527 .map(|(entity_id, events)| {
528 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
529 EntitySummary {
530 entity_id,
531 event_count: events.len(),
532 last_event_type: last.event_type_str().to_string(),
533 last_event_at: last.timestamp(),
534 }
535 })
536 .collect();
537 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
538
539 let total = summaries.len();
540
541 let offset = req.offset.unwrap_or(0);
543 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
544 let summaries = if let Some(limit) = req.limit {
545 let has_more = summaries.len() > limit;
546 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
547 return Ok(Json(ListEntitiesResponse {
548 entities: truncated,
549 total,
550 has_more,
551 }));
552 } else {
553 summaries
554 };
555
556 Ok(Json(ListEntitiesResponse {
557 entities: summaries,
558 total,
559 has_more: false,
560 }))
561}
562
563pub async fn detect_duplicates(
564 State(store): State<SharedStore>,
565 Query(req): Query<DetectDuplicatesRequest>,
566) -> Result<Json<DetectDuplicatesResponse>> {
567 use std::collections::HashMap;
568
569 let group_by_fields: Vec<&str> = req.group_by.split(',').map(str::trim).collect();
570
571 let query_req = QueryEventsRequest {
573 entity_id: None,
574 event_type: None,
575 tenant_id: None,
576 as_of: None,
577 since: None,
578 until: None,
579 limit: None,
580 event_type_prefix: Some(req.event_type_prefix),
581 payload_filter: None,
582 };
583 let events = store.query(&query_req)?;
584
585 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
588 for event in &events {
589 let eid = event.entity_id().to_string();
590 entity_latest
591 .entry(eid)
592 .and_modify(|existing| {
593 if event.timestamp() > existing.timestamp() {
594 *existing = event;
595 }
596 })
597 .or_insert(event);
598 }
599
600 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
602 for (entity_id, event) in &entity_latest {
603 let payload = event.payload();
604 let mut key_parts = serde_json::Map::new();
605 for field in &group_by_fields {
606 let value = payload
607 .get(*field)
608 .cloned()
609 .unwrap_or(serde_json::Value::Null);
610 key_parts.insert((*field).to_string(), value);
611 }
612 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
613 groups.entry(key_str).or_default().push(entity_id.clone());
614 }
615
616 let mut duplicate_groups: Vec<DuplicateGroup> = groups
618 .into_iter()
619 .filter(|(_, ids)| ids.len() > 1)
620 .map(|(key_str, mut ids)| {
621 ids.sort();
622 let key: serde_json::Value =
623 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
624 let count = ids.len();
625 DuplicateGroup {
626 key,
627 entity_ids: ids,
628 count,
629 }
630 })
631 .collect();
632
633 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
635
636 let total = duplicate_groups.len();
637
638 let offset = req.offset.unwrap_or(0);
640 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
641
642 if let Some(limit) = req.limit {
643 let has_more = duplicate_groups.len() > limit;
644 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
645 return Ok(Json(DetectDuplicatesResponse {
646 duplicates: truncated,
647 total,
648 has_more,
649 }));
650 }
651
652 Ok(Json(DetectDuplicatesResponse {
653 duplicates: duplicate_groups,
654 total,
655 has_more: false,
656 }))
657}
658
659#[derive(Deserialize)]
660pub struct EntityStateParams {
661 as_of: Option<chrono::DateTime<chrono::Utc>>,
662}
663
664pub async fn get_entity_state(
665 State(store): State<SharedStore>,
666 Path(entity_id): Path<String>,
667 Query(params): Query<EntityStateParams>,
668) -> Result<Json<serde_json::Value>> {
669 let state = store.reconstruct_state(&entity_id, params.as_of)?;
670
671 tracing::info!("State reconstructed for entity: {}", entity_id);
672
673 Ok(Json(state))
674}
675
676pub async fn get_entity_snapshot(
677 State(store): State<SharedStore>,
678 Path(entity_id): Path<String>,
679) -> Result<Json<serde_json::Value>> {
680 let snapshot = store.get_snapshot(&entity_id)?;
681
682 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
683
684 Ok(Json(snapshot))
685}
686
687pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
688 let stats = store.stats();
689 Json(stats)
690}
691
692#[derive(Debug, Deserialize)]
695pub struct ListStreamsParams {
696 pub limit: Option<usize>,
698 pub offset: Option<usize>,
700}
701
702#[derive(Debug, serde::Serialize)]
704pub struct ListStreamsResponse {
705 pub streams: Vec<StreamInfo>,
706 pub total: usize,
707}
708
709pub async fn list_streams(
710 State(store): State<SharedStore>,
711 Query(params): Query<ListStreamsParams>,
712) -> Json<ListStreamsResponse> {
713 let mut streams = store.list_streams();
714 let total = streams.len();
715
716 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
718
719 if let Some(offset) = params.offset {
721 if offset < streams.len() {
722 streams = streams[offset..].to_vec();
723 } else {
724 streams = vec![];
725 }
726 }
727
728 if let Some(limit) = params.limit {
729 streams.truncate(limit);
730 }
731
732 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
733
734 Json(ListStreamsResponse { streams, total })
735}
736
737#[derive(Debug, Deserialize)]
740pub struct ListEventTypesParams {
741 pub limit: Option<usize>,
743 pub offset: Option<usize>,
745}
746
747#[derive(Debug, serde::Serialize)]
749pub struct ListEventTypesResponse {
750 pub event_types: Vec<EventTypeInfo>,
751 pub total: usize,
752}
753
754pub async fn list_event_types(
755 State(store): State<SharedStore>,
756 Query(params): Query<ListEventTypesParams>,
757) -> Json<ListEventTypesResponse> {
758 let mut event_types = store.list_event_types();
759 let total = event_types.len();
760
761 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
763
764 if let Some(offset) = params.offset {
766 if offset < event_types.len() {
767 event_types = event_types[offset..].to_vec();
768 } else {
769 event_types = vec![];
770 }
771 }
772
773 if let Some(limit) = params.limit {
774 event_types.truncate(limit);
775 }
776
777 tracing::debug!(
778 "Listed {} event types (total: {})",
779 event_types.len(),
780 total
781 );
782
783 Json(ListEventTypesResponse { event_types, total })
784}
785
786#[derive(Debug, Deserialize)]
788pub struct WebSocketParams {
789 pub consumer_id: Option<String>,
790}
791
792pub async fn events_websocket(
793 ws: WebSocketUpgrade,
794 State(store): State<SharedStore>,
795 Query(params): Query<WebSocketParams>,
796) -> Response {
797 let websocket_manager = store.websocket_manager();
798
799 ws.on_upgrade(move |socket| async move {
800 if let Some(consumer_id) = params.consumer_id {
801 websocket_manager
802 .handle_socket_with_consumer(socket, consumer_id, store)
803 .await;
804 } else {
805 websocket_manager.handle_socket(socket).await;
806 }
807 })
808}
809
810pub async fn analytics_frequency(
812 State(store): State<SharedStore>,
813 Query(req): Query<EventFrequencyRequest>,
814) -> Result<Json<EventFrequencyResponse>> {
815 let response = AnalyticsEngine::event_frequency(&store, &req)?;
816
817 tracing::debug!(
818 "Frequency analysis returned {} buckets",
819 response.buckets.len()
820 );
821
822 Ok(Json(response))
823}
824
825pub async fn analytics_summary(
827 State(store): State<SharedStore>,
828 Query(req): Query<StatsSummaryRequest>,
829) -> Result<Json<StatsSummaryResponse>> {
830 let response = AnalyticsEngine::stats_summary(&store, &req)?;
831
832 tracing::debug!(
833 "Stats summary: {} events across {} entities",
834 response.total_events,
835 response.unique_entities
836 );
837
838 Ok(Json(response))
839}
840
841pub async fn analytics_correlation(
843 State(store): State<SharedStore>,
844 Query(req): Query<CorrelationRequest>,
845) -> Result<Json<CorrelationResponse>> {
846 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
847
848 tracing::debug!(
849 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
850 response.correlated_pairs,
851 response.total_a,
852 response.correlation_percentage
853 );
854
855 Ok(Json(response))
856}
857
858pub async fn create_snapshot(
860 State(store): State<SharedStore>,
861 Json(req): Json<CreateSnapshotRequest>,
862) -> Result<Json<CreateSnapshotResponse>> {
863 store.create_snapshot(&req.entity_id)?;
864
865 let snapshot_manager = store.snapshot_manager();
866 let snapshot = snapshot_manager
867 .get_latest_snapshot(&req.entity_id)
868 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
869
870 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
871
872 Ok(Json(CreateSnapshotResponse {
873 snapshot_id: snapshot.id,
874 entity_id: snapshot.entity_id,
875 created_at: snapshot.created_at,
876 event_count: snapshot.event_count,
877 size_bytes: snapshot.metadata.size_bytes,
878 }))
879}
880
881pub async fn list_snapshots(
883 State(store): State<SharedStore>,
884 Query(req): Query<ListSnapshotsRequest>,
885) -> Result<Json<ListSnapshotsResponse>> {
886 let snapshot_manager = store.snapshot_manager();
887
888 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
889 snapshot_manager
890 .get_all_snapshots(&entity_id)
891 .into_iter()
892 .map(SnapshotInfo::from)
893 .collect()
894 } else {
895 let entities = snapshot_manager.list_entities();
897 entities
898 .iter()
899 .flat_map(|entity_id| {
900 snapshot_manager
901 .get_all_snapshots(entity_id)
902 .into_iter()
903 .map(SnapshotInfo::from)
904 })
905 .collect()
906 };
907
908 let total = snapshots.len();
909
910 tracing::debug!("Listed {} snapshots", total);
911
912 Ok(Json(ListSnapshotsResponse { snapshots, total }))
913}
914
915pub async fn get_latest_snapshot(
917 State(store): State<SharedStore>,
918 Path(entity_id): Path<String>,
919) -> Result<Json<serde_json::Value>> {
920 let snapshot_manager = store.snapshot_manager();
921
922 let snapshot = snapshot_manager
923 .get_latest_snapshot(&entity_id)
924 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
925
926 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
927
928 Ok(Json(serde_json::json!({
929 "snapshot_id": snapshot.id,
930 "entity_id": snapshot.entity_id,
931 "created_at": snapshot.created_at,
932 "as_of": snapshot.as_of,
933 "event_count": snapshot.event_count,
934 "size_bytes": snapshot.metadata.size_bytes,
935 "snapshot_type": snapshot.metadata.snapshot_type,
936 "state": snapshot.state
937 })))
938}
939
940pub async fn trigger_compaction(
942 State(store): State<SharedStore>,
943) -> Result<Json<CompactionResult>> {
944 let compaction_manager = store.compaction_manager().ok_or_else(|| {
945 crate::error::AllSourceError::InternalError(
946 "Compaction not enabled (no Parquet storage)".to_string(),
947 )
948 })?;
949
950 tracing::info!("📦 Manual compaction triggered via API");
951
952 let result = compaction_manager.compact_now()?;
953
954 Ok(Json(result))
955}
956
957pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
959 let compaction_manager = store.compaction_manager().ok_or_else(|| {
960 crate::error::AllSourceError::InternalError(
961 "Compaction not enabled (no Parquet storage)".to_string(),
962 )
963 })?;
964
965 let stats = compaction_manager.stats();
966 let config = compaction_manager.config();
967
968 Ok(Json(serde_json::json!({
969 "stats": stats,
970 "config": {
971 "min_files_to_compact": config.min_files_to_compact,
972 "target_file_size": config.target_file_size,
973 "max_file_size": config.max_file_size,
974 "small_file_threshold": config.small_file_threshold,
975 "compaction_interval_seconds": config.compaction_interval_seconds,
976 "auto_compact": config.auto_compact,
977 "strategy": config.strategy
978 }
979 })))
980}
981
982pub async fn register_schema(
984 State(store): State<SharedStore>,
985 Json(req): Json<RegisterSchemaRequest>,
986) -> Result<Json<RegisterSchemaResponse>> {
987 let schema_registry = store.schema_registry();
988
989 let response =
990 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
991
992 tracing::info!(
993 "📋 Schema registered: v{} for '{}'",
994 response.version,
995 response.subject
996 );
997
998 Ok(Json(response))
999}
1000
1001#[derive(Deserialize)]
1003pub struct GetSchemaParams {
1004 version: Option<u32>,
1005}
1006
1007pub async fn get_schema(
1008 State(store): State<SharedStore>,
1009 Path(subject): Path<String>,
1010 Query(params): Query<GetSchemaParams>,
1011) -> Result<Json<serde_json::Value>> {
1012 let schema_registry = store.schema_registry();
1013
1014 let schema = schema_registry.get_schema(&subject, params.version)?;
1015
1016 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
1017
1018 Ok(Json(serde_json::json!({
1019 "id": schema.id,
1020 "subject": schema.subject,
1021 "version": schema.version,
1022 "schema": schema.schema,
1023 "created_at": schema.created_at,
1024 "description": schema.description,
1025 "tags": schema.tags
1026 })))
1027}
1028
1029pub async fn list_schema_versions(
1031 State(store): State<SharedStore>,
1032 Path(subject): Path<String>,
1033) -> Result<Json<serde_json::Value>> {
1034 let schema_registry = store.schema_registry();
1035
1036 let versions = schema_registry.list_versions(&subject)?;
1037
1038 Ok(Json(serde_json::json!({
1039 "subject": subject,
1040 "versions": versions
1041 })))
1042}
1043
1044pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1046 let schema_registry = store.schema_registry();
1047
1048 let subjects = schema_registry.list_subjects();
1049
1050 Json(serde_json::json!({
1051 "subjects": subjects,
1052 "total": subjects.len()
1053 }))
1054}
1055
1056pub async fn validate_event_schema(
1058 State(store): State<SharedStore>,
1059 Json(req): Json<ValidateEventRequest>,
1060) -> Result<Json<ValidateEventResponse>> {
1061 let schema_registry = store.schema_registry();
1062
1063 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1064
1065 if response.valid {
1066 tracing::debug!(
1067 "✅ Event validated against schema '{}' v{}",
1068 req.subject,
1069 response.schema_version
1070 );
1071 } else {
1072 tracing::warn!(
1073 "❌ Event validation failed for '{}': {:?}",
1074 req.subject,
1075 response.errors
1076 );
1077 }
1078
1079 Ok(Json(response))
1080}
1081
1082#[derive(Deserialize)]
1084pub struct SetCompatibilityRequest {
1085 compatibility: CompatibilityMode,
1086}
1087
1088pub async fn set_compatibility_mode(
1089 State(store): State<SharedStore>,
1090 Path(subject): Path<String>,
1091 Json(req): Json<SetCompatibilityRequest>,
1092) -> Json<serde_json::Value> {
1093 let schema_registry = store.schema_registry();
1094
1095 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1096
1097 tracing::info!(
1098 "🔧 Set compatibility mode for '{}' to {:?}",
1099 subject,
1100 req.compatibility
1101 );
1102
1103 Json(serde_json::json!({
1104 "subject": subject,
1105 "compatibility": req.compatibility
1106 }))
1107}
1108
1109pub async fn start_replay(
1111 State(store): State<SharedStore>,
1112 Json(req): Json<StartReplayRequest>,
1113) -> Result<Json<StartReplayResponse>> {
1114 let replay_manager = store.replay_manager();
1115
1116 let response = replay_manager.start_replay(store, req)?;
1117
1118 tracing::info!(
1119 "🔄 Started replay {} with {} events",
1120 response.replay_id,
1121 response.total_events
1122 );
1123
1124 Ok(Json(response))
1125}
1126
1127pub async fn get_replay_progress(
1129 State(store): State<SharedStore>,
1130 Path(replay_id): Path<uuid::Uuid>,
1131) -> Result<Json<ReplayProgress>> {
1132 let replay_manager = store.replay_manager();
1133
1134 let progress = replay_manager.get_progress(replay_id)?;
1135
1136 Ok(Json(progress))
1137}
1138
1139pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1141 let replay_manager = store.replay_manager();
1142
1143 let replays = replay_manager.list_replays();
1144
1145 Json(serde_json::json!({
1146 "replays": replays,
1147 "total": replays.len()
1148 }))
1149}
1150
1151pub async fn cancel_replay(
1153 State(store): State<SharedStore>,
1154 Path(replay_id): Path<uuid::Uuid>,
1155) -> Result<Json<serde_json::Value>> {
1156 let replay_manager = store.replay_manager();
1157
1158 replay_manager.cancel_replay(replay_id)?;
1159
1160 tracing::info!("🛑 Cancelled replay {}", replay_id);
1161
1162 Ok(Json(serde_json::json!({
1163 "replay_id": replay_id,
1164 "status": "cancelled"
1165 })))
1166}
1167
1168pub async fn delete_replay(
1170 State(store): State<SharedStore>,
1171 Path(replay_id): Path<uuid::Uuid>,
1172) -> Result<Json<serde_json::Value>> {
1173 let replay_manager = store.replay_manager();
1174
1175 let deleted = replay_manager.delete_replay(replay_id)?;
1176
1177 if deleted {
1178 tracing::info!("🗑️ Deleted replay {}", replay_id);
1179 }
1180
1181 Ok(Json(serde_json::json!({
1182 "replay_id": replay_id,
1183 "deleted": deleted
1184 })))
1185}
1186
1187pub async fn register_pipeline(
1189 State(store): State<SharedStore>,
1190 Json(config): Json<PipelineConfig>,
1191) -> Result<Json<serde_json::Value>> {
1192 let pipeline_manager = store.pipeline_manager();
1193
1194 let pipeline_id = pipeline_manager.register(config.clone());
1195
1196 tracing::info!(
1197 "🔀 Pipeline registered: {} (name: {})",
1198 pipeline_id,
1199 config.name
1200 );
1201
1202 Ok(Json(serde_json::json!({
1203 "pipeline_id": pipeline_id,
1204 "name": config.name,
1205 "enabled": config.enabled
1206 })))
1207}
1208
1209pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1211 let pipeline_manager = store.pipeline_manager();
1212
1213 let pipelines = pipeline_manager.list();
1214
1215 tracing::debug!("Listed {} pipelines", pipelines.len());
1216
1217 Json(serde_json::json!({
1218 "pipelines": pipelines,
1219 "total": pipelines.len()
1220 }))
1221}
1222
1223pub async fn get_pipeline(
1225 State(store): State<SharedStore>,
1226 Path(pipeline_id): Path<uuid::Uuid>,
1227) -> Result<Json<PipelineConfig>> {
1228 let pipeline_manager = store.pipeline_manager();
1229
1230 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1231 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1232 })?;
1233
1234 Ok(Json(pipeline.config().clone()))
1235}
1236
1237pub async fn remove_pipeline(
1239 State(store): State<SharedStore>,
1240 Path(pipeline_id): Path<uuid::Uuid>,
1241) -> Result<Json<serde_json::Value>> {
1242 let pipeline_manager = store.pipeline_manager();
1243
1244 let removed = pipeline_manager.remove(pipeline_id);
1245
1246 if removed {
1247 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1248 }
1249
1250 Ok(Json(serde_json::json!({
1251 "pipeline_id": pipeline_id,
1252 "removed": removed
1253 })))
1254}
1255
1256pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1258 let pipeline_manager = store.pipeline_manager();
1259
1260 let stats = pipeline_manager.all_stats();
1261
1262 Json(serde_json::json!({
1263 "stats": stats,
1264 "total": stats.len()
1265 }))
1266}
1267
1268pub async fn get_pipeline_stats(
1270 State(store): State<SharedStore>,
1271 Path(pipeline_id): Path<uuid::Uuid>,
1272) -> Result<Json<PipelineStats>> {
1273 let pipeline_manager = store.pipeline_manager();
1274
1275 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1276 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1277 })?;
1278
1279 Ok(Json(pipeline.stats()))
1280}
1281
1282pub async fn reset_pipeline(
1284 State(store): State<SharedStore>,
1285 Path(pipeline_id): Path<uuid::Uuid>,
1286) -> Result<Json<serde_json::Value>> {
1287 let pipeline_manager = store.pipeline_manager();
1288
1289 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1290 crate::error::AllSourceError::ValidationError(format!("Pipeline not found: {pipeline_id}"))
1291 })?;
1292
1293 pipeline.reset();
1294
1295 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1296
1297 Ok(Json(serde_json::json!({
1298 "pipeline_id": pipeline_id,
1299 "reset": true
1300 })))
1301}
1302
1303pub async fn get_event_by_id(
1309 State(store): State<SharedStore>,
1310 Path(event_id): Path<uuid::Uuid>,
1311) -> Result<Json<serde_json::Value>> {
1312 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1313 crate::error::AllSourceError::EntityNotFound(format!("Event '{event_id}' not found"))
1314 })?;
1315
1316 let dto = EventDto::from(&event);
1317
1318 tracing::debug!("Event retrieved by ID: {}", event_id);
1319
1320 Ok(Json(serde_json::json!({
1321 "event": dto,
1322 "found": true
1323 })))
1324}
1325
1326pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1332 let projection_manager = store.projection_manager();
1333 let status_map = store.projection_status();
1334
1335 let projections: Vec<serde_json::Value> = projection_manager
1336 .list_projections()
1337 .iter()
1338 .map(|(name, projection)| {
1339 let status = status_map
1340 .get(name)
1341 .map_or_else(|| "running".to_string(), |s| s.value().clone());
1342 serde_json::json!({
1343 "name": name,
1344 "type": format!("{:?}", projection.name()),
1345 "status": status,
1346 })
1347 })
1348 .collect();
1349
1350 tracing::debug!("Listed {} projections", projections.len());
1351
1352 Json(serde_json::json!({
1353 "projections": projections,
1354 "total": projections.len()
1355 }))
1356}
1357
1358pub async fn get_projection(
1360 State(store): State<SharedStore>,
1361 Path(name): Path<String>,
1362) -> Result<Json<serde_json::Value>> {
1363 let projection_manager = store.projection_manager();
1364
1365 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1366 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1367 })?;
1368
1369 Ok(Json(serde_json::json!({
1370 "name": projection.name(),
1371 "found": true
1372 })))
1373}
1374
1375pub async fn get_projection_state(
1380 State(store): State<SharedStore>,
1381 Path((name, entity_id)): Path<(String, String)>,
1382) -> Result<Json<serde_json::Value>> {
1383 let projection_manager = store.projection_manager();
1384
1385 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1386 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1387 })?;
1388
1389 let state = projection.get_state(&entity_id);
1390
1391 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1392
1393 Ok(Json(serde_json::json!({
1394 "projection": name,
1395 "entity_id": entity_id,
1396 "state": state,
1397 "found": state.is_some()
1398 })))
1399}
1400
1401pub async fn delete_projection(
1406 State(store): State<SharedStore>,
1407 Path(name): Path<String>,
1408) -> Result<Json<serde_json::Value>> {
1409 let projection_manager = store.projection_manager();
1410
1411 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1412 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1413 })?;
1414
1415 projection.clear();
1416
1417 let cache = store.projection_state_cache();
1419 let prefix = format!("{name}:");
1420 let keys_to_remove: Vec<String> = cache
1421 .iter()
1422 .filter(|entry| entry.key().starts_with(&prefix))
1423 .map(|entry| entry.key().clone())
1424 .collect();
1425 for key in keys_to_remove {
1426 cache.remove(&key);
1427 }
1428
1429 tracing::info!("Projection deleted (cleared): {}", name);
1430
1431 Ok(Json(serde_json::json!({
1432 "projection": name,
1433 "deleted": true
1434 })))
1435}
1436
1437pub async fn get_projection_state_summary(
1441 State(store): State<SharedStore>,
1442 Path(name): Path<String>,
1443) -> Result<Json<serde_json::Value>> {
1444 let projection_manager = store.projection_manager();
1445
1446 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1447 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1448 })?;
1449
1450 let cache = store.projection_state_cache();
1452 let prefix = format!("{name}:");
1453 let states: Vec<serde_json::Value> = cache
1454 .iter()
1455 .filter(|entry| entry.key().starts_with(&prefix))
1456 .map(|entry| {
1457 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1458 serde_json::json!({
1459 "entity_id": entity_id,
1460 "state": entry.value().clone()
1461 })
1462 })
1463 .collect();
1464
1465 let total = states.len();
1466
1467 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1468
1469 Ok(Json(serde_json::json!({
1470 "projection": name,
1471 "states": states,
1472 "total": total
1473 })))
1474}
1475
1476pub async fn reset_projection(
1480 State(store): State<SharedStore>,
1481 Path(name): Path<String>,
1482) -> Result<Json<serde_json::Value>> {
1483 let reprocessed = store.reset_projection(&name)?;
1484
1485 tracing::info!(
1486 "Projection reset: {} ({} events reprocessed)",
1487 name,
1488 reprocessed
1489 );
1490
1491 Ok(Json(serde_json::json!({
1492 "projection": name,
1493 "reset": true,
1494 "events_reprocessed": reprocessed
1495 })))
1496}
1497
1498pub async fn pause_projection(
1502 State(store): State<SharedStore>,
1503 Path(name): Path<String>,
1504) -> Result<Json<serde_json::Value>> {
1505 let projection_manager = store.projection_manager();
1506
1507 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1509 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1510 })?;
1511
1512 store
1513 .projection_status()
1514 .insert(name.clone(), "paused".to_string());
1515
1516 tracing::info!("Projection paused: {}", name);
1517
1518 Ok(Json(serde_json::json!({
1519 "projection": name,
1520 "status": "paused"
1521 })))
1522}
1523
1524pub async fn start_projection(
1528 State(store): State<SharedStore>,
1529 Path(name): Path<String>,
1530) -> Result<Json<serde_json::Value>> {
1531 let projection_manager = store.projection_manager();
1532
1533 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1535 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1536 })?;
1537
1538 store
1539 .projection_status()
1540 .insert(name.clone(), "running".to_string());
1541
1542 tracing::info!("Projection started: {}", name);
1543
1544 Ok(Json(serde_json::json!({
1545 "projection": name,
1546 "status": "running"
1547 })))
1548}
1549
1550#[derive(Debug, Deserialize)]
1552pub struct SaveProjectionStateRequest {
1553 pub state: serde_json::Value,
1554}
1555
1556pub async fn save_projection_state(
1561 State(store): State<SharedStore>,
1562 Path((name, entity_id)): Path<(String, String)>,
1563 Json(req): Json<SaveProjectionStateRequest>,
1564) -> Result<Json<serde_json::Value>> {
1565 let projection_cache = store.projection_state_cache();
1566
1567 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1569
1570 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1571
1572 Ok(Json(serde_json::json!({
1573 "projection": name,
1574 "entity_id": entity_id,
1575 "saved": true
1576 })))
1577}
1578
1579#[derive(Debug, Deserialize)]
1583pub struct BulkGetStateRequest {
1584 pub entity_ids: Vec<String>,
1585}
1586
1587#[derive(Debug, Deserialize)]
1591pub struct BulkSaveStateRequest {
1592 pub states: Vec<BulkSaveStateItem>,
1593}
1594
1595#[derive(Debug, Deserialize)]
1596pub struct BulkSaveStateItem {
1597 pub entity_id: String,
1598 pub state: serde_json::Value,
1599}
1600
1601pub async fn bulk_get_projection_states(
1602 State(store): State<SharedStore>,
1603 Path(name): Path<String>,
1604 Json(req): Json<BulkGetStateRequest>,
1605) -> Result<Json<serde_json::Value>> {
1606 let projection_manager = store.projection_manager();
1607
1608 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1609 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1610 })?;
1611
1612 let states: Vec<serde_json::Value> = req
1613 .entity_ids
1614 .iter()
1615 .map(|entity_id| {
1616 let state = projection.get_state(entity_id);
1617 serde_json::json!({
1618 "entity_id": entity_id,
1619 "state": state,
1620 "found": state.is_some()
1621 })
1622 })
1623 .collect();
1624
1625 tracing::debug!(
1626 "Bulk projection state retrieved: {} entities from {}",
1627 states.len(),
1628 name
1629 );
1630
1631 Ok(Json(serde_json::json!({
1632 "projection": name,
1633 "states": states,
1634 "total": states.len()
1635 })))
1636}
1637
1638pub async fn bulk_save_projection_states(
1643 State(store): State<SharedStore>,
1644 Path(name): Path<String>,
1645 Json(req): Json<BulkSaveStateRequest>,
1646) -> Result<Json<serde_json::Value>> {
1647 let projection_cache = store.projection_state_cache();
1648
1649 let mut saved_count = 0;
1650 for item in &req.states {
1651 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1652 saved_count += 1;
1653 }
1654
1655 tracing::info!(
1656 "Bulk projection state saved: {} entities for {}",
1657 saved_count,
1658 name
1659 );
1660
1661 Ok(Json(serde_json::json!({
1662 "projection": name,
1663 "saved": saved_count,
1664 "total": req.states.len()
1665 })))
1666}
1667
1668#[derive(Debug, Deserialize)]
1674pub struct ListWebhooksParams {
1675 pub tenant_id: Option<String>,
1676}
1677
1678pub async fn register_webhook(
1680 State(store): State<SharedStore>,
1681 Json(req): Json<RegisterWebhookRequest>,
1682) -> Json<serde_json::Value> {
1683 let registry = store.webhook_registry();
1684 let webhook = registry.register(req);
1685
1686 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1687
1688 Json(serde_json::json!({
1689 "webhook": webhook,
1690 "created": true
1691 }))
1692}
1693
1694pub async fn list_webhooks(
1696 State(store): State<SharedStore>,
1697 Query(params): Query<ListWebhooksParams>,
1698) -> Json<serde_json::Value> {
1699 let registry = store.webhook_registry();
1700
1701 let webhooks = if let Some(tenant_id) = params.tenant_id {
1702 registry.list_by_tenant(&tenant_id)
1703 } else {
1704 vec![]
1706 };
1707
1708 let total = webhooks.len();
1709
1710 Json(serde_json::json!({
1711 "webhooks": webhooks,
1712 "total": total
1713 }))
1714}
1715
1716pub async fn get_webhook(
1718 State(store): State<SharedStore>,
1719 Path(webhook_id): Path<uuid::Uuid>,
1720) -> Result<Json<serde_json::Value>> {
1721 let registry = store.webhook_registry();
1722
1723 let webhook = registry.get(webhook_id).ok_or_else(|| {
1724 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1725 })?;
1726
1727 Ok(Json(serde_json::json!({
1728 "webhook": webhook,
1729 "found": true
1730 })))
1731}
1732
1733pub async fn update_webhook(
1735 State(store): State<SharedStore>,
1736 Path(webhook_id): Path<uuid::Uuid>,
1737 Json(req): Json<UpdateWebhookRequest>,
1738) -> Result<Json<serde_json::Value>> {
1739 let registry = store.webhook_registry();
1740
1741 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1742 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1743 })?;
1744
1745 tracing::info!("Webhook updated: {}", webhook_id);
1746
1747 Ok(Json(serde_json::json!({
1748 "webhook": webhook,
1749 "updated": true
1750 })))
1751}
1752
1753pub async fn delete_webhook(
1755 State(store): State<SharedStore>,
1756 Path(webhook_id): Path<uuid::Uuid>,
1757) -> Result<Json<serde_json::Value>> {
1758 let registry = store.webhook_registry();
1759
1760 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1761 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1762 })?;
1763
1764 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1765
1766 Ok(Json(serde_json::json!({
1767 "webhook_id": webhook_id,
1768 "deleted": true
1769 })))
1770}
1771
1772#[derive(Debug, Deserialize)]
1774pub struct ListDeliveriesParams {
1775 pub limit: Option<usize>,
1776}
1777
1778pub async fn list_webhook_deliveries(
1780 State(store): State<SharedStore>,
1781 Path(webhook_id): Path<uuid::Uuid>,
1782 Query(params): Query<ListDeliveriesParams>,
1783) -> Result<Json<serde_json::Value>> {
1784 let registry = store.webhook_registry();
1785
1786 registry.get(webhook_id).ok_or_else(|| {
1788 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{webhook_id}' not found"))
1789 })?;
1790
1791 let limit = params.limit.unwrap_or(50);
1792 let deliveries = registry.get_deliveries(webhook_id, limit);
1793 let total = deliveries.len();
1794
1795 Ok(Json(serde_json::json!({
1796 "webhook_id": webhook_id,
1797 "deliveries": deliveries,
1798 "total": total
1799 })))
1800}
1801
1802#[cfg(feature = "analytics")]
1808pub async fn eventql_query(
1809 State(store): State<SharedStore>,
1810 Json(req): Json<crate::infrastructure::query::eventql::EventQLRequest>,
1811) -> Result<Json<serde_json::Value>> {
1812 let events = store.snapshot_events();
1813 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1814 Ok(response) => Ok(Json(serde_json::json!({
1815 "columns": response.columns,
1816 "rows": response.rows,
1817 "row_count": response.row_count,
1818 }))),
1819 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1820 }
1821}
1822
1823pub async fn graphql_query(
1825 State(store): State<SharedStore>,
1826 Json(req): Json<GraphQLRequest>,
1827) -> Json<serde_json::Value> {
1828 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1829 Ok(f) => f,
1830 Err(e) => {
1831 return Json(
1832 serde_json::to_value(GraphQLResponse {
1833 data: None,
1834 errors: vec![GraphQLError { message: e }],
1835 })
1836 .unwrap(),
1837 );
1838 }
1839 };
1840
1841 let mut data = serde_json::Map::new();
1842 let mut errors = Vec::new();
1843
1844 for field in &fields {
1845 match field.name.as_str() {
1846 "events" => {
1847 let request = crate::application::dto::QueryEventsRequest {
1848 entity_id: field.arguments.get("entity_id").cloned(),
1849 event_type: field.arguments.get("event_type").cloned(),
1850 tenant_id: field.arguments.get("tenant_id").cloned(),
1851 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1852 as_of: None,
1853 since: None,
1854 until: None,
1855 event_type_prefix: None,
1856 payload_filter: None,
1857 };
1858 match store.query(&request) {
1859 Ok(events) => {
1860 let json_events: Vec<serde_json::Value> = events
1861 .iter()
1862 .map(|e| {
1863 crate::infrastructure::query::graphql::event_to_json(
1864 e,
1865 &field.fields,
1866 )
1867 })
1868 .collect();
1869 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1870 }
1871 Err(e) => errors.push(GraphQLError {
1872 message: format!("events query failed: {e}"),
1873 }),
1874 }
1875 }
1876 "event" => {
1877 if let Some(id_str) = field.arguments.get("id") {
1878 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1879 match store.get_event_by_id(&id) {
1880 Ok(Some(event)) => {
1881 data.insert(
1882 "event".to_string(),
1883 crate::infrastructure::query::graphql::event_to_json(
1884 &event,
1885 &field.fields,
1886 ),
1887 );
1888 }
1889 Ok(None) => {
1890 data.insert("event".to_string(), serde_json::Value::Null);
1891 }
1892 Err(e) => errors.push(GraphQLError {
1893 message: format!("event lookup failed: {e}"),
1894 }),
1895 }
1896 } else {
1897 errors.push(GraphQLError {
1898 message: format!("Invalid UUID: {id_str}"),
1899 });
1900 }
1901 } else {
1902 errors.push(GraphQLError {
1903 message: "event query requires 'id' argument".to_string(),
1904 });
1905 }
1906 }
1907 "projections" => {
1908 let pm = store.projection_manager();
1909 let names: Vec<serde_json::Value> = pm
1910 .list_projections()
1911 .iter()
1912 .map(|(name, _)| serde_json::Value::String(name.clone()))
1913 .collect();
1914 data.insert("projections".to_string(), serde_json::Value::Array(names));
1915 }
1916 "stats" => {
1917 let stats = store.stats();
1918 data.insert(
1919 "stats".to_string(),
1920 serde_json::json!({
1921 "total_events": stats.total_events,
1922 "total_entities": stats.total_entities,
1923 "total_event_types": stats.total_event_types,
1924 }),
1925 );
1926 }
1927 "__schema" => {
1928 data.insert(
1929 "__schema".to_string(),
1930 crate::infrastructure::query::graphql::introspection_schema(),
1931 );
1932 }
1933 other => {
1934 errors.push(GraphQLError {
1935 message: format!("Unknown field: {other}"),
1936 });
1937 }
1938 }
1939 }
1940
1941 Json(
1942 serde_json::to_value(GraphQLResponse {
1943 data: Some(serde_json::Value::Object(data)),
1944 errors,
1945 })
1946 .unwrap(),
1947 )
1948}
1949
1950pub async fn geo_query(
1952 State(store): State<SharedStore>,
1953 Json(req): Json<GeoQueryRequest>,
1954) -> Json<serde_json::Value> {
1955 let events = store.snapshot_events();
1956 let geo_index = store.geo_index();
1957 let results =
1958 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1959 let total = results.len();
1960 Json(serde_json::json!({
1961 "results": results,
1962 "total": total,
1963 }))
1964}
1965
1966pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1968 let stats = store.geo_index().stats();
1969 Json(serde_json::json!(stats))
1970}
1971
1972pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1974 let stats = store.exactly_once().stats();
1975 Json(serde_json::json!(stats))
1976}
1977
1978pub async fn schema_evolution_history(
1980 State(store): State<SharedStore>,
1981 Path(event_type): Path<String>,
1982) -> Json<serde_json::Value> {
1983 let mgr = store.schema_evolution();
1984 let history = mgr.get_history(&event_type);
1985 let version = mgr.get_version(&event_type);
1986 Json(serde_json::json!({
1987 "event_type": event_type,
1988 "current_version": version,
1989 "history": history,
1990 }))
1991}
1992
1993pub async fn schema_evolution_schema(
1995 State(store): State<SharedStore>,
1996 Path(event_type): Path<String>,
1997) -> Json<serde_json::Value> {
1998 let mgr = store.schema_evolution();
1999 if let Some(schema) = mgr.get_schema(&event_type) {
2000 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
2001 Json(serde_json::json!({
2002 "event_type": event_type,
2003 "version": mgr.get_version(&event_type),
2004 "inferred_schema": schema,
2005 "json_schema": json_schema,
2006 }))
2007 } else {
2008 Json(serde_json::json!({
2009 "event_type": event_type,
2010 "error": "No schema inferred for this event type"
2011 }))
2012 }
2013}
2014
2015pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
2017 let stats = store.schema_evolution().stats();
2018 let event_types = store.schema_evolution().list_event_types();
2019 Json(serde_json::json!({
2020 "stats": stats,
2021 "tracked_event_types": event_types,
2022 }))
2023}
2024
2025#[cfg(feature = "embedded-sync")]
2031pub async fn sync_pull_handler(
2032 State(state): State<AppState>,
2033 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
2034) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
2035 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
2036
2037 let store = &state.store;
2038
2039 let since = request
2042 .version_vector
2043 .values()
2044 .map(|ts| ts.physical_ms)
2045 .min()
2046 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2047
2048 let events = store.query(&crate::application::dto::QueryEventsRequest {
2049 entity_id: None,
2050 event_type: None,
2051 tenant_id: None,
2052 as_of: None,
2053 since,
2054 until: None,
2055 limit: None,
2056 event_type_prefix: None,
2057 payload_filter: None,
2058 })?;
2059
2060 let mut replicated = Vec::with_capacity(events.len());
2062 let mut last_ms = 0u64;
2063 let mut logical = 0u32;
2064
2065 for event in &events {
2066 let event_ms = event.timestamp().timestamp_millis() as u64;
2067 if event_ms == last_ms {
2068 logical += 1;
2069 } else {
2070 last_ms = event_ms;
2071 logical = 0;
2072 }
2073
2074 replicated.push(ReplicatedEvent {
2075 event_id: event.id().to_string(),
2076 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2077 origin_region: "server".to_string(),
2078 event_data: serde_json::json!({
2079 "event_type": event.event_type_str(),
2080 "entity_id": event.entity_id_str(),
2081 "tenant_id": event.tenant_id_str(),
2082 "payload": event.payload,
2083 "metadata": event.metadata,
2084 }),
2085 });
2086 }
2087
2088 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2089 events: replicated,
2090 version_vector: std::collections::BTreeMap::new(),
2091 }))
2092}
2093
2094#[cfg(feature = "embedded-sync")]
2096pub async fn sync_push_handler(
2097 State(state): State<AppState>,
2098 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2099) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2100 let store = &state.store;
2101
2102 let mut accepted = 0usize;
2103 let mut skipped = 0usize;
2104
2105 for rep_event in &request.events {
2106 let event_data = &rep_event.event_data;
2107 let event_type = event_data
2108 .get("event_type")
2109 .and_then(|v| v.as_str())
2110 .unwrap_or("unknown")
2111 .to_string();
2112 let entity_id = event_data
2113 .get("entity_id")
2114 .and_then(|v| v.as_str())
2115 .unwrap_or("unknown")
2116 .to_string();
2117 let tenant_id = event_data
2118 .get("tenant_id")
2119 .and_then(|v| v.as_str())
2120 .unwrap_or("default")
2121 .to_string();
2122 let payload = event_data
2123 .get("payload")
2124 .cloned()
2125 .unwrap_or(serde_json::json!({}));
2126 let metadata = event_data.get("metadata").cloned();
2127
2128 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2129 Ok(domain_event) => {
2130 store.ingest(&domain_event)?;
2131 accepted += 1;
2132 }
2133 Err(_) => {
2134 skipped += 1;
2135 }
2136 }
2137 }
2138
2139 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2140 accepted,
2141 skipped,
2142 version_vector: std::collections::BTreeMap::new(),
2143 }))
2144}
2145
2146pub async fn register_consumer(
2152 State(store): State<SharedStore>,
2153 Json(req): Json<RegisterConsumerRequest>,
2154) -> Result<Json<ConsumerResponse>> {
2155 let consumer = store
2156 .consumer_registry()
2157 .register(&req.consumer_id, &req.event_type_filters);
2158
2159 Ok(Json(ConsumerResponse {
2160 consumer_id: consumer.consumer_id,
2161 event_type_filters: consumer.event_type_filters,
2162 cursor_position: consumer.cursor_position,
2163 }))
2164}
2165
2166pub async fn get_consumer(
2168 State(store): State<SharedStore>,
2169 Path(consumer_id): Path<String>,
2170) -> Result<Json<ConsumerResponse>> {
2171 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2172
2173 Ok(Json(ConsumerResponse {
2174 consumer_id: consumer.consumer_id,
2175 event_type_filters: consumer.event_type_filters,
2176 cursor_position: consumer.cursor_position,
2177 }))
2178}
2179
2180#[derive(Debug, Deserialize)]
2182pub struct ConsumerPollQuery {
2183 pub limit: Option<usize>,
2184}
2185
2186pub async fn poll_consumer_events(
2187 State(store): State<SharedStore>,
2188 Path(consumer_id): Path<String>,
2189 Query(query): Query<ConsumerPollQuery>,
2190) -> Result<Json<ConsumerEventsResponse>> {
2191 let consumer = store.consumer_registry().get_or_create(&consumer_id);
2192 let offset = consumer.cursor_position.unwrap_or(0);
2193 let limit = query.limit.unwrap_or(100);
2194
2195 let events = store.events_after_offset(offset, &consumer.event_type_filters, limit);
2196 let count = events.len();
2197
2198 let consumer_events: Vec<ConsumerEventDto> = events
2199 .into_iter()
2200 .map(|(position, event)| ConsumerEventDto {
2201 position,
2202 event: EventDto::from(&event),
2203 })
2204 .collect();
2205
2206 Ok(Json(ConsumerEventsResponse {
2207 events: consumer_events,
2208 count,
2209 }))
2210}
2211
2212pub async fn ack_consumer(
2214 State(store): State<SharedStore>,
2215 Path(consumer_id): Path<String>,
2216 Json(req): Json<AckRequest>,
2217) -> Result<Json<serde_json::Value>> {
2218 let max_offset = store.total_events() as u64;
2219
2220 store
2221 .consumer_registry()
2222 .ack(&consumer_id, req.position, max_offset)
2223 .map_err(crate::error::AllSourceError::InvalidInput)?;
2224
2225 Ok(Json(serde_json::json!({
2226 "status": "ok",
2227 "consumer_id": consumer_id,
2228 "position": req.position,
2229 })))
2230}
2231
2232#[cfg(test)]
2233mod tests {
2234 use super::*;
2235 use crate::{domain::entities::Event, store::EventStore};
2236
2237 fn create_test_store() -> Arc<EventStore> {
2238 Arc::new(EventStore::new())
2239 }
2240
2241 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2242 Event::from_strings(
2243 event_type.to_string(),
2244 entity_id.to_string(),
2245 "test-stream".to_string(),
2246 serde_json::json!({
2247 "name": "Test",
2248 "value": 42
2249 }),
2250 None,
2251 )
2252 .unwrap()
2253 }
2254
2255 #[tokio::test]
2256 async fn test_query_events_has_more_and_total_count() {
2257 let store = create_test_store();
2258
2259 for i in 0..50 {
2261 store
2262 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2263 .unwrap();
2264 }
2265
2266 let req = QueryEventsRequest {
2268 entity_id: None,
2269 event_type: None,
2270 tenant_id: None,
2271 as_of: None,
2272 since: None,
2273 until: None,
2274 limit: Some(10),
2275 event_type_prefix: None,
2276 payload_filter: None,
2277 };
2278
2279 let requested_limit = req.limit;
2280 let unlimited_req = QueryEventsRequest {
2281 limit: None,
2282 ..QueryEventsRequest {
2283 entity_id: req.entity_id,
2284 event_type: req.event_type,
2285 tenant_id: req.tenant_id,
2286 as_of: req.as_of,
2287 since: req.since,
2288 until: req.until,
2289 limit: None,
2290 event_type_prefix: req.event_type_prefix,
2291 payload_filter: req.payload_filter,
2292 }
2293 };
2294 let all_events = store.query(&unlimited_req).unwrap();
2295 let total_count = all_events.len();
2296 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2297 all_events.into_iter().take(limit).collect()
2298 } else {
2299 all_events
2300 };
2301 let count = limited_events.len();
2302 let has_more = count < total_count;
2303
2304 assert_eq!(count, 10);
2305 assert_eq!(total_count, 50);
2306 assert!(has_more);
2307 }
2308
2309 #[tokio::test]
2310 async fn test_query_events_no_more_results() {
2311 let store = create_test_store();
2312
2313 for i in 0..5 {
2315 store
2316 .ingest(&create_test_event(&format!("entity-{i}"), "user.created"))
2317 .unwrap();
2318 }
2319
2320 let all_events = store
2322 .query(&QueryEventsRequest {
2323 entity_id: None,
2324 event_type: None,
2325 tenant_id: None,
2326 as_of: None,
2327 since: None,
2328 until: None,
2329 limit: None,
2330 event_type_prefix: None,
2331 payload_filter: None,
2332 })
2333 .unwrap();
2334 let total_count = all_events.len();
2335 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2336 let count = limited_events.len();
2337 let has_more = count < total_count;
2338
2339 assert_eq!(count, 5);
2340 assert_eq!(total_count, 5);
2341 assert!(!has_more);
2342 }
2343
2344 #[tokio::test]
2345 async fn test_list_entities_by_type_prefix() {
2346 let store = create_test_store();
2347
2348 store
2350 .ingest(&create_test_event("idx-1", "index.created"))
2351 .unwrap();
2352 store
2353 .ingest(&create_test_event("idx-1", "index.updated"))
2354 .unwrap();
2355 store
2356 .ingest(&create_test_event("idx-2", "index.created"))
2357 .unwrap();
2358 store
2359 .ingest(&create_test_event("idx-3", "index.created"))
2360 .unwrap();
2361 store
2363 .ingest(&create_test_event("trade-1", "trade.created"))
2364 .unwrap();
2365 store
2366 .ingest(&create_test_event("trade-2", "trade.created"))
2367 .unwrap();
2368
2369 let req = ListEntitiesRequest {
2371 event_type_prefix: Some("index.".to_string()),
2372 payload_filter: None,
2373 limit: None,
2374 offset: None,
2375 };
2376 let query_req = QueryEventsRequest {
2377 entity_id: None,
2378 event_type: None,
2379 tenant_id: None,
2380 as_of: None,
2381 since: None,
2382 until: None,
2383 limit: None,
2384 event_type_prefix: req.event_type_prefix,
2385 payload_filter: req.payload_filter,
2386 };
2387 let events = store.query(&query_req).unwrap();
2388
2389 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2391 std::collections::HashMap::new();
2392 for event in &events {
2393 entity_map
2394 .entry(event.entity_id().to_string())
2395 .or_default()
2396 .push(event);
2397 }
2398
2399 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2402 assert_eq!(entity_map["idx-3"].len(), 1);
2403 }
2404
2405 fn create_test_event_with_payload(
2406 entity_id: &str,
2407 event_type: &str,
2408 payload: serde_json::Value,
2409 ) -> Event {
2410 Event::from_strings(
2411 event_type.to_string(),
2412 entity_id.to_string(),
2413 "test-stream".to_string(),
2414 payload,
2415 None,
2416 )
2417 .unwrap()
2418 }
2419
2420 #[tokio::test]
2421 async fn test_detect_duplicates_by_payload_fields() {
2422 let store = create_test_store();
2423
2424 store
2426 .ingest(&create_test_event_with_payload(
2427 "idx-1",
2428 "index.created",
2429 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2430 ))
2431 .unwrap();
2432 store
2433 .ingest(&create_test_event_with_payload(
2434 "idx-2",
2435 "index.created",
2436 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2437 ))
2438 .unwrap();
2439 store
2440 .ingest(&create_test_event_with_payload(
2441 "idx-3",
2442 "index.created",
2443 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2444 ))
2445 .unwrap();
2446 store
2447 .ingest(&create_test_event_with_payload(
2448 "idx-4",
2449 "index.created",
2450 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2451 ))
2452 .unwrap();
2453 store
2454 .ingest(&create_test_event_with_payload(
2455 "idx-5",
2456 "index.created",
2457 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2458 ))
2459 .unwrap();
2460
2461 let query_req = QueryEventsRequest {
2463 entity_id: None,
2464 event_type: None,
2465 tenant_id: None,
2466 as_of: None,
2467 since: None,
2468 until: None,
2469 limit: None,
2470 event_type_prefix: Some("index.".to_string()),
2471 payload_filter: None,
2472 };
2473 let events = store.query(&query_req).unwrap();
2474
2475 let group_by_fields = vec!["name"];
2477 let mut entity_latest: std::collections::HashMap<String, &Event> =
2478 std::collections::HashMap::new();
2479 for event in &events {
2480 let eid = event.entity_id().to_string();
2481 entity_latest
2482 .entry(eid)
2483 .and_modify(|existing| {
2484 if event.timestamp() > existing.timestamp() {
2485 *existing = event;
2486 }
2487 })
2488 .or_insert(event);
2489 }
2490
2491 let mut groups: std::collections::HashMap<String, Vec<String>> =
2492 std::collections::HashMap::new();
2493 for (entity_id, event) in &entity_latest {
2494 let payload = event.payload();
2495 let mut key_parts = serde_json::Map::new();
2496 for field in &group_by_fields {
2497 let value = payload
2498 .get(*field)
2499 .cloned()
2500 .unwrap_or(serde_json::Value::Null);
2501 key_parts.insert((*field).to_string(), value);
2502 }
2503 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2504 groups.entry(key_str).or_default().push(entity_id.clone());
2505 }
2506
2507 let duplicate_groups: Vec<_> = groups
2508 .into_iter()
2509 .filter(|(_, ids)| ids.len() > 1)
2510 .collect();
2511
2512 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2514 assert_eq!(ids.len(), 2);
2515 }
2516 }
2517
2518 #[tokio::test]
2519 async fn test_detect_duplicates_no_duplicates() {
2520 let store = create_test_store();
2521
2522 store
2524 .ingest(&create_test_event_with_payload(
2525 "idx-1",
2526 "index.created",
2527 serde_json::json!({"name": "A"}),
2528 ))
2529 .unwrap();
2530 store
2531 .ingest(&create_test_event_with_payload(
2532 "idx-2",
2533 "index.created",
2534 serde_json::json!({"name": "B"}),
2535 ))
2536 .unwrap();
2537
2538 let query_req = QueryEventsRequest {
2539 entity_id: None,
2540 event_type: None,
2541 tenant_id: None,
2542 as_of: None,
2543 since: None,
2544 until: None,
2545 limit: None,
2546 event_type_prefix: Some("index.".to_string()),
2547 payload_filter: None,
2548 };
2549 let events = store.query(&query_req).unwrap();
2550
2551 let mut entity_latest: std::collections::HashMap<String, &Event> =
2552 std::collections::HashMap::new();
2553 for event in &events {
2554 entity_latest
2555 .entry(event.entity_id().to_string())
2556 .or_insert(event);
2557 }
2558
2559 let mut groups: std::collections::HashMap<String, Vec<String>> =
2560 std::collections::HashMap::new();
2561 for (entity_id, event) in &entity_latest {
2562 let key_str =
2563 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2564 .unwrap();
2565 groups.entry(key_str).or_default().push(entity_id.clone());
2566 }
2567
2568 let duplicate_groups: Vec<_> = groups
2569 .into_iter()
2570 .filter(|(_, ids)| ids.len() > 1)
2571 .collect();
2572
2573 assert_eq!(duplicate_groups.len(), 0); }
2575
2576 #[tokio::test]
2577 async fn test_detect_duplicates_multi_field_group_by() {
2578 let store = create_test_store();
2579
2580 store
2582 .ingest(&create_test_event_with_payload(
2583 "idx-1",
2584 "index.created",
2585 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2586 ))
2587 .unwrap();
2588 store
2589 .ingest(&create_test_event_with_payload(
2590 "idx-2",
2591 "index.created",
2592 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2593 ))
2594 .unwrap();
2595 store
2597 .ingest(&create_test_event_with_payload(
2598 "idx-3",
2599 "index.created",
2600 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2601 ))
2602 .unwrap();
2603
2604 let query_req = QueryEventsRequest {
2605 entity_id: None,
2606 event_type: None,
2607 tenant_id: None,
2608 as_of: None,
2609 since: None,
2610 until: None,
2611 limit: None,
2612 event_type_prefix: Some("index.".to_string()),
2613 payload_filter: None,
2614 };
2615 let events = store.query(&query_req).unwrap();
2616
2617 let group_by_fields = vec!["name", "user_id"];
2618 let mut entity_latest: std::collections::HashMap<String, &Event> =
2619 std::collections::HashMap::new();
2620 for event in &events {
2621 entity_latest
2622 .entry(event.entity_id().to_string())
2623 .and_modify(|existing| {
2624 if event.timestamp() > existing.timestamp() {
2625 *existing = event;
2626 }
2627 })
2628 .or_insert(event);
2629 }
2630
2631 let mut groups: std::collections::HashMap<String, Vec<String>> =
2632 std::collections::HashMap::new();
2633 for (entity_id, event) in &entity_latest {
2634 let payload = event.payload();
2635 let mut key_parts = serde_json::Map::new();
2636 for field in &group_by_fields {
2637 let value = payload
2638 .get(*field)
2639 .cloned()
2640 .unwrap_or(serde_json::Value::Null);
2641 key_parts.insert((*field).to_string(), value);
2642 }
2643 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2644 groups.entry(key_str).or_default().push(entity_id.clone());
2645 }
2646
2647 let duplicate_groups: Vec<_> = groups
2648 .into_iter()
2649 .filter(|(_, ids)| ids.len() > 1)
2650 .collect();
2651
2652 assert_eq!(duplicate_groups.len(), 1);
2654 let (_, ref ids) = duplicate_groups[0];
2655 assert_eq!(ids.len(), 2);
2656 let mut sorted_ids = ids.clone();
2657 sorted_ids.sort();
2658 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2659 }
2660
2661 #[tokio::test]
2662 async fn test_projection_state_cache() {
2663 let store = create_test_store();
2664
2665 let cache = store.projection_state_cache();
2667 cache.insert(
2668 "entity_snapshots:user-123".to_string(),
2669 serde_json::json!({"name": "Test User", "age": 30}),
2670 );
2671
2672 let state = cache.get("entity_snapshots:user-123");
2674 assert!(state.is_some());
2675 let state = state.unwrap();
2676 assert_eq!(state["name"], "Test User");
2677 assert_eq!(state["age"], 30);
2678 }
2679
2680 #[tokio::test]
2681 async fn test_projection_manager_list_projections() {
2682 let store = create_test_store();
2683
2684 let projection_manager = store.projection_manager();
2686 let projections = projection_manager.list_projections();
2687
2688 assert!(projections.len() >= 2);
2690
2691 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2692 assert!(names.contains(&"entity_snapshots"));
2693 assert!(names.contains(&"event_counters"));
2694 }
2695
2696 #[tokio::test]
2697 async fn test_projection_state_after_event_ingestion() {
2698 let store = create_test_store();
2699
2700 let event = create_test_event("user-456", "user.created");
2702 store.ingest(&event).unwrap();
2703
2704 let projection_manager = store.projection_manager();
2706 let snapshot_projection = projection_manager
2707 .get_projection("entity_snapshots")
2708 .unwrap();
2709
2710 let state = snapshot_projection.get_state("user-456");
2711 assert!(state.is_some());
2712 let state = state.unwrap();
2713 assert_eq!(state["name"], "Test");
2714 assert_eq!(state["value"], 42);
2715 }
2716
2717 #[tokio::test]
2718 async fn test_projection_state_cache_multiple_entities() {
2719 let store = create_test_store();
2720 let cache = store.projection_state_cache();
2721
2722 for i in 0..10 {
2724 cache.insert(
2725 format!("entity_snapshots:entity-{i}"),
2726 serde_json::json!({"id": i, "status": "active"}),
2727 );
2728 }
2729
2730 assert_eq!(cache.len(), 10);
2732
2733 for i in 0..10 {
2735 let key = format!("entity_snapshots:entity-{i}");
2736 let state = cache.get(&key);
2737 assert!(state.is_some());
2738 assert_eq!(state.unwrap()["id"], i);
2739 }
2740 }
2741
2742 #[tokio::test]
2743 async fn test_projection_state_update() {
2744 let store = create_test_store();
2745 let cache = store.projection_state_cache();
2746
2747 cache.insert(
2749 "entity_snapshots:user-789".to_string(),
2750 serde_json::json!({"balance": 100}),
2751 );
2752
2753 cache.insert(
2755 "entity_snapshots:user-789".to_string(),
2756 serde_json::json!({"balance": 150}),
2757 );
2758
2759 let state = cache.get("entity_snapshots:user-789").unwrap();
2761 assert_eq!(state["balance"], 150);
2762 }
2763
2764 #[tokio::test]
2765 async fn test_event_counter_projection() {
2766 let store = create_test_store();
2767
2768 store
2770 .ingest(&create_test_event("user-1", "user.created"))
2771 .unwrap();
2772 store
2773 .ingest(&create_test_event("user-2", "user.created"))
2774 .unwrap();
2775 store
2776 .ingest(&create_test_event("user-1", "user.updated"))
2777 .unwrap();
2778
2779 let projection_manager = store.projection_manager();
2781 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2782
2783 let created_state = counter_projection.get_state("user.created");
2785 assert!(created_state.is_some());
2786 assert_eq!(created_state.unwrap()["count"], 2);
2787
2788 let updated_state = counter_projection.get_state("user.updated");
2789 assert!(updated_state.is_some());
2790 assert_eq!(updated_state.unwrap()["count"], 1);
2791 }
2792
2793 #[tokio::test]
2794 async fn test_projection_state_cache_key_format() {
2795 let store = create_test_store();
2796 let cache = store.projection_state_cache();
2797
2798 let key = "orders:order-12345".to_string();
2800 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2801
2802 let state = cache.get(&key).unwrap();
2803 assert_eq!(state["total"], 99.99);
2804 }
2805
2806 #[tokio::test]
2807 async fn test_projection_state_cache_removal() {
2808 let store = create_test_store();
2809 let cache = store.projection_state_cache();
2810
2811 cache.insert(
2813 "test:entity-1".to_string(),
2814 serde_json::json!({"data": "value"}),
2815 );
2816 assert_eq!(cache.len(), 1);
2817
2818 cache.remove("test:entity-1");
2819 assert_eq!(cache.len(), 0);
2820 assert!(cache.get("test:entity-1").is_none());
2821 }
2822
2823 #[tokio::test]
2824 async fn test_get_nonexistent_projection() {
2825 let store = create_test_store();
2826 let projection_manager = store.projection_manager();
2827
2828 let projection = projection_manager.get_projection("nonexistent_projection");
2830 assert!(projection.is_none());
2831 }
2832
2833 #[tokio::test]
2834 async fn test_get_nonexistent_entity_state() {
2835 let store = create_test_store();
2836 let projection_manager = store.projection_manager();
2837
2838 let snapshot_projection = projection_manager
2840 .get_projection("entity_snapshots")
2841 .unwrap();
2842 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2843 assert!(state.is_none());
2844 }
2845
2846 #[tokio::test]
2847 async fn test_projection_state_cache_concurrent_access() {
2848 let store = create_test_store();
2849 let cache = store.projection_state_cache();
2850
2851 let handles: Vec<_> = (0..10)
2853 .map(|i| {
2854 let cache_clone = cache.clone();
2855 tokio::spawn(async move {
2856 cache_clone.insert(
2857 format!("concurrent:entity-{i}"),
2858 serde_json::json!({"thread": i}),
2859 );
2860 })
2861 })
2862 .collect();
2863
2864 for handle in handles {
2865 handle.await.unwrap();
2866 }
2867
2868 assert_eq!(cache.len(), 10);
2870 }
2871
2872 #[tokio::test]
2873 async fn test_projection_state_large_payload() {
2874 let store = create_test_store();
2875 let cache = store.projection_state_cache();
2876
2877 let large_array: Vec<serde_json::Value> = (0..1000)
2879 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2880 .collect();
2881
2882 cache.insert(
2883 "large:entity-1".to_string(),
2884 serde_json::json!({"items": large_array}),
2885 );
2886
2887 let state = cache.get("large:entity-1").unwrap();
2888 let items = state["items"].as_array().unwrap();
2889 assert_eq!(items.len(), 1000);
2890 }
2891
2892 #[tokio::test]
2893 async fn test_projection_state_complex_json() {
2894 let store = create_test_store();
2895 let cache = store.projection_state_cache();
2896
2897 let complex_state = serde_json::json!({
2899 "user": {
2900 "id": "user-123",
2901 "profile": {
2902 "name": "John Doe",
2903 "email": "john@example.com",
2904 "settings": {
2905 "theme": "dark",
2906 "notifications": true
2907 }
2908 },
2909 "roles": ["admin", "user"],
2910 "metadata": {
2911 "created_at": "2025-01-01T00:00:00Z",
2912 "last_login": null
2913 }
2914 }
2915 });
2916
2917 cache.insert("complex:user-123".to_string(), complex_state);
2918
2919 let state = cache.get("complex:user-123").unwrap();
2920 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2921 assert_eq!(state["user"]["roles"][0], "admin");
2922 assert!(state["user"]["metadata"]["last_login"].is_null());
2923 }
2924
2925 #[tokio::test]
2926 async fn test_projection_state_cache_iteration() {
2927 let store = create_test_store();
2928 let cache = store.projection_state_cache();
2929
2930 for i in 0..5 {
2932 cache.insert(format!("iter:entity-{i}"), serde_json::json!({"index": i}));
2933 }
2934
2935 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2937 assert_eq!(entries.len(), 5);
2938 }
2939
2940 #[tokio::test]
2941 async fn test_projection_manager_get_entity_snapshots() {
2942 let store = create_test_store();
2943 let projection_manager = store.projection_manager();
2944
2945 let projection = projection_manager.get_projection("entity_snapshots");
2947 assert!(projection.is_some());
2948 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2949 }
2950
2951 #[tokio::test]
2952 async fn test_projection_manager_get_event_counters() {
2953 let store = create_test_store();
2954 let projection_manager = store.projection_manager();
2955
2956 let projection = projection_manager.get_projection("event_counters");
2958 assert!(projection.is_some());
2959 assert_eq!(projection.unwrap().name(), "event_counters");
2960 }
2961
2962 #[tokio::test]
2963 async fn test_projection_state_cache_overwrite() {
2964 let store = create_test_store();
2965 let cache = store.projection_state_cache();
2966
2967 cache.insert(
2969 "overwrite:entity-1".to_string(),
2970 serde_json::json!({"version": 1}),
2971 );
2972
2973 cache.insert(
2975 "overwrite:entity-1".to_string(),
2976 serde_json::json!({"version": 2}),
2977 );
2978
2979 cache.insert(
2981 "overwrite:entity-1".to_string(),
2982 serde_json::json!({"version": 3}),
2983 );
2984
2985 let state = cache.get("overwrite:entity-1").unwrap();
2986 assert_eq!(state["version"], 3);
2987
2988 assert_eq!(cache.len(), 1);
2990 }
2991
2992 #[tokio::test]
2993 async fn test_projection_state_multiple_projections() {
2994 let store = create_test_store();
2995 let cache = store.projection_state_cache();
2996
2997 cache.insert(
2999 "entity_snapshots:user-1".to_string(),
3000 serde_json::json!({"name": "Alice"}),
3001 );
3002 cache.insert(
3003 "event_counters:user.created".to_string(),
3004 serde_json::json!({"count": 5}),
3005 );
3006 cache.insert(
3007 "custom_projection:order-1".to_string(),
3008 serde_json::json!({"total": 150.0}),
3009 );
3010
3011 assert_eq!(
3013 cache.get("entity_snapshots:user-1").unwrap()["name"],
3014 "Alice"
3015 );
3016 assert_eq!(
3017 cache.get("event_counters:user.created").unwrap()["count"],
3018 5
3019 );
3020 assert_eq!(
3021 cache.get("custom_projection:order-1").unwrap()["total"],
3022 150.0
3023 );
3024 }
3025
3026 #[tokio::test]
3027 async fn test_bulk_projection_state_access() {
3028 let store = create_test_store();
3029
3030 for i in 0..5 {
3032 let event = create_test_event(&format!("bulk-user-{i}"), "user.created");
3033 store.ingest(&event).unwrap();
3034 }
3035
3036 let projection_manager = store.projection_manager();
3038 let snapshot_projection = projection_manager
3039 .get_projection("entity_snapshots")
3040 .unwrap();
3041
3042 for i in 0..5 {
3044 let state = snapshot_projection.get_state(&format!("bulk-user-{i}"));
3045 assert!(state.is_some(), "Entity bulk-user-{i} should have state");
3046 }
3047 }
3048
3049 #[tokio::test]
3050 async fn test_bulk_save_projection_states() {
3051 let store = create_test_store();
3052 let cache = store.projection_state_cache();
3053
3054 let states = vec![
3056 BulkSaveStateItem {
3057 entity_id: "bulk-entity-1".to_string(),
3058 state: serde_json::json!({"name": "Entity 1", "value": 100}),
3059 },
3060 BulkSaveStateItem {
3061 entity_id: "bulk-entity-2".to_string(),
3062 state: serde_json::json!({"name": "Entity 2", "value": 200}),
3063 },
3064 BulkSaveStateItem {
3065 entity_id: "bulk-entity-3".to_string(),
3066 state: serde_json::json!({"name": "Entity 3", "value": 300}),
3067 },
3068 ];
3069
3070 let projection_name = "test_projection";
3071
3072 for item in &states {
3074 cache.insert(
3075 format!("{projection_name}:{}", item.entity_id),
3076 item.state.clone(),
3077 );
3078 }
3079
3080 assert_eq!(cache.len(), 3);
3082
3083 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
3084 assert_eq!(state1["name"], "Entity 1");
3085 assert_eq!(state1["value"], 100);
3086
3087 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
3088 assert_eq!(state2["name"], "Entity 2");
3089 assert_eq!(state2["value"], 200);
3090
3091 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
3092 assert_eq!(state3["name"], "Entity 3");
3093 assert_eq!(state3["value"], 300);
3094 }
3095
3096 #[tokio::test]
3097 async fn test_bulk_save_empty_states() {
3098 let store = create_test_store();
3099 let cache = store.projection_state_cache();
3100
3101 cache.clear();
3103
3104 let states: Vec<BulkSaveStateItem> = vec![];
3106 assert_eq!(states.len(), 0);
3107
3108 assert_eq!(cache.len(), 0);
3110 }
3111
3112 #[tokio::test]
3113 async fn test_bulk_save_overwrites_existing() {
3114 let store = create_test_store();
3115 let cache = store.projection_state_cache();
3116
3117 cache.insert(
3119 "test:entity-1".to_string(),
3120 serde_json::json!({"version": 1, "data": "initial"}),
3121 );
3122
3123 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3125 cache.insert("test:entity-1".to_string(), new_state);
3126
3127 let state = cache.get("test:entity-1").unwrap();
3129 assert_eq!(state["version"], 2);
3130 assert_eq!(state["data"], "updated");
3131 }
3132
3133 #[tokio::test]
3134 async fn test_bulk_save_high_volume() {
3135 let store = create_test_store();
3136 let cache = store.projection_state_cache();
3137
3138 for i in 0..1000 {
3140 cache.insert(
3141 format!("volume_test:entity-{i}"),
3142 serde_json::json!({"index": i, "status": "active"}),
3143 );
3144 }
3145
3146 assert_eq!(cache.len(), 1000);
3148
3149 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3151 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3152 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3153 }
3154
3155 #[tokio::test]
3156 async fn test_bulk_save_different_projections() {
3157 let store = create_test_store();
3158 let cache = store.projection_state_cache();
3159
3160 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3162
3163 for proj in &projections {
3164 for i in 0..5 {
3165 cache.insert(
3166 format!("{proj}:entity-{i}"),
3167 serde_json::json!({"projection": proj, "id": i}),
3168 );
3169 }
3170 }
3171
3172 assert_eq!(cache.len(), 15);
3174
3175 for proj in &projections {
3177 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3178 assert_eq!(state["projection"], *proj);
3179 }
3180 }
3181}