1use crate::{
2 application::{
3 dto::{
4 DetectDuplicatesRequest, DetectDuplicatesResponse, DuplicateGroup, EntitySummary,
5 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
6 IngestEventsBatchResponse, ListEntitiesRequest, ListEntitiesResponse,
7 QueryEventsRequest, QueryEventsResponse,
8 },
9 services::{
10 analytics::{
11 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
12 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
13 },
14 pipeline::{PipelineConfig, PipelineStats},
15 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
16 schema::{
17 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
18 ValidateEventRequest, ValidateEventResponse,
19 },
20 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
21 },
22 },
23 domain::entities::Event,
24 error::Result,
25 infrastructure::{
26 persistence::{
27 compaction::CompactionResult,
28 snapshot::{
29 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
30 ListSnapshotsResponse, SnapshotInfo,
31 },
32 },
33 query::{
34 eventql::EventQLRequest,
35 geospatial::GeoQueryRequest,
36 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
37 },
38 replication::ReplicationMode,
39 web::api_v1::AppState,
40 },
41 store::{EventStore, EventTypeInfo, StreamInfo},
42};
43use axum::{
44 Json, Router,
45 extract::{Path, Query, State, WebSocketUpgrade},
46 response::{IntoResponse, Response},
47 routing::{get, post, put},
48};
49use serde::Deserialize;
50use std::sync::Arc;
51use tower_http::{
52 cors::{Any, CorsLayer},
53 trace::TraceLayer,
54};
55
56type SharedStore = Arc<EventStore>;
57
58async fn await_replication_ack(state: &AppState) {
64 let shipper_guard = state.wal_shipper.read().await;
65 if let Some(ref shipper) = *shipper_guard {
66 let mode = shipper.replication_mode();
67 if mode == ReplicationMode::Async {
68 return;
69 }
70
71 let target_offset = shipper.current_leader_offset();
72 if target_offset == 0 {
73 return;
74 }
75
76 let shipper = Arc::clone(shipper);
77 drop(shipper_guard);
79
80 let timer = state
81 .store
82 .metrics()
83 .replication_ack_wait_seconds
84 .start_timer();
85 let acked = shipper.wait_for_ack(target_offset).await;
86 timer.observe_duration();
87
88 if !acked {
89 tracing::warn!(
90 "Replication ACK timeout in {} mode (offset {}). \
91 Write succeeded locally but follower confirmation pending.",
92 mode,
93 target_offset,
94 );
95 }
96 }
97}
98
99pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
100 let app = Router::new()
101 .route("/health", get(health))
102 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
104 .route("/api/v1/events/batch", post(ingest_events_batch))
105 .route("/api/v1/events/query", get(query_events))
106 .route("/api/v1/events/{event_id}", get(get_event_by_id))
107 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
110 .route("/api/v1/event-types", get(list_event_types))
111 .route("/api/v1/entities/duplicates", get(detect_duplicates))
112 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
113 .route(
114 "/api/v1/entities/{entity_id}/snapshot",
115 get(get_entity_snapshot),
116 )
117 .route("/api/v1/stats", get(get_stats))
118 .route("/api/v1/analytics/frequency", get(analytics_frequency))
120 .route("/api/v1/analytics/summary", get(analytics_summary))
121 .route("/api/v1/analytics/correlation", get(analytics_correlation))
122 .route("/api/v1/snapshots", post(create_snapshot))
124 .route("/api/v1/snapshots", get(list_snapshots))
125 .route(
126 "/api/v1/snapshots/{entity_id}/latest",
127 get(get_latest_snapshot),
128 )
129 .route("/api/v1/compaction/trigger", post(trigger_compaction))
131 .route("/api/v1/compaction/stats", get(compaction_stats))
132 .route("/api/v1/schemas", post(register_schema))
134 .route("/api/v1/schemas", get(list_subjects))
135 .route("/api/v1/schemas/{subject}", get(get_schema))
136 .route(
137 "/api/v1/schemas/{subject}/versions",
138 get(list_schema_versions),
139 )
140 .route("/api/v1/schemas/validate", post(validate_event_schema))
141 .route(
142 "/api/v1/schemas/{subject}/compatibility",
143 put(set_compatibility_mode),
144 )
145 .route("/api/v1/replay", post(start_replay))
147 .route("/api/v1/replay", get(list_replays))
148 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
149 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
150 .route(
151 "/api/v1/replay/{replay_id}",
152 axum::routing::delete(delete_replay),
153 )
154 .route("/api/v1/pipelines", post(register_pipeline))
156 .route("/api/v1/pipelines", get(list_pipelines))
157 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
158 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
159 .route(
160 "/api/v1/pipelines/{pipeline_id}",
161 axum::routing::delete(remove_pipeline),
162 )
163 .route(
164 "/api/v1/pipelines/{pipeline_id}/stats",
165 get(get_pipeline_stats),
166 )
167 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
168 .route("/api/v1/projections", get(list_projections))
170 .route("/api/v1/projections/{name}", get(get_projection))
171 .route(
172 "/api/v1/projections/{name}",
173 axum::routing::delete(delete_projection),
174 )
175 .route(
176 "/api/v1/projections/{name}/state",
177 get(get_projection_state_summary),
178 )
179 .route("/api/v1/projections/{name}/reset", post(reset_projection))
180 .route(
181 "/api/v1/projections/{name}/{entity_id}/state",
182 get(get_projection_state),
183 )
184 .route(
185 "/api/v1/projections/{name}/{entity_id}/state",
186 post(save_projection_state),
187 )
188 .route(
189 "/api/v1/projections/{name}/{entity_id}/state",
190 put(save_projection_state),
191 )
192 .route(
193 "/api/v1/projections/{name}/bulk",
194 post(bulk_get_projection_states),
195 )
196 .route(
197 "/api/v1/projections/{name}/bulk/save",
198 post(bulk_save_projection_states),
199 )
200 .route("/api/v1/webhooks", post(register_webhook))
202 .route("/api/v1/webhooks", get(list_webhooks))
203 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
204 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
205 .route(
206 "/api/v1/webhooks/{webhook_id}",
207 axum::routing::delete(delete_webhook),
208 )
209 .route(
210 "/api/v1/webhooks/{webhook_id}/deliveries",
211 get(list_webhook_deliveries),
212 )
213 .route("/api/v1/eventql", post(eventql_query))
215 .route("/api/v1/graphql", post(graphql_query))
216 .route("/api/v1/geospatial/query", post(geo_query))
217 .route("/api/v1/geospatial/stats", get(geo_stats))
218 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
219 .route(
220 "/api/v1/schema-evolution/history/{event_type}",
221 get(schema_evolution_history),
222 )
223 .route(
224 "/api/v1/schema-evolution/schema/{event_type}",
225 get(schema_evolution_schema),
226 )
227 .route(
228 "/api/v1/schema-evolution/stats",
229 get(schema_evolution_stats),
230 )
231 .layer(
232 CorsLayer::new()
233 .allow_origin(Any)
234 .allow_methods(Any)
235 .allow_headers(Any),
236 )
237 .layer(TraceLayer::new_for_http())
238 .with_state(store);
239
240 let listener = tokio::net::TcpListener::bind(addr).await?;
241 axum::serve(listener, app).await?;
242
243 Ok(())
244}
245
246pub async fn health() -> impl IntoResponse {
247 Json(serde_json::json!({
248 "status": "healthy",
249 "service": "allsource-core",
250 "version": env!("CARGO_PKG_VERSION")
251 }))
252}
253
254pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
256 let metrics = store.metrics();
257
258 match metrics.encode() {
259 Ok(encoded) => Response::builder()
260 .status(200)
261 .header("Content-Type", "text/plain; version=0.0.4")
262 .body(encoded)
263 .unwrap()
264 .into_response(),
265 Err(e) => Response::builder()
266 .status(500)
267 .body(format!("Error encoding metrics: {e}"))
268 .unwrap()
269 .into_response(),
270 }
271}
272
273pub async fn ingest_event(
274 State(store): State<SharedStore>,
275 Json(req): Json<IngestEventRequest>,
276) -> Result<Json<IngestEventResponse>> {
277 let event = Event::from_strings(
279 req.event_type,
280 req.entity_id,
281 "default".to_string(),
282 req.payload,
283 req.metadata,
284 )?;
285
286 let event_id = event.id;
287 let timestamp = event.timestamp;
288
289 store.ingest(event)?;
290
291 tracing::info!("Event ingested: {}", event_id);
292
293 Ok(Json(IngestEventResponse {
294 event_id,
295 timestamp,
296 }))
297}
298
299pub async fn ingest_event_v1(
303 State(state): State<AppState>,
304 Json(req): Json<IngestEventRequest>,
305) -> Result<Json<IngestEventResponse>> {
306 let event = Event::from_strings(
307 req.event_type,
308 req.entity_id,
309 "default".to_string(),
310 req.payload,
311 req.metadata,
312 )?;
313
314 let event_id = event.id;
315 let timestamp = event.timestamp;
316
317 state.store.ingest(event)?;
318
319 await_replication_ack(&state).await;
321
322 tracing::info!("Event ingested: {}", event_id);
323
324 Ok(Json(IngestEventResponse {
325 event_id,
326 timestamp,
327 }))
328}
329
330pub async fn ingest_events_batch(
335 State(store): State<SharedStore>,
336 Json(req): Json<IngestEventsBatchRequest>,
337) -> Result<Json<IngestEventsBatchResponse>> {
338 let total = req.events.len();
339 let mut ingested_events = Vec::with_capacity(total);
340
341 for event_req in req.events {
342 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
343
344 let event = Event::from_strings(
345 event_req.event_type,
346 event_req.entity_id,
347 tenant_id,
348 event_req.payload,
349 event_req.metadata,
350 )?;
351
352 let event_id = event.id;
353 let timestamp = event.timestamp;
354
355 store.ingest(event)?;
356
357 ingested_events.push(IngestEventResponse {
358 event_id,
359 timestamp,
360 });
361 }
362
363 let ingested = ingested_events.len();
364 tracing::info!("Batch ingested {} events", ingested);
365
366 Ok(Json(IngestEventsBatchResponse {
367 total,
368 ingested,
369 events: ingested_events,
370 }))
371}
372
373pub async fn ingest_events_batch_v1(
377 State(state): State<AppState>,
378 Json(req): Json<IngestEventsBatchRequest>,
379) -> Result<Json<IngestEventsBatchResponse>> {
380 let total = req.events.len();
381 let mut ingested_events = Vec::with_capacity(total);
382
383 for event_req in req.events {
384 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
385
386 let event = Event::from_strings(
387 event_req.event_type,
388 event_req.entity_id,
389 tenant_id,
390 event_req.payload,
391 event_req.metadata,
392 )?;
393
394 let event_id = event.id;
395 let timestamp = event.timestamp;
396
397 state.store.ingest(event)?;
398
399 ingested_events.push(IngestEventResponse {
400 event_id,
401 timestamp,
402 });
403 }
404
405 await_replication_ack(&state).await;
407
408 let ingested = ingested_events.len();
409 tracing::info!("Batch ingested {} events", ingested);
410
411 Ok(Json(IngestEventsBatchResponse {
412 total,
413 ingested,
414 events: ingested_events,
415 }))
416}
417
418pub async fn query_events(
419 State(store): State<SharedStore>,
420 Query(req): Query<QueryEventsRequest>,
421) -> Result<Json<QueryEventsResponse>> {
422 let requested_limit = req.limit;
423
424 let unlimited_req = QueryEventsRequest {
426 entity_id: req.entity_id,
427 event_type: req.event_type,
428 tenant_id: req.tenant_id,
429 as_of: req.as_of,
430 since: req.since,
431 until: req.until,
432 limit: None,
433 event_type_prefix: req.event_type_prefix,
434 payload_filter: req.payload_filter,
435 };
436 let all_events = store.query(unlimited_req)?;
437 let total_count = all_events.len();
438
439 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
441 all_events.into_iter().take(limit).collect()
442 } else {
443 all_events
444 };
445
446 let count = limited_events.len();
447 let has_more = count < total_count;
448 let events: Vec<EventDto> = limited_events.iter().map(EventDto::from).collect();
449
450 tracing::debug!("Query returned {} events (total: {})", count, total_count);
451
452 Ok(Json(QueryEventsResponse {
453 events,
454 count,
455 total_count,
456 has_more,
457 }))
458}
459
460pub async fn list_entities(
461 State(store): State<SharedStore>,
462 Query(req): Query<ListEntitiesRequest>,
463) -> Result<Json<ListEntitiesResponse>> {
464 use std::collections::HashMap;
465
466 let query_req = QueryEventsRequest {
468 entity_id: None,
469 event_type: None,
470 tenant_id: None,
471 as_of: None,
472 since: None,
473 until: None,
474 limit: None,
475 event_type_prefix: req.event_type_prefix,
476 payload_filter: req.payload_filter,
477 };
478 let events = store.query(query_req)?;
479
480 let mut entity_map: HashMap<String, Vec<&Event>> = HashMap::new();
482 for event in &events {
483 entity_map
484 .entry(event.entity_id().to_string())
485 .or_default()
486 .push(event);
487 }
488
489 let mut summaries: Vec<EntitySummary> = entity_map
491 .into_iter()
492 .map(|(entity_id, events)| {
493 let last = events.iter().max_by_key(|e| e.timestamp()).unwrap();
494 EntitySummary {
495 entity_id,
496 event_count: events.len(),
497 last_event_type: last.event_type_str().to_string(),
498 last_event_at: last.timestamp(),
499 }
500 })
501 .collect();
502 summaries.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
503
504 let total = summaries.len();
505
506 let offset = req.offset.unwrap_or(0);
508 let summaries: Vec<EntitySummary> = summaries.into_iter().skip(offset).collect::<Vec<_>>();
509 let summaries = if let Some(limit) = req.limit {
510 let has_more = summaries.len() > limit;
511 let truncated: Vec<EntitySummary> = summaries.into_iter().take(limit).collect();
512 return Ok(Json(ListEntitiesResponse {
513 entities: truncated,
514 total,
515 has_more,
516 }));
517 } else {
518 summaries
519 };
520
521 Ok(Json(ListEntitiesResponse {
522 entities: summaries,
523 total,
524 has_more: false,
525 }))
526}
527
528pub async fn detect_duplicates(
529 State(store): State<SharedStore>,
530 Query(req): Query<DetectDuplicatesRequest>,
531) -> Result<Json<DetectDuplicatesResponse>> {
532 use std::collections::HashMap;
533
534 let group_by_fields: Vec<&str> = req.group_by.split(',').map(|s| s.trim()).collect();
535
536 let query_req = QueryEventsRequest {
538 entity_id: None,
539 event_type: None,
540 tenant_id: None,
541 as_of: None,
542 since: None,
543 until: None,
544 limit: None,
545 event_type_prefix: Some(req.event_type_prefix),
546 payload_filter: None,
547 };
548 let events = store.query(query_req)?;
549
550 let mut entity_latest: HashMap<String, &Event> = HashMap::new();
553 for event in &events {
554 let eid = event.entity_id().to_string();
555 entity_latest
556 .entry(eid)
557 .and_modify(|existing| {
558 if event.timestamp() > existing.timestamp() {
559 *existing = event;
560 }
561 })
562 .or_insert(event);
563 }
564
565 let mut groups: HashMap<String, Vec<String>> = HashMap::new();
567 for (entity_id, event) in &entity_latest {
568 let payload = event.payload();
569 let mut key_parts = serde_json::Map::new();
570 for field in &group_by_fields {
571 let value = payload
572 .get(*field)
573 .cloned()
574 .unwrap_or(serde_json::Value::Null);
575 key_parts.insert(field.to_string(), value);
576 }
577 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
578 groups.entry(key_str).or_default().push(entity_id.clone());
579 }
580
581 let mut duplicate_groups: Vec<DuplicateGroup> = groups
583 .into_iter()
584 .filter(|(_, ids)| ids.len() > 1)
585 .map(|(key_str, mut ids)| {
586 ids.sort();
587 let key: serde_json::Value =
588 serde_json::from_str(&key_str).unwrap_or(serde_json::Value::Null);
589 let count = ids.len();
590 DuplicateGroup {
591 key,
592 entity_ids: ids,
593 count,
594 }
595 })
596 .collect();
597
598 duplicate_groups.sort_by(|a, b| b.count.cmp(&a.count));
600
601 let total = duplicate_groups.len();
602
603 let offset = req.offset.unwrap_or(0);
605 let duplicate_groups: Vec<DuplicateGroup> = duplicate_groups.into_iter().skip(offset).collect();
606
607 if let Some(limit) = req.limit {
608 let has_more = duplicate_groups.len() > limit;
609 let truncated: Vec<DuplicateGroup> = duplicate_groups.into_iter().take(limit).collect();
610 return Ok(Json(DetectDuplicatesResponse {
611 duplicates: truncated,
612 total,
613 has_more,
614 }));
615 }
616
617 Ok(Json(DetectDuplicatesResponse {
618 duplicates: duplicate_groups,
619 total,
620 has_more: false,
621 }))
622}
623
624#[derive(Deserialize)]
625pub struct EntityStateParams {
626 as_of: Option<chrono::DateTime<chrono::Utc>>,
627}
628
629pub async fn get_entity_state(
630 State(store): State<SharedStore>,
631 Path(entity_id): Path<String>,
632 Query(params): Query<EntityStateParams>,
633) -> Result<Json<serde_json::Value>> {
634 let state = store.reconstruct_state(&entity_id, params.as_of)?;
635
636 tracing::info!("State reconstructed for entity: {}", entity_id);
637
638 Ok(Json(state))
639}
640
641pub async fn get_entity_snapshot(
642 State(store): State<SharedStore>,
643 Path(entity_id): Path<String>,
644) -> Result<Json<serde_json::Value>> {
645 let snapshot = store.get_snapshot(&entity_id)?;
646
647 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
648
649 Ok(Json(snapshot))
650}
651
652pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
653 let stats = store.stats();
654 Json(stats)
655}
656
657#[derive(Debug, Deserialize)]
660pub struct ListStreamsParams {
661 pub limit: Option<usize>,
663 pub offset: Option<usize>,
665}
666
667#[derive(Debug, serde::Serialize)]
669pub struct ListStreamsResponse {
670 pub streams: Vec<StreamInfo>,
671 pub total: usize,
672}
673
674pub async fn list_streams(
675 State(store): State<SharedStore>,
676 Query(params): Query<ListStreamsParams>,
677) -> Json<ListStreamsResponse> {
678 let mut streams = store.list_streams();
679 let total = streams.len();
680
681 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
683
684 if let Some(offset) = params.offset {
686 if offset < streams.len() {
687 streams = streams[offset..].to_vec();
688 } else {
689 streams = vec![];
690 }
691 }
692
693 if let Some(limit) = params.limit {
694 streams.truncate(limit);
695 }
696
697 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
698
699 Json(ListStreamsResponse { streams, total })
700}
701
702#[derive(Debug, Deserialize)]
705pub struct ListEventTypesParams {
706 pub limit: Option<usize>,
708 pub offset: Option<usize>,
710}
711
712#[derive(Debug, serde::Serialize)]
714pub struct ListEventTypesResponse {
715 pub event_types: Vec<EventTypeInfo>,
716 pub total: usize,
717}
718
719pub async fn list_event_types(
720 State(store): State<SharedStore>,
721 Query(params): Query<ListEventTypesParams>,
722) -> Json<ListEventTypesResponse> {
723 let mut event_types = store.list_event_types();
724 let total = event_types.len();
725
726 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
728
729 if let Some(offset) = params.offset {
731 if offset < event_types.len() {
732 event_types = event_types[offset..].to_vec();
733 } else {
734 event_types = vec![];
735 }
736 }
737
738 if let Some(limit) = params.limit {
739 event_types.truncate(limit);
740 }
741
742 tracing::debug!(
743 "Listed {} event types (total: {})",
744 event_types.len(),
745 total
746 );
747
748 Json(ListEventTypesResponse { event_types, total })
749}
750
751pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
753 let websocket_manager = store.websocket_manager();
754
755 ws.on_upgrade(move |socket| async move {
756 websocket_manager.handle_socket(socket).await;
757 })
758}
759
760pub async fn analytics_frequency(
762 State(store): State<SharedStore>,
763 Query(req): Query<EventFrequencyRequest>,
764) -> Result<Json<EventFrequencyResponse>> {
765 let response = AnalyticsEngine::event_frequency(&store, req)?;
766
767 tracing::debug!(
768 "Frequency analysis returned {} buckets",
769 response.buckets.len()
770 );
771
772 Ok(Json(response))
773}
774
775pub async fn analytics_summary(
777 State(store): State<SharedStore>,
778 Query(req): Query<StatsSummaryRequest>,
779) -> Result<Json<StatsSummaryResponse>> {
780 let response = AnalyticsEngine::stats_summary(&store, req)?;
781
782 tracing::debug!(
783 "Stats summary: {} events across {} entities",
784 response.total_events,
785 response.unique_entities
786 );
787
788 Ok(Json(response))
789}
790
791pub async fn analytics_correlation(
793 State(store): State<SharedStore>,
794 Query(req): Query<CorrelationRequest>,
795) -> Result<Json<CorrelationResponse>> {
796 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
797
798 tracing::debug!(
799 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
800 response.correlated_pairs,
801 response.total_a,
802 response.correlation_percentage
803 );
804
805 Ok(Json(response))
806}
807
808pub async fn create_snapshot(
810 State(store): State<SharedStore>,
811 Json(req): Json<CreateSnapshotRequest>,
812) -> Result<Json<CreateSnapshotResponse>> {
813 store.create_snapshot(&req.entity_id)?;
814
815 let snapshot_manager = store.snapshot_manager();
816 let snapshot = snapshot_manager
817 .get_latest_snapshot(&req.entity_id)
818 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
819
820 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
821
822 Ok(Json(CreateSnapshotResponse {
823 snapshot_id: snapshot.id,
824 entity_id: snapshot.entity_id,
825 created_at: snapshot.created_at,
826 event_count: snapshot.event_count,
827 size_bytes: snapshot.metadata.size_bytes,
828 }))
829}
830
831pub async fn list_snapshots(
833 State(store): State<SharedStore>,
834 Query(req): Query<ListSnapshotsRequest>,
835) -> Result<Json<ListSnapshotsResponse>> {
836 let snapshot_manager = store.snapshot_manager();
837
838 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
839 snapshot_manager
840 .get_all_snapshots(&entity_id)
841 .into_iter()
842 .map(SnapshotInfo::from)
843 .collect()
844 } else {
845 let entities = snapshot_manager.list_entities();
847 entities
848 .iter()
849 .flat_map(|entity_id| {
850 snapshot_manager
851 .get_all_snapshots(entity_id)
852 .into_iter()
853 .map(SnapshotInfo::from)
854 })
855 .collect()
856 };
857
858 let total = snapshots.len();
859
860 tracing::debug!("Listed {} snapshots", total);
861
862 Ok(Json(ListSnapshotsResponse { snapshots, total }))
863}
864
865pub async fn get_latest_snapshot(
867 State(store): State<SharedStore>,
868 Path(entity_id): Path<String>,
869) -> Result<Json<serde_json::Value>> {
870 let snapshot_manager = store.snapshot_manager();
871
872 let snapshot = snapshot_manager
873 .get_latest_snapshot(&entity_id)
874 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
875
876 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
877
878 Ok(Json(serde_json::json!({
879 "snapshot_id": snapshot.id,
880 "entity_id": snapshot.entity_id,
881 "created_at": snapshot.created_at,
882 "as_of": snapshot.as_of,
883 "event_count": snapshot.event_count,
884 "size_bytes": snapshot.metadata.size_bytes,
885 "snapshot_type": snapshot.metadata.snapshot_type,
886 "state": snapshot.state
887 })))
888}
889
890pub async fn trigger_compaction(
892 State(store): State<SharedStore>,
893) -> Result<Json<CompactionResult>> {
894 let compaction_manager = store.compaction_manager().ok_or_else(|| {
895 crate::error::AllSourceError::InternalError(
896 "Compaction not enabled (no Parquet storage)".to_string(),
897 )
898 })?;
899
900 tracing::info!("📦 Manual compaction triggered via API");
901
902 let result = compaction_manager.compact_now()?;
903
904 Ok(Json(result))
905}
906
907pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
909 let compaction_manager = store.compaction_manager().ok_or_else(|| {
910 crate::error::AllSourceError::InternalError(
911 "Compaction not enabled (no Parquet storage)".to_string(),
912 )
913 })?;
914
915 let stats = compaction_manager.stats();
916 let config = compaction_manager.config();
917
918 Ok(Json(serde_json::json!({
919 "stats": stats,
920 "config": {
921 "min_files_to_compact": config.min_files_to_compact,
922 "target_file_size": config.target_file_size,
923 "max_file_size": config.max_file_size,
924 "small_file_threshold": config.small_file_threshold,
925 "compaction_interval_seconds": config.compaction_interval_seconds,
926 "auto_compact": config.auto_compact,
927 "strategy": config.strategy
928 }
929 })))
930}
931
932pub async fn register_schema(
934 State(store): State<SharedStore>,
935 Json(req): Json<RegisterSchemaRequest>,
936) -> Result<Json<RegisterSchemaResponse>> {
937 let schema_registry = store.schema_registry();
938
939 let response =
940 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
941
942 tracing::info!(
943 "📋 Schema registered: v{} for '{}'",
944 response.version,
945 response.subject
946 );
947
948 Ok(Json(response))
949}
950
951#[derive(Deserialize)]
953pub struct GetSchemaParams {
954 version: Option<u32>,
955}
956
957pub async fn get_schema(
958 State(store): State<SharedStore>,
959 Path(subject): Path<String>,
960 Query(params): Query<GetSchemaParams>,
961) -> Result<Json<serde_json::Value>> {
962 let schema_registry = store.schema_registry();
963
964 let schema = schema_registry.get_schema(&subject, params.version)?;
965
966 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
967
968 Ok(Json(serde_json::json!({
969 "id": schema.id,
970 "subject": schema.subject,
971 "version": schema.version,
972 "schema": schema.schema,
973 "created_at": schema.created_at,
974 "description": schema.description,
975 "tags": schema.tags
976 })))
977}
978
979pub async fn list_schema_versions(
981 State(store): State<SharedStore>,
982 Path(subject): Path<String>,
983) -> Result<Json<serde_json::Value>> {
984 let schema_registry = store.schema_registry();
985
986 let versions = schema_registry.list_versions(&subject)?;
987
988 Ok(Json(serde_json::json!({
989 "subject": subject,
990 "versions": versions
991 })))
992}
993
994pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
996 let schema_registry = store.schema_registry();
997
998 let subjects = schema_registry.list_subjects();
999
1000 Json(serde_json::json!({
1001 "subjects": subjects,
1002 "total": subjects.len()
1003 }))
1004}
1005
1006pub async fn validate_event_schema(
1008 State(store): State<SharedStore>,
1009 Json(req): Json<ValidateEventRequest>,
1010) -> Result<Json<ValidateEventResponse>> {
1011 let schema_registry = store.schema_registry();
1012
1013 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
1014
1015 if response.valid {
1016 tracing::debug!(
1017 "✅ Event validated against schema '{}' v{}",
1018 req.subject,
1019 response.schema_version
1020 );
1021 } else {
1022 tracing::warn!(
1023 "❌ Event validation failed for '{}': {:?}",
1024 req.subject,
1025 response.errors
1026 );
1027 }
1028
1029 Ok(Json(response))
1030}
1031
1032#[derive(Deserialize)]
1034pub struct SetCompatibilityRequest {
1035 compatibility: CompatibilityMode,
1036}
1037
1038pub async fn set_compatibility_mode(
1039 State(store): State<SharedStore>,
1040 Path(subject): Path<String>,
1041 Json(req): Json<SetCompatibilityRequest>,
1042) -> Json<serde_json::Value> {
1043 let schema_registry = store.schema_registry();
1044
1045 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
1046
1047 tracing::info!(
1048 "🔧 Set compatibility mode for '{}' to {:?}",
1049 subject,
1050 req.compatibility
1051 );
1052
1053 Json(serde_json::json!({
1054 "subject": subject,
1055 "compatibility": req.compatibility
1056 }))
1057}
1058
1059pub async fn start_replay(
1061 State(store): State<SharedStore>,
1062 Json(req): Json<StartReplayRequest>,
1063) -> Result<Json<StartReplayResponse>> {
1064 let replay_manager = store.replay_manager();
1065
1066 let response = replay_manager.start_replay(store, req)?;
1067
1068 tracing::info!(
1069 "🔄 Started replay {} with {} events",
1070 response.replay_id,
1071 response.total_events
1072 );
1073
1074 Ok(Json(response))
1075}
1076
1077pub async fn get_replay_progress(
1079 State(store): State<SharedStore>,
1080 Path(replay_id): Path<uuid::Uuid>,
1081) -> Result<Json<ReplayProgress>> {
1082 let replay_manager = store.replay_manager();
1083
1084 let progress = replay_manager.get_progress(replay_id)?;
1085
1086 Ok(Json(progress))
1087}
1088
1089pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1091 let replay_manager = store.replay_manager();
1092
1093 let replays = replay_manager.list_replays();
1094
1095 Json(serde_json::json!({
1096 "replays": replays,
1097 "total": replays.len()
1098 }))
1099}
1100
1101pub async fn cancel_replay(
1103 State(store): State<SharedStore>,
1104 Path(replay_id): Path<uuid::Uuid>,
1105) -> Result<Json<serde_json::Value>> {
1106 let replay_manager = store.replay_manager();
1107
1108 replay_manager.cancel_replay(replay_id)?;
1109
1110 tracing::info!("🛑 Cancelled replay {}", replay_id);
1111
1112 Ok(Json(serde_json::json!({
1113 "replay_id": replay_id,
1114 "status": "cancelled"
1115 })))
1116}
1117
1118pub async fn delete_replay(
1120 State(store): State<SharedStore>,
1121 Path(replay_id): Path<uuid::Uuid>,
1122) -> Result<Json<serde_json::Value>> {
1123 let replay_manager = store.replay_manager();
1124
1125 let deleted = replay_manager.delete_replay(replay_id)?;
1126
1127 if deleted {
1128 tracing::info!("🗑️ Deleted replay {}", replay_id);
1129 }
1130
1131 Ok(Json(serde_json::json!({
1132 "replay_id": replay_id,
1133 "deleted": deleted
1134 })))
1135}
1136
1137pub async fn register_pipeline(
1139 State(store): State<SharedStore>,
1140 Json(config): Json<PipelineConfig>,
1141) -> Result<Json<serde_json::Value>> {
1142 let pipeline_manager = store.pipeline_manager();
1143
1144 let pipeline_id = pipeline_manager.register(config.clone());
1145
1146 tracing::info!(
1147 "🔀 Pipeline registered: {} (name: {})",
1148 pipeline_id,
1149 config.name
1150 );
1151
1152 Ok(Json(serde_json::json!({
1153 "pipeline_id": pipeline_id,
1154 "name": config.name,
1155 "enabled": config.enabled
1156 })))
1157}
1158
1159pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1161 let pipeline_manager = store.pipeline_manager();
1162
1163 let pipelines = pipeline_manager.list();
1164
1165 tracing::debug!("Listed {} pipelines", pipelines.len());
1166
1167 Json(serde_json::json!({
1168 "pipelines": pipelines,
1169 "total": pipelines.len()
1170 }))
1171}
1172
1173pub async fn get_pipeline(
1175 State(store): State<SharedStore>,
1176 Path(pipeline_id): Path<uuid::Uuid>,
1177) -> Result<Json<PipelineConfig>> {
1178 let pipeline_manager = store.pipeline_manager();
1179
1180 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1181 crate::error::AllSourceError::ValidationError(format!(
1182 "Pipeline not found: {}",
1183 pipeline_id
1184 ))
1185 })?;
1186
1187 Ok(Json(pipeline.config().clone()))
1188}
1189
1190pub async fn remove_pipeline(
1192 State(store): State<SharedStore>,
1193 Path(pipeline_id): Path<uuid::Uuid>,
1194) -> Result<Json<serde_json::Value>> {
1195 let pipeline_manager = store.pipeline_manager();
1196
1197 let removed = pipeline_manager.remove(pipeline_id);
1198
1199 if removed {
1200 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1201 }
1202
1203 Ok(Json(serde_json::json!({
1204 "pipeline_id": pipeline_id,
1205 "removed": removed
1206 })))
1207}
1208
1209pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1211 let pipeline_manager = store.pipeline_manager();
1212
1213 let stats = pipeline_manager.all_stats();
1214
1215 Json(serde_json::json!({
1216 "stats": stats,
1217 "total": stats.len()
1218 }))
1219}
1220
1221pub async fn get_pipeline_stats(
1223 State(store): State<SharedStore>,
1224 Path(pipeline_id): Path<uuid::Uuid>,
1225) -> Result<Json<PipelineStats>> {
1226 let pipeline_manager = store.pipeline_manager();
1227
1228 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1229 crate::error::AllSourceError::ValidationError(format!(
1230 "Pipeline not found: {}",
1231 pipeline_id
1232 ))
1233 })?;
1234
1235 Ok(Json(pipeline.stats()))
1236}
1237
1238pub async fn reset_pipeline(
1240 State(store): State<SharedStore>,
1241 Path(pipeline_id): Path<uuid::Uuid>,
1242) -> Result<Json<serde_json::Value>> {
1243 let pipeline_manager = store.pipeline_manager();
1244
1245 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1246 crate::error::AllSourceError::ValidationError(format!(
1247 "Pipeline not found: {}",
1248 pipeline_id
1249 ))
1250 })?;
1251
1252 pipeline.reset();
1253
1254 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1255
1256 Ok(Json(serde_json::json!({
1257 "pipeline_id": pipeline_id,
1258 "reset": true
1259 })))
1260}
1261
1262pub async fn get_event_by_id(
1268 State(store): State<SharedStore>,
1269 Path(event_id): Path<uuid::Uuid>,
1270) -> Result<Json<serde_json::Value>> {
1271 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1272 crate::error::AllSourceError::EntityNotFound(format!("Event '{}' not found", event_id))
1273 })?;
1274
1275 let dto = EventDto::from(&event);
1276
1277 tracing::debug!("Event retrieved by ID: {}", event_id);
1278
1279 Ok(Json(serde_json::json!({
1280 "event": dto,
1281 "found": true
1282 })))
1283}
1284
1285pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1291 let projection_manager = store.projection_manager();
1292 let status_map = store.projection_status();
1293
1294 let projections: Vec<serde_json::Value> = projection_manager
1295 .list_projections()
1296 .iter()
1297 .map(|(name, projection)| {
1298 let status = status_map
1299 .get(name)
1300 .map(|s| s.value().clone())
1301 .unwrap_or_else(|| "running".to_string());
1302 serde_json::json!({
1303 "name": name,
1304 "type": format!("{:?}", projection.name()),
1305 "status": status,
1306 })
1307 })
1308 .collect();
1309
1310 tracing::debug!("Listed {} projections", projections.len());
1311
1312 Json(serde_json::json!({
1313 "projections": projections,
1314 "total": projections.len()
1315 }))
1316}
1317
1318pub async fn get_projection(
1320 State(store): State<SharedStore>,
1321 Path(name): Path<String>,
1322) -> Result<Json<serde_json::Value>> {
1323 let projection_manager = store.projection_manager();
1324
1325 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1326 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1327 })?;
1328
1329 Ok(Json(serde_json::json!({
1330 "name": projection.name(),
1331 "found": true
1332 })))
1333}
1334
1335pub async fn get_projection_state(
1340 State(store): State<SharedStore>,
1341 Path((name, entity_id)): Path<(String, String)>,
1342) -> Result<Json<serde_json::Value>> {
1343 let projection_manager = store.projection_manager();
1344
1345 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1346 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1347 })?;
1348
1349 let state = projection.get_state(&entity_id);
1350
1351 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1352
1353 Ok(Json(serde_json::json!({
1354 "projection": name,
1355 "entity_id": entity_id,
1356 "state": state,
1357 "found": state.is_some()
1358 })))
1359}
1360
1361pub async fn delete_projection(
1366 State(store): State<SharedStore>,
1367 Path(name): Path<String>,
1368) -> Result<Json<serde_json::Value>> {
1369 let projection_manager = store.projection_manager();
1370
1371 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1372 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1373 })?;
1374
1375 projection.clear();
1376
1377 let cache = store.projection_state_cache();
1379 let prefix = format!("{name}:");
1380 let keys_to_remove: Vec<String> = cache
1381 .iter()
1382 .filter(|entry| entry.key().starts_with(&prefix))
1383 .map(|entry| entry.key().clone())
1384 .collect();
1385 for key in keys_to_remove {
1386 cache.remove(&key);
1387 }
1388
1389 tracing::info!("Projection deleted (cleared): {}", name);
1390
1391 Ok(Json(serde_json::json!({
1392 "projection": name,
1393 "deleted": true
1394 })))
1395}
1396
1397pub async fn get_projection_state_summary(
1401 State(store): State<SharedStore>,
1402 Path(name): Path<String>,
1403) -> Result<Json<serde_json::Value>> {
1404 let projection_manager = store.projection_manager();
1405
1406 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1407 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1408 })?;
1409
1410 let cache = store.projection_state_cache();
1412 let prefix = format!("{name}:");
1413 let states: Vec<serde_json::Value> = cache
1414 .iter()
1415 .filter(|entry| entry.key().starts_with(&prefix))
1416 .map(|entry| {
1417 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1418 serde_json::json!({
1419 "entity_id": entity_id,
1420 "state": entry.value().clone()
1421 })
1422 })
1423 .collect();
1424
1425 let total = states.len();
1426
1427 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1428
1429 Ok(Json(serde_json::json!({
1430 "projection": name,
1431 "states": states,
1432 "total": total
1433 })))
1434}
1435
1436pub async fn reset_projection(
1440 State(store): State<SharedStore>,
1441 Path(name): Path<String>,
1442) -> Result<Json<serde_json::Value>> {
1443 let reprocessed = store.reset_projection(&name)?;
1444
1445 tracing::info!(
1446 "Projection reset: {} ({} events reprocessed)",
1447 name,
1448 reprocessed
1449 );
1450
1451 Ok(Json(serde_json::json!({
1452 "projection": name,
1453 "reset": true,
1454 "events_reprocessed": reprocessed
1455 })))
1456}
1457
1458pub async fn pause_projection(
1462 State(store): State<SharedStore>,
1463 Path(name): Path<String>,
1464) -> Result<Json<serde_json::Value>> {
1465 let projection_manager = store.projection_manager();
1466
1467 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1469 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1470 })?;
1471
1472 store
1473 .projection_status()
1474 .insert(name.clone(), "paused".to_string());
1475
1476 tracing::info!("Projection paused: {}", name);
1477
1478 Ok(Json(serde_json::json!({
1479 "projection": name,
1480 "status": "paused"
1481 })))
1482}
1483
1484pub async fn start_projection(
1488 State(store): State<SharedStore>,
1489 Path(name): Path<String>,
1490) -> Result<Json<serde_json::Value>> {
1491 let projection_manager = store.projection_manager();
1492
1493 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1495 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1496 })?;
1497
1498 store
1499 .projection_status()
1500 .insert(name.clone(), "running".to_string());
1501
1502 tracing::info!("Projection started: {}", name);
1503
1504 Ok(Json(serde_json::json!({
1505 "projection": name,
1506 "status": "running"
1507 })))
1508}
1509
1510#[derive(Debug, Deserialize)]
1512pub struct SaveProjectionStateRequest {
1513 pub state: serde_json::Value,
1514}
1515
1516pub async fn save_projection_state(
1521 State(store): State<SharedStore>,
1522 Path((name, entity_id)): Path<(String, String)>,
1523 Json(req): Json<SaveProjectionStateRequest>,
1524) -> Result<Json<serde_json::Value>> {
1525 let projection_cache = store.projection_state_cache();
1526
1527 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1529
1530 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1531
1532 Ok(Json(serde_json::json!({
1533 "projection": name,
1534 "entity_id": entity_id,
1535 "saved": true
1536 })))
1537}
1538
1539#[derive(Debug, Deserialize)]
1543pub struct BulkGetStateRequest {
1544 pub entity_ids: Vec<String>,
1545}
1546
1547#[derive(Debug, Deserialize)]
1551pub struct BulkSaveStateRequest {
1552 pub states: Vec<BulkSaveStateItem>,
1553}
1554
1555#[derive(Debug, Deserialize)]
1556pub struct BulkSaveStateItem {
1557 pub entity_id: String,
1558 pub state: serde_json::Value,
1559}
1560
1561pub async fn bulk_get_projection_states(
1562 State(store): State<SharedStore>,
1563 Path(name): Path<String>,
1564 Json(req): Json<BulkGetStateRequest>,
1565) -> Result<Json<serde_json::Value>> {
1566 let projection_manager = store.projection_manager();
1567
1568 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1569 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1570 })?;
1571
1572 let states: Vec<serde_json::Value> = req
1573 .entity_ids
1574 .iter()
1575 .map(|entity_id| {
1576 let state = projection.get_state(entity_id);
1577 serde_json::json!({
1578 "entity_id": entity_id,
1579 "state": state,
1580 "found": state.is_some()
1581 })
1582 })
1583 .collect();
1584
1585 tracing::debug!(
1586 "Bulk projection state retrieved: {} entities from {}",
1587 states.len(),
1588 name
1589 );
1590
1591 Ok(Json(serde_json::json!({
1592 "projection": name,
1593 "states": states,
1594 "total": states.len()
1595 })))
1596}
1597
1598pub async fn bulk_save_projection_states(
1603 State(store): State<SharedStore>,
1604 Path(name): Path<String>,
1605 Json(req): Json<BulkSaveStateRequest>,
1606) -> Result<Json<serde_json::Value>> {
1607 let projection_cache = store.projection_state_cache();
1608
1609 let mut saved_count = 0;
1610 for item in &req.states {
1611 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1612 saved_count += 1;
1613 }
1614
1615 tracing::info!(
1616 "Bulk projection state saved: {} entities for {}",
1617 saved_count,
1618 name
1619 );
1620
1621 Ok(Json(serde_json::json!({
1622 "projection": name,
1623 "saved": saved_count,
1624 "total": req.states.len()
1625 })))
1626}
1627
1628#[derive(Debug, Deserialize)]
1634pub struct ListWebhooksParams {
1635 pub tenant_id: Option<String>,
1636}
1637
1638pub async fn register_webhook(
1640 State(store): State<SharedStore>,
1641 Json(req): Json<RegisterWebhookRequest>,
1642) -> Json<serde_json::Value> {
1643 let registry = store.webhook_registry();
1644 let webhook = registry.register(req);
1645
1646 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1647
1648 Json(serde_json::json!({
1649 "webhook": webhook,
1650 "created": true
1651 }))
1652}
1653
1654pub async fn list_webhooks(
1656 State(store): State<SharedStore>,
1657 Query(params): Query<ListWebhooksParams>,
1658) -> Json<serde_json::Value> {
1659 let registry = store.webhook_registry();
1660
1661 let webhooks = if let Some(tenant_id) = params.tenant_id {
1662 registry.list_by_tenant(&tenant_id)
1663 } else {
1664 vec![]
1666 };
1667
1668 let total = webhooks.len();
1669
1670 Json(serde_json::json!({
1671 "webhooks": webhooks,
1672 "total": total
1673 }))
1674}
1675
1676pub async fn get_webhook(
1678 State(store): State<SharedStore>,
1679 Path(webhook_id): Path<uuid::Uuid>,
1680) -> Result<Json<serde_json::Value>> {
1681 let registry = store.webhook_registry();
1682
1683 let webhook = registry.get(webhook_id).ok_or_else(|| {
1684 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1685 })?;
1686
1687 Ok(Json(serde_json::json!({
1688 "webhook": webhook,
1689 "found": true
1690 })))
1691}
1692
1693pub async fn update_webhook(
1695 State(store): State<SharedStore>,
1696 Path(webhook_id): Path<uuid::Uuid>,
1697 Json(req): Json<UpdateWebhookRequest>,
1698) -> Result<Json<serde_json::Value>> {
1699 let registry = store.webhook_registry();
1700
1701 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1702 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1703 })?;
1704
1705 tracing::info!("Webhook updated: {}", webhook_id);
1706
1707 Ok(Json(serde_json::json!({
1708 "webhook": webhook,
1709 "updated": true
1710 })))
1711}
1712
1713pub async fn delete_webhook(
1715 State(store): State<SharedStore>,
1716 Path(webhook_id): Path<uuid::Uuid>,
1717) -> Result<Json<serde_json::Value>> {
1718 let registry = store.webhook_registry();
1719
1720 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1721 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1722 })?;
1723
1724 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1725
1726 Ok(Json(serde_json::json!({
1727 "webhook_id": webhook_id,
1728 "deleted": true
1729 })))
1730}
1731
1732#[derive(Debug, Deserialize)]
1734pub struct ListDeliveriesParams {
1735 pub limit: Option<usize>,
1736}
1737
1738pub async fn list_webhook_deliveries(
1740 State(store): State<SharedStore>,
1741 Path(webhook_id): Path<uuid::Uuid>,
1742 Query(params): Query<ListDeliveriesParams>,
1743) -> Result<Json<serde_json::Value>> {
1744 let registry = store.webhook_registry();
1745
1746 registry.get(webhook_id).ok_or_else(|| {
1748 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1749 })?;
1750
1751 let limit = params.limit.unwrap_or(50);
1752 let deliveries = registry.get_deliveries(webhook_id, limit);
1753 let total = deliveries.len();
1754
1755 Ok(Json(serde_json::json!({
1756 "webhook_id": webhook_id,
1757 "deliveries": deliveries,
1758 "total": total
1759 })))
1760}
1761
1762pub async fn eventql_query(
1768 State(store): State<SharedStore>,
1769 Json(req): Json<EventQLRequest>,
1770) -> Result<Json<serde_json::Value>> {
1771 let events = store.snapshot_events();
1772 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1773 Ok(response) => Ok(Json(serde_json::json!({
1774 "columns": response.columns,
1775 "rows": response.rows,
1776 "row_count": response.row_count,
1777 }))),
1778 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1779 }
1780}
1781
1782pub async fn graphql_query(
1784 State(store): State<SharedStore>,
1785 Json(req): Json<GraphQLRequest>,
1786) -> Json<serde_json::Value> {
1787 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1788 Ok(f) => f,
1789 Err(e) => {
1790 return Json(
1791 serde_json::to_value(GraphQLResponse {
1792 data: None,
1793 errors: vec![GraphQLError { message: e }],
1794 })
1795 .unwrap(),
1796 );
1797 }
1798 };
1799
1800 let mut data = serde_json::Map::new();
1801 let mut errors = Vec::new();
1802
1803 for field in &fields {
1804 match field.name.as_str() {
1805 "events" => {
1806 let request = crate::application::dto::QueryEventsRequest {
1807 entity_id: field.arguments.get("entity_id").cloned(),
1808 event_type: field.arguments.get("event_type").cloned(),
1809 tenant_id: field.arguments.get("tenant_id").cloned(),
1810 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1811 as_of: None,
1812 since: None,
1813 until: None,
1814 event_type_prefix: None,
1815 payload_filter: None,
1816 };
1817 match store.query(request) {
1818 Ok(events) => {
1819 let json_events: Vec<serde_json::Value> = events
1820 .iter()
1821 .map(|e| {
1822 crate::infrastructure::query::graphql::event_to_json(
1823 e,
1824 &field.fields,
1825 )
1826 })
1827 .collect();
1828 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1829 }
1830 Err(e) => errors.push(GraphQLError {
1831 message: format!("events query failed: {e}"),
1832 }),
1833 }
1834 }
1835 "event" => {
1836 if let Some(id_str) = field.arguments.get("id") {
1837 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1838 match store.get_event_by_id(&id) {
1839 Ok(Some(event)) => {
1840 data.insert(
1841 "event".to_string(),
1842 crate::infrastructure::query::graphql::event_to_json(
1843 &event,
1844 &field.fields,
1845 ),
1846 );
1847 }
1848 Ok(None) => {
1849 data.insert("event".to_string(), serde_json::Value::Null);
1850 }
1851 Err(e) => errors.push(GraphQLError {
1852 message: format!("event lookup failed: {e}"),
1853 }),
1854 }
1855 } else {
1856 errors.push(GraphQLError {
1857 message: format!("Invalid UUID: {id_str}"),
1858 });
1859 }
1860 } else {
1861 errors.push(GraphQLError {
1862 message: "event query requires 'id' argument".to_string(),
1863 });
1864 }
1865 }
1866 "projections" => {
1867 let pm = store.projection_manager();
1868 let names: Vec<serde_json::Value> = pm
1869 .list_projections()
1870 .iter()
1871 .map(|(name, _)| serde_json::Value::String(name.clone()))
1872 .collect();
1873 data.insert("projections".to_string(), serde_json::Value::Array(names));
1874 }
1875 "stats" => {
1876 let stats = store.stats();
1877 data.insert(
1878 "stats".to_string(),
1879 serde_json::json!({
1880 "total_events": stats.total_events,
1881 "total_entities": stats.total_entities,
1882 "total_event_types": stats.total_event_types,
1883 }),
1884 );
1885 }
1886 "__schema" => {
1887 data.insert(
1888 "__schema".to_string(),
1889 crate::infrastructure::query::graphql::introspection_schema(),
1890 );
1891 }
1892 other => {
1893 errors.push(GraphQLError {
1894 message: format!("Unknown field: {other}"),
1895 });
1896 }
1897 }
1898 }
1899
1900 Json(
1901 serde_json::to_value(GraphQLResponse {
1902 data: Some(serde_json::Value::Object(data)),
1903 errors,
1904 })
1905 .unwrap(),
1906 )
1907}
1908
1909pub async fn geo_query(
1911 State(store): State<SharedStore>,
1912 Json(req): Json<GeoQueryRequest>,
1913) -> Json<serde_json::Value> {
1914 let events = store.snapshot_events();
1915 let geo_index = store.geo_index();
1916 let results =
1917 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1918 let total = results.len();
1919 Json(serde_json::json!({
1920 "results": results,
1921 "total": total,
1922 }))
1923}
1924
1925pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1927 let stats = store.geo_index().stats();
1928 Json(serde_json::json!(stats))
1929}
1930
1931pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1933 let stats = store.exactly_once().stats();
1934 Json(serde_json::json!(stats))
1935}
1936
1937pub async fn schema_evolution_history(
1939 State(store): State<SharedStore>,
1940 Path(event_type): Path<String>,
1941) -> Json<serde_json::Value> {
1942 let mgr = store.schema_evolution();
1943 let history = mgr.get_history(&event_type);
1944 let version = mgr.get_version(&event_type);
1945 Json(serde_json::json!({
1946 "event_type": event_type,
1947 "current_version": version,
1948 "history": history,
1949 }))
1950}
1951
1952pub async fn schema_evolution_schema(
1954 State(store): State<SharedStore>,
1955 Path(event_type): Path<String>,
1956) -> Json<serde_json::Value> {
1957 let mgr = store.schema_evolution();
1958 if let Some(schema) = mgr.get_schema(&event_type) {
1959 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1960 Json(serde_json::json!({
1961 "event_type": event_type,
1962 "version": mgr.get_version(&event_type),
1963 "inferred_schema": schema,
1964 "json_schema": json_schema,
1965 }))
1966 } else {
1967 Json(serde_json::json!({
1968 "event_type": event_type,
1969 "error": "No schema inferred for this event type"
1970 }))
1971 }
1972}
1973
1974pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1976 let stats = store.schema_evolution().stats();
1977 let event_types = store.schema_evolution().list_event_types();
1978 Json(serde_json::json!({
1979 "stats": stats,
1980 "tracked_event_types": event_types,
1981 }))
1982}
1983
1984#[cfg(feature = "embedded-sync")]
1990pub async fn sync_pull_handler(
1991 State(state): State<AppState>,
1992 Json(request): Json<crate::embedded::sync_types::SyncPullRequest>,
1993) -> Result<Json<crate::embedded::sync_types::SyncPullResponse>> {
1994 use crate::infrastructure::cluster::{crdt::ReplicatedEvent, hlc::HlcTimestamp};
1995
1996 let store = &state.store;
1997
1998 let since = request
2001 .version_vector
2002 .values()
2003 .map(|ts| ts.physical_ms)
2004 .min()
2005 .and_then(|ms| chrono::DateTime::from_timestamp_millis(ms as i64));
2006
2007 let events = store.query(crate::application::dto::QueryEventsRequest {
2008 entity_id: None,
2009 event_type: None,
2010 tenant_id: None,
2011 as_of: None,
2012 since,
2013 until: None,
2014 limit: None,
2015 event_type_prefix: None,
2016 payload_filter: None,
2017 })?;
2018
2019 let mut replicated = Vec::with_capacity(events.len());
2021 let mut last_ms = 0u64;
2022 let mut logical = 0u32;
2023
2024 for event in &events {
2025 let event_ms = event.timestamp().timestamp_millis() as u64;
2026 if event_ms == last_ms {
2027 logical += 1;
2028 } else {
2029 last_ms = event_ms;
2030 logical = 0;
2031 }
2032
2033 replicated.push(ReplicatedEvent {
2034 event_id: event.id().to_string(),
2035 hlc_timestamp: HlcTimestamp::new(event_ms, logical, 0),
2036 origin_region: "server".to_string(),
2037 event_data: serde_json::json!({
2038 "event_type": event.event_type_str(),
2039 "entity_id": event.entity_id_str(),
2040 "tenant_id": event.tenant_id_str(),
2041 "payload": event.payload,
2042 "metadata": event.metadata,
2043 }),
2044 });
2045 }
2046
2047 Ok(Json(crate::embedded::sync_types::SyncPullResponse {
2048 events: replicated,
2049 version_vector: std::collections::BTreeMap::new(),
2050 }))
2051}
2052
2053#[cfg(feature = "embedded-sync")]
2055pub async fn sync_push_handler(
2056 State(state): State<AppState>,
2057 Json(request): Json<crate::embedded::sync_types::SyncPushRequest>,
2058) -> Result<Json<crate::embedded::sync_types::SyncPushResponse>> {
2059 let store = &state.store;
2060
2061 let mut accepted = 0usize;
2062 let mut skipped = 0usize;
2063
2064 for rep_event in &request.events {
2065 let event_data = &rep_event.event_data;
2066 let event_type = event_data
2067 .get("event_type")
2068 .and_then(|v| v.as_str())
2069 .unwrap_or("unknown")
2070 .to_string();
2071 let entity_id = event_data
2072 .get("entity_id")
2073 .and_then(|v| v.as_str())
2074 .unwrap_or("unknown")
2075 .to_string();
2076 let tenant_id = event_data
2077 .get("tenant_id")
2078 .and_then(|v| v.as_str())
2079 .unwrap_or("default")
2080 .to_string();
2081 let payload = event_data
2082 .get("payload")
2083 .cloned()
2084 .unwrap_or(serde_json::json!({}));
2085 let metadata = event_data.get("metadata").cloned();
2086
2087 match Event::from_strings(event_type, entity_id, tenant_id, payload, metadata) {
2088 Ok(domain_event) => {
2089 store.ingest(domain_event)?;
2090 accepted += 1;
2091 }
2092 Err(_) => {
2093 skipped += 1;
2094 }
2095 }
2096 }
2097
2098 Ok(Json(crate::embedded::sync_types::SyncPushResponse {
2099 accepted,
2100 skipped,
2101 version_vector: std::collections::BTreeMap::new(),
2102 }))
2103}
2104
2105#[cfg(test)]
2106mod tests {
2107 use super::*;
2108 use crate::{domain::entities::Event, store::EventStore};
2109
2110 fn create_test_store() -> Arc<EventStore> {
2111 Arc::new(EventStore::new())
2112 }
2113
2114 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
2115 Event::from_strings(
2116 event_type.to_string(),
2117 entity_id.to_string(),
2118 "test-stream".to_string(),
2119 serde_json::json!({
2120 "name": "Test",
2121 "value": 42
2122 }),
2123 None,
2124 )
2125 .unwrap()
2126 }
2127
2128 #[tokio::test]
2129 async fn test_query_events_has_more_and_total_count() {
2130 let store = create_test_store();
2131
2132 for i in 0..50 {
2134 store
2135 .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2136 .unwrap();
2137 }
2138
2139 let req = QueryEventsRequest {
2141 entity_id: None,
2142 event_type: None,
2143 tenant_id: None,
2144 as_of: None,
2145 since: None,
2146 until: None,
2147 limit: Some(10),
2148 event_type_prefix: None,
2149 payload_filter: None,
2150 };
2151
2152 let requested_limit = req.limit;
2153 let unlimited_req = QueryEventsRequest {
2154 limit: None,
2155 ..QueryEventsRequest {
2156 entity_id: req.entity_id,
2157 event_type: req.event_type,
2158 tenant_id: req.tenant_id,
2159 as_of: req.as_of,
2160 since: req.since,
2161 until: req.until,
2162 limit: None,
2163 event_type_prefix: req.event_type_prefix,
2164 payload_filter: req.payload_filter,
2165 }
2166 };
2167 let all_events = store.query(unlimited_req).unwrap();
2168 let total_count = all_events.len();
2169 let limited_events: Vec<Event> = if let Some(limit) = requested_limit {
2170 all_events.into_iter().take(limit).collect()
2171 } else {
2172 all_events
2173 };
2174 let count = limited_events.len();
2175 let has_more = count < total_count;
2176
2177 assert_eq!(count, 10);
2178 assert_eq!(total_count, 50);
2179 assert!(has_more);
2180 }
2181
2182 #[tokio::test]
2183 async fn test_query_events_no_more_results() {
2184 let store = create_test_store();
2185
2186 for i in 0..5 {
2188 store
2189 .ingest(create_test_event(&format!("entity-{}", i), "user.created"))
2190 .unwrap();
2191 }
2192
2193 let all_events = store
2195 .query(QueryEventsRequest {
2196 entity_id: None,
2197 event_type: None,
2198 tenant_id: None,
2199 as_of: None,
2200 since: None,
2201 until: None,
2202 limit: None,
2203 event_type_prefix: None,
2204 payload_filter: None,
2205 })
2206 .unwrap();
2207 let total_count = all_events.len();
2208 let limited_events: Vec<Event> = all_events.into_iter().take(100).collect();
2209 let count = limited_events.len();
2210 let has_more = count < total_count;
2211
2212 assert_eq!(count, 5);
2213 assert_eq!(total_count, 5);
2214 assert!(!has_more);
2215 }
2216
2217 #[tokio::test]
2218 async fn test_list_entities_by_type_prefix() {
2219 let store = create_test_store();
2220
2221 store
2223 .ingest(create_test_event("idx-1", "index.created"))
2224 .unwrap();
2225 store
2226 .ingest(create_test_event("idx-1", "index.updated"))
2227 .unwrap();
2228 store
2229 .ingest(create_test_event("idx-2", "index.created"))
2230 .unwrap();
2231 store
2232 .ingest(create_test_event("idx-3", "index.created"))
2233 .unwrap();
2234 store
2236 .ingest(create_test_event("trade-1", "trade.created"))
2237 .unwrap();
2238 store
2239 .ingest(create_test_event("trade-2", "trade.created"))
2240 .unwrap();
2241
2242 let req = ListEntitiesRequest {
2244 event_type_prefix: Some("index.".to_string()),
2245 payload_filter: None,
2246 limit: None,
2247 offset: None,
2248 };
2249 let query_req = QueryEventsRequest {
2250 entity_id: None,
2251 event_type: None,
2252 tenant_id: None,
2253 as_of: None,
2254 since: None,
2255 until: None,
2256 limit: None,
2257 event_type_prefix: req.event_type_prefix,
2258 payload_filter: req.payload_filter,
2259 };
2260 let events = store.query(query_req).unwrap();
2261
2262 let mut entity_map: std::collections::HashMap<String, Vec<&Event>> =
2264 std::collections::HashMap::new();
2265 for event in &events {
2266 entity_map
2267 .entry(event.entity_id().to_string())
2268 .or_default()
2269 .push(event);
2270 }
2271
2272 assert_eq!(entity_map.len(), 3); assert_eq!(entity_map["idx-1"].len(), 2); assert_eq!(entity_map["idx-2"].len(), 1);
2275 assert_eq!(entity_map["idx-3"].len(), 1);
2276 }
2277
2278 fn create_test_event_with_payload(
2279 entity_id: &str,
2280 event_type: &str,
2281 payload: serde_json::Value,
2282 ) -> Event {
2283 Event::from_strings(
2284 event_type.to_string(),
2285 entity_id.to_string(),
2286 "test-stream".to_string(),
2287 payload,
2288 None,
2289 )
2290 .unwrap()
2291 }
2292
2293 #[tokio::test]
2294 async fn test_detect_duplicates_by_payload_fields() {
2295 let store = create_test_store();
2296
2297 store
2299 .ingest(create_test_event_with_payload(
2300 "idx-1",
2301 "index.created",
2302 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2303 ))
2304 .unwrap();
2305 store
2306 .ingest(create_test_event_with_payload(
2307 "idx-2",
2308 "index.created",
2309 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2310 ))
2311 .unwrap();
2312 store
2313 .ingest(create_test_event_with_payload(
2314 "idx-3",
2315 "index.created",
2316 serde_json::json!({"name": "NASDAQ", "user_id": "alice"}),
2317 ))
2318 .unwrap();
2319 store
2320 .ingest(create_test_event_with_payload(
2321 "idx-4",
2322 "index.created",
2323 serde_json::json!({"name": "NASDAQ", "user_id": "carol"}),
2324 ))
2325 .unwrap();
2326 store
2327 .ingest(create_test_event_with_payload(
2328 "idx-5",
2329 "index.created",
2330 serde_json::json!({"name": "DAX", "user_id": "dave"}),
2331 ))
2332 .unwrap();
2333
2334 let query_req = QueryEventsRequest {
2336 entity_id: None,
2337 event_type: None,
2338 tenant_id: None,
2339 as_of: None,
2340 since: None,
2341 until: None,
2342 limit: None,
2343 event_type_prefix: Some("index.".to_string()),
2344 payload_filter: None,
2345 };
2346 let events = store.query(query_req).unwrap();
2347
2348 let group_by_fields = vec!["name"];
2350 let mut entity_latest: std::collections::HashMap<String, &Event> =
2351 std::collections::HashMap::new();
2352 for event in &events {
2353 let eid = event.entity_id().to_string();
2354 entity_latest
2355 .entry(eid)
2356 .and_modify(|existing| {
2357 if event.timestamp() > existing.timestamp() {
2358 *existing = event;
2359 }
2360 })
2361 .or_insert(event);
2362 }
2363
2364 let mut groups: std::collections::HashMap<String, Vec<String>> =
2365 std::collections::HashMap::new();
2366 for (entity_id, event) in &entity_latest {
2367 let payload = event.payload();
2368 let mut key_parts = serde_json::Map::new();
2369 for field in &group_by_fields {
2370 let value = payload
2371 .get(*field)
2372 .cloned()
2373 .unwrap_or(serde_json::Value::Null);
2374 key_parts.insert(field.to_string(), value);
2375 }
2376 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2377 groups.entry(key_str).or_default().push(entity_id.clone());
2378 }
2379
2380 let duplicate_groups: Vec<_> = groups
2381 .into_iter()
2382 .filter(|(_, ids)| ids.len() > 1)
2383 .collect();
2384
2385 assert_eq!(duplicate_groups.len(), 2); for (_, ids) in &duplicate_groups {
2387 assert_eq!(ids.len(), 2);
2388 }
2389 }
2390
2391 #[tokio::test]
2392 async fn test_detect_duplicates_no_duplicates() {
2393 let store = create_test_store();
2394
2395 store
2397 .ingest(create_test_event_with_payload(
2398 "idx-1",
2399 "index.created",
2400 serde_json::json!({"name": "A"}),
2401 ))
2402 .unwrap();
2403 store
2404 .ingest(create_test_event_with_payload(
2405 "idx-2",
2406 "index.created",
2407 serde_json::json!({"name": "B"}),
2408 ))
2409 .unwrap();
2410
2411 let query_req = QueryEventsRequest {
2412 entity_id: None,
2413 event_type: None,
2414 tenant_id: None,
2415 as_of: None,
2416 since: None,
2417 until: None,
2418 limit: None,
2419 event_type_prefix: Some("index.".to_string()),
2420 payload_filter: None,
2421 };
2422 let events = store.query(query_req).unwrap();
2423
2424 let mut entity_latest: std::collections::HashMap<String, &Event> =
2425 std::collections::HashMap::new();
2426 for event in &events {
2427 entity_latest
2428 .entry(event.entity_id().to_string())
2429 .or_insert(event);
2430 }
2431
2432 let mut groups: std::collections::HashMap<String, Vec<String>> =
2433 std::collections::HashMap::new();
2434 for (entity_id, event) in &entity_latest {
2435 let key_str =
2436 serde_json::to_string(&serde_json::json!({"name": event.payload().get("name")}))
2437 .unwrap();
2438 groups.entry(key_str).or_default().push(entity_id.clone());
2439 }
2440
2441 let duplicate_groups: Vec<_> = groups
2442 .into_iter()
2443 .filter(|(_, ids)| ids.len() > 1)
2444 .collect();
2445
2446 assert_eq!(duplicate_groups.len(), 0); }
2448
2449 #[tokio::test]
2450 async fn test_detect_duplicates_multi_field_group_by() {
2451 let store = create_test_store();
2452
2453 store
2455 .ingest(create_test_event_with_payload(
2456 "idx-1",
2457 "index.created",
2458 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2459 ))
2460 .unwrap();
2461 store
2462 .ingest(create_test_event_with_payload(
2463 "idx-2",
2464 "index.created",
2465 serde_json::json!({"name": "S&P 500", "user_id": "alice"}),
2466 ))
2467 .unwrap();
2468 store
2470 .ingest(create_test_event_with_payload(
2471 "idx-3",
2472 "index.created",
2473 serde_json::json!({"name": "S&P 500", "user_id": "bob"}),
2474 ))
2475 .unwrap();
2476
2477 let query_req = QueryEventsRequest {
2478 entity_id: None,
2479 event_type: None,
2480 tenant_id: None,
2481 as_of: None,
2482 since: None,
2483 until: None,
2484 limit: None,
2485 event_type_prefix: Some("index.".to_string()),
2486 payload_filter: None,
2487 };
2488 let events = store.query(query_req).unwrap();
2489
2490 let group_by_fields = vec!["name", "user_id"];
2491 let mut entity_latest: std::collections::HashMap<String, &Event> =
2492 std::collections::HashMap::new();
2493 for event in &events {
2494 entity_latest
2495 .entry(event.entity_id().to_string())
2496 .and_modify(|existing| {
2497 if event.timestamp() > existing.timestamp() {
2498 *existing = event;
2499 }
2500 })
2501 .or_insert(event);
2502 }
2503
2504 let mut groups: std::collections::HashMap<String, Vec<String>> =
2505 std::collections::HashMap::new();
2506 for (entity_id, event) in &entity_latest {
2507 let payload = event.payload();
2508 let mut key_parts = serde_json::Map::new();
2509 for field in &group_by_fields {
2510 let value = payload
2511 .get(*field)
2512 .cloned()
2513 .unwrap_or(serde_json::Value::Null);
2514 key_parts.insert(field.to_string(), value);
2515 }
2516 let key_str = serde_json::to_string(&key_parts).unwrap_or_default();
2517 groups.entry(key_str).or_default().push(entity_id.clone());
2518 }
2519
2520 let duplicate_groups: Vec<_> = groups
2521 .into_iter()
2522 .filter(|(_, ids)| ids.len() > 1)
2523 .collect();
2524
2525 assert_eq!(duplicate_groups.len(), 1);
2527 let (_, ref ids) = duplicate_groups[0];
2528 assert_eq!(ids.len(), 2);
2529 let mut sorted_ids = ids.clone();
2530 sorted_ids.sort();
2531 assert_eq!(sorted_ids, vec!["idx-1", "idx-2"]);
2532 }
2533
2534 #[tokio::test]
2535 async fn test_projection_state_cache() {
2536 let store = create_test_store();
2537
2538 let cache = store.projection_state_cache();
2540 cache.insert(
2541 "entity_snapshots:user-123".to_string(),
2542 serde_json::json!({"name": "Test User", "age": 30}),
2543 );
2544
2545 let state = cache.get("entity_snapshots:user-123");
2547 assert!(state.is_some());
2548 let state = state.unwrap();
2549 assert_eq!(state["name"], "Test User");
2550 assert_eq!(state["age"], 30);
2551 }
2552
2553 #[tokio::test]
2554 async fn test_projection_manager_list_projections() {
2555 let store = create_test_store();
2556
2557 let projection_manager = store.projection_manager();
2559 let projections = projection_manager.list_projections();
2560
2561 assert!(projections.len() >= 2);
2563
2564 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
2565 assert!(names.contains(&"entity_snapshots"));
2566 assert!(names.contains(&"event_counters"));
2567 }
2568
2569 #[tokio::test]
2570 async fn test_projection_state_after_event_ingestion() {
2571 let store = create_test_store();
2572
2573 let event = create_test_event("user-456", "user.created");
2575 store.ingest(event).unwrap();
2576
2577 let projection_manager = store.projection_manager();
2579 let snapshot_projection = projection_manager
2580 .get_projection("entity_snapshots")
2581 .unwrap();
2582
2583 let state = snapshot_projection.get_state("user-456");
2584 assert!(state.is_some());
2585 let state = state.unwrap();
2586 assert_eq!(state["name"], "Test");
2587 assert_eq!(state["value"], 42);
2588 }
2589
2590 #[tokio::test]
2591 async fn test_projection_state_cache_multiple_entities() {
2592 let store = create_test_store();
2593 let cache = store.projection_state_cache();
2594
2595 for i in 0..10 {
2597 cache.insert(
2598 format!("entity_snapshots:entity-{}", i),
2599 serde_json::json!({"id": i, "status": "active"}),
2600 );
2601 }
2602
2603 assert_eq!(cache.len(), 10);
2605
2606 for i in 0..10 {
2608 let key = format!("entity_snapshots:entity-{}", i);
2609 let state = cache.get(&key);
2610 assert!(state.is_some());
2611 assert_eq!(state.unwrap()["id"], i);
2612 }
2613 }
2614
2615 #[tokio::test]
2616 async fn test_projection_state_update() {
2617 let store = create_test_store();
2618 let cache = store.projection_state_cache();
2619
2620 cache.insert(
2622 "entity_snapshots:user-789".to_string(),
2623 serde_json::json!({"balance": 100}),
2624 );
2625
2626 cache.insert(
2628 "entity_snapshots:user-789".to_string(),
2629 serde_json::json!({"balance": 150}),
2630 );
2631
2632 let state = cache.get("entity_snapshots:user-789").unwrap();
2634 assert_eq!(state["balance"], 150);
2635 }
2636
2637 #[tokio::test]
2638 async fn test_event_counter_projection() {
2639 let store = create_test_store();
2640
2641 store
2643 .ingest(create_test_event("user-1", "user.created"))
2644 .unwrap();
2645 store
2646 .ingest(create_test_event("user-2", "user.created"))
2647 .unwrap();
2648 store
2649 .ingest(create_test_event("user-1", "user.updated"))
2650 .unwrap();
2651
2652 let projection_manager = store.projection_manager();
2654 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
2655
2656 let created_state = counter_projection.get_state("user.created");
2658 assert!(created_state.is_some());
2659 assert_eq!(created_state.unwrap()["count"], 2);
2660
2661 let updated_state = counter_projection.get_state("user.updated");
2662 assert!(updated_state.is_some());
2663 assert_eq!(updated_state.unwrap()["count"], 1);
2664 }
2665
2666 #[tokio::test]
2667 async fn test_projection_state_cache_key_format() {
2668 let store = create_test_store();
2669 let cache = store.projection_state_cache();
2670
2671 let key = "orders:order-12345".to_string();
2673 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
2674
2675 let state = cache.get(&key).unwrap();
2676 assert_eq!(state["total"], 99.99);
2677 }
2678
2679 #[tokio::test]
2680 async fn test_projection_state_cache_removal() {
2681 let store = create_test_store();
2682 let cache = store.projection_state_cache();
2683
2684 cache.insert(
2686 "test:entity-1".to_string(),
2687 serde_json::json!({"data": "value"}),
2688 );
2689 assert_eq!(cache.len(), 1);
2690
2691 cache.remove("test:entity-1");
2692 assert_eq!(cache.len(), 0);
2693 assert!(cache.get("test:entity-1").is_none());
2694 }
2695
2696 #[tokio::test]
2697 async fn test_get_nonexistent_projection() {
2698 let store = create_test_store();
2699 let projection_manager = store.projection_manager();
2700
2701 let projection = projection_manager.get_projection("nonexistent_projection");
2703 assert!(projection.is_none());
2704 }
2705
2706 #[tokio::test]
2707 async fn test_get_nonexistent_entity_state() {
2708 let store = create_test_store();
2709 let projection_manager = store.projection_manager();
2710
2711 let snapshot_projection = projection_manager
2713 .get_projection("entity_snapshots")
2714 .unwrap();
2715 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
2716 assert!(state.is_none());
2717 }
2718
2719 #[tokio::test]
2720 async fn test_projection_state_cache_concurrent_access() {
2721 let store = create_test_store();
2722 let cache = store.projection_state_cache();
2723
2724 let handles: Vec<_> = (0..10)
2726 .map(|i| {
2727 let cache_clone = cache.clone();
2728 tokio::spawn(async move {
2729 cache_clone.insert(
2730 format!("concurrent:entity-{}", i),
2731 serde_json::json!({"thread": i}),
2732 );
2733 })
2734 })
2735 .collect();
2736
2737 for handle in handles {
2738 handle.await.unwrap();
2739 }
2740
2741 assert_eq!(cache.len(), 10);
2743 }
2744
2745 #[tokio::test]
2746 async fn test_projection_state_large_payload() {
2747 let store = create_test_store();
2748 let cache = store.projection_state_cache();
2749
2750 let large_array: Vec<serde_json::Value> = (0..1000)
2752 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
2753 .collect();
2754
2755 cache.insert(
2756 "large:entity-1".to_string(),
2757 serde_json::json!({"items": large_array}),
2758 );
2759
2760 let state = cache.get("large:entity-1").unwrap();
2761 let items = state["items"].as_array().unwrap();
2762 assert_eq!(items.len(), 1000);
2763 }
2764
2765 #[tokio::test]
2766 async fn test_projection_state_complex_json() {
2767 let store = create_test_store();
2768 let cache = store.projection_state_cache();
2769
2770 let complex_state = serde_json::json!({
2772 "user": {
2773 "id": "user-123",
2774 "profile": {
2775 "name": "John Doe",
2776 "email": "john@example.com",
2777 "settings": {
2778 "theme": "dark",
2779 "notifications": true
2780 }
2781 },
2782 "roles": ["admin", "user"],
2783 "metadata": {
2784 "created_at": "2025-01-01T00:00:00Z",
2785 "last_login": null
2786 }
2787 }
2788 });
2789
2790 cache.insert("complex:user-123".to_string(), complex_state);
2791
2792 let state = cache.get("complex:user-123").unwrap();
2793 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2794 assert_eq!(state["user"]["roles"][0], "admin");
2795 assert!(state["user"]["metadata"]["last_login"].is_null());
2796 }
2797
2798 #[tokio::test]
2799 async fn test_projection_state_cache_iteration() {
2800 let store = create_test_store();
2801 let cache = store.projection_state_cache();
2802
2803 for i in 0..5 {
2805 cache.insert(
2806 format!("iter:entity-{}", i),
2807 serde_json::json!({"index": i}),
2808 );
2809 }
2810
2811 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2813 assert_eq!(entries.len(), 5);
2814 }
2815
2816 #[tokio::test]
2817 async fn test_projection_manager_get_entity_snapshots() {
2818 let store = create_test_store();
2819 let projection_manager = store.projection_manager();
2820
2821 let projection = projection_manager.get_projection("entity_snapshots");
2823 assert!(projection.is_some());
2824 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2825 }
2826
2827 #[tokio::test]
2828 async fn test_projection_manager_get_event_counters() {
2829 let store = create_test_store();
2830 let projection_manager = store.projection_manager();
2831
2832 let projection = projection_manager.get_projection("event_counters");
2834 assert!(projection.is_some());
2835 assert_eq!(projection.unwrap().name(), "event_counters");
2836 }
2837
2838 #[tokio::test]
2839 async fn test_projection_state_cache_overwrite() {
2840 let store = create_test_store();
2841 let cache = store.projection_state_cache();
2842
2843 cache.insert(
2845 "overwrite:entity-1".to_string(),
2846 serde_json::json!({"version": 1}),
2847 );
2848
2849 cache.insert(
2851 "overwrite:entity-1".to_string(),
2852 serde_json::json!({"version": 2}),
2853 );
2854
2855 cache.insert(
2857 "overwrite:entity-1".to_string(),
2858 serde_json::json!({"version": 3}),
2859 );
2860
2861 let state = cache.get("overwrite:entity-1").unwrap();
2862 assert_eq!(state["version"], 3);
2863
2864 assert_eq!(cache.len(), 1);
2866 }
2867
2868 #[tokio::test]
2869 async fn test_projection_state_multiple_projections() {
2870 let store = create_test_store();
2871 let cache = store.projection_state_cache();
2872
2873 cache.insert(
2875 "entity_snapshots:user-1".to_string(),
2876 serde_json::json!({"name": "Alice"}),
2877 );
2878 cache.insert(
2879 "event_counters:user.created".to_string(),
2880 serde_json::json!({"count": 5}),
2881 );
2882 cache.insert(
2883 "custom_projection:order-1".to_string(),
2884 serde_json::json!({"total": 150.0}),
2885 );
2886
2887 assert_eq!(
2889 cache.get("entity_snapshots:user-1").unwrap()["name"],
2890 "Alice"
2891 );
2892 assert_eq!(
2893 cache.get("event_counters:user.created").unwrap()["count"],
2894 5
2895 );
2896 assert_eq!(
2897 cache.get("custom_projection:order-1").unwrap()["total"],
2898 150.0
2899 );
2900 }
2901
2902 #[tokio::test]
2903 async fn test_bulk_projection_state_access() {
2904 let store = create_test_store();
2905
2906 for i in 0..5 {
2908 let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
2909 store.ingest(event).unwrap();
2910 }
2911
2912 let projection_manager = store.projection_manager();
2914 let snapshot_projection = projection_manager
2915 .get_projection("entity_snapshots")
2916 .unwrap();
2917
2918 for i in 0..5 {
2920 let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
2921 assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
2922 }
2923 }
2924
2925 #[tokio::test]
2926 async fn test_bulk_save_projection_states() {
2927 let store = create_test_store();
2928 let cache = store.projection_state_cache();
2929
2930 let states = vec![
2932 BulkSaveStateItem {
2933 entity_id: "bulk-entity-1".to_string(),
2934 state: serde_json::json!({"name": "Entity 1", "value": 100}),
2935 },
2936 BulkSaveStateItem {
2937 entity_id: "bulk-entity-2".to_string(),
2938 state: serde_json::json!({"name": "Entity 2", "value": 200}),
2939 },
2940 BulkSaveStateItem {
2941 entity_id: "bulk-entity-3".to_string(),
2942 state: serde_json::json!({"name": "Entity 3", "value": 300}),
2943 },
2944 ];
2945
2946 let projection_name = "test_projection";
2947
2948 for item in &states {
2950 cache.insert(
2951 format!("{projection_name}:{}", item.entity_id),
2952 item.state.clone(),
2953 );
2954 }
2955
2956 assert_eq!(cache.len(), 3);
2958
2959 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
2960 assert_eq!(state1["name"], "Entity 1");
2961 assert_eq!(state1["value"], 100);
2962
2963 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
2964 assert_eq!(state2["name"], "Entity 2");
2965 assert_eq!(state2["value"], 200);
2966
2967 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
2968 assert_eq!(state3["name"], "Entity 3");
2969 assert_eq!(state3["value"], 300);
2970 }
2971
2972 #[tokio::test]
2973 async fn test_bulk_save_empty_states() {
2974 let store = create_test_store();
2975 let cache = store.projection_state_cache();
2976
2977 cache.clear();
2979
2980 let states: Vec<BulkSaveStateItem> = vec![];
2982 assert_eq!(states.len(), 0);
2983
2984 assert_eq!(cache.len(), 0);
2986 }
2987
2988 #[tokio::test]
2989 async fn test_bulk_save_overwrites_existing() {
2990 let store = create_test_store();
2991 let cache = store.projection_state_cache();
2992
2993 cache.insert(
2995 "test:entity-1".to_string(),
2996 serde_json::json!({"version": 1, "data": "initial"}),
2997 );
2998
2999 let new_state = serde_json::json!({"version": 2, "data": "updated"});
3001 cache.insert("test:entity-1".to_string(), new_state);
3002
3003 let state = cache.get("test:entity-1").unwrap();
3005 assert_eq!(state["version"], 2);
3006 assert_eq!(state["data"], "updated");
3007 }
3008
3009 #[tokio::test]
3010 async fn test_bulk_save_high_volume() {
3011 let store = create_test_store();
3012 let cache = store.projection_state_cache();
3013
3014 for i in 0..1000 {
3016 cache.insert(
3017 format!("volume_test:entity-{}", i),
3018 serde_json::json!({"index": i, "status": "active"}),
3019 );
3020 }
3021
3022 assert_eq!(cache.len(), 1000);
3024
3025 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
3027 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
3028 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
3029 }
3030
3031 #[tokio::test]
3032 async fn test_bulk_save_different_projections() {
3033 let store = create_test_store();
3034 let cache = store.projection_state_cache();
3035
3036 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
3038
3039 for proj in projections.iter() {
3040 for i in 0..5 {
3041 cache.insert(
3042 format!("{proj}:entity-{i}"),
3043 serde_json::json!({"projection": proj, "id": i}),
3044 );
3045 }
3046 }
3047
3048 assert_eq!(cache.len(), 15);
3050
3051 for proj in projections.iter() {
3053 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
3054 assert_eq!(state["projection"], *proj);
3055 }
3056 }
3057}