1use crate::application::dto::{
2 EventDto, IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse,
3};
4use crate::application::services::analytics::{
5 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
6 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
7};
8use crate::application::services::pipeline::{PipelineConfig, PipelineStats};
9use crate::application::services::replay::{
10 ReplayProgress, StartReplayRequest, StartReplayResponse,
11};
12use crate::application::services::schema::{
13 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
14 ValidateEventResponse,
15};
16use crate::domain::entities::Event;
17use crate::error::Result;
18use crate::infrastructure::persistence::compaction::CompactionResult;
19use crate::infrastructure::persistence::snapshot::{
20 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
21 SnapshotInfo,
22};
23use crate::store::EventStore;
24use axum::{
25 extract::{Path, Query, State, WebSocketUpgrade},
26 response::{IntoResponse, Response},
27 routing::{get, post, put},
28 Json, Router,
29};
30use serde::Deserialize;
31use std::sync::Arc;
32use tower_http::cors::{Any, CorsLayer};
33use tower_http::trace::TraceLayer;
34
35type SharedStore = Arc<EventStore>;
36
37pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
38 let app = Router::new()
39 .route("/health", get(health))
40 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
42 .route("/api/v1/events/query", get(query_events))
43 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/entities/{entity_id}/state", get(get_entity_state))
45 .route(
46 "/api/v1/entities/{entity_id}/snapshot",
47 get(get_entity_snapshot),
48 )
49 .route("/api/v1/stats", get(get_stats))
50 .route("/api/v1/analytics/frequency", get(analytics_frequency))
52 .route("/api/v1/analytics/summary", get(analytics_summary))
53 .route("/api/v1/analytics/correlation", get(analytics_correlation))
54 .route("/api/v1/snapshots", post(create_snapshot))
56 .route("/api/v1/snapshots", get(list_snapshots))
57 .route(
58 "/api/v1/snapshots/{entity_id}/latest",
59 get(get_latest_snapshot),
60 )
61 .route("/api/v1/compaction/trigger", post(trigger_compaction))
63 .route("/api/v1/compaction/stats", get(compaction_stats))
64 .route("/api/v1/schemas", post(register_schema))
66 .route("/api/v1/schemas", get(list_subjects))
67 .route("/api/v1/schemas/{subject}", get(get_schema))
68 .route(
69 "/api/v1/schemas/{subject}/versions",
70 get(list_schema_versions),
71 )
72 .route("/api/v1/schemas/validate", post(validate_event_schema))
73 .route(
74 "/api/v1/schemas/{subject}/compatibility",
75 put(set_compatibility_mode),
76 )
77 .route("/api/v1/replay", post(start_replay))
79 .route("/api/v1/replay", get(list_replays))
80 .route("/api/v1/replay/{replay_id}", get(get_replay_progress))
81 .route("/api/v1/replay/{replay_id}/cancel", post(cancel_replay))
82 .route(
83 "/api/v1/replay/{replay_id}",
84 axum::routing::delete(delete_replay),
85 )
86 .route("/api/v1/pipelines", post(register_pipeline))
88 .route("/api/v1/pipelines", get(list_pipelines))
89 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
90 .route("/api/v1/pipelines/{pipeline_id}", get(get_pipeline))
91 .route(
92 "/api/v1/pipelines/{pipeline_id}",
93 axum::routing::delete(remove_pipeline),
94 )
95 .route(
96 "/api/v1/pipelines/{pipeline_id}/stats",
97 get(get_pipeline_stats),
98 )
99 .route("/api/v1/pipelines/{pipeline_id}/reset", put(reset_pipeline))
100 .route("/api/v1/projections", get(list_projections))
102 .route("/api/v1/projections/{name}", get(get_projection))
103 .route(
104 "/api/v1/projections/{name}/{entity_id}/state",
105 get(get_projection_state),
106 )
107 .route(
108 "/api/v1/projections/{name}/{entity_id}/state",
109 post(save_projection_state),
110 )
111 .route(
112 "/api/v1/projections/{name}/{entity_id}/state",
113 put(save_projection_state),
114 )
115 .route(
116 "/api/v1/projections/{name}/bulk",
117 post(bulk_get_projection_states),
118 )
119 .route(
120 "/api/v1/projections/{name}/bulk/save",
121 post(bulk_save_projection_states),
122 )
123 .layer(
124 CorsLayer::new()
125 .allow_origin(Any)
126 .allow_methods(Any)
127 .allow_headers(Any),
128 )
129 .layer(TraceLayer::new_for_http())
130 .with_state(store);
131
132 let listener = tokio::net::TcpListener::bind(addr).await?;
133 axum::serve(listener, app).await?;
134
135 Ok(())
136}
137
138pub async fn health() -> impl IntoResponse {
139 Json(serde_json::json!({
140 "status": "healthy",
141 "service": "allsource-core",
142 "version": env!("CARGO_PKG_VERSION")
143 }))
144}
145
146pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
148 let metrics = store.metrics();
149
150 match metrics.encode() {
151 Ok(encoded) => Response::builder()
152 .status(200)
153 .header("Content-Type", "text/plain; version=0.0.4")
154 .body(encoded)
155 .unwrap()
156 .into_response(),
157 Err(e) => Response::builder()
158 .status(500)
159 .body(format!("Error encoding metrics: {e}"))
160 .unwrap()
161 .into_response(),
162 }
163}
164
165pub async fn ingest_event(
166 State(store): State<SharedStore>,
167 Json(req): Json<IngestEventRequest>,
168) -> Result<Json<IngestEventResponse>> {
169 let event = Event::from_strings(
171 req.event_type,
172 req.entity_id,
173 "default".to_string(),
174 req.payload,
175 req.metadata,
176 )?;
177
178 let event_id = event.id;
179 let timestamp = event.timestamp;
180
181 store.ingest(event)?;
182
183 tracing::info!("Event ingested: {}", event_id);
184
185 Ok(Json(IngestEventResponse {
186 event_id,
187 timestamp,
188 }))
189}
190
191pub async fn query_events(
192 State(store): State<SharedStore>,
193 Query(req): Query<QueryEventsRequest>,
194) -> Result<Json<QueryEventsResponse>> {
195 let domain_events = store.query(req)?;
196 let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
197 let count = events.len();
198
199 tracing::debug!("Query returned {} events", count);
200
201 Ok(Json(QueryEventsResponse { events, count }))
202}
203
204#[derive(Deserialize)]
205pub struct EntityStateParams {
206 as_of: Option<chrono::DateTime<chrono::Utc>>,
207}
208
209pub async fn get_entity_state(
210 State(store): State<SharedStore>,
211 Path(entity_id): Path<String>,
212 Query(params): Query<EntityStateParams>,
213) -> Result<Json<serde_json::Value>> {
214 let state = store.reconstruct_state(&entity_id, params.as_of)?;
215
216 tracing::info!("State reconstructed for entity: {}", entity_id);
217
218 Ok(Json(state))
219}
220
221pub async fn get_entity_snapshot(
222 State(store): State<SharedStore>,
223 Path(entity_id): Path<String>,
224) -> Result<Json<serde_json::Value>> {
225 let snapshot = store.get_snapshot(&entity_id)?;
226
227 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
228
229 Ok(Json(snapshot))
230}
231
232pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
233 let stats = store.stats();
234 Json(stats)
235}
236
237pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
239 let websocket_manager = store.websocket_manager();
240
241 ws.on_upgrade(move |socket| async move {
242 websocket_manager.handle_socket(socket).await;
243 })
244}
245
246pub async fn analytics_frequency(
248 State(store): State<SharedStore>,
249 Query(req): Query<EventFrequencyRequest>,
250) -> Result<Json<EventFrequencyResponse>> {
251 let response = AnalyticsEngine::event_frequency(&store, req)?;
252
253 tracing::debug!(
254 "Frequency analysis returned {} buckets",
255 response.buckets.len()
256 );
257
258 Ok(Json(response))
259}
260
261pub async fn analytics_summary(
263 State(store): State<SharedStore>,
264 Query(req): Query<StatsSummaryRequest>,
265) -> Result<Json<StatsSummaryResponse>> {
266 let response = AnalyticsEngine::stats_summary(&store, req)?;
267
268 tracing::debug!(
269 "Stats summary: {} events across {} entities",
270 response.total_events,
271 response.unique_entities
272 );
273
274 Ok(Json(response))
275}
276
277pub async fn analytics_correlation(
279 State(store): State<SharedStore>,
280 Query(req): Query<CorrelationRequest>,
281) -> Result<Json<CorrelationResponse>> {
282 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
283
284 tracing::debug!(
285 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
286 response.correlated_pairs,
287 response.total_a,
288 response.correlation_percentage
289 );
290
291 Ok(Json(response))
292}
293
294pub async fn create_snapshot(
296 State(store): State<SharedStore>,
297 Json(req): Json<CreateSnapshotRequest>,
298) -> Result<Json<CreateSnapshotResponse>> {
299 store.create_snapshot(&req.entity_id)?;
300
301 let snapshot_manager = store.snapshot_manager();
302 let snapshot = snapshot_manager
303 .get_latest_snapshot(&req.entity_id)
304 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
305
306 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
307
308 Ok(Json(CreateSnapshotResponse {
309 snapshot_id: snapshot.id,
310 entity_id: snapshot.entity_id,
311 created_at: snapshot.created_at,
312 event_count: snapshot.event_count,
313 size_bytes: snapshot.metadata.size_bytes,
314 }))
315}
316
317pub async fn list_snapshots(
319 State(store): State<SharedStore>,
320 Query(req): Query<ListSnapshotsRequest>,
321) -> Result<Json<ListSnapshotsResponse>> {
322 let snapshot_manager = store.snapshot_manager();
323
324 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
325 snapshot_manager
326 .get_all_snapshots(&entity_id)
327 .into_iter()
328 .map(SnapshotInfo::from)
329 .collect()
330 } else {
331 let entities = snapshot_manager.list_entities();
333 entities
334 .iter()
335 .flat_map(|entity_id| {
336 snapshot_manager
337 .get_all_snapshots(entity_id)
338 .into_iter()
339 .map(SnapshotInfo::from)
340 })
341 .collect()
342 };
343
344 let total = snapshots.len();
345
346 tracing::debug!("Listed {} snapshots", total);
347
348 Ok(Json(ListSnapshotsResponse { snapshots, total }))
349}
350
351pub async fn get_latest_snapshot(
353 State(store): State<SharedStore>,
354 Path(entity_id): Path<String>,
355) -> Result<Json<serde_json::Value>> {
356 let snapshot_manager = store.snapshot_manager();
357
358 let snapshot = snapshot_manager
359 .get_latest_snapshot(&entity_id)
360 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
361
362 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
363
364 Ok(Json(serde_json::json!({
365 "snapshot_id": snapshot.id,
366 "entity_id": snapshot.entity_id,
367 "created_at": snapshot.created_at,
368 "as_of": snapshot.as_of,
369 "event_count": snapshot.event_count,
370 "size_bytes": snapshot.metadata.size_bytes,
371 "snapshot_type": snapshot.metadata.snapshot_type,
372 "state": snapshot.state
373 })))
374}
375
376pub async fn trigger_compaction(
378 State(store): State<SharedStore>,
379) -> Result<Json<CompactionResult>> {
380 let compaction_manager = store.compaction_manager().ok_or_else(|| {
381 crate::error::AllSourceError::InternalError(
382 "Compaction not enabled (no Parquet storage)".to_string(),
383 )
384 })?;
385
386 tracing::info!("📦 Manual compaction triggered via API");
387
388 let result = compaction_manager.compact_now()?;
389
390 Ok(Json(result))
391}
392
393pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
395 let compaction_manager = store.compaction_manager().ok_or_else(|| {
396 crate::error::AllSourceError::InternalError(
397 "Compaction not enabled (no Parquet storage)".to_string(),
398 )
399 })?;
400
401 let stats = compaction_manager.stats();
402 let config = compaction_manager.config();
403
404 Ok(Json(serde_json::json!({
405 "stats": stats,
406 "config": {
407 "min_files_to_compact": config.min_files_to_compact,
408 "target_file_size": config.target_file_size,
409 "max_file_size": config.max_file_size,
410 "small_file_threshold": config.small_file_threshold,
411 "compaction_interval_seconds": config.compaction_interval_seconds,
412 "auto_compact": config.auto_compact,
413 "strategy": config.strategy
414 }
415 })))
416}
417
418pub async fn register_schema(
420 State(store): State<SharedStore>,
421 Json(req): Json<RegisterSchemaRequest>,
422) -> Result<Json<RegisterSchemaResponse>> {
423 let schema_registry = store.schema_registry();
424
425 let response =
426 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
427
428 tracing::info!(
429 "📋 Schema registered: v{} for '{}'",
430 response.version,
431 response.subject
432 );
433
434 Ok(Json(response))
435}
436
437#[derive(Deserialize)]
439pub struct GetSchemaParams {
440 version: Option<u32>,
441}
442
443pub async fn get_schema(
444 State(store): State<SharedStore>,
445 Path(subject): Path<String>,
446 Query(params): Query<GetSchemaParams>,
447) -> Result<Json<serde_json::Value>> {
448 let schema_registry = store.schema_registry();
449
450 let schema = schema_registry.get_schema(&subject, params.version)?;
451
452 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
453
454 Ok(Json(serde_json::json!({
455 "id": schema.id,
456 "subject": schema.subject,
457 "version": schema.version,
458 "schema": schema.schema,
459 "created_at": schema.created_at,
460 "description": schema.description,
461 "tags": schema.tags
462 })))
463}
464
465pub async fn list_schema_versions(
467 State(store): State<SharedStore>,
468 Path(subject): Path<String>,
469) -> Result<Json<serde_json::Value>> {
470 let schema_registry = store.schema_registry();
471
472 let versions = schema_registry.list_versions(&subject)?;
473
474 Ok(Json(serde_json::json!({
475 "subject": subject,
476 "versions": versions
477 })))
478}
479
480pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
482 let schema_registry = store.schema_registry();
483
484 let subjects = schema_registry.list_subjects();
485
486 Json(serde_json::json!({
487 "subjects": subjects,
488 "total": subjects.len()
489 }))
490}
491
492pub async fn validate_event_schema(
494 State(store): State<SharedStore>,
495 Json(req): Json<ValidateEventRequest>,
496) -> Result<Json<ValidateEventResponse>> {
497 let schema_registry = store.schema_registry();
498
499 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
500
501 if response.valid {
502 tracing::debug!(
503 "✅ Event validated against schema '{}' v{}",
504 req.subject,
505 response.schema_version
506 );
507 } else {
508 tracing::warn!(
509 "❌ Event validation failed for '{}': {:?}",
510 req.subject,
511 response.errors
512 );
513 }
514
515 Ok(Json(response))
516}
517
518#[derive(Deserialize)]
520pub struct SetCompatibilityRequest {
521 compatibility: CompatibilityMode,
522}
523
524pub async fn set_compatibility_mode(
525 State(store): State<SharedStore>,
526 Path(subject): Path<String>,
527 Json(req): Json<SetCompatibilityRequest>,
528) -> Json<serde_json::Value> {
529 let schema_registry = store.schema_registry();
530
531 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
532
533 tracing::info!(
534 "🔧 Set compatibility mode for '{}' to {:?}",
535 subject,
536 req.compatibility
537 );
538
539 Json(serde_json::json!({
540 "subject": subject,
541 "compatibility": req.compatibility
542 }))
543}
544
545pub async fn start_replay(
547 State(store): State<SharedStore>,
548 Json(req): Json<StartReplayRequest>,
549) -> Result<Json<StartReplayResponse>> {
550 let replay_manager = store.replay_manager();
551
552 let response = replay_manager.start_replay(store, req)?;
553
554 tracing::info!(
555 "🔄 Started replay {} with {} events",
556 response.replay_id,
557 response.total_events
558 );
559
560 Ok(Json(response))
561}
562
563pub async fn get_replay_progress(
565 State(store): State<SharedStore>,
566 Path(replay_id): Path<uuid::Uuid>,
567) -> Result<Json<ReplayProgress>> {
568 let replay_manager = store.replay_manager();
569
570 let progress = replay_manager.get_progress(replay_id)?;
571
572 Ok(Json(progress))
573}
574
575pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
577 let replay_manager = store.replay_manager();
578
579 let replays = replay_manager.list_replays();
580
581 Json(serde_json::json!({
582 "replays": replays,
583 "total": replays.len()
584 }))
585}
586
587pub async fn cancel_replay(
589 State(store): State<SharedStore>,
590 Path(replay_id): Path<uuid::Uuid>,
591) -> Result<Json<serde_json::Value>> {
592 let replay_manager = store.replay_manager();
593
594 replay_manager.cancel_replay(replay_id)?;
595
596 tracing::info!("🛑 Cancelled replay {}", replay_id);
597
598 Ok(Json(serde_json::json!({
599 "replay_id": replay_id,
600 "status": "cancelled"
601 })))
602}
603
604pub async fn delete_replay(
606 State(store): State<SharedStore>,
607 Path(replay_id): Path<uuid::Uuid>,
608) -> Result<Json<serde_json::Value>> {
609 let replay_manager = store.replay_manager();
610
611 let deleted = replay_manager.delete_replay(replay_id)?;
612
613 if deleted {
614 tracing::info!("🗑️ Deleted replay {}", replay_id);
615 }
616
617 Ok(Json(serde_json::json!({
618 "replay_id": replay_id,
619 "deleted": deleted
620 })))
621}
622
623pub async fn register_pipeline(
625 State(store): State<SharedStore>,
626 Json(config): Json<PipelineConfig>,
627) -> Result<Json<serde_json::Value>> {
628 let pipeline_manager = store.pipeline_manager();
629
630 let pipeline_id = pipeline_manager.register(config.clone());
631
632 tracing::info!(
633 "🔀 Pipeline registered: {} (name: {})",
634 pipeline_id,
635 config.name
636 );
637
638 Ok(Json(serde_json::json!({
639 "pipeline_id": pipeline_id,
640 "name": config.name,
641 "enabled": config.enabled
642 })))
643}
644
645pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
647 let pipeline_manager = store.pipeline_manager();
648
649 let pipelines = pipeline_manager.list();
650
651 tracing::debug!("Listed {} pipelines", pipelines.len());
652
653 Json(serde_json::json!({
654 "pipelines": pipelines,
655 "total": pipelines.len()
656 }))
657}
658
659pub async fn get_pipeline(
661 State(store): State<SharedStore>,
662 Path(pipeline_id): Path<uuid::Uuid>,
663) -> Result<Json<PipelineConfig>> {
664 let pipeline_manager = store.pipeline_manager();
665
666 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
667 crate::error::AllSourceError::ValidationError(format!(
668 "Pipeline not found: {}",
669 pipeline_id
670 ))
671 })?;
672
673 Ok(Json(pipeline.config().clone()))
674}
675
676pub async fn remove_pipeline(
678 State(store): State<SharedStore>,
679 Path(pipeline_id): Path<uuid::Uuid>,
680) -> Result<Json<serde_json::Value>> {
681 let pipeline_manager = store.pipeline_manager();
682
683 let removed = pipeline_manager.remove(pipeline_id);
684
685 if removed {
686 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
687 }
688
689 Ok(Json(serde_json::json!({
690 "pipeline_id": pipeline_id,
691 "removed": removed
692 })))
693}
694
695pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
697 let pipeline_manager = store.pipeline_manager();
698
699 let stats = pipeline_manager.all_stats();
700
701 Json(serde_json::json!({
702 "stats": stats,
703 "total": stats.len()
704 }))
705}
706
707pub async fn get_pipeline_stats(
709 State(store): State<SharedStore>,
710 Path(pipeline_id): Path<uuid::Uuid>,
711) -> Result<Json<PipelineStats>> {
712 let pipeline_manager = store.pipeline_manager();
713
714 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
715 crate::error::AllSourceError::ValidationError(format!(
716 "Pipeline not found: {}",
717 pipeline_id
718 ))
719 })?;
720
721 Ok(Json(pipeline.stats()))
722}
723
724pub async fn reset_pipeline(
726 State(store): State<SharedStore>,
727 Path(pipeline_id): Path<uuid::Uuid>,
728) -> Result<Json<serde_json::Value>> {
729 let pipeline_manager = store.pipeline_manager();
730
731 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
732 crate::error::AllSourceError::ValidationError(format!(
733 "Pipeline not found: {}",
734 pipeline_id
735 ))
736 })?;
737
738 pipeline.reset();
739
740 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
741
742 Ok(Json(serde_json::json!({
743 "pipeline_id": pipeline_id,
744 "reset": true
745 })))
746}
747
748pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
754 let projection_manager = store.projection_manager();
755
756 let projections: Vec<serde_json::Value> = projection_manager
757 .list_projections()
758 .iter()
759 .map(|(name, projection)| {
760 serde_json::json!({
761 "name": name,
762 "type": format!("{:?}", projection.name()),
763 })
764 })
765 .collect();
766
767 tracing::debug!("Listed {} projections", projections.len());
768
769 Json(serde_json::json!({
770 "projections": projections,
771 "total": projections.len()
772 }))
773}
774
775pub async fn get_projection(
777 State(store): State<SharedStore>,
778 Path(name): Path<String>,
779) -> Result<Json<serde_json::Value>> {
780 let projection_manager = store.projection_manager();
781
782 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
783 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
784 })?;
785
786 Ok(Json(serde_json::json!({
787 "name": projection.name(),
788 "found": true
789 })))
790}
791
792pub async fn get_projection_state(
797 State(store): State<SharedStore>,
798 Path((name, entity_id)): Path<(String, String)>,
799) -> Result<Json<serde_json::Value>> {
800 let projection_manager = store.projection_manager();
801
802 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
803 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
804 })?;
805
806 let state = projection.get_state(&entity_id);
807
808 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
809
810 Ok(Json(serde_json::json!({
811 "projection": name,
812 "entity_id": entity_id,
813 "state": state,
814 "found": state.is_some()
815 })))
816}
817
818#[derive(Debug, Deserialize)]
820pub struct SaveProjectionStateRequest {
821 pub state: serde_json::Value,
822}
823
824pub async fn save_projection_state(
829 State(store): State<SharedStore>,
830 Path((name, entity_id)): Path<(String, String)>,
831 Json(req): Json<SaveProjectionStateRequest>,
832) -> Result<Json<serde_json::Value>> {
833 let projection_cache = store.projection_state_cache();
834
835 projection_cache.insert(format!("{name}:{entity_id}"), req.state.clone());
837
838 tracing::info!("Projection state saved: {} / {}", name, entity_id);
839
840 Ok(Json(serde_json::json!({
841 "projection": name,
842 "entity_id": entity_id,
843 "saved": true
844 })))
845}
846
847#[derive(Debug, Deserialize)]
851pub struct BulkGetStateRequest {
852 pub entity_ids: Vec<String>,
853}
854
855#[derive(Debug, Deserialize)]
859pub struct BulkSaveStateRequest {
860 pub states: Vec<BulkSaveStateItem>,
861}
862
863#[derive(Debug, Deserialize)]
864pub struct BulkSaveStateItem {
865 pub entity_id: String,
866 pub state: serde_json::Value,
867}
868
869pub async fn bulk_get_projection_states(
870 State(store): State<SharedStore>,
871 Path(name): Path<String>,
872 Json(req): Json<BulkGetStateRequest>,
873) -> Result<Json<serde_json::Value>> {
874 let projection_manager = store.projection_manager();
875
876 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
877 crate::error::AllSourceError::EntityNotFound(format!("Projection '{name}' not found"))
878 })?;
879
880 let states: Vec<serde_json::Value> = req
881 .entity_ids
882 .iter()
883 .map(|entity_id| {
884 let state = projection.get_state(entity_id);
885 serde_json::json!({
886 "entity_id": entity_id,
887 "state": state,
888 "found": state.is_some()
889 })
890 })
891 .collect();
892
893 tracing::debug!(
894 "Bulk projection state retrieved: {} entities from {}",
895 states.len(),
896 name
897 );
898
899 Ok(Json(serde_json::json!({
900 "projection": name,
901 "states": states,
902 "total": states.len()
903 })))
904}
905
906pub async fn bulk_save_projection_states(
911 State(store): State<SharedStore>,
912 Path(name): Path<String>,
913 Json(req): Json<BulkSaveStateRequest>,
914) -> Result<Json<serde_json::Value>> {
915 let projection_cache = store.projection_state_cache();
916
917 let mut saved_count = 0;
918 for item in &req.states {
919 projection_cache.insert(format!("{name}:{}", item.entity_id), item.state.clone());
920 saved_count += 1;
921 }
922
923 tracing::info!(
924 "Bulk projection state saved: {} entities for {}",
925 saved_count,
926 name
927 );
928
929 Ok(Json(serde_json::json!({
930 "projection": name,
931 "saved": saved_count,
932 "total": req.states.len()
933 })))
934}
935
936#[cfg(test)]
937mod tests {
938 use super::*;
939 use crate::domain::entities::Event;
940 use crate::store::EventStore;
941
942 fn create_test_store() -> Arc<EventStore> {
943 Arc::new(EventStore::new())
944 }
945
946 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
947 Event::from_strings(
948 event_type.to_string(),
949 entity_id.to_string(),
950 "test-stream".to_string(),
951 serde_json::json!({
952 "name": "Test",
953 "value": 42
954 }),
955 None,
956 )
957 .unwrap()
958 }
959
960 #[tokio::test]
961 async fn test_projection_state_cache() {
962 let store = create_test_store();
963
964 let cache = store.projection_state_cache();
966 cache.insert(
967 "entity_snapshots:user-123".to_string(),
968 serde_json::json!({"name": "Test User", "age": 30}),
969 );
970
971 let state = cache.get("entity_snapshots:user-123");
973 assert!(state.is_some());
974 let state = state.unwrap();
975 assert_eq!(state["name"], "Test User");
976 assert_eq!(state["age"], 30);
977 }
978
979 #[tokio::test]
980 async fn test_projection_manager_list_projections() {
981 let store = create_test_store();
982
983 let projection_manager = store.projection_manager();
985 let projections = projection_manager.list_projections();
986
987 assert!(projections.len() >= 2);
989
990 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
991 assert!(names.contains(&"entity_snapshots"));
992 assert!(names.contains(&"event_counters"));
993 }
994
995 #[tokio::test]
996 async fn test_projection_state_after_event_ingestion() {
997 let store = create_test_store();
998
999 let event = create_test_event("user-456", "user.created");
1001 store.ingest(event).unwrap();
1002
1003 let projection_manager = store.projection_manager();
1005 let snapshot_projection = projection_manager
1006 .get_projection("entity_snapshots")
1007 .unwrap();
1008
1009 let state = snapshot_projection.get_state("user-456");
1010 assert!(state.is_some());
1011 let state = state.unwrap();
1012 assert_eq!(state["name"], "Test");
1013 assert_eq!(state["value"], 42);
1014 }
1015
1016 #[tokio::test]
1017 async fn test_projection_state_cache_multiple_entities() {
1018 let store = create_test_store();
1019 let cache = store.projection_state_cache();
1020
1021 for i in 0..10 {
1023 cache.insert(
1024 format!("entity_snapshots:entity-{}", i),
1025 serde_json::json!({"id": i, "status": "active"}),
1026 );
1027 }
1028
1029 assert_eq!(cache.len(), 10);
1031
1032 for i in 0..10 {
1034 let key = format!("entity_snapshots:entity-{}", i);
1035 let state = cache.get(&key);
1036 assert!(state.is_some());
1037 assert_eq!(state.unwrap()["id"], i);
1038 }
1039 }
1040
1041 #[tokio::test]
1042 async fn test_projection_state_update() {
1043 let store = create_test_store();
1044 let cache = store.projection_state_cache();
1045
1046 cache.insert(
1048 "entity_snapshots:user-789".to_string(),
1049 serde_json::json!({"balance": 100}),
1050 );
1051
1052 cache.insert(
1054 "entity_snapshots:user-789".to_string(),
1055 serde_json::json!({"balance": 150}),
1056 );
1057
1058 let state = cache.get("entity_snapshots:user-789").unwrap();
1060 assert_eq!(state["balance"], 150);
1061 }
1062
1063 #[tokio::test]
1064 async fn test_event_counter_projection() {
1065 let store = create_test_store();
1066
1067 store
1069 .ingest(create_test_event("user-1", "user.created"))
1070 .unwrap();
1071 store
1072 .ingest(create_test_event("user-2", "user.created"))
1073 .unwrap();
1074 store
1075 .ingest(create_test_event("user-1", "user.updated"))
1076 .unwrap();
1077
1078 let projection_manager = store.projection_manager();
1080 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1081
1082 let created_state = counter_projection.get_state("user.created");
1084 assert!(created_state.is_some());
1085 assert_eq!(created_state.unwrap()["count"], 2);
1086
1087 let updated_state = counter_projection.get_state("user.updated");
1088 assert!(updated_state.is_some());
1089 assert_eq!(updated_state.unwrap()["count"], 1);
1090 }
1091
1092 #[tokio::test]
1093 async fn test_projection_state_cache_key_format() {
1094 let store = create_test_store();
1095 let cache = store.projection_state_cache();
1096
1097 let key = "orders:order-12345".to_string();
1099 cache.insert(key.clone(), serde_json::json!({"total": 99.99}));
1100
1101 let state = cache.get(&key).unwrap();
1102 assert_eq!(state["total"], 99.99);
1103 }
1104
1105 #[tokio::test]
1106 async fn test_projection_state_cache_removal() {
1107 let store = create_test_store();
1108 let cache = store.projection_state_cache();
1109
1110 cache.insert("test:entity-1".to_string(), serde_json::json!({"data": "value"}));
1112 assert_eq!(cache.len(), 1);
1113
1114 cache.remove("test:entity-1");
1115 assert_eq!(cache.len(), 0);
1116 assert!(cache.get("test:entity-1").is_none());
1117 }
1118
1119 #[tokio::test]
1120 async fn test_get_nonexistent_projection() {
1121 let store = create_test_store();
1122 let projection_manager = store.projection_manager();
1123
1124 let projection = projection_manager.get_projection("nonexistent_projection");
1126 assert!(projection.is_none());
1127 }
1128
1129 #[tokio::test]
1130 async fn test_get_nonexistent_entity_state() {
1131 let store = create_test_store();
1132 let projection_manager = store.projection_manager();
1133
1134 let snapshot_projection = projection_manager.get_projection("entity_snapshots").unwrap();
1136 let state = snapshot_projection.get_state("nonexistent-entity-xyz");
1137 assert!(state.is_none());
1138 }
1139
1140 #[tokio::test]
1141 async fn test_projection_state_cache_concurrent_access() {
1142 let store = create_test_store();
1143 let cache = store.projection_state_cache();
1144
1145 let handles: Vec<_> = (0..10)
1147 .map(|i| {
1148 let cache_clone = cache.clone();
1149 tokio::spawn(async move {
1150 cache_clone.insert(
1151 format!("concurrent:entity-{}", i),
1152 serde_json::json!({"thread": i}),
1153 );
1154 })
1155 })
1156 .collect();
1157
1158 for handle in handles {
1159 handle.await.unwrap();
1160 }
1161
1162 assert_eq!(cache.len(), 10);
1164 }
1165
1166 #[tokio::test]
1167 async fn test_projection_state_large_payload() {
1168 let store = create_test_store();
1169 let cache = store.projection_state_cache();
1170
1171 let large_array: Vec<serde_json::Value> = (0..1000)
1173 .map(|i| serde_json::json!({"item": i, "description": "test item with some padding data to increase size"}))
1174 .collect();
1175
1176 cache.insert(
1177 "large:entity-1".to_string(),
1178 serde_json::json!({"items": large_array}),
1179 );
1180
1181 let state = cache.get("large:entity-1").unwrap();
1182 let items = state["items"].as_array().unwrap();
1183 assert_eq!(items.len(), 1000);
1184 }
1185
1186 #[tokio::test]
1187 async fn test_projection_state_complex_json() {
1188 let store = create_test_store();
1189 let cache = store.projection_state_cache();
1190
1191 let complex_state = serde_json::json!({
1193 "user": {
1194 "id": "user-123",
1195 "profile": {
1196 "name": "John Doe",
1197 "email": "john@example.com",
1198 "settings": {
1199 "theme": "dark",
1200 "notifications": true
1201 }
1202 },
1203 "roles": ["admin", "user"],
1204 "metadata": {
1205 "created_at": "2025-01-01T00:00:00Z",
1206 "last_login": null
1207 }
1208 }
1209 });
1210
1211 cache.insert("complex:user-123".to_string(), complex_state);
1212
1213 let state = cache.get("complex:user-123").unwrap();
1214 assert_eq!(state["user"]["profile"]["name"], "John Doe");
1215 assert_eq!(state["user"]["roles"][0], "admin");
1216 assert!(state["user"]["metadata"]["last_login"].is_null());
1217 }
1218
1219 #[tokio::test]
1220 async fn test_projection_state_cache_iteration() {
1221 let store = create_test_store();
1222 let cache = store.projection_state_cache();
1223
1224 for i in 0..5 {
1226 cache.insert(
1227 format!("iter:entity-{}", i),
1228 serde_json::json!({"index": i}),
1229 );
1230 }
1231
1232 let entries: Vec<_> = cache.iter().map(|entry| entry.key().clone()).collect();
1234 assert_eq!(entries.len(), 5);
1235 }
1236
1237 #[tokio::test]
1238 async fn test_projection_manager_get_entity_snapshots() {
1239 let store = create_test_store();
1240 let projection_manager = store.projection_manager();
1241
1242 let projection = projection_manager.get_projection("entity_snapshots");
1244 assert!(projection.is_some());
1245 assert_eq!(projection.unwrap().name(), "entity_snapshots");
1246 }
1247
1248 #[tokio::test]
1249 async fn test_projection_manager_get_event_counters() {
1250 let store = create_test_store();
1251 let projection_manager = store.projection_manager();
1252
1253 let projection = projection_manager.get_projection("event_counters");
1255 assert!(projection.is_some());
1256 assert_eq!(projection.unwrap().name(), "event_counters");
1257 }
1258
1259 #[tokio::test]
1260 async fn test_projection_state_cache_overwrite() {
1261 let store = create_test_store();
1262 let cache = store.projection_state_cache();
1263
1264 cache.insert("overwrite:entity-1".to_string(), serde_json::json!({"version": 1}));
1266
1267 cache.insert("overwrite:entity-1".to_string(), serde_json::json!({"version": 2}));
1269
1270 cache.insert("overwrite:entity-1".to_string(), serde_json::json!({"version": 3}));
1272
1273 let state = cache.get("overwrite:entity-1").unwrap();
1274 assert_eq!(state["version"], 3);
1275
1276 assert_eq!(cache.len(), 1);
1278 }
1279
1280 #[tokio::test]
1281 async fn test_projection_state_multiple_projections() {
1282 let store = create_test_store();
1283 let cache = store.projection_state_cache();
1284
1285 cache.insert("entity_snapshots:user-1".to_string(), serde_json::json!({"name": "Alice"}));
1287 cache.insert("event_counters:user.created".to_string(), serde_json::json!({"count": 5}));
1288 cache.insert("custom_projection:order-1".to_string(), serde_json::json!({"total": 150.0}));
1289
1290 assert_eq!(cache.get("entity_snapshots:user-1").unwrap()["name"], "Alice");
1292 assert_eq!(cache.get("event_counters:user.created").unwrap()["count"], 5);
1293 assert_eq!(cache.get("custom_projection:order-1").unwrap()["total"], 150.0);
1294 }
1295
1296 #[tokio::test]
1297 async fn test_bulk_projection_state_access() {
1298 let store = create_test_store();
1299
1300 for i in 0..5 {
1302 let event = create_test_event(&format!("bulk-user-{}", i), "user.created");
1303 store.ingest(event).unwrap();
1304 }
1305
1306 let projection_manager = store.projection_manager();
1308 let snapshot_projection = projection_manager.get_projection("entity_snapshots").unwrap();
1309
1310 for i in 0..5 {
1312 let state = snapshot_projection.get_state(&format!("bulk-user-{}", i));
1313 assert!(state.is_some(), "Entity bulk-user-{} should have state", i);
1314 }
1315 }
1316
1317 #[tokio::test]
1318 async fn test_bulk_save_projection_states() {
1319 let store = create_test_store();
1320 let cache = store.projection_state_cache();
1321
1322 let states = vec![
1324 BulkSaveStateItem {
1325 entity_id: "bulk-entity-1".to_string(),
1326 state: serde_json::json!({"name": "Entity 1", "value": 100}),
1327 },
1328 BulkSaveStateItem {
1329 entity_id: "bulk-entity-2".to_string(),
1330 state: serde_json::json!({"name": "Entity 2", "value": 200}),
1331 },
1332 BulkSaveStateItem {
1333 entity_id: "bulk-entity-3".to_string(),
1334 state: serde_json::json!({"name": "Entity 3", "value": 300}),
1335 },
1336 ];
1337
1338 let projection_name = "test_projection";
1339
1340 for item in &states {
1342 cache.insert(
1343 format!("{projection_name}:{}", item.entity_id),
1344 item.state.clone(),
1345 );
1346 }
1347
1348 assert_eq!(cache.len(), 3);
1350
1351 let state1 = cache.get("test_projection:bulk-entity-1").unwrap();
1352 assert_eq!(state1["name"], "Entity 1");
1353 assert_eq!(state1["value"], 100);
1354
1355 let state2 = cache.get("test_projection:bulk-entity-2").unwrap();
1356 assert_eq!(state2["name"], "Entity 2");
1357 assert_eq!(state2["value"], 200);
1358
1359 let state3 = cache.get("test_projection:bulk-entity-3").unwrap();
1360 assert_eq!(state3["name"], "Entity 3");
1361 assert_eq!(state3["value"], 300);
1362 }
1363
1364 #[tokio::test]
1365 async fn test_bulk_save_empty_states() {
1366 let store = create_test_store();
1367 let cache = store.projection_state_cache();
1368
1369 cache.clear();
1371
1372 let states: Vec<BulkSaveStateItem> = vec![];
1374 assert_eq!(states.len(), 0);
1375
1376 assert_eq!(cache.len(), 0);
1378 }
1379
1380 #[tokio::test]
1381 async fn test_bulk_save_overwrites_existing() {
1382 let store = create_test_store();
1383 let cache = store.projection_state_cache();
1384
1385 cache.insert(
1387 "test:entity-1".to_string(),
1388 serde_json::json!({"version": 1, "data": "initial"}),
1389 );
1390
1391 let new_state = serde_json::json!({"version": 2, "data": "updated"});
1393 cache.insert("test:entity-1".to_string(), new_state);
1394
1395 let state = cache.get("test:entity-1").unwrap();
1397 assert_eq!(state["version"], 2);
1398 assert_eq!(state["data"], "updated");
1399 }
1400
1401 #[tokio::test]
1402 async fn test_bulk_save_high_volume() {
1403 let store = create_test_store();
1404 let cache = store.projection_state_cache();
1405
1406 for i in 0..1000 {
1408 cache.insert(
1409 format!("volume_test:entity-{}", i),
1410 serde_json::json!({"index": i, "status": "active"}),
1411 );
1412 }
1413
1414 assert_eq!(cache.len(), 1000);
1416
1417 assert_eq!(
1419 cache.get("volume_test:entity-0").unwrap()["index"],
1420 0
1421 );
1422 assert_eq!(
1423 cache.get("volume_test:entity-500").unwrap()["index"],
1424 500
1425 );
1426 assert_eq!(
1427 cache.get("volume_test:entity-999").unwrap()["index"],
1428 999
1429 );
1430 }
1431
1432 #[tokio::test]
1433 async fn test_bulk_save_different_projections() {
1434 let store = create_test_store();
1435 let cache = store.projection_state_cache();
1436
1437 let projections = ["entity_snapshots", "event_counters", "custom_analytics"];
1439
1440 for proj in projections.iter() {
1441 for i in 0..5 {
1442 cache.insert(
1443 format!("{proj}:entity-{i}"),
1444 serde_json::json!({"projection": proj, "id": i}),
1445 );
1446 }
1447 }
1448
1449 assert_eq!(cache.len(), 15);
1451
1452 for proj in projections.iter() {
1454 let state = cache.get(&format!("{proj}:entity-0")).unwrap();
1455 assert_eq!(state["projection"], *proj);
1456 }
1457 }
1458}