1use crate::application::services::analytics::{
2 AnalyticsEngine, CorrelationRequest, CorrelationResponse, EventFrequencyRequest,
3 EventFrequencyResponse, StatsSummaryRequest, StatsSummaryResponse,
4};
5use crate::application::dto::{
6 EventDto, IngestEventRequest, IngestEventResponse, QueryEventsRequest, QueryEventsResponse,
7};
8use crate::infrastructure::persistence::compaction::CompactionResult;
9use crate::domain::entities::Event;
10use crate::error::Result;
11use crate::application::services::pipeline::{PipelineConfig, PipelineStats};
12use crate::application::services::replay::{ReplayProgress, StartReplayRequest, StartReplayResponse};
13use crate::application::services::schema::{
14 CompatibilityMode, RegisterSchemaRequest, RegisterSchemaResponse, ValidateEventRequest,
15 ValidateEventResponse,
16};
17use crate::infrastructure::persistence::snapshot::{
18 CreateSnapshotRequest, CreateSnapshotResponse, ListSnapshotsRequest, ListSnapshotsResponse,
19 SnapshotInfo,
20};
21use crate::store::EventStore;
22use axum::{
23 extract::{Path, Query, State, WebSocketUpgrade},
24 response::{IntoResponse, Response},
25 routing::{get, post, put},
26 Json, Router,
27};
28use serde::Deserialize;
29use std::sync::Arc;
30use tower_http::cors::{Any, CorsLayer};
31use tower_http::trace::TraceLayer;
32
33type SharedStore = Arc<EventStore>;
34
35pub async fn serve(store: SharedStore, addr: &str) -> anyhow::Result<()> {
36 let app = Router::new()
37 .route("/health", get(health))
38 .route("/metrics", get(prometheus_metrics)) .route("/api/v1/events", post(ingest_event))
40 .route("/api/v1/events/query", get(query_events))
41 .route("/api/v1/events/stream", get(events_websocket)) .route("/api/v1/entities/:entity_id/state", get(get_entity_state))
43 .route(
44 "/api/v1/entities/:entity_id/snapshot",
45 get(get_entity_snapshot),
46 )
47 .route("/api/v1/stats", get(get_stats))
48 .route("/api/v1/analytics/frequency", get(analytics_frequency))
50 .route("/api/v1/analytics/summary", get(analytics_summary))
51 .route("/api/v1/analytics/correlation", get(analytics_correlation))
52 .route("/api/v1/snapshots", post(create_snapshot))
54 .route("/api/v1/snapshots", get(list_snapshots))
55 .route(
56 "/api/v1/snapshots/:entity_id/latest",
57 get(get_latest_snapshot),
58 )
59 .route("/api/v1/compaction/trigger", post(trigger_compaction))
61 .route("/api/v1/compaction/stats", get(compaction_stats))
62 .route("/api/v1/schemas", post(register_schema))
64 .route("/api/v1/schemas", get(list_subjects))
65 .route("/api/v1/schemas/:subject", get(get_schema))
66 .route(
67 "/api/v1/schemas/:subject/versions",
68 get(list_schema_versions),
69 )
70 .route("/api/v1/schemas/validate", post(validate_event_schema))
71 .route(
72 "/api/v1/schemas/:subject/compatibility",
73 put(set_compatibility_mode),
74 )
75 .route("/api/v1/replay", post(start_replay))
77 .route("/api/v1/replay", get(list_replays))
78 .route("/api/v1/replay/:replay_id", get(get_replay_progress))
79 .route("/api/v1/replay/:replay_id/cancel", post(cancel_replay))
80 .route(
81 "/api/v1/replay/:replay_id",
82 axum::routing::delete(delete_replay),
83 )
84 .route("/api/v1/pipelines", post(register_pipeline))
86 .route("/api/v1/pipelines", get(list_pipelines))
87 .route("/api/v1/pipelines/stats", get(all_pipeline_stats))
88 .route("/api/v1/pipelines/:pipeline_id", get(get_pipeline))
89 .route(
90 "/api/v1/pipelines/:pipeline_id",
91 axum::routing::delete(remove_pipeline),
92 )
93 .route(
94 "/api/v1/pipelines/:pipeline_id/stats",
95 get(get_pipeline_stats),
96 )
97 .route("/api/v1/pipelines/:pipeline_id/reset", put(reset_pipeline))
98 .route("/api/v1/projections", get(list_projections))
100 .route("/api/v1/projections/:name", get(get_projection))
101 .route(
102 "/api/v1/projections/:name/:entity_id/state",
103 get(get_projection_state),
104 )
105 .route(
106 "/api/v1/projections/:name/:entity_id/state",
107 put(save_projection_state),
108 )
109 .route(
110 "/api/v1/projections/:name/bulk",
111 post(bulk_get_projection_states),
112 )
113 .layer(
114 CorsLayer::new()
115 .allow_origin(Any)
116 .allow_methods(Any)
117 .allow_headers(Any),
118 )
119 .layer(TraceLayer::new_for_http())
120 .with_state(store);
121
122 let listener = tokio::net::TcpListener::bind(addr).await?;
123 axum::serve(listener, app).await?;
124
125 Ok(())
126}
127
128pub async fn health() -> impl IntoResponse {
129 Json(serde_json::json!({
130 "status": "healthy",
131 "service": "allsource-core",
132 "version": env!("CARGO_PKG_VERSION")
133 }))
134}
135
136pub async fn prometheus_metrics(State(store): State<SharedStore>) -> impl IntoResponse {
138 let metrics = store.metrics();
139
140 match metrics.encode() {
141 Ok(encoded) => Response::builder()
142 .status(200)
143 .header("Content-Type", "text/plain; version=0.0.4")
144 .body(encoded)
145 .unwrap()
146 .into_response(),
147 Err(e) => Response::builder()
148 .status(500)
149 .body(format!("Error encoding metrics: {}", e))
150 .unwrap()
151 .into_response(),
152 }
153}
154
155pub async fn ingest_event(
156 State(store): State<SharedStore>,
157 Json(req): Json<IngestEventRequest>,
158) -> Result<Json<IngestEventResponse>> {
159 let event = Event::from_strings(
161 req.event_type,
162 req.entity_id,
163 "default".to_string(),
164 req.payload,
165 req.metadata,
166 )?;
167
168 let event_id = event.id;
169 let timestamp = event.timestamp;
170
171 store.ingest(event)?;
172
173 tracing::info!("Event ingested: {}", event_id);
174
175 Ok(Json(IngestEventResponse {
176 event_id,
177 timestamp,
178 }))
179}
180
181pub async fn query_events(
182 State(store): State<SharedStore>,
183 Query(req): Query<QueryEventsRequest>,
184) -> Result<Json<QueryEventsResponse>> {
185 let domain_events = store.query(req)?;
186 let events: Vec<EventDto> = domain_events.iter().map(EventDto::from).collect();
187 let count = events.len();
188
189 tracing::debug!("Query returned {} events", count);
190
191 Ok(Json(QueryEventsResponse { events, count }))
192}
193
194#[derive(Deserialize)]
195pub struct EntityStateParams {
196 as_of: Option<chrono::DateTime<chrono::Utc>>,
197}
198
199pub async fn get_entity_state(
200 State(store): State<SharedStore>,
201 Path(entity_id): Path<String>,
202 Query(params): Query<EntityStateParams>,
203) -> Result<Json<serde_json::Value>> {
204 let state = store.reconstruct_state(&entity_id, params.as_of)?;
205
206 tracing::info!("State reconstructed for entity: {}", entity_id);
207
208 Ok(Json(state))
209}
210
211pub async fn get_entity_snapshot(
212 State(store): State<SharedStore>,
213 Path(entity_id): Path<String>,
214) -> Result<Json<serde_json::Value>> {
215 let snapshot = store.get_snapshot(&entity_id)?;
216
217 tracing::debug!("Snapshot retrieved for entity: {}", entity_id);
218
219 Ok(Json(snapshot))
220}
221
222pub async fn get_stats(State(store): State<SharedStore>) -> impl IntoResponse {
223 let stats = store.stats();
224 Json(stats)
225}
226
227pub async fn events_websocket(ws: WebSocketUpgrade, State(store): State<SharedStore>) -> Response {
229 let websocket_manager = store.websocket_manager();
230
231 ws.on_upgrade(move |socket| async move {
232 websocket_manager.handle_socket(socket).await;
233 })
234}
235
236pub async fn analytics_frequency(
238 State(store): State<SharedStore>,
239 Query(req): Query<EventFrequencyRequest>,
240) -> Result<Json<EventFrequencyResponse>> {
241 let response = AnalyticsEngine::event_frequency(&store, req)?;
242
243 tracing::debug!(
244 "Frequency analysis returned {} buckets",
245 response.buckets.len()
246 );
247
248 Ok(Json(response))
249}
250
251pub async fn analytics_summary(
253 State(store): State<SharedStore>,
254 Query(req): Query<StatsSummaryRequest>,
255) -> Result<Json<StatsSummaryResponse>> {
256 let response = AnalyticsEngine::stats_summary(&store, req)?;
257
258 tracing::debug!(
259 "Stats summary: {} events across {} entities",
260 response.total_events,
261 response.unique_entities
262 );
263
264 Ok(Json(response))
265}
266
267pub async fn analytics_correlation(
269 State(store): State<SharedStore>,
270 Query(req): Query<CorrelationRequest>,
271) -> Result<Json<CorrelationResponse>> {
272 let response = AnalyticsEngine::analyze_correlation(&store, req)?;
273
274 tracing::debug!(
275 "Correlation analysis: {}/{} correlated pairs ({:.2}%)",
276 response.correlated_pairs,
277 response.total_a,
278 response.correlation_percentage
279 );
280
281 Ok(Json(response))
282}
283
284pub async fn create_snapshot(
286 State(store): State<SharedStore>,
287 Json(req): Json<CreateSnapshotRequest>,
288) -> Result<Json<CreateSnapshotResponse>> {
289 store.create_snapshot(&req.entity_id)?;
290
291 let snapshot_manager = store.snapshot_manager();
292 let snapshot = snapshot_manager
293 .get_latest_snapshot(&req.entity_id)
294 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(req.entity_id.clone()))?;
295
296 tracing::info!("📸 Created snapshot for entity: {}", req.entity_id);
297
298 Ok(Json(CreateSnapshotResponse {
299 snapshot_id: snapshot.id,
300 entity_id: snapshot.entity_id,
301 created_at: snapshot.created_at,
302 event_count: snapshot.event_count,
303 size_bytes: snapshot.metadata.size_bytes,
304 }))
305}
306
307pub async fn list_snapshots(
309 State(store): State<SharedStore>,
310 Query(req): Query<ListSnapshotsRequest>,
311) -> Result<Json<ListSnapshotsResponse>> {
312 let snapshot_manager = store.snapshot_manager();
313
314 let snapshots: Vec<SnapshotInfo> = if let Some(entity_id) = req.entity_id {
315 snapshot_manager
316 .get_all_snapshots(&entity_id)
317 .into_iter()
318 .map(SnapshotInfo::from)
319 .collect()
320 } else {
321 let entities = snapshot_manager.list_entities();
323 entities
324 .iter()
325 .flat_map(|entity_id| {
326 snapshot_manager
327 .get_all_snapshots(entity_id)
328 .into_iter()
329 .map(SnapshotInfo::from)
330 })
331 .collect()
332 };
333
334 let total = snapshots.len();
335
336 tracing::debug!("Listed {} snapshots", total);
337
338 Ok(Json(ListSnapshotsResponse { snapshots, total }))
339}
340
341pub async fn get_latest_snapshot(
343 State(store): State<SharedStore>,
344 Path(entity_id): Path<String>,
345) -> Result<Json<serde_json::Value>> {
346 let snapshot_manager = store.snapshot_manager();
347
348 let snapshot = snapshot_manager
349 .get_latest_snapshot(&entity_id)
350 .ok_or_else(|| crate::error::AllSourceError::EntityNotFound(entity_id.clone()))?;
351
352 tracing::debug!("Retrieved latest snapshot for entity: {}", entity_id);
353
354 Ok(Json(serde_json::json!({
355 "snapshot_id": snapshot.id,
356 "entity_id": snapshot.entity_id,
357 "created_at": snapshot.created_at,
358 "as_of": snapshot.as_of,
359 "event_count": snapshot.event_count,
360 "size_bytes": snapshot.metadata.size_bytes,
361 "snapshot_type": snapshot.metadata.snapshot_type,
362 "state": snapshot.state
363 })))
364}
365
366pub async fn trigger_compaction(
368 State(store): State<SharedStore>,
369) -> Result<Json<CompactionResult>> {
370 let compaction_manager = store.compaction_manager().ok_or_else(|| {
371 crate::error::AllSourceError::InternalError(
372 "Compaction not enabled (no Parquet storage)".to_string(),
373 )
374 })?;
375
376 tracing::info!("📦 Manual compaction triggered via API");
377
378 let result = compaction_manager.compact_now()?;
379
380 Ok(Json(result))
381}
382
383pub async fn compaction_stats(State(store): State<SharedStore>) -> Result<Json<serde_json::Value>> {
385 let compaction_manager = store.compaction_manager().ok_or_else(|| {
386 crate::error::AllSourceError::InternalError(
387 "Compaction not enabled (no Parquet storage)".to_string(),
388 )
389 })?;
390
391 let stats = compaction_manager.stats();
392 let config = compaction_manager.config();
393
394 Ok(Json(serde_json::json!({
395 "stats": stats,
396 "config": {
397 "min_files_to_compact": config.min_files_to_compact,
398 "target_file_size": config.target_file_size,
399 "max_file_size": config.max_file_size,
400 "small_file_threshold": config.small_file_threshold,
401 "compaction_interval_seconds": config.compaction_interval_seconds,
402 "auto_compact": config.auto_compact,
403 "strategy": config.strategy
404 }
405 })))
406}
407
408pub async fn register_schema(
410 State(store): State<SharedStore>,
411 Json(req): Json<RegisterSchemaRequest>,
412) -> Result<Json<RegisterSchemaResponse>> {
413 let schema_registry = store.schema_registry();
414
415 let response =
416 schema_registry.register_schema(req.subject, req.schema, req.description, req.tags)?;
417
418 tracing::info!(
419 "📋 Schema registered: v{} for '{}'",
420 response.version,
421 response.subject
422 );
423
424 Ok(Json(response))
425}
426
427#[derive(Deserialize)]
429pub struct GetSchemaParams {
430 version: Option<u32>,
431}
432
433pub async fn get_schema(
434 State(store): State<SharedStore>,
435 Path(subject): Path<String>,
436 Query(params): Query<GetSchemaParams>,
437) -> Result<Json<serde_json::Value>> {
438 let schema_registry = store.schema_registry();
439
440 let schema = schema_registry.get_schema(&subject, params.version)?;
441
442 tracing::debug!("Retrieved schema v{} for '{}'", schema.version, subject);
443
444 Ok(Json(serde_json::json!({
445 "id": schema.id,
446 "subject": schema.subject,
447 "version": schema.version,
448 "schema": schema.schema,
449 "created_at": schema.created_at,
450 "description": schema.description,
451 "tags": schema.tags
452 })))
453}
454
455pub async fn list_schema_versions(
457 State(store): State<SharedStore>,
458 Path(subject): Path<String>,
459) -> Result<Json<serde_json::Value>> {
460 let schema_registry = store.schema_registry();
461
462 let versions = schema_registry.list_versions(&subject)?;
463
464 Ok(Json(serde_json::json!({
465 "subject": subject,
466 "versions": versions
467 })))
468}
469
470pub async fn list_subjects(State(store): State<SharedStore>) -> Json<serde_json::Value> {
472 let schema_registry = store.schema_registry();
473
474 let subjects = schema_registry.list_subjects();
475
476 Json(serde_json::json!({
477 "subjects": subjects,
478 "total": subjects.len()
479 }))
480}
481
482pub async fn validate_event_schema(
484 State(store): State<SharedStore>,
485 Json(req): Json<ValidateEventRequest>,
486) -> Result<Json<ValidateEventResponse>> {
487 let schema_registry = store.schema_registry();
488
489 let response = schema_registry.validate(&req.subject, req.version, &req.payload)?;
490
491 if response.valid {
492 tracing::debug!(
493 "✅ Event validated against schema '{}' v{}",
494 req.subject,
495 response.schema_version
496 );
497 } else {
498 tracing::warn!(
499 "❌ Event validation failed for '{}': {:?}",
500 req.subject,
501 response.errors
502 );
503 }
504
505 Ok(Json(response))
506}
507
508#[derive(Deserialize)]
510pub struct SetCompatibilityRequest {
511 compatibility: CompatibilityMode,
512}
513
514pub async fn set_compatibility_mode(
515 State(store): State<SharedStore>,
516 Path(subject): Path<String>,
517 Json(req): Json<SetCompatibilityRequest>,
518) -> Json<serde_json::Value> {
519 let schema_registry = store.schema_registry();
520
521 schema_registry.set_compatibility_mode(subject.clone(), req.compatibility);
522
523 tracing::info!(
524 "🔧 Set compatibility mode for '{}' to {:?}",
525 subject,
526 req.compatibility
527 );
528
529 Json(serde_json::json!({
530 "subject": subject,
531 "compatibility": req.compatibility
532 }))
533}
534
535pub async fn start_replay(
537 State(store): State<SharedStore>,
538 Json(req): Json<StartReplayRequest>,
539) -> Result<Json<StartReplayResponse>> {
540 let replay_manager = store.replay_manager();
541
542 let response = replay_manager.start_replay(store, req)?;
543
544 tracing::info!(
545 "🔄 Started replay {} with {} events",
546 response.replay_id,
547 response.total_events
548 );
549
550 Ok(Json(response))
551}
552
553pub async fn get_replay_progress(
555 State(store): State<SharedStore>,
556 Path(replay_id): Path<uuid::Uuid>,
557) -> Result<Json<ReplayProgress>> {
558 let replay_manager = store.replay_manager();
559
560 let progress = replay_manager.get_progress(replay_id)?;
561
562 Ok(Json(progress))
563}
564
565pub async fn list_replays(State(store): State<SharedStore>) -> Json<serde_json::Value> {
567 let replay_manager = store.replay_manager();
568
569 let replays = replay_manager.list_replays();
570
571 Json(serde_json::json!({
572 "replays": replays,
573 "total": replays.len()
574 }))
575}
576
577pub async fn cancel_replay(
579 State(store): State<SharedStore>,
580 Path(replay_id): Path<uuid::Uuid>,
581) -> Result<Json<serde_json::Value>> {
582 let replay_manager = store.replay_manager();
583
584 replay_manager.cancel_replay(replay_id)?;
585
586 tracing::info!("🛑 Cancelled replay {}", replay_id);
587
588 Ok(Json(serde_json::json!({
589 "replay_id": replay_id,
590 "status": "cancelled"
591 })))
592}
593
594pub async fn delete_replay(
596 State(store): State<SharedStore>,
597 Path(replay_id): Path<uuid::Uuid>,
598) -> Result<Json<serde_json::Value>> {
599 let replay_manager = store.replay_manager();
600
601 let deleted = replay_manager.delete_replay(replay_id)?;
602
603 if deleted {
604 tracing::info!("🗑️ Deleted replay {}", replay_id);
605 }
606
607 Ok(Json(serde_json::json!({
608 "replay_id": replay_id,
609 "deleted": deleted
610 })))
611}
612
613pub async fn register_pipeline(
615 State(store): State<SharedStore>,
616 Json(config): Json<PipelineConfig>,
617) -> Result<Json<serde_json::Value>> {
618 let pipeline_manager = store.pipeline_manager();
619
620 let pipeline_id = pipeline_manager.register(config.clone());
621
622 tracing::info!(
623 "🔀 Pipeline registered: {} (name: {})",
624 pipeline_id,
625 config.name
626 );
627
628 Ok(Json(serde_json::json!({
629 "pipeline_id": pipeline_id,
630 "name": config.name,
631 "enabled": config.enabled
632 })))
633}
634
635pub async fn list_pipelines(State(store): State<SharedStore>) -> Json<serde_json::Value> {
637 let pipeline_manager = store.pipeline_manager();
638
639 let pipelines = pipeline_manager.list();
640
641 tracing::debug!("Listed {} pipelines", pipelines.len());
642
643 Json(serde_json::json!({
644 "pipelines": pipelines,
645 "total": pipelines.len()
646 }))
647}
648
649pub async fn get_pipeline(
651 State(store): State<SharedStore>,
652 Path(pipeline_id): Path<uuid::Uuid>,
653) -> Result<Json<PipelineConfig>> {
654 let pipeline_manager = store.pipeline_manager();
655
656 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
657 crate::error::AllSourceError::ValidationError(format!(
658 "Pipeline not found: {}",
659 pipeline_id
660 ))
661 })?;
662
663 Ok(Json(pipeline.config().clone()))
664}
665
666pub async fn remove_pipeline(
668 State(store): State<SharedStore>,
669 Path(pipeline_id): Path<uuid::Uuid>,
670) -> Result<Json<serde_json::Value>> {
671 let pipeline_manager = store.pipeline_manager();
672
673 let removed = pipeline_manager.remove(pipeline_id);
674
675 if removed {
676 tracing::info!("🗑️ Removed pipeline {}", pipeline_id);
677 }
678
679 Ok(Json(serde_json::json!({
680 "pipeline_id": pipeline_id,
681 "removed": removed
682 })))
683}
684
685pub async fn all_pipeline_stats(State(store): State<SharedStore>) -> Json<serde_json::Value> {
687 let pipeline_manager = store.pipeline_manager();
688
689 let stats = pipeline_manager.all_stats();
690
691 Json(serde_json::json!({
692 "stats": stats,
693 "total": stats.len()
694 }))
695}
696
697pub async fn get_pipeline_stats(
699 State(store): State<SharedStore>,
700 Path(pipeline_id): Path<uuid::Uuid>,
701) -> Result<Json<PipelineStats>> {
702 let pipeline_manager = store.pipeline_manager();
703
704 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
705 crate::error::AllSourceError::ValidationError(format!(
706 "Pipeline not found: {}",
707 pipeline_id
708 ))
709 })?;
710
711 Ok(Json(pipeline.stats()))
712}
713
714pub async fn reset_pipeline(
716 State(store): State<SharedStore>,
717 Path(pipeline_id): Path<uuid::Uuid>,
718) -> Result<Json<serde_json::Value>> {
719 let pipeline_manager = store.pipeline_manager();
720
721 let pipeline = pipeline_manager.get(pipeline_id).ok_or_else(|| {
722 crate::error::AllSourceError::ValidationError(format!(
723 "Pipeline not found: {}",
724 pipeline_id
725 ))
726 })?;
727
728 pipeline.reset();
729
730 tracing::info!("🔄 Reset pipeline {}", pipeline_id);
731
732 Ok(Json(serde_json::json!({
733 "pipeline_id": pipeline_id,
734 "reset": true
735 })))
736}
737
738pub async fn list_projections(State(store): State<SharedStore>) -> Json<serde_json::Value> {
744 let projection_manager = store.projection_manager();
745
746 let projections: Vec<serde_json::Value> = projection_manager
747 .list_projections()
748 .iter()
749 .map(|(name, projection)| {
750 serde_json::json!({
751 "name": name,
752 "type": format!("{:?}", projection.name()),
753 })
754 })
755 .collect();
756
757 tracing::debug!("Listed {} projections", projections.len());
758
759 Json(serde_json::json!({
760 "projections": projections,
761 "total": projections.len()
762 }))
763}
764
765pub async fn get_projection(
767 State(store): State<SharedStore>,
768 Path(name): Path<String>,
769) -> Result<Json<serde_json::Value>> {
770 let projection_manager = store.projection_manager();
771
772 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
773 crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
774 })?;
775
776 Ok(Json(serde_json::json!({
777 "name": projection.name(),
778 "found": true
779 })))
780}
781
782pub async fn get_projection_state(
787 State(store): State<SharedStore>,
788 Path((name, entity_id)): Path<(String, String)>,
789) -> Result<Json<serde_json::Value>> {
790 let projection_manager = store.projection_manager();
791
792 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
793 crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
794 })?;
795
796 let state = projection.get_state(&entity_id);
797
798 tracing::debug!(
799 "Projection state retrieved: {} / {}",
800 name,
801 entity_id
802 );
803
804 Ok(Json(serde_json::json!({
805 "projection": name,
806 "entity_id": entity_id,
807 "state": state,
808 "found": state.is_some()
809 })))
810}
811
812#[derive(Debug, Deserialize)]
814pub struct SaveProjectionStateRequest {
815 pub state: serde_json::Value,
816}
817
818pub async fn save_projection_state(
823 State(store): State<SharedStore>,
824 Path((name, entity_id)): Path<(String, String)>,
825 Json(req): Json<SaveProjectionStateRequest>,
826) -> Result<Json<serde_json::Value>> {
827 let projection_cache = store.projection_state_cache();
828
829 projection_cache.insert(
831 format!("{}:{}", name, entity_id),
832 req.state.clone(),
833 );
834
835 tracing::info!(
836 "Projection state saved: {} / {}",
837 name,
838 entity_id
839 );
840
841 Ok(Json(serde_json::json!({
842 "projection": name,
843 "entity_id": entity_id,
844 "saved": true
845 })))
846}
847
848#[derive(Debug, Deserialize)]
852pub struct BulkGetStateRequest {
853 pub entity_ids: Vec<String>,
854}
855
856pub async fn bulk_get_projection_states(
857 State(store): State<SharedStore>,
858 Path(name): Path<String>,
859 Json(req): Json<BulkGetStateRequest>,
860) -> Result<Json<serde_json::Value>> {
861 let projection_manager = store.projection_manager();
862
863 let projection = projection_manager.get_projection(&name).ok_or_else(|| {
864 crate::error::AllSourceError::EntityNotFound(format!("Projection '{}' not found", name))
865 })?;
866
867 let states: Vec<serde_json::Value> = req
868 .entity_ids
869 .iter()
870 .map(|entity_id| {
871 let state = projection.get_state(entity_id);
872 serde_json::json!({
873 "entity_id": entity_id,
874 "state": state,
875 "found": state.is_some()
876 })
877 })
878 .collect();
879
880 tracing::debug!(
881 "Bulk projection state retrieved: {} entities from {}",
882 states.len(),
883 name
884 );
885
886 Ok(Json(serde_json::json!({
887 "projection": name,
888 "states": states,
889 "total": states.len()
890 })))
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896 use crate::domain::entities::Event;
897 use crate::store::EventStore;
898
899 fn create_test_store() -> Arc<EventStore> {
900 Arc::new(EventStore::new())
901 }
902
903 fn create_test_event(entity_id: &str, event_type: &str) -> Event {
904 Event::from_strings(
905 event_type.to_string(),
906 entity_id.to_string(),
907 "test-stream".to_string(),
908 serde_json::json!({
909 "name": "Test",
910 "value": 42
911 }),
912 None,
913 )
914 .unwrap()
915 }
916
917 #[tokio::test]
918 async fn test_projection_state_cache() {
919 let store = create_test_store();
920
921 let cache = store.projection_state_cache();
923 cache.insert(
924 "entity_snapshots:user-123".to_string(),
925 serde_json::json!({"name": "Test User", "age": 30}),
926 );
927
928 let state = cache.get("entity_snapshots:user-123");
930 assert!(state.is_some());
931 let state = state.unwrap();
932 assert_eq!(state["name"], "Test User");
933 assert_eq!(state["age"], 30);
934 }
935
936 #[tokio::test]
937 async fn test_projection_manager_list_projections() {
938 let store = create_test_store();
939
940 let projection_manager = store.projection_manager();
942 let projections = projection_manager.list_projections();
943
944 assert!(projections.len() >= 2);
946
947 let names: Vec<&str> = projections.iter().map(|(name, _)| name.as_str()).collect();
948 assert!(names.contains(&"entity_snapshots"));
949 assert!(names.contains(&"event_counters"));
950 }
951
952 #[tokio::test]
953 async fn test_projection_state_after_event_ingestion() {
954 let store = create_test_store();
955
956 let event = create_test_event("user-456", "user.created");
958 store.ingest(event).unwrap();
959
960 let projection_manager = store.projection_manager();
962 let snapshot_projection = projection_manager.get_projection("entity_snapshots").unwrap();
963
964 let state = snapshot_projection.get_state("user-456");
965 assert!(state.is_some());
966 let state = state.unwrap();
967 assert_eq!(state["name"], "Test");
968 assert_eq!(state["value"], 42);
969 }
970
971 #[tokio::test]
972 async fn test_projection_state_cache_multiple_entities() {
973 let store = create_test_store();
974 let cache = store.projection_state_cache();
975
976 for i in 0..10 {
978 cache.insert(
979 format!("entity_snapshots:entity-{}", i),
980 serde_json::json!({"id": i, "status": "active"}),
981 );
982 }
983
984 assert_eq!(cache.len(), 10);
986
987 for i in 0..10 {
989 let key = format!("entity_snapshots:entity-{}", i);
990 let state = cache.get(&key);
991 assert!(state.is_some());
992 assert_eq!(state.unwrap()["id"], i);
993 }
994 }
995
996 #[tokio::test]
997 async fn test_projection_state_update() {
998 let store = create_test_store();
999 let cache = store.projection_state_cache();
1000
1001 cache.insert(
1003 "entity_snapshots:user-789".to_string(),
1004 serde_json::json!({"balance": 100}),
1005 );
1006
1007 cache.insert(
1009 "entity_snapshots:user-789".to_string(),
1010 serde_json::json!({"balance": 150}),
1011 );
1012
1013 let state = cache.get("entity_snapshots:user-789").unwrap();
1015 assert_eq!(state["balance"], 150);
1016 }
1017
1018 #[tokio::test]
1019 async fn test_event_counter_projection() {
1020 let store = create_test_store();
1021
1022 store.ingest(create_test_event("user-1", "user.created")).unwrap();
1024 store.ingest(create_test_event("user-2", "user.created")).unwrap();
1025 store.ingest(create_test_event("user-1", "user.updated")).unwrap();
1026
1027 let projection_manager = store.projection_manager();
1029 let counter_projection = projection_manager.get_projection("event_counters").unwrap();
1030
1031 let created_state = counter_projection.get_state("user.created");
1033 assert!(created_state.is_some());
1034 assert_eq!(created_state.unwrap()["count"], 2);
1035
1036 let updated_state = counter_projection.get_state("user.updated");
1037 assert!(updated_state.is_some());
1038 assert_eq!(updated_state.unwrap()["count"], 1);
1039 }
1040}