1use crate::{
2 application::{
3 dto::{
4 EventDto, IngestEventRequest, IngestEventResponse, IngestEventsBatchRequest,
5 IngestEventsBatchResponse, QueryEventsRequest, QueryEventsResponse,
6 },
7 services::{
8 analytics::{
9 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
10 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
11 },
12 pipeline::{PipelineConfig, PipelineStats},
13 replay::{ReplayProgress, StartReplayRequest, StartReplayResponse},
14 schema::{
15 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse,
16 ValidateEventRequest, ValidateEventResponse,
17 },
18 webhook::{RegisterWebhookRequest, UpdateWebhookRequest},
19 },
20 },
21 domain::entities::Event,
22 error::Result,
23 infrastructure::{
24 persistence::{
25 compaction::CompactionResult,
26 snapshot::{
27 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest,
28 ListSnapshotsResponse, SnapshotInfo,
29 },
30 },
31 query::{
32 eventql::EventQLRequest,
33 geospatial::GeoQueryRequest,
34 graphql::{GraphQLError, GraphQLRequest, GraphQLResponse},
35 },
36 replication::ReplicationMode,
37 web::api_v1::AppState,
38 },
39 store::{EventStore, EventTypeInfo, StreamInfo},
40};
41use axum::{
42 Json, Router,
43 extract::{Path, Query, State, WebSocketUpgrade},
44 response::{IntoResponse, Response},
45 routing::{get, post, put},
46};
47use serde::Deserialize;
48use std::sync::Arc;
49use tower_http::{
50 cors::{Any, CorsLayer},
51 trace::TraceLayer,
52};
53
54type SharedStore = Arc<EventStore>;
55
56async fn await_replication_ack(state: &AppState) {
62 let shipper_guard = state.wal_shipper.read().await;
63 if let Some(ref shipper) = *shipper_guard {
64 let mode = shipper.replication_mode();
65 if mode == ReplicationMode::Async {
66 return;
67 }
68
69 let target_offset = shipper.current_leader_offset();
70 if target_offset == 0 {
71 return;
72 }
73
74 let shipper = Arc::clone(shipper);
75 drop(shipper_guard);
77
78 let timer = state
79 .store
80 .metrics()
81 .replication_ack_wait_seconds
82 .start_timer();
83 let acked = shipper.wait_for_ack(target_offset).await;
84 timer.observe_duration();
85
86 if !acked {
87 tracing::warn!(
88 "Replication ACK timeout in {} mode (offset {}). \
89 Write succeeded locally but follower confirmation pending.",
90 mode,
91 target_offset,
92 );
93 }
94 }
95}
96
97pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
98 let app = Router::new()
99 .route("/health", get(health))
100 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
102 .route("/api/v1/events/batch", post(ingest_events_batch))
103 .route("/api/v1/events/query", get(query_events))
104 .route("/api/v1/events/{event_id}", get(get_event_by_id))
105 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/streams", get(list_streams))
108 .route("/api/v1/event-types", get(list_event_types))
109 .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
110 .route(
111 "/api/v1/entities/{entity_id}/snapshot",
112 get(get_entity_snapshot),
113 )
114 .route("/api/v1/stats", get(get_stats))
115 .route("/api/v1/analytics/frequency", get(analytics_frequency))
117 .route("/api/v1/analytics/summary", get(analytics_summary))
118 .route("/api/v1/analytics/correlation", get(analytics_correlation))
119 .route("/api/v1/snapshots", post(create_snapshot))
121 .route("/api/v1/snapshots", get(list_snapshots))
122 .route(
123 "/api/v1/snapshots/{entity_id}/latest",
124 get(get_latest_snapshot),
125 )
126 .route("/api/v1/compaction/trigger", post(trigger_compaction))
128 .route("/api/v1/compaction/stats", get(compaction_stats))
129 .route("/api/v1/schemas", post(register_schema))
131 .route("/api/v1/schemas", get(list_subjects))
132 .route("/api/v1/schemas/{subject}", get(get_schema))
133 .route(
134 "/api/v1/schemas/{subject}/versions",
135 get(list_schema_versions),
136 )
137 .route("/api/v1/schemas/validate", post(validate_event_schema))
138 .route(
139 "/api/v1/schemas/{subject}/compatibility",
140 put(set_compatibility_mode),
141 )
142 .route("/api/v1/replay", post(start_replay))
144 .route("/api/v1/replay", get(list_replays))
145 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
146 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
147 .route(
148 "/api/v1/replay/{replay_id}",
149 axum::routing::delete(delete_replay),
150 )
151 .route("/api/v1/pipelines", post(register_pipeline))
153 .route("/api/v1/pipelines", get(list_pipelines))
154 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
155 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
156 .route(
157 "/api/v1/pipelines/{pipeline_id}",
158 axum::routing::delete(remove_pipeline),
159 )
160 .route(
161 "/api/v1/pipelines/{pipeline_id}/stats",
162 get(get_pipeline_stats),
163 )
164 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
165 .route("/api/v1/projections", get(list_projections))
167 .route("/api/v1/projections/{name}", get(get_projection))
168 .route(
169 "/api/v1/projections/{name}",
170 axum::routing::delete(delete_projection),
171 )
172 .route(
173 "/api/v1/projections/{name}/state",
174 get(get_projection_state_summary),
175 )
176 .route("/api/v1/projections/{name}/reset", post(reset_projection))
177 .route(
178 "/api/v1/projections/{name}/{entity_id}/state",
179 get(get_projection_state),
180 )
181 .route(
182 "/api/v1/projections/{name}/{entity_id}/state",
183 post(save_projection_state),
184 )
185 .route(
186 "/api/v1/projections/{name}/{entity_id}/state",
187 put(save_projection_state),
188 )
189 .route(
190 "/api/v1/projections/{name}/bulk",
191 post(bulk_get_projection_states),
192 )
193 .route(
194 "/api/v1/projections/{name}/bulk/save",
195 post(bulk_save_projection_states),
196 )
197 .route("/api/v1/webhooks", post(register_webhook))
199 .route("/api/v1/webhooks", get(list_webhooks))
200 .route("/api/v1/webhooks/{webhook_id}", get(get_webhook))
201 .route("/api/v1/webhooks/{webhook_id}", put(update_webhook))
202 .route(
203 "/api/v1/webhooks/{webhook_id}",
204 axum::routing::delete(delete_webhook),
205 )
206 .route(
207 "/api/v1/webhooks/{webhook_id}/deliveries",
208 get(list_webhook_deliveries),
209 )
210 .route("/api/v1/eventql", post(eventql_query))
212 .route("/api/v1/graphql", post(graphql_query))
213 .route("/api/v1/geospatial/query", post(geo_query))
214 .route("/api/v1/geospatial/stats", get(geo_stats))
215 .route("/api/v1/exactly-once/stats", get(exactly_once_stats))
216 .route(
217 "/api/v1/schema-evolution/history/{event_type}",
218 get(schema_evolution_history),
219 )
220 .route(
221 "/api/v1/schema-evolution/schema/{event_type}",
222 get(schema_evolution_schema),
223 )
224 .route(
225 "/api/v1/schema-evolution/stats",
226 get(schema_evolution_stats),
227 )
228 .layer(
229 CorsLayer::new()
230 .allow_origin(Any)
231 .allow_methods(Any)
232 .allow_headers(Any),
233 )
234 .layer(TraceLayer::new_for_http())
235 .with_state(store);
236
237 let listener = tokio::net::TcpListener::bind(addr).await?;
238 axum::serve(listener, app).await?;
239
240 Ok(())
241}
242
243pub async fn health() -> impl IntoResponse {
244 Json(serde_json::json!({
245 "status": "healthy",
246 "service": "allsource-core",
247 "version": env!("CARGO_PKG_VERSION")
248 }))
249}
250
251pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
253 let metrics = store.metrics();
254
255 match metrics.encode() {
256 Ok(encoded) => Response::builder()
257 .status(200)
258 .header("Content-Type", "text/plain; version=0.0.4")
259 .body(encoded)
260 .unwrap()
261 .into_response(),
262 Err(e) => Response::builder()
263 .status(500)
264 .body(format!("Error encoding metrics: {e}"))
265 .unwrap()
266 .into_response(),
267 }
268}
269
270pub async fn ingest_event(
271 State(store): State<SharedStore>,
272 Json(req): Json<IngestEventRequest>,
273) -> Result<Json<IngestEventResponse>> {
274 let event = Event::from_strings(
276 req.event_type,
277 req.entity_id,
278 "default".to_string(),
279 req.payload,
280 req.metadata,
281 )?;
282
283 let event_id = event.id;
284 let timestamp = event.timestamp;
285
286 store.ingest(event)?;
287
288 tracing::info!("Event ingested: {}", event_id);
289
290 Ok(Json(IngestEventResponse {
291 event_id,
292 timestamp,
293 }))
294}
295
296pub async fn ingest_event_v1(
300 State(state): State<AppState>,
301 Json(req): Json<IngestEventRequest>,
302) -> Result<Json<IngestEventResponse>> {
303 let event = Event::from_strings(
304 req.event_type,
305 req.entity_id,
306 "default".to_string(),
307 req.payload,
308 req.metadata,
309 )?;
310
311 let event_id = event.id;
312 let timestamp = event.timestamp;
313
314 state.store.ingest(event)?;
315
316 await_replication_ack(&state).await;
318
319 tracing::info!("Event ingested: {}", event_id);
320
321 Ok(Json(IngestEventResponse {
322 event_id,
323 timestamp,
324 }))
325}
326
327pub async fn ingest_events_batch(
332 State(store): State<SharedStore>,
333 Json(req): Json<IngestEventsBatchRequest>,
334) -> Result<Json<IngestEventsBatchResponse>> {
335 let total = req.events.len();
336 let mut ingested_events = Vec::with_capacity(total);
337
338 for event_req in req.events {
339 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
340
341 let event = Event::from_strings(
342 event_req.event_type,
343 event_req.entity_id,
344 tenant_id,
345 event_req.payload,
346 event_req.metadata,
347 )?;
348
349 let event_id = event.id;
350 let timestamp = event.timestamp;
351
352 store.ingest(event)?;
353
354 ingested_events.push(IngestEventResponse {
355 event_id,
356 timestamp,
357 });
358 }
359
360 let ingested = ingested_events.len();
361 tracing::info!("Batch ingested {} events", ingested);
362
363 Ok(Json(IngestEventsBatchResponse {
364 total,
365 ingested,
366 events: ingested_events,
367 }))
368}
369
370pub async fn ingest_events_batch_v1(
374 State(state): State<AppState>,
375 Json(req): Json<IngestEventsBatchRequest>,
376) -> Result<Json<IngestEventsBatchResponse>> {
377 let total = req.events.len();
378 let mut ingested_events = Vec::with_capacity(total);
379
380 for event_req in req.events {
381 let tenant_id = event_req.tenant_id.unwrap_or_else(|| "default".to_string());
382
383 let event = Event::from_strings(
384 event_req.event_type,
385 event_req.entity_id,
386 tenant_id,
387 event_req.payload,
388 event_req.metadata,
389 )?;
390
391 let event_id = event.id;
392 let timestamp = event.timestamp;
393
394 state.store.ingest(event)?;
395
396 ingested_events.push(IngestEventResponse {
397 event_id,
398 timestamp,
399 });
400 }
401
402 await_replication_ack(&state).await;
404
405 let ingested = ingested_events.len();
406 tracing::info!("Batch ingested {} events", ingested);
407
408 Ok(Json(IngestEventsBatchResponse {
409 total,
410 ingested,
411 events: ingested_events,
412 }))
413}
414
415pub async fn query_events(
416 State(store): State<SharedStore>,
417 Query(req): Query<QueryEventsRequest>,
418) -> Result<Json<QueryEventsResponse>> {
419 let domain_events = store.query(req)?;
420 let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
421 let count = events.len();
422
423 tracing::debug!("Query returned {} events", count);
424
425 Ok(Json(QueryEventsResponse { events, count }))
426}
427
428#[derive(Deserialize)]
429pub struct EntityStateParams {
430 as_of: Option<chrono::DateTime<chrono::Utc>>,
431}
432
433pub async fn get_entity_state(
434 State(store): State<SharedStore>,
435 Path(entity_id): Path<String>,
436 Query(params): Query<EntityStateParams>,
437) -> Result<Json<serde_json::Value>> {
438 let state = store.reconstruct_state(&entity_id, params.as_of)?;
439
440 tracing::info!("State reconstructed for entity: {}", entity_id);
441
442 Ok(Json(state))
443}
444
445pub async fn get_entity_snapshot(
446 State(store): State<SharedStore>,
447 Path(entity_id): Path<String>,
448) -> Result<Json<serde_json::Value>> {
449 let snapshot = store.get_snapshot(&entity_id)?;
450
451 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
452
453 Ok(Json(snapshot))
454}
455
456pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
457 let stats = store.stats();
458 Json(stats)
459}
460
461#[derive(Debug, Deserialize)]
464pub struct ListStreamsParams {
465 pub limit: Option<usize>,
467 pub offset: Option<usize>,
469}
470
471#[derive(Debug, serde::Serialize)]
473pub struct ListStreamsResponse {
474 pub streams: Vec<StreamInfo>,
475 pub total: usize,
476}
477
478pub async fn list_streams(
479 State(store): State<SharedStore>,
480 Query(params): Query<ListStreamsParams>,
481) -> Json<ListStreamsResponse> {
482 let mut streams = store.list_streams();
483 let total = streams.len();
484
485 streams.sort_by(|a, b| b.last_event_at.cmp(&a.last_event_at));
487
488 if let Some(offset) = params.offset {
490 if offset < streams.len() {
491 streams = streams[offset..].to_vec();
492 } else {
493 streams = vec![];
494 }
495 }
496
497 if let Some(limit) = params.limit {
498 streams.truncate(limit);
499 }
500
501 tracing::debug!("Listed {} streams (total: {})", streams.len(), total);
502
503 Json(ListStreamsResponse { streams, total })
504}
505
506#[derive(Debug, Deserialize)]
509pub struct ListEventTypesParams {
510 pub limit: Option<usize>,
512 pub offset: Option<usize>,
514}
515
516#[derive(Debug, serde::Serialize)]
518pub struct ListEventTypesResponse {
519 pub event_types: Vec<EventTypeInfo>,
520 pub total: usize,
521}
522
523pub async fn list_event_types(
524 State(store): State<SharedStore>,
525 Query(params): Query<ListEventTypesParams>,
526) -> Json<ListEventTypesResponse> {
527 let mut event_types = store.list_event_types();
528 let total = event_types.len();
529
530 event_types.sort_by(|a, b| b.event_count.cmp(&a.event_count));
532
533 if let Some(offset) = params.offset {
535 if offset < event_types.len() {
536 event_types = event_types[offset..].to_vec();
537 } else {
538 event_types = vec![];
539 }
540 }
541
542 if let Some(limit) = params.limit {
543 event_types.truncate(limit);
544 }
545
546 tracing::debug!(
547 "Listed {} event types (total: {})",
548 event_types.len(),
549 total
550 );
551
552 Json(ListEventTypesResponse { event_types, total })
553}
554
555pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
557 let websocket_manager = store.websocket_manager();
558
559 ws.on_upgrade(move |socket| async move {
560 websocket_manager.handle_socket(socket).await;
561 })
562}
563
564pub async fn analytics_frequency(
566 State(store): State<SharedStore>,
567 Query(req): Query<EventFrequencyRequest>,
568) -> Result<Json<EventFrequencyResponse>> {
569 let response = AnalyticsEngine::event_frequency(&store, req)?;
570
571 tracing::debug!(
572 "Frequency analysis returned {} buckets",
573 response.buckets.len()
574 );
575
576 Ok(Json(response))
577}
578
579pub async fn analytics_summary(
581 State(store): State<SharedStore>,
582 Query(req): Query<StatsSummaryRequest>,
583) -> Result<Json<StatsSummaryResponse>> {
584 let response = AnalyticsEngine::stats_summary(&store, req)?;
585
586 tracing::debug!(
587 "Stats summary: {} events across {} entities",
588 response.total_events,
589 response.unique_entities
590 );
591
592 Ok(Json(response))
593}
594
595pub async fn analytics_correlation(
597 State(store): State<SharedStore>,
598 Query(req): Query<CorrelationRequest>,
599) -> Result<Json<CorrelationResponse>> {
600 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
601
602 tracing::debug!(
603 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
604 response.correlated_pairs,
605 response.total_a,
606 response.correlation_percentage
607 );
608
609 Ok(Json(response))
610}
611
612pub async fn create_snapshot(
614 State(store): State<SharedStore>,
615 Json(req): Json<CreateSnapshotRequest>,
616) -> Result<Json<CreateSnapshotResponse>> {
617 store.create_snapshot(&req.entity_id)?;
618
619 let snapshot_manager = store.snapshot_manager();
620 let snapshot = snapshot_manager
621 .get_latest_snapshot(&req.entity_id)
622 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
623
624 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
625
626 Ok(Json(CreateSnapshotResponse {
627 snapshot_id: snapshot.id,
628 entity_id: snapshot.entity_id,
629 created_at: snapshot.created_at,
630 event_count: snapshot.event_count,
631 size_bytes: snapshot.metadata.size_bytes,
632 }))
633}
634
635pub async fn list_snapshots(
637 State(store): State<SharedStore>,
638 Query(req): Query<ListSnapshotsRequest>,
639) -> Result<Json<ListSnapshotsResponse>> {
640 let snapshot_manager = store.snapshot_manager();
641
642 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
643 snapshot_manager
644 .get_all_snapshots(&entity_id)
645 .into_iter()
646 .map(SnapshotInfo::from)
647 .collect()
648 } else {
649 let entities = snapshot_manager.list_entities();
651 entities
652 .iter()
653 .flat_map(|entity_id| {
654 snapshot_manager
655 .get_all_snapshots(entity_id)
656 .into_iter()
657 .map(SnapshotInfo::from)
658 })
659 .collect()
660 };
661
662 let total = snapshots.len();
663
664 tracing::debug!("Listed {} snapshots", total);
665
666 Ok(Json(ListSnapshotsResponse { snapshots, total }))
667}
668
669pub async fn get_latest_snapshot(
671 State(store): State<SharedStore>,
672 Path(entity_id): Path<String>,
673) -> Result<Json<serde_json::Value>> {
674 let snapshot_manager = store.snapshot_manager();
675
676 let snapshot = snapshot_manager
677 .get_latest_snapshot(&entity_id)
678 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
679
680 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
681
682 Ok(Json(serde_json::json!({
683 "snapshot_id": snapshot.id,
684 "entity_id": snapshot.entity_id,
685 "created_at": snapshot.created_at,
686 "as_of": snapshot.as_of,
687 "event_count": snapshot.event_count,
688 "size_bytes": snapshot.metadata.size_bytes,
689 "snapshot_type": snapshot.metadata.snapshot_type,
690 "state": snapshot.state
691 })))
692}
693
694pub async fn trigger_compaction(
696 State(store): State<SharedStore>,
697) -> Result<Json<CompactionResult>> {
698 let compaction_manager = store.compaction_manager().ok_or_else(|| {
699 crate::error::AllSourceError::InternalError(
700 "Compaction not enabled (no Parquet storage)".to_string(),
701 )
702 })?;
703
704 tracing::info!("📦 Manual compaction triggered via API");
705
706 let result = compaction_manager.compact_now()?;
707
708 Ok(Json(result))
709}
710
711pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
713 let compaction_manager = store.compaction_manager().ok_or_else(|| {
714 crate::error::AllSourceError::InternalError(
715 "Compaction not enabled (no Parquet storage)".to_string(),
716 )
717 })?;
718
719 let stats = compaction_manager.stats();
720 let config = compaction_manager.config();
721
722 Ok(Json(serde_json::json!({
723 "stats": stats,
724 "config": {
725 "min_files_to_compact": config.min_files_to_compact,
726 "target_file_size": config.target_file_size,
727 "max_file_size": config.max_file_size,
728 "small_file_threshold": config.small_file_threshold,
729 "compaction_interval_seconds": config.compaction_interval_seconds,
730 "auto_compact": config.auto_compact,
731 "strategy": config.strategy
732 }
733 })))
734}
735
736pub async fn register_schema(
738 State(store): State<SharedStore>,
739 Json(req): Json<RegisterSchemaRequest>,
740) -> Result<Json<RegisterSchemaResponse>> {
741 let schema_registry = store.schema_registry();
742
743 let response =
744 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
745
746 tracing::info!(
747 "📋 Schema registered: v{} for '{}'",
748 response.version,
749 response.subject
750 );
751
752 Ok(Json(response))
753}
754
755#[derive(Deserialize)]
757pub struct GetSchemaParams {
758 version: Option<u32>,
759}
760
761pub async fn get_schema(
762 State(store): State<SharedStore>,
763 Path(subject): Path<String>,
764 Query(params): Query<GetSchemaParams>,
765) -> Result<Json<serde_json::Value>> {
766 let schema_registry = store.schema_registry();
767
768 let schema = schema_registry.get_schema(&subject, params.version)?;
769
770 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
771
772 Ok(Json(serde_json::json!({
773 "id": schema.id,
774 "subject": schema.subject,
775 "version": schema.version,
776 "schema": schema.schema,
777 "created_at": schema.created_at,
778 "description": schema.description,
779 "tags": schema.tags
780 })))
781}
782
783pub async fn list_schema_versions(
785 State(store): State<SharedStore>,
786 Path(subject): Path<String>,
787) -> Result<Json<serde_json::Value>> {
788 let schema_registry = store.schema_registry();
789
790 let versions = schema_registry.list_versions(&subject)?;
791
792 Ok(Json(serde_json::json!({
793 "subject": subject,
794 "versions": versions
795 })))
796}
797
798pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
800 let schema_registry = store.schema_registry();
801
802 let subjects = schema_registry.list_subjects();
803
804 Json(serde_json::json!({
805 "subjects": subjects,
806 "total": subjects.len()
807 }))
808}
809
810pub async fn validate_event_schema(
812 State(store): State<SharedStore>,
813 Json(req): Json<ValidateEventRequest>,
814) -> Result<Json<ValidateEventResponse>> {
815 let schema_registry = store.schema_registry();
816
817 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
818
819 if response.valid {
820 tracing::debug!(
821 "✅ Event validated against schema '{}' v{}",
822 req.subject,
823 response.schema_version
824 );
825 } else {
826 tracing::warn!(
827 "❌ Event validation failed for '{}': {:?}",
828 req.subject,
829 response.errors
830 );
831 }
832
833 Ok(Json(response))
834}
835
836#[derive(Deserialize)]
838pub struct SetCompatibilityRequest {
839 compatibility: CompatibilityMode,
840}
841
842pub async fn set_compatibility_mode(
843 State(store): State<SharedStore>,
844 Path(subject): Path<String>,
845 Json(req): Json<SetCompatibilityRequest>,
846) -> Json<serde_json::Value> {
847 let schema_registry = store.schema_registry();
848
849 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
850
851 tracing::info!(
852 "🔧 Set compatibility mode for '{}' to {:?}",
853 subject,
854 req.compatibility
855 );
856
857 Json(serde_json::json!({
858 "subject": subject,
859 "compatibility": req.compatibility
860 }))
861}
862
863pub async fn start_replay(
865 State(store): State<SharedStore>,
866 Json(req): Json<StartReplayRequest>,
867) -> Result<Json<StartReplayResponse>> {
868 let replay_manager = store.replay_manager();
869
870 let response = replay_manager.start_replay(store, req)?;
871
872 tracing::info!(
873 "🔄 Started replay {} with {} events",
874 response.replay_id,
875 response.total_events
876 );
877
878 Ok(Json(response))
879}
880
881pub async fn get_replay_progress(
883 State(store): State<SharedStore>,
884 Path(replay_id): Path<uuid::Uuid>,
885) -> Result<Json<ReplayProgress>> {
886 let replay_manager = store.replay_manager();
887
888 let progress = replay_manager.get_progress(replay_id)?;
889
890 Ok(Json(progress))
891}
892
893pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
895 let replay_manager = store.replay_manager();
896
897 let replays = replay_manager.list_replays();
898
899 Json(serde_json::json!({
900 "replays": replays,
901 "total": replays.len()
902 }))
903}
904
905pub async fn cancel_replay(
907 State(store): State<SharedStore>,
908 Path(replay_id): Path<uuid::Uuid>,
909) -> Result<Json<serde_json::Value>> {
910 let replay_manager = store.replay_manager();
911
912 replay_manager.cancel_replay(replay_id)?;
913
914 tracing::info!("🛑 Cancelled replay {}", replay_id);
915
916 Ok(Json(serde_json::json!({
917 "replay_id": replay_id,
918 "status": "cancelled"
919 })))
920}
921
922pub async fn delete_replay(
924 State(store): State<SharedStore>,
925 Path(replay_id): Path<uuid::Uuid>,
926) -> Result<Json<serde_json::Value>> {
927 let replay_manager = store.replay_manager();
928
929 let deleted = replay_manager.delete_replay(replay_id)?;
930
931 if deleted {
932 tracing::info!("🗑️ Deleted replay {}", replay_id);
933 }
934
935 Ok(Json(serde_json::json!({
936 "replay_id": replay_id,
937 "deleted": deleted
938 })))
939}
940
941pub async fn register_pipeline(
943 State(store): State<SharedStore>,
944 Json(config): Json<PipelineConfig>,
945) -> Result<Json<serde_json::Value>> {
946 let pipeline_manager = store.pipeline_manager();
947
948 let pipeline_id = pipeline_manager.register(config.clone());
949
950 tracing::info!(
951 "🔀 Pipeline registered: {} (name: {})",
952 pipeline_id,
953 config.name
954 );
955
956 Ok(Json(serde_json::json!({
957 "pipeline_id": pipeline_id,
958 "name": config.name,
959 "enabled": config.enabled
960 })))
961}
962
963pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
965 let pipeline_manager = store.pipeline_manager();
966
967 let pipelines = pipeline_manager.list();
968
969 tracing::debug!("Listed {} pipelines", pipelines.len());
970
971 Json(serde_json::json!({
972 "pipelines": pipelines,
973 "total": pipelines.len()
974 }))
975}
976
977pub async fn get_pipeline(
979 State(store): State<SharedStore>,
980 Path(pipeline_id): Path<uuid::Uuid>,
981) -> Result<Json<PipelineConfig>> {
982 let pipeline_manager = store.pipeline_manager();
983
984 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
985 crate::error::AllSourceError::ValidationError(format!(
986 "Pipeline not found: {}",
987 pipeline_id
988 ))
989 })?;
990
991 Ok(Json(pipeline.config().clone()))
992}
993
994pub async fn remove_pipeline(
996 State(store): State<SharedStore>,
997 Path(pipeline_id): Path<uuid::Uuid>,
998) -> Result<Json<serde_json::Value>> {
999 let pipeline_manager = store.pipeline_manager();
1000
1001 let removed = pipeline_manager.remove(pipeline_id);
1002
1003 if removed {
1004 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
1005 }
1006
1007 Ok(Json(serde_json::json!({
1008 "pipeline_id": pipeline_id,
1009 "removed": removed
1010 })))
1011}
1012
1013pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1015 let pipeline_manager = store.pipeline_manager();
1016
1017 let stats = pipeline_manager.all_stats();
1018
1019 Json(serde_json::json!({
1020 "stats": stats,
1021 "total": stats.len()
1022 }))
1023}
1024
1025pub async fn get_pipeline_stats(
1027 State(store): State<SharedStore>,
1028 Path(pipeline_id): Path<uuid::Uuid>,
1029) -> Result<Json<PipelineStats>> {
1030 let pipeline_manager = store.pipeline_manager();
1031
1032 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1033 crate::error::AllSourceError::ValidationError(format!(
1034 "Pipeline not found: {}",
1035 pipeline_id
1036 ))
1037 })?;
1038
1039 Ok(Json(pipeline.stats()))
1040}
1041
1042pub async fn reset_pipeline(
1044 State(store): State<SharedStore>,
1045 Path(pipeline_id): Path<uuid::Uuid>,
1046) -> Result<Json<serde_json::Value>> {
1047 let pipeline_manager = store.pipeline_manager();
1048
1049 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
1050 crate::error::AllSourceError::ValidationError(format!(
1051 "Pipeline not found: {}",
1052 pipeline_id
1053 ))
1054 })?;
1055
1056 pipeline.reset();
1057
1058 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
1059
1060 Ok(Json(serde_json::json!({
1061 "pipeline_id": pipeline_id,
1062 "reset": true
1063 })))
1064}
1065
1066pub async fn get_event_by_id(
1072 State(store): State<SharedStore>,
1073 Path(event_id): Path<uuid::Uuid>,
1074) -> Result<Json<serde_json::Value>> {
1075 let event = store.get_event_by_id(&event_id)?.ok_or_else(|| {
1076 crate::error::AllSourceError::EntityNotFound(format!("Event '{}' not found", event_id))
1077 })?;
1078
1079 let dto = EventDto::from(&event);
1080
1081 tracing::debug!("Event retrieved by ID: {}", event_id);
1082
1083 Ok(Json(serde_json::json!({
1084 "event": dto,
1085 "found": true
1086 })))
1087}
1088
1089pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1095 let projection_manager = store.projection_manager();
1096
1097 let projections: Vec<serde_json::Value> = projection_manager
1098 .list_projections()
1099 .iter()
1100 .map(|(name, projection)| {
1101 serde_json::json!({
1102 "name": name,
1103 "type": format!("{:?}", projection.name()),
1104 })
1105 })
1106 .collect();
1107
1108 tracing::debug!("Listed {} projections", projections.len());
1109
1110 Json(serde_json::json!({
1111 "projections": projections,
1112 "total": projections.len()
1113 }))
1114}
1115
1116pub async fn get_projection(
1118 State(store): State<SharedStore>,
1119 Path(name): Path<String>,
1120) -> Result<Json<serde_json::Value>> {
1121 let projection_manager = store.projection_manager();
1122
1123 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1124 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1125 })?;
1126
1127 Ok(Json(serde_json::json!({
1128 "name": projection.name(),
1129 "found": true
1130 })))
1131}
1132
1133pub async fn get_projection_state(
1138 State(store): State<SharedStore>,
1139 Path((name, entity_id)): Path<(String, String)>,
1140) -> Result<Json<serde_json::Value>> {
1141 let projection_manager = store.projection_manager();
1142
1143 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1144 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1145 })?;
1146
1147 let state = projection.get_state(&entity_id);
1148
1149 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
1150
1151 Ok(Json(serde_json::json!({
1152 "projection": name,
1153 "entity_id": entity_id,
1154 "state": state,
1155 "found": state.is_some()
1156 })))
1157}
1158
1159pub async fn delete_projection(
1164 State(store): State<SharedStore>,
1165 Path(name): Path<String>,
1166) -> Result<Json<serde_json::Value>> {
1167 let projection_manager = store.projection_manager();
1168
1169 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1170 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1171 })?;
1172
1173 projection.clear();
1174
1175 let cache = store.projection_state_cache();
1177 let prefix = format!("{name}:");
1178 let keys_to_remove: Vec<String> = cache
1179 .iter()
1180 .filter(|entry| entry.key().starts_with(&prefix))
1181 .map(|entry| entry.key().clone())
1182 .collect();
1183 for key in keys_to_remove {
1184 cache.remove(&key);
1185 }
1186
1187 tracing::info!("Projection deleted (cleared): {}", name);
1188
1189 Ok(Json(serde_json::json!({
1190 "projection": name,
1191 "deleted": true
1192 })))
1193}
1194
1195pub async fn get_projection_state_summary(
1199 State(store): State<SharedStore>,
1200 Path(name): Path<String>,
1201) -> Result<Json<serde_json::Value>> {
1202 let projection_manager = store.projection_manager();
1203
1204 let _projection = projection_manager.get_projection(&name).ok_or_else(|| {
1205 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1206 })?;
1207
1208 let cache = store.projection_state_cache();
1210 let prefix = format!("{name}:");
1211 let states: Vec<serde_json::Value> = cache
1212 .iter()
1213 .filter(|entry| entry.key().starts_with(&prefix))
1214 .map(|entry| {
1215 let entity_id = entry.key().strip_prefix(&prefix).unwrap_or(entry.key());
1216 serde_json::json!({
1217 "entity_id": entity_id,
1218 "state": entry.value().clone()
1219 })
1220 })
1221 .collect();
1222
1223 let total = states.len();
1224
1225 tracing::debug!("Projection state summary: {} ({} entities)", name, total);
1226
1227 Ok(Json(serde_json::json!({
1228 "projection": name,
1229 "states": states,
1230 "total": total
1231 })))
1232}
1233
1234pub async fn reset_projection(
1238 State(store): State<SharedStore>,
1239 Path(name): Path<String>,
1240) -> Result<Json<serde_json::Value>> {
1241 let reprocessed = store.reset_projection(&name)?;
1242
1243 tracing::info!(
1244 "Projection reset: {} ({} events reprocessed)",
1245 name,
1246 reprocessed
1247 );
1248
1249 Ok(Json(serde_json::json!({
1250 "projection": name,
1251 "reset": true,
1252 "events_reprocessed": reprocessed
1253 })))
1254}
1255
1256#[derive(Debug, Deserialize)]
1258pub struct SaveProjectionStateRequest {
1259 pub state: serde_json::Value,
1260}
1261
1262pub async fn save_projection_state(
1267 State(store): State<SharedStore>,
1268 Path((name, entity_id)): Path<(String, String)>,
1269 Json(req): Json<SaveProjectionStateRequest>,
1270) -> Result<Json<serde_json::Value>> {
1271 let projection_cache = store.projection_state_cache();
1272
1273 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
1275
1276 tracing::info!("Projection state saved: {} / {}", name, entity_id);
1277
1278 Ok(Json(serde_json::json!({
1279 "projection": name,
1280 "entity_id": entity_id,
1281 "saved": true
1282 })))
1283}
1284
1285#[derive(Debug, Deserialize)]
1289pub struct BulkGetStateRequest {
1290 pub entity_ids: Vec<String>,
1291}
1292
1293#[derive(Debug, Deserialize)]
1297pub struct BulkSaveStateRequest {
1298 pub states: Vec<BulkSaveStateItem>,
1299}
1300
1301#[derive(Debug, Deserialize)]
1302pub struct BulkSaveStateItem {
1303 pub entity_id: String,
1304 pub state: serde_json::Value,
1305}
1306
1307pub async fn bulk_get_projection_states(
1308 State(store): State<SharedStore>,
1309 Path(name): Path<String>,
1310 Json(req): Json<BulkGetStateRequest>,
1311) -> Result<Json<serde_json::Value>> {
1312 let projection_manager = store.projection_manager();
1313
1314 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
1315 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
1316 })?;
1317
1318 let states: Vec<serde_json::Value> = req
1319 .entity_ids
1320 .iter()
1321 .map(|entity_id| {
1322 let state = projection.get_state(entity_id);
1323 serde_json::json!({
1324 "entity_id": entity_id,
1325 "state": state,
1326 "found": state.is_some()
1327 })
1328 })
1329 .collect();
1330
1331 tracing::debug!(
1332 "Bulk projection state retrieved: {} entities from {}",
1333 states.len(),
1334 name
1335 );
1336
1337 Ok(Json(serde_json::json!({
1338 "projection": name,
1339 "states": states,
1340 "total": states.len()
1341 })))
1342}
1343
1344pub async fn bulk_save_projection_states(
1349 State(store): State<SharedStore>,
1350 Path(name): Path<String>,
1351 Json(req): Json<BulkSaveStateRequest>,
1352) -> Result<Json<serde_json::Value>> {
1353 let projection_cache = store.projection_state_cache();
1354
1355 let mut saved_count = 0;
1356 for item in &req.states {
1357 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
1358 saved_count += 1;
1359 }
1360
1361 tracing::info!(
1362 "Bulk projection state saved: {} entities for {}",
1363 saved_count,
1364 name
1365 );
1366
1367 Ok(Json(serde_json::json!({
1368 "projection": name,
1369 "saved": saved_count,
1370 "total": req.states.len()
1371 })))
1372}
1373
1374#[derive(Debug, Deserialize)]
1380pub struct ListWebhooksParams {
1381 pub tenant_id: Option<String>,
1382}
1383
1384pub async fn register_webhook(
1386 State(store): State<SharedStore>,
1387 Json(req): Json<RegisterWebhookRequest>,
1388) -> Json<serde_json::Value> {
1389 let registry = store.webhook_registry();
1390 let webhook = registry.register(req);
1391
1392 tracing::info!("Webhook registered: {} -> {}", webhook.id, webhook.url);
1393
1394 Json(serde_json::json!({
1395 "webhook": webhook,
1396 "created": true
1397 }))
1398}
1399
1400pub async fn list_webhooks(
1402 State(store): State<SharedStore>,
1403 Query(params): Query<ListWebhooksParams>,
1404) -> Json<serde_json::Value> {
1405 let registry = store.webhook_registry();
1406
1407 let webhooks = if let Some(tenant_id) = params.tenant_id {
1408 registry.list_by_tenant(&tenant_id)
1409 } else {
1410 vec![]
1412 };
1413
1414 let total = webhooks.len();
1415
1416 Json(serde_json::json!({
1417 "webhooks": webhooks,
1418 "total": total
1419 }))
1420}
1421
1422pub async fn get_webhook(
1424 State(store): State<SharedStore>,
1425 Path(webhook_id): Path<uuid::Uuid>,
1426) -> Result<Json<serde_json::Value>> {
1427 let registry = store.webhook_registry();
1428
1429 let webhook = registry.get(webhook_id).ok_or_else(|| {
1430 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1431 })?;
1432
1433 Ok(Json(serde_json::json!({
1434 "webhook": webhook,
1435 "found": true
1436 })))
1437}
1438
1439pub async fn update_webhook(
1441 State(store): State<SharedStore>,
1442 Path(webhook_id): Path<uuid::Uuid>,
1443 Json(req): Json<UpdateWebhookRequest>,
1444) -> Result<Json<serde_json::Value>> {
1445 let registry = store.webhook_registry();
1446
1447 let webhook = registry.update(webhook_id, req).ok_or_else(|| {
1448 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1449 })?;
1450
1451 tracing::info!("Webhook updated: {}", webhook_id);
1452
1453 Ok(Json(serde_json::json!({
1454 "webhook": webhook,
1455 "updated": true
1456 })))
1457}
1458
1459pub async fn delete_webhook(
1461 State(store): State<SharedStore>,
1462 Path(webhook_id): Path<uuid::Uuid>,
1463) -> Result<Json<serde_json::Value>> {
1464 let registry = store.webhook_registry();
1465
1466 let webhook = registry.delete(webhook_id).ok_or_else(|| {
1467 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1468 })?;
1469
1470 tracing::info!("Webhook deleted: {} ({})", webhook_id, webhook.url);
1471
1472 Ok(Json(serde_json::json!({
1473 "webhook_id": webhook_id,
1474 "deleted": true
1475 })))
1476}
1477
1478#[derive(Debug, Deserialize)]
1480pub struct ListDeliveriesParams {
1481 pub limit: Option<usize>,
1482}
1483
1484pub async fn list_webhook_deliveries(
1486 State(store): State<SharedStore>,
1487 Path(webhook_id): Path<uuid::Uuid>,
1488 Query(params): Query<ListDeliveriesParams>,
1489) -> Result<Json<serde_json::Value>> {
1490 let registry = store.webhook_registry();
1491
1492 registry.get(webhook_id).ok_or_else(|| {
1494 crate::error::AllSourceError::EntityNotFound(format!("Webhook '{}' not found", webhook_id))
1495 })?;
1496
1497 let limit = params.limit.unwrap_or(50);
1498 let deliveries = registry.get_deliveries(webhook_id, limit);
1499 let total = deliveries.len();
1500
1501 Ok(Json(serde_json::json!({
1502 "webhook_id": webhook_id,
1503 "deliveries": deliveries,
1504 "total": total
1505 })))
1506}
1507
1508pub async fn eventql_query(
1514 State(store): State<SharedStore>,
1515 Json(req): Json<EventQLRequest>,
1516) -> Result<Json<serde_json::Value>> {
1517 let events = store.snapshot_events();
1518 match crate::infrastructure::query::eventql::execute_eventql(&events, &req).await {
1519 Ok(response) => Ok(Json(serde_json::json!({
1520 "columns": response.columns,
1521 "rows": response.rows,
1522 "row_count": response.row_count,
1523 }))),
1524 Err(e) => Err(crate::error::AllSourceError::InvalidQuery(e)),
1525 }
1526}
1527
1528pub async fn graphql_query(
1530 State(store): State<SharedStore>,
1531 Json(req): Json<GraphQLRequest>,
1532) -> Json<serde_json::Value> {
1533 let fields = match crate::infrastructure::query::graphql::parse_query(&req.query) {
1534 Ok(f) => f,
1535 Err(e) => {
1536 return Json(
1537 serde_json::to_value(GraphQLResponse {
1538 data: None,
1539 errors: vec![GraphQLError { message: e }],
1540 })
1541 .unwrap(),
1542 );
1543 }
1544 };
1545
1546 let mut data = serde_json::Map::new();
1547 let mut errors = Vec::new();
1548
1549 for field in &fields {
1550 match field.name.as_str() {
1551 "events" => {
1552 let request = crate::application::dto::QueryEventsRequest {
1553 entity_id: field.arguments.get("entity_id").cloned(),
1554 event_type: field.arguments.get("event_type").cloned(),
1555 tenant_id: field.arguments.get("tenant_id").cloned(),
1556 limit: field.arguments.get("limit").and_then(|l| l.parse().ok()),
1557 as_of: None,
1558 since: None,
1559 until: None,
1560 };
1561 match store.query(request) {
1562 Ok(events) => {
1563 let json_events: Vec<serde_json::Value> = events
1564 .iter()
1565 .map(|e| {
1566 crate::infrastructure::query::graphql::event_to_json(
1567 e,
1568 &field.fields,
1569 )
1570 })
1571 .collect();
1572 data.insert("events".to_string(), serde_json::Value::Array(json_events));
1573 }
1574 Err(e) => errors.push(GraphQLError {
1575 message: format!("events query failed: {e}"),
1576 }),
1577 }
1578 }
1579 "event" => {
1580 if let Some(id_str) = field.arguments.get("id") {
1581 if let Ok(id) = uuid::Uuid::parse_str(id_str) {
1582 match store.get_event_by_id(&id) {
1583 Ok(Some(event)) => {
1584 data.insert(
1585 "event".to_string(),
1586 crate::infrastructure::query::graphql::event_to_json(
1587 &event,
1588 &field.fields,
1589 ),
1590 );
1591 }
1592 Ok(None) => {
1593 data.insert("event".to_string(), serde_json::Value::Null);
1594 }
1595 Err(e) => errors.push(GraphQLError {
1596 message: format!("event lookup failed: {e}"),
1597 }),
1598 }
1599 } else {
1600 errors.push(GraphQLError {
1601 message: format!("Invalid UUID: {id_str}"),
1602 });
1603 }
1604 } else {
1605 errors.push(GraphQLError {
1606 message: "event query requires 'id' argument".to_string(),
1607 });
1608 }
1609 }
1610 "projections" => {
1611 let pm = store.projection_manager();
1612 let names: Vec<serde_json::Value> = pm
1613 .list_projections()
1614 .iter()
1615 .map(|(name, _)| serde_json::Value::String(name.clone()))
1616 .collect();
1617 data.insert("projections".to_string(), serde_json::Value::Array(names));
1618 }
1619 "stats" => {
1620 let stats = store.stats();
1621 data.insert(
1622 "stats".to_string(),
1623 serde_json::json!({
1624 "total_events": stats.total_events,
1625 "total_entities": stats.total_entities,
1626 "total_event_types": stats.total_event_types,
1627 }),
1628 );
1629 }
1630 "__schema" => {
1631 data.insert(
1632 "__schema".to_string(),
1633 crate::infrastructure::query::graphql::introspection_schema(),
1634 );
1635 }
1636 other => {
1637 errors.push(GraphQLError {
1638 message: format!("Unknown field: {other}"),
1639 });
1640 }
1641 }
1642 }
1643
1644 Json(
1645 serde_json::to_value(GraphQLResponse {
1646 data: Some(serde_json::Value::Object(data)),
1647 errors,
1648 })
1649 .unwrap(),
1650 )
1651}
1652
1653pub async fn geo_query(
1655 State(store): State<SharedStore>,
1656 Json(req): Json<GeoQueryRequest>,
1657) -> Json<serde_json::Value> {
1658 let events = store.snapshot_events();
1659 let geo_index = store.geo_index();
1660 let results =
1661 crate::infrastructure::query::geospatial::execute_geo_query(&events, &geo_index, &req);
1662 let total = results.len();
1663 Json(serde_json::json!({
1664 "results": results,
1665 "total": total,
1666 }))
1667}
1668
1669pub async fn geo_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1671 let stats = store.geo_index().stats();
1672 Json(serde_json::json!(stats))
1673}
1674
1675pub async fn exactly_once_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1677 let stats = store.exactly_once().stats();
1678 Json(serde_json::json!(stats))
1679}
1680
1681pub async fn schema_evolution_history(
1683 State(store): State<SharedStore>,
1684 Path(event_type): Path<String>,
1685) -> Json<serde_json::Value> {
1686 let mgr = store.schema_evolution();
1687 let history = mgr.get_history(&event_type);
1688 let version = mgr.get_version(&event_type);
1689 Json(serde_json::json!({
1690 "event_type": event_type,
1691 "current_version": version,
1692 "history": history,
1693 }))
1694}
1695
1696pub async fn schema_evolution_schema(
1698 State(store): State<SharedStore>,
1699 Path(event_type): Path<String>,
1700) -> Json<serde_json::Value> {
1701 let mgr = store.schema_evolution();
1702 if let Some(schema) = mgr.get_schema(&event_type) {
1703 let json_schema = crate::application::services::schema_evolution::to_json_schema(&schema);
1704 Json(serde_json::json!({
1705 "event_type": event_type,
1706 "version": mgr.get_version(&event_type),
1707 "inferred_schema": schema,
1708 "json_schema": json_schema,
1709 }))
1710 } else {
1711 Json(serde_json::json!({
1712 "event_type": event_type,
1713 "error": "No schema inferred for this event type"
1714 }))
1715 }
1716}
1717
1718pub async fn schema_evolution_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
1720 let stats = store.schema_evolution().stats();
1721 let event_types = store.schema_evolution().list_event_types();
1722 Json(serde_json::json!({
1723 "stats": stats,
1724 "tracked_event_types": event_types,
1725 }))
1726}
1727
1728#[cfg(test)]
1729mod tests {
1730 use super::*;
1731 use crate::{domain::entities::Event, store::EventStore};
1732
1733 fn create_test_store() -> Arc<EventStore> {
1734 Arc::new(EventStore::new())
1735 }
1736
1737 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
1738 Event::from_strings(
1739 event_type.to_string(),
1740 entity_id.to_string(),
1741 "test-stream".to_string(),
1742 serde_json::json!({
1743 "name": "Test",
1744 "value": 42
1745 }),
1746 None,
1747 )
1748 .unwrap()
1749 }
1750
1751 #[tokio::test]
1752 async fn test_projection_state_cache() {
1753 let store = create_test_store();
1754
1755 let cache = store.projection_state_cache();
1757 cache.insert(
1758 "entity_snapshots:user-123".to_string(),
1759 serde_json::json!({"name": "Test User", "age": 30}),
1760 );
1761
1762 let state = cache.get("entity_snapshots:user-123");
1764 assert!(state.is_some());
1765 let state = state.unwrap();
1766 assert_eq!(state["name"], "Test User");
1767 assert_eq!(state["age"], 30);
1768 }
1769
1770 #[tokio::test]
1771 async fn test_projection_manager_list_projections() {
1772 let store = create_test_store();
1773
1774 let projection_manager = store.projection_manager();
1776 let projections = projection_manager.list_projections();
1777
1778 assert!(projections.len() >= 2);
1780
1781 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
1782 assert!(names.contains(&"entity_snapshots"));
1783 assert!(names.contains(&"event_counters"));
1784 }
1785
1786 #[tokio::test]
1787 async fn test_projection_state_after_event_ingestion() {
1788 let store = create_test_store();
1789
1790 let event = create_test_event("user-456", "user.created");
1792 store.ingest(event).unwrap();
1793
1794 let projection_manager = store.projection_manager();
1796 let snapshot_projection = projection_manager
1797 .get_projection("entity_snapshots")
1798 .unwrap();
1799
1800 let state = snapshot_projection.get_state("user-456");
1801 assert!(state.is_some());
1802 let state = state.unwrap();
1803 assert_eq!(state["name"], "Test");
1804 assert_eq!(state["value"], 42);
1805 }
1806
1807 #[tokio::test]
1808 async fn test_projection_state_cache_multiple_entities() {
1809 let store = create_test_store();
1810 let cache = store.projection_state_cache();
1811
1812 for i in 0..10 {
1814 cache.insert(
1815 format!("entity_snapshots:entity-{}", i),
1816 serde_json::json!({"id": i, "status": "active"}),
1817 );
1818 }
1819
1820 assert_eq!(cache.len(), 10);
1822
1823 for i in 0..10 {
1825 let key = format!("entity_snapshots:entity-{}", i);
1826 let state = cache.get(&key);
1827 assert!(state.is_some());
1828 assert_eq!(state.unwrap()["id"], i);
1829 }
1830 }
1831
1832 #[tokio::test]
1833 async fn test_projection_state_update() {
1834 let store = create_test_store();
1835 let cache = store.projection_state_cache();
1836
1837 cache.insert(
1839 "entity_snapshots:user-789".to_string(),
1840 serde_json::json!({"balance": 100}),
1841 );
1842
1843 cache.insert(
1845 "entity_snapshots:user-789".to_string(),
1846 serde_json::json!({"balance": 150}),
1847 );
1848
1849 let state = cache.get("entity_snapshots:user-789").unwrap();
1851 assert_eq!(state["balance"], 150);
1852 }
1853
1854 #[tokio::test]
1855 async fn test_event_counter_projection() {
1856 let store = create_test_store();
1857
1858 store
1860 .ingest(create_test_event("user-1", "user.created"))
1861 .unwrap();
1862 store
1863 .ingest(create_test_event("user-2", "user.created"))
1864 .unwrap();
1865 store
1866 .ingest(create_test_event("user-1", "user.updated"))
1867 .unwrap();
1868
1869 let projection_manager = store.projection_manager();
1871 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1872
1873 let created_state = counter_projection.get_state("user.created");
1875 assert!(created_state.is_some());
1876 assert_eq!(created_state.unwrap()["count"], 2);
1877
1878 let updated_state = counter_projection.get_state("user.updated");
1879 assert!(updated_state.is_some());
1880 assert_eq!(updated_state.unwrap()["count"], 1);
1881 }
1882
1883 #[tokio::test]
1884 async fn test_projection_state_cache_key_format() {
1885 let store = create_test_store();
1886 let cache = store.projection_state_cache();
1887
1888 let key = "orders:order-12345".to_string();
1890 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
1891
1892 let state = cache.get(&key).unwrap();
1893 assert_eq!(state["total"], 99.99);
1894 }
1895
1896 #[tokio::test]
1897 async fn test_projection_state_cache_removal() {
1898 let store = create_test_store();
1899 let cache = store.projection_state_cache();
1900
1901 cache.insert(
1903 "test:entity-1".to_string(),
1904 serde_json::json!({"data": "value"}),
1905 );
1906 assert_eq!(cache.len(), 1);
1907
1908 cache.remove("test:entity-1");
1909 assert_eq!(cache.len(), 0);
1910 assert!(cache.get("test:entity-1").is_none());
1911 }
1912
1913 #[tokio::test]
1914 async fn test_get_nonexistent_projection() {
1915 let store = create_test_store();
1916 let projection_manager = store.projection_manager();
1917
1918 let projection = projection_manager.get_projection("nonexistent_projection");
1920 assert!(projection.is_none());
1921 }
1922
1923 #[tokio::test]
1924 async fn test_get_nonexistent_entity_state() {
1925 let store = create_test_store();
1926 let projection_manager = store.projection_manager();
1927
1928 let snapshot_projection = projection_manager
1930 .get_projection("entity_snapshots")
1931 .unwrap();
1932 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
1933 assert!(state.is_none());
1934 }
1935
1936 #[tokio::test]
1937 async fn test_projection_state_cache_concurrent_access() {
1938 let store = create_test_store();
1939 let cache = store.projection_state_cache();
1940
1941 let handles: Vec<_> = (0..10)
1943 .map(|i| {
1944 let cache_clone = cache.clone();
1945 tokio::spawn(async move {
1946 cache_clone.insert(
1947 format!("concurrent:entity-{}", i),
1948 serde_json::json!({"thread": i}),
1949 );
1950 })
1951 })
1952 .collect();
1953
1954 for handle in handles {
1955 handle.await.unwrap();
1956 }
1957
1958 assert_eq!(cache.len(), 10);
1960 }
1961
1962 #[tokio::test]
1963 async fn test_projection_state_large_payload() {
1964 let store = create_test_store();
1965 let cache = store.projection_state_cache();
1966
1967 let large_array: Vec<serde_json::Value> = (0..1000)
1969 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
1970 .collect();
1971
1972 cache.insert(
1973 "large:entity-1".to_string(),
1974 serde_json::json!({"items": large_array}),
1975 );
1976
1977 let state = cache.get("large:entity-1").unwrap();
1978 let items = state["items"].as_array().unwrap();
1979 assert_eq!(items.len(), 1000);
1980 }
1981
1982 #[tokio::test]
1983 async fn test_projection_state_complex_json() {
1984 let store = create_test_store();
1985 let cache = store.projection_state_cache();
1986
1987 let complex_state = serde_json::json!({
1989 "user": {
1990 "id": "user-123",
1991 "profile": {
1992 "name": "John Doe",
1993 "email": "john@example.com",
1994 "settings": {
1995 "theme": "dark",
1996 "notifications": true
1997 }
1998 },
1999 "roles": ["admin", "user"],
2000 "metadata": {
2001 "created_at": "2025-01-01T00:00:00Z",
2002 "last_login": null
2003 }
2004 }
2005 });
2006
2007 cache.insert("complex:user-123".to_string(), complex_state);
2008
2009 let state = cache.get("complex:user-123").unwrap();
2010 assert_eq!(state["user"]["profile"]["name"], "John Doe");
2011 assert_eq!(state["user"]["roles"][0], "admin");
2012 assert!(state["user"]["metadata"]["last_login"].is_null());
2013 }
2014
2015 #[tokio::test]
2016 async fn test_projection_state_cache_iteration() {
2017 let store = create_test_store();
2018 let cache = store.projection_state_cache();
2019
2020 for i in 0..5 {
2022 cache.insert(
2023 format!("iter:entity-{}", i),
2024 serde_json::json!({"index": i}),
2025 );
2026 }
2027
2028 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
2030 assert_eq!(entries.len(), 5);
2031 }
2032
2033 #[tokio::test]
2034 async fn test_projection_manager_get_entity_snapshots() {
2035 let store = create_test_store();
2036 let projection_manager = store.projection_manager();
2037
2038 let projection = projection_manager.get_projection("entity_snapshots");
2040 assert!(projection.is_some());
2041 assert_eq!(projection.unwrap().name(), "entity_snapshots");
2042 }
2043
2044 #[tokio::test]
2045 async fn test_projection_manager_get_event_counters() {
2046 let store = create_test_store();
2047 let projection_manager = store.projection_manager();
2048
2049 let projection = projection_manager.get_projection("event_counters");
2051 assert!(projection.is_some());
2052 assert_eq!(projection.unwrap().name(), "event_counters");
2053 }
2054
2055 #[tokio::test]
2056 async fn test_projection_state_cache_overwrite() {
2057 let store = create_test_store();
2058 let cache = store.projection_state_cache();
2059
2060 cache.insert(
2062 "overwrite:entity-1".to_string(),
2063 serde_json::json!({"version": 1}),
2064 );
2065
2066 cache.insert(
2068 "overwrite:entity-1".to_string(),
2069 serde_json::json!({"version": 2}),
2070 );
2071
2072 cache.insert(
2074 "overwrite:entity-1".to_string(),
2075 serde_json::json!({"version": 3}),
2076 );
2077
2078 let state = cache.get("overwrite:entity-1").unwrap();
2079 assert_eq!(state["version"], 3);
2080
2081 assert_eq!(cache.len(), 1);
2083 }
2084
2085 #[tokio::test]
2086 async fn test_projection_state_multiple_projections() {
2087 let store = create_test_store();
2088 let cache = store.projection_state_cache();
2089
2090 cache.insert(
2092 "entity_snapshots:user-1".to_string(),
2093 serde_json::json!({"name": "Alice"}),
2094 );
2095 cache.insert(
2096 "event_counters:user.created".to_string(),
2097 serde_json::json!({"count": 5}),
2098 );
2099 cache.insert(
2100 "custom_projection:order-1".to_string(),
2101 serde_json::json!({"total": 150.0}),
2102 );
2103
2104 assert_eq!(
2106 cache.get("entity_snapshots:user-1").unwrap()["name"],
2107 "Alice"
2108 );
2109 assert_eq!(
2110 cache.get("event_counters:user.created").unwrap()["count"],
2111 5
2112 );
2113 assert_eq!(
2114 cache.get("custom_projection:order-1").unwrap()["total"],
2115 150.0
2116 );
2117 }
2118
2119 #[tokio::test]
2120 async fn test_bulk_projection_state_access() {
2121 let store = create_test_store();
2122
2123 for i in 0..5 {
2125 let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
2126 store.ingest(event).unwrap();
2127 }
2128
2129 let projection_manager = store.projection_manager();
2131 let snapshot_projection = projection_manager
2132 .get_projection("entity_snapshots")
2133 .unwrap();
2134
2135 for i in 0..5 {
2137 let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
2138 assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
2139 }
2140 }
2141
2142 #[tokio::test]
2143 async fn test_bulk_save_projection_states() {
2144 let store = create_test_store();
2145 let cache = store.projection_state_cache();
2146
2147 let states = vec![
2149 BulkSaveStateItem {
2150 entity_id: "bulk-entity-1".to_string(),
2151 state: serde_json::json!({"name": "Entity 1", "value": 100}),
2152 },
2153 BulkSaveStateItem {
2154 entity_id: "bulk-entity-2".to_string(),
2155 state: serde_json::json!({"name": "Entity 2", "value": 200}),
2156 },
2157 BulkSaveStateItem {
2158 entity_id: "bulk-entity-3".to_string(),
2159 state: serde_json::json!({"name": "Entity 3", "value": 300}),
2160 },
2161 ];
2162
2163 let projection_name = "test_projection";
2164
2165 for item in &states {
2167 cache.insert(
2168 format!("{projection_name}:{}", item.entity_id),
2169 item.state.clone(),
2170 );
2171 }
2172
2173 assert_eq!(cache.len(), 3);
2175
2176 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
2177 assert_eq!(state1["name"], "Entity 1");
2178 assert_eq!(state1["value"], 100);
2179
2180 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
2181 assert_eq!(state2["name"], "Entity 2");
2182 assert_eq!(state2["value"], 200);
2183
2184 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
2185 assert_eq!(state3["name"], "Entity 3");
2186 assert_eq!(state3["value"], 300);
2187 }
2188
2189 #[tokio::test]
2190 async fn test_bulk_save_empty_states() {
2191 let store = create_test_store();
2192 let cache = store.projection_state_cache();
2193
2194 cache.clear();
2196
2197 let states: Vec<BulkSaveStateItem> = vec![];
2199 assert_eq!(states.len(), 0);
2200
2201 assert_eq!(cache.len(), 0);
2203 }
2204
2205 #[tokio::test]
2206 async fn test_bulk_save_overwrites_existing() {
2207 let store = create_test_store();
2208 let cache = store.projection_state_cache();
2209
2210 cache.insert(
2212 "test:entity-1".to_string(),
2213 serde_json::json!({"version": 1, "data": "initial"}),
2214 );
2215
2216 let new_state = serde_json::json!({"version": 2, "data": "updated"});
2218 cache.insert("test:entity-1".to_string(), new_state);
2219
2220 let state = cache.get("test:entity-1").unwrap();
2222 assert_eq!(state["version"], 2);
2223 assert_eq!(state["data"], "updated");
2224 }
2225
2226 #[tokio::test]
2227 async fn test_bulk_save_high_volume() {
2228 let store = create_test_store();
2229 let cache = store.projection_state_cache();
2230
2231 for i in 0..1000 {
2233 cache.insert(
2234 format!("volume_test:entity-{}", i),
2235 serde_json::json!({"index": i, "status": "active"}),
2236 );
2237 }
2238
2239 assert_eq!(cache.len(), 1000);
2241
2242 assert_eq!(cache.get("volume_test:entity-0").unwrap()["index"], 0);
2244 assert_eq!(cache.get("volume_test:entity-500").unwrap()["index"], 500);
2245 assert_eq!(cache.get("volume_test:entity-999").unwrap()["index"], 999);
2246 }
2247
2248 #[tokio::test]
2249 async fn test_bulk_save_different_projections() {
2250 let store = create_test_store();
2251 let cache = store.projection_state_cache();
2252
2253 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
2255
2256 for proj in projections.iter() {
2257 for i in 0..5 {
2258 cache.insert(
2259 format!("{proj}:entity-{i}"),
2260 serde_json::json!({"projection": proj, "id": i}),
2261 );
2262 }
2263 }
2264
2265 assert_eq!(cache.len(), 15);
2267
2268 for proj in projections.iter() {
2270 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
2271 assert_eq!(state["projection"], *proj);
2272 }
2273 }
2274}