1use crate::es::storage::{EventStoreStorage, EventStream};
2use crate::{
3 Aggregate, CqrsContext, CqrsError, EventEnvelope, EventStore, MaybeSend, MaybeSync, Snapshot,
4};
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::sync::Arc;
8use tracing::{debug, error, info};
9
10#[derive(Debug, Clone)]
11pub struct EventStoreImpl<A, P>
12where
13 A: Aggregate + 'static,
14 P: EventStoreStorage<A> + MaybeSend + MaybeSync + Clone + Debug + 'static,
15{
16 _phantom: std::marker::PhantomData<(A, P)>,
17 persist: P,
18}
19
20impl<A, P> EventStoreImpl<A, P>
21where
22 A: Aggregate + 'static,
23 P: EventStoreStorage<A> + MaybeSend + MaybeSync + Clone + Debug + 'static,
24{
25 #[must_use]
26 pub fn new(persist: P) -> Arc<Self> {
27 Arc::new(Self {
28 _phantom: Default::default(),
29 persist,
30 })
31 }
32
33 async fn execute_within_session(
34 &self,
35 session: &mut P::Session,
36 events: Vec<A::Event>,
37 aggregate: &A,
38 metadata: HashMap<String, String>,
39 version: usize,
40 context: &CqrsContext,
41 ) -> Result<Vec<EventEnvelope<A>>, CqrsError> {
42 let latest_event = match self.persist.fetch_latest_event(aggregate, session).await {
43 Ok(event) => {
44 debug!(has_event = event.is_some(), "Fetched latest event");
45 event
46 }
47 Err(e) => {
48 error!(error = %e, "Failed to fetch latest event");
49 return Err(e);
50 }
51 };
52
53 let latest_version = latest_event.map(|e| e.version).unwrap_or(0);
54 debug!(latest_version = %latest_version, expected_version = %version, "Checking version");
55
56 if version != latest_version {
57 error!(latest_version = %latest_version, expected_version = %version, "Version conflict detected");
58 return Err(CqrsError::concurrency_error());
59 }
60
61 debug!("Creating event envelopes");
62 let envelopes = events
63 .iter()
64 .enumerate()
65 .map(|(i, e)| {
66 let event_id = context.next_uuid();
67 let event_version = version + i + 1;
68 debug!(event_id = %event_id, event_version = %event_version, "Creating event envelope");
69 EventEnvelope {
70 event_id,
71 aggregate_id: aggregate.aggregate_id(),
72 version: event_version,
73 payload: e.clone(),
74 metadata: metadata.clone(),
75 at: context.now(),
76 }
77 })
78 .collect::<Vec<_>>();
79
80 debug!(event_count = envelopes.len(), "Saving events");
81 if let Err(e) = self.persist.save_events(envelopes.clone(), session).await {
82 error!(error = %e, "Failed to save events");
83 return Err(e);
84 }
85 debug!("Events saved successfully");
86
87 let next_latest_version = version + envelopes.len();
88 debug!(next_version = %next_latest_version, "Saving snapshot");
89 if let Err(e) = self
90 .persist
91 .save_snapshot(aggregate, next_latest_version, session)
92 .await
93 {
94 error!(error = %e, "Failed to save snapshot");
95 return Err(e);
96 }
97 debug!("Snapshot saved successfully");
98
99 Ok(envelopes)
100 }
101}
102
103cqrs_async_trait! {
104impl<A, P> EventStore<A> for EventStoreImpl<A, P>
105where
106 A: Aggregate + 'static,
107 P: EventStoreStorage<A> + MaybeSend + MaybeSync + Clone + Debug + 'static,
108{
109 async fn load_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError> {
110 debug!("Loading snapshot for aggregate");
111 match self.persist.fetch_snapshot(aggregate_id).await {
112 Ok(Some(snapshot)) => {
113 info!(version = %snapshot.version, "Snapshot loaded successfully");
114 Ok(Some(snapshot))
115 }
116 Ok(None) => {
117 debug!("No snapshot found for aggregate");
118 Ok(None)
119 }
120 Err(e) => {
121 error!(error = %e, "Failed to load snapshot");
122 Err(e)
123 }
124 }
125 }
126
127 async fn load_events_from_version(
128 &self,
129 aggregate_id: &str,
130 version: usize,
131 ) -> Result<EventStream<A>, CqrsError> {
132 debug!("Loading events from version");
133 self.persist
134 .fetch_events_from_version(aggregate_id, version)
135 .await
136 }
137
138 async fn load_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError> {
139 debug!("Loading all events for aggregate");
140 self.persist.fetch_all_events(aggregate_id).await
141 }
142
143 async fn load_events_paged(
144 &self,
145 aggregate_id: &str,
146 page: usize,
147 page_size: usize,
148 ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError> {
149 debug!("Loading paged events for aggregate");
150 self.persist
151 .fetch_events_paged(aggregate_id, page, page_size)
152 .await
153 }
154
155 async fn commit(
156 &self,
157 events: Vec<A::Event>,
158 aggregate: &A,
159 metadata: HashMap<String, String>,
160 version: usize,
161 context: &CqrsContext,
162 ) -> Result<Vec<EventEnvelope<A>>, CqrsError> {
163 debug!("Starting commit process");
164
165 let mut session = match self.persist.start_session().await {
166 Ok(session) => {
167 debug!("Session started successfully");
168 session
169 }
170 Err(e) => {
171 error!(error = %e, "Failed to start session");
172 return Err(e);
173 }
174 };
175
176 let result = self
177 .execute_within_session(&mut session, events, aggregate, metadata, version, context)
178 .await;
179
180 match result {
181 Ok(events) => {
182 debug!("Closing session");
183 if let Err(e) = self.persist.close_session(session).await {
184 error!(error = %e, "Failed to close session");
185 return Err(e);
186 }
187 info!(event_count = events.len(), "Commit completed successfully");
188 Ok(events)
189 }
190 Err(e) => {
191 error!(error = %e, "Error during commit, aborting session");
192 let _ = self.persist.abort_session(session).await;
193 Err(e)
194 }
195 }
196 }
197}
198}