1use crate::application::dto::{
2 EventDto, IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse,
3};
4use crate::application::services::analytics::{
5 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
6 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
7};
8use crate::application::services::pipeline::{PipelineConfig, PipelineStats};
9use crate::application::services::replay::{
10 ReplayProgress, StartReplayRequest, StartReplayResponse,
11};
12use crate::application::services::schema::{
13 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
14 ValidateEventResponse,
15};
16use crate::domain::entities::Event;
17use crate::error::Result;
18use crate::infrastructure::persistence::compaction::CompactionResult;
19use crate::infrastructure::persistence::snapshot::{
20 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
21 SnapshotInfo,
22};
23use crate::store::EventStore;
24use axum::{
25 extract::{Path, Query, State, WebSocketUpgrade},
26 response::{IntoResponse, Response},
27 routing::{get, post, put},
28 Json, Router,
29};
30use serde::Deserialize;
31use std::sync::Arc;
32use tower_http::cors::{Any, CorsLayer};
33use tower_http::trace::TraceLayer;
34
35type SharedStore = Arc<EventStore>;
36
37pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
38 let app = Router::new()
39 .route("/health", get(health))
40 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
42 .route("/api/v1/events/query", get(query_events))
43 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/entities/:entity_id/state", get(get_entity_state))
45 .route(
46 "/api/v1/entities/:entity_id/snapshot",
47 get(get_entity_snapshot),
48 )
49 .route("/api/v1/stats", get(get_stats))
50 .route("/api/v1/analytics/frequency", get(analytics_frequency))
52 .route("/api/v1/analytics/summary", get(analytics_summary))
53 .route("/api/v1/analytics/correlation", get(analytics_correlation))
54 .route("/api/v1/snapshots", post(create_snapshot))
56 .route("/api/v1/snapshots", get(list_snapshots))
57 .route(
58 "/api/v1/snapshots/:entity_id/latest",
59 get(get_latest_snapshot),
60 )
61 .route("/api/v1/compaction/trigger", post(trigger_compaction))
63 .route("/api/v1/compaction/stats", get(compaction_stats))
64 .route("/api/v1/schemas", post(register_schema))
66 .route("/api/v1/schemas", get(list_subjects))
67 .route("/api/v1/schemas/:subject", get(get_schema))
68 .route(
69 "/api/v1/schemas/:subject/versions",
70 get(list_schema_versions),
71 )
72 .route("/api/v1/schemas/validate", post(validate_event_schema))
73 .route(
74 "/api/v1/schemas/:subject/compatibility",
75 put(set_compatibility_mode),
76 )
77 .route("/api/v1/replay", post(start_replay))
79 .route("/api/v1/replay", get(list_replays))
80 .route("/api/v1/replay/:replay_id", get(get_replay_progress))
81 .route("/api/v1/replay/:replay_id/cancel", post(cancel_replay))
82 .route(
83 "/api/v1/replay/:replay_id",
84 axum::routing::delete(delete_replay),
85 )
86 .route("/api/v1/pipelines", post(register_pipeline))
88 .route("/api/v1/pipelines", get(list_pipelines))
89 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
90 .route("/api/v1/pipelines/:pipeline_id", get(get_pipeline))
91 .route(
92 "/api/v1/pipelines/:pipeline_id",
93 axum::routing::delete(remove_pipeline),
94 )
95 .route(
96 "/api/v1/pipelines/:pipeline_id/stats",
97 get(get_pipeline_stats),
98 )
99 .route("/api/v1/pipelines/:pipeline_id/reset", put(reset_pipeline))
100 .route("/api/v1/projections", get(list_projections))
102 .route("/api/v1/projections/:name", get(get_projection))
103 .route(
104 "/api/v1/projections/:name/:entity_id/state",
105 get(get_projection_state),
106 )
107 .route(
108 "/api/v1/projections/:name/:entity_id/state",
109 put(save_projection_state),
110 )
111 .route(
112 "/api/v1/projections/:name/bulk",
113 post(bulk_get_projection_states),
114 )
115 .layer(
116 CorsLayer::new()
117 .allow_origin(Any)
118 .allow_methods(Any)
119 .allow_headers(Any),
120 )
121 .layer(TraceLayer::new_for_http())
122 .with_state(store);
123
124 let listener = tokio::net::TcpListener::bind(addr).await?;
125 axum::serve(listener, app).await?;
126
127 Ok(())
128}
129
130pub async fn health() -> impl IntoResponse {
131 Json(serde_json::json!({
132 "status": "healthy",
133 "service": "allsource-core",
134 "version": env!("CARGO_PKG_VERSION")
135 }))
136}
137
138pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
140 let metrics = store.metrics();
141
142 match metrics.encode() {
143 Ok(encoded) => Response::builder()
144 .status(200)
145 .header("Content-Type", "text/plain; version=0.0.4")
146 .body(encoded)
147 .unwrap()
148 .into_response(),
149 Err(e) => Response::builder()
150 .status(500)
151 .body(format!("Error encoding metrics: {}", e))
152 .unwrap()
153 .into_response(),
154 }
155}
156
157pub async fn ingest_event(
158 State(store): State<SharedStore>,
159 Json(req): Json<IngestEventRequest>,
160) -> Result<Json<IngestEventResponse>> {
161 let event = Event::from_strings(
163 req.event_type,
164 req.entity_id,
165 "default".to_string(),
166 req.payload,
167 req.metadata,
168 )?;
169
170 let event_id = event.id;
171 let timestamp = event.timestamp;
172
173 store.ingest(event)?;
174
175 tracing::info!("Event ingested: {}", event_id);
176
177 Ok(Json(IngestEventResponse {
178 event_id,
179 timestamp,
180 }))
181}
182
183pub async fn query_events(
184 State(store): State<SharedStore>,
185 Query(req): Query<QueryEventsRequest>,
186) -> Result<Json<QueryEventsResponse>> {
187 let domain_events = store.query(req)?;
188 let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
189 let count = events.len();
190
191 tracing::debug!("Query returned {} events", count);
192
193 Ok(Json(QueryEventsResponse { events, count }))
194}
195
196#[derive(Deserialize)]
197pub struct EntityStateParams {
198 as_of: Option<chrono::DateTime<chrono::Utc>>,
199}
200
201pub async fn get_entity_state(
202 State(store): State<SharedStore>,
203 Path(entity_id): Path<String>,
204 Query(params): Query<EntityStateParams>,
205) -> Result<Json<serde_json::Value>> {
206 let state = store.reconstruct_state(&entity_id, params.as_of)?;
207
208 tracing::info!("State reconstructed for entity: {}", entity_id);
209
210 Ok(Json(state))
211}
212
213pub async fn get_entity_snapshot(
214 State(store): State<SharedStore>,
215 Path(entity_id): Path<String>,
216) -> Result<Json<serde_json::Value>> {
217 let snapshot = store.get_snapshot(&entity_id)?;
218
219 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
220
221 Ok(Json(snapshot))
222}
223
224pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
225 let stats = store.stats();
226 Json(stats)
227}
228
229pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
231 let websocket_manager = store.websocket_manager();
232
233 ws.on_upgrade(move |socket| async move {
234 websocket_manager.handle_socket(socket).await;
235 })
236}
237
238pub async fn analytics_frequency(
240 State(store): State<SharedStore>,
241 Query(req): Query<EventFrequencyRequest>,
242) -> Result<Json<EventFrequencyResponse>> {
243 let response = AnalyticsEngine::event_frequency(&store, req)?;
244
245 tracing::debug!(
246 "Frequency analysis returned {} buckets",
247 response.buckets.len()
248 );
249
250 Ok(Json(response))
251}
252
253pub async fn analytics_summary(
255 State(store): State<SharedStore>,
256 Query(req): Query<StatsSummaryRequest>,
257) -> Result<Json<StatsSummaryResponse>> {
258 let response = AnalyticsEngine::stats_summary(&store, req)?;
259
260 tracing::debug!(
261 "Stats summary: {} events across {} entities",
262 response.total_events,
263 response.unique_entities
264 );
265
266 Ok(Json(response))
267}
268
269pub async fn analytics_correlation(
271 State(store): State<SharedStore>,
272 Query(req): Query<CorrelationRequest>,
273) -> Result<Json<CorrelationResponse>> {
274 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
275
276 tracing::debug!(
277 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
278 response.correlated_pairs,
279 response.total_a,
280 response.correlation_percentage
281 );
282
283 Ok(Json(response))
284}
285
286pub async fn create_snapshot(
288 State(store): State<SharedStore>,
289 Json(req): Json<CreateSnapshotRequest>,
290) -> Result<Json<CreateSnapshotResponse>> {
291 store.create_snapshot(&req.entity_id)?;
292
293 let snapshot_manager = store.snapshot_manager();
294 let snapshot = snapshot_manager
295 .get_latest_snapshot(&req.entity_id)
296 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
297
298 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
299
300 Ok(Json(CreateSnapshotResponse {
301 snapshot_id: snapshot.id,
302 entity_id: snapshot.entity_id,
303 created_at: snapshot.created_at,
304 event_count: snapshot.event_count,
305 size_bytes: snapshot.metadata.size_bytes,
306 }))
307}
308
309pub async fn list_snapshots(
311 State(store): State<SharedStore>,
312 Query(req): Query<ListSnapshotsRequest>,
313) -> Result<Json<ListSnapshotsResponse>> {
314 let snapshot_manager = store.snapshot_manager();
315
316 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
317 snapshot_manager
318 .get_all_snapshots(&entity_id)
319 .into_iter()
320 .map(SnapshotInfo::from)
321 .collect()
322 } else {
323 let entities = snapshot_manager.list_entities();
325 entities
326 .iter()
327 .flat_map(|entity_id| {
328 snapshot_manager
329 .get_all_snapshots(entity_id)
330 .into_iter()
331 .map(SnapshotInfo::from)
332 })
333 .collect()
334 };
335
336 let total = snapshots.len();
337
338 tracing::debug!("Listed {} snapshots", total);
339
340 Ok(Json(ListSnapshotsResponse { snapshots, total }))
341}
342
343pub async fn get_latest_snapshot(
345 State(store): State<SharedStore>,
346 Path(entity_id): Path<String>,
347) -> Result<Json<serde_json::Value>> {
348 let snapshot_manager = store.snapshot_manager();
349
350 let snapshot = snapshot_manager
351 .get_latest_snapshot(&entity_id)
352 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
353
354 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
355
356 Ok(Json(serde_json::json!({
357 "snapshot_id": snapshot.id,
358 "entity_id": snapshot.entity_id,
359 "created_at": snapshot.created_at,
360 "as_of": snapshot.as_of,
361 "event_count": snapshot.event_count,
362 "size_bytes": snapshot.metadata.size_bytes,
363 "snapshot_type": snapshot.metadata.snapshot_type,
364 "state": snapshot.state
365 })))
366}
367
368pub async fn trigger_compaction(
370 State(store): State<SharedStore>,
371) -> Result<Json<CompactionResult>> {
372 let compaction_manager = store.compaction_manager().ok_or_else(|| {
373 crate::error::AllSourceError::InternalError(
374 "Compaction not enabled (no Parquet storage)".to_string(),
375 )
376 })?;
377
378 tracing::info!("📦 Manual compaction triggered via API");
379
380 let result = compaction_manager.compact_now()?;
381
382 Ok(Json(result))
383}
384
385pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
387 let compaction_manager = store.compaction_manager().ok_or_else(|| {
388 crate::error::AllSourceError::InternalError(
389 "Compaction not enabled (no Parquet storage)".to_string(),
390 )
391 })?;
392
393 let stats = compaction_manager.stats();
394 let config = compaction_manager.config();
395
396 Ok(Json(serde_json::json!({
397 "stats": stats,
398 "config": {
399 "min_files_to_compact": config.min_files_to_compact,
400 "target_file_size": config.target_file_size,
401 "max_file_size": config.max_file_size,
402 "small_file_threshold": config.small_file_threshold,
403 "compaction_interval_seconds": config.compaction_interval_seconds,
404 "auto_compact": config.auto_compact,
405 "strategy": config.strategy
406 }
407 })))
408}
409
410pub async fn register_schema(
412 State(store): State<SharedStore>,
413 Json(req): Json<RegisterSchemaRequest>,
414) -> Result<Json<RegisterSchemaResponse>> {
415 let schema_registry = store.schema_registry();
416
417 let response =
418 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
419
420 tracing::info!(
421 "📋 Schema registered: v{} for '{}'",
422 response.version,
423 response.subject
424 );
425
426 Ok(Json(response))
427}
428
429#[derive(Deserialize)]
431pub struct GetSchemaParams {
432 version: Option<u32>,
433}
434
435pub async fn get_schema(
436 State(store): State<SharedStore>,
437 Path(subject): Path<String>,
438 Query(params): Query<GetSchemaParams>,
439) -> Result<Json<serde_json::Value>> {
440 let schema_registry = store.schema_registry();
441
442 let schema = schema_registry.get_schema(&subject, params.version)?;
443
444 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
445
446 Ok(Json(serde_json::json!({
447 "id": schema.id,
448 "subject": schema.subject,
449 "version": schema.version,
450 "schema": schema.schema,
451 "created_at": schema.created_at,
452 "description": schema.description,
453 "tags": schema.tags
454 })))
455}
456
457pub async fn list_schema_versions(
459 State(store): State<SharedStore>,
460 Path(subject): Path<String>,
461) -> Result<Json<serde_json::Value>> {
462 let schema_registry = store.schema_registry();
463
464 let versions = schema_registry.list_versions(&subject)?;
465
466 Ok(Json(serde_json::json!({
467 "subject": subject,
468 "versions": versions
469 })))
470}
471
472pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
474 let schema_registry = store.schema_registry();
475
476 let subjects = schema_registry.list_subjects();
477
478 Json(serde_json::json!({
479 "subjects": subjects,
480 "total": subjects.len()
481 }))
482}
483
484pub async fn validate_event_schema(
486 State(store): State<SharedStore>,
487 Json(req): Json<ValidateEventRequest>,
488) -> Result<Json<ValidateEventResponse>> {
489 let schema_registry = store.schema_registry();
490
491 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
492
493 if response.valid {
494 tracing::debug!(
495 "✅ Event validated against schema '{}' v{}",
496 req.subject,
497 response.schema_version
498 );
499 } else {
500 tracing::warn!(
501 "❌ Event validation failed for '{}': {:?}",
502 req.subject,
503 response.errors
504 );
505 }
506
507 Ok(Json(response))
508}
509
510#[derive(Deserialize)]
512pub struct SetCompatibilityRequest {
513 compatibility: CompatibilityMode,
514}
515
516pub async fn set_compatibility_mode(
517 State(store): State<SharedStore>,
518 Path(subject): Path<String>,
519 Json(req): Json<SetCompatibilityRequest>,
520) -> Json<serde_json::Value> {
521 let schema_registry = store.schema_registry();
522
523 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
524
525 tracing::info!(
526 "🔧 Set compatibility mode for '{}' to {:?}",
527 subject,
528 req.compatibility
529 );
530
531 Json(serde_json::json!({
532 "subject": subject,
533 "compatibility": req.compatibility
534 }))
535}
536
537pub async fn start_replay(
539 State(store): State<SharedStore>,
540 Json(req): Json<StartReplayRequest>,
541) -> Result<Json<StartReplayResponse>> {
542 let replay_manager = store.replay_manager();
543
544 let response = replay_manager.start_replay(store, req)?;
545
546 tracing::info!(
547 "🔄 Started replay {} with {} events",
548 response.replay_id,
549 response.total_events
550 );
551
552 Ok(Json(response))
553}
554
555pub async fn get_replay_progress(
557 State(store): State<SharedStore>,
558 Path(replay_id): Path<uuid::Uuid>,
559) -> Result<Json<ReplayProgress>> {
560 let replay_manager = store.replay_manager();
561
562 let progress = replay_manager.get_progress(replay_id)?;
563
564 Ok(Json(progress))
565}
566
567pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
569 let replay_manager = store.replay_manager();
570
571 let replays = replay_manager.list_replays();
572
573 Json(serde_json::json!({
574 "replays": replays,
575 "total": replays.len()
576 }))
577}
578
579pub async fn cancel_replay(
581 State(store): State<SharedStore>,
582 Path(replay_id): Path<uuid::Uuid>,
583) -> Result<Json<serde_json::Value>> {
584 let replay_manager = store.replay_manager();
585
586 replay_manager.cancel_replay(replay_id)?;
587
588 tracing::info!("🛑 Cancelled replay {}", replay_id);
589
590 Ok(Json(serde_json::json!({
591 "replay_id": replay_id,
592 "status": "cancelled"
593 })))
594}
595
596pub async fn delete_replay(
598 State(store): State<SharedStore>,
599 Path(replay_id): Path<uuid::Uuid>,
600) -> Result<Json<serde_json::Value>> {
601 let replay_manager = store.replay_manager();
602
603 let deleted = replay_manager.delete_replay(replay_id)?;
604
605 if deleted {
606 tracing::info!("🗑️ Deleted replay {}", replay_id);
607 }
608
609 Ok(Json(serde_json::json!({
610 "replay_id": replay_id,
611 "deleted": deleted
612 })))
613}
614
615pub async fn register_pipeline(
617 State(store): State<SharedStore>,
618 Json(config): Json<PipelineConfig>,
619) -> Result<Json<serde_json::Value>> {
620 let pipeline_manager = store.pipeline_manager();
621
622 let pipeline_id = pipeline_manager.register(config.clone());
623
624 tracing::info!(
625 "🔀 Pipeline registered: {} (name: {})",
626 pipeline_id,
627 config.name
628 );
629
630 Ok(Json(serde_json::json!({
631 "pipeline_id": pipeline_id,
632 "name": config.name,
633 "enabled": config.enabled
634 })))
635}
636
637pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
639 let pipeline_manager = store.pipeline_manager();
640
641 let pipelines = pipeline_manager.list();
642
643 tracing::debug!("Listed {} pipelines", pipelines.len());
644
645 Json(serde_json::json!({
646 "pipelines": pipelines,
647 "total": pipelines.len()
648 }))
649}
650
651pub async fn get_pipeline(
653 State(store): State<SharedStore>,
654 Path(pipeline_id): Path<uuid::Uuid>,
655) -> Result<Json<PipelineConfig>> {
656 let pipeline_manager = store.pipeline_manager();
657
658 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
659 crate::error::AllSourceError::ValidationError(format!(
660 "Pipeline not found: {}",
661 pipeline_id
662 ))
663 })?;
664
665 Ok(Json(pipeline.config().clone()))
666}
667
668pub async fn remove_pipeline(
670 State(store): State<SharedStore>,
671 Path(pipeline_id): Path<uuid::Uuid>,
672) -> Result<Json<serde_json::Value>> {
673 let pipeline_manager = store.pipeline_manager();
674
675 let removed = pipeline_manager.remove(pipeline_id);
676
677 if removed {
678 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
679 }
680
681 Ok(Json(serde_json::json!({
682 "pipeline_id": pipeline_id,
683 "removed": removed
684 })))
685}
686
687pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
689 let pipeline_manager = store.pipeline_manager();
690
691 let stats = pipeline_manager.all_stats();
692
693 Json(serde_json::json!({
694 "stats": stats,
695 "total": stats.len()
696 }))
697}
698
699pub async fn get_pipeline_stats(
701 State(store): State<SharedStore>,
702 Path(pipeline_id): Path<uuid::Uuid>,
703) -> Result<Json<PipelineStats>> {
704 let pipeline_manager = store.pipeline_manager();
705
706 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
707 crate::error::AllSourceError::ValidationError(format!(
708 "Pipeline not found: {}",
709 pipeline_id
710 ))
711 })?;
712
713 Ok(Json(pipeline.stats()))
714}
715
716pub async fn reset_pipeline(
718 State(store): State<SharedStore>,
719 Path(pipeline_id): Path<uuid::Uuid>,
720) -> Result<Json<serde_json::Value>> {
721 let pipeline_manager = store.pipeline_manager();
722
723 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
724 crate::error::AllSourceError::ValidationError(format!(
725 "Pipeline not found: {}",
726 pipeline_id
727 ))
728 })?;
729
730 pipeline.reset();
731
732 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
733
734 Ok(Json(serde_json::json!({
735 "pipeline_id": pipeline_id,
736 "reset": true
737 })))
738}
739
740pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
746 let projection_manager = store.projection_manager();
747
748 let projections: Vec<serde_json::Value> = projection_manager
749 .list_projections()
750 .iter()
751 .map(|(name, projection)| {
752 serde_json::json!({
753 "name": name,
754 "type": format!("{:?}", projection.name()),
755 })
756 })
757 .collect();
758
759 tracing::debug!("Listed {} projections", projections.len());
760
761 Json(serde_json::json!({
762 "projections": projections,
763 "total": projections.len()
764 }))
765}
766
767pub async fn get_projection(
769 State(store): State<SharedStore>,
770 Path(name): Path<String>,
771) -> Result<Json<serde_json::Value>> {
772 let projection_manager = store.projection_manager();
773
774 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
775 crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
776 })?;
777
778 Ok(Json(serde_json::json!({
779 "name": projection.name(),
780 "found": true
781 })))
782}
783
784pub async fn get_projection_state(
789 State(store): State<SharedStore>,
790 Path((name, entity_id)): Path<(String, String)>,
791) -> Result<Json<serde_json::Value>> {
792 let projection_manager = store.projection_manager();
793
794 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
795 crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
796 })?;
797
798 let state = projection.get_state(&entity_id);
799
800 tracing::debug!("Projection state retrieved: {} / {}", name, entity_id);
801
802 Ok(Json(serde_json::json!({
803 "projection": name,
804 "entity_id": entity_id,
805 "state": state,
806 "found": state.is_some()
807 })))
808}
809
810#[derive(Debug, Deserialize)]
812pub struct SaveProjectionStateRequest {
813 pub state: serde_json::Value,
814}
815
816pub async fn save_projection_state(
821 State(store): State<SharedStore>,
822 Path((name, entity_id)): Path<(String, String)>,
823 Json(req): Json<SaveProjectionStateRequest>,
824) -> Result<Json<serde_json::Value>> {
825 let projection_cache = store.projection_state_cache();
826
827 projection_cache.insert(format!("{}:{}", name, entity_id), req.state.clone());
829
830 tracing::info!("Projection state saved: {} / {}", name, entity_id);
831
832 Ok(Json(serde_json::json!({
833 "projection": name,
834 "entity_id": entity_id,
835 "saved": true
836 })))
837}
838
839#[derive(Debug, Deserialize)]
843pub struct BulkGetStateRequest {
844 pub entity_ids: Vec<String>,
845}
846
847pub async fn bulk_get_projection_states(
848 State(store): State<SharedStore>,
849 Path(name): Path<String>,
850 Json(req): Json<BulkGetStateRequest>,
851) -> Result<Json<serde_json::Value>> {
852 let projection_manager = store.projection_manager();
853
854 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
855 crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
856 })?;
857
858 let states: Vec<serde_json::Value> = req
859 .entity_ids
860 .iter()
861 .map(|entity_id| {
862 let state = projection.get_state(entity_id);
863 serde_json::json!({
864 "entity_id": entity_id,
865 "state": state,
866 "found": state.is_some()
867 })
868 })
869 .collect();
870
871 tracing::debug!(
872 "Bulk projection state retrieved: {} entities from {}",
873 states.len(),
874 name
875 );
876
877 Ok(Json(serde_json::json!({
878 "projection": name,
879 "states": states,
880 "total": states.len()
881 })))
882}
883
884#[cfg(test)]
885mod tests {
886 use super::*;
887 use crate::domain::entities::Event;
888 use crate::store::EventStore;
889
890 fn create_test_store() -> Arc<EventStore> {
891 Arc::new(EventStore::new())
892 }
893
894 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
895 Event::from_strings(
896 event_type.to_string(),
897 entity_id.to_string(),
898 "test-stream".to_string(),
899 serde_json::json!({
900 "name": "Test",
901 "value": 42
902 }),
903 None,
904 )
905 .unwrap()
906 }
907
908 #[tokio::test]
909 async fn test_projection_state_cache() {
910 let store = create_test_store();
911
912 let cache = store.projection_state_cache();
914 cache.insert(
915 "entity_snapshots:user-123".to_string(),
916 serde_json::json!({"name": "Test User", "age": 30}),
917 );
918
919 let state = cache.get("entity_snapshots:user-123");
921 assert!(state.is_some());
922 let state = state.unwrap();
923 assert_eq!(state["name"], "Test User");
924 assert_eq!(state["age"], 30);
925 }
926
927 #[tokio::test]
928 async fn test_projection_manager_list_projections() {
929 let store = create_test_store();
930
931 let projection_manager = store.projection_manager();
933 let projections = projection_manager.list_projections();
934
935 assert!(projections.len() >= 2);
937
938 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
939 assert!(names.contains(&"entity_snapshots"));
940 assert!(names.contains(&"event_counters"));
941 }
942
943 #[tokio::test]
944 async fn test_projection_state_after_event_ingestion() {
945 let store = create_test_store();
946
947 let event = create_test_event("user-456", "user.created");
949 store.ingest(event).unwrap();
950
951 let projection_manager = store.projection_manager();
953 let snapshot_projection = projection_manager
954 .get_projection("entity_snapshots")
955 .unwrap();
956
957 let state = snapshot_projection.get_state("user-456");
958 assert!(state.is_some());
959 let state = state.unwrap();
960 assert_eq!(state["name"], "Test");
961 assert_eq!(state["value"], 42);
962 }
963
964 #[tokio::test]
965 async fn test_projection_state_cache_multiple_entities() {
966 let store = create_test_store();
967 let cache = store.projection_state_cache();
968
969 for i in 0..10 {
971 cache.insert(
972 format!("entity_snapshots:entity-{}", i),
973 serde_json::json!({"id": i, "status": "active"}),
974 );
975 }
976
977 assert_eq!(cache.len(), 10);
979
980 for i in 0..10 {
982 let key = format!("entity_snapshots:entity-{}", i);
983 let state = cache.get(&key);
984 assert!(state.is_some());
985 assert_eq!(state.unwrap()["id"], i);
986 }
987 }
988
989 #[tokio::test]
990 async fn test_projection_state_update() {
991 let store = create_test_store();
992 let cache = store.projection_state_cache();
993
994 cache.insert(
996 "entity_snapshots:user-789".to_string(),
997 serde_json::json!({"balance": 100}),
998 );
999
1000 cache.insert(
1002 "entity_snapshots:user-789".to_string(),
1003 serde_json::json!({"balance": 150}),
1004 );
1005
1006 let state = cache.get("entity_snapshots:user-789").unwrap();
1008 assert_eq!(state["balance"], 150);
1009 }
1010
1011 #[tokio::test]
1012 async fn test_event_counter_projection() {
1013 let store = create_test_store();
1014
1015 store
1017 .ingest(create_test_event("user-1", "user.created"))
1018 .unwrap();
1019 store
1020 .ingest(create_test_event("user-2", "user.created"))
1021 .unwrap();
1022 store
1023 .ingest(create_test_event("user-1", "user.updated"))
1024 .unwrap();
1025
1026 let projection_manager = store.projection_manager();
1028 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1029
1030 let created_state = counter_projection.get_state("user.created");
1032 assert!(created_state.is_some());
1033 assert_eq!(created_state.unwrap()["count"], 2);
1034
1035 let updated_state = counter_projection.get_state("user.updated");
1036 assert!(updated_state.is_some());
1037 assert_eq!(updated_state.unwrap()["count"], 1);
1038 }
1039}