Skip to main content

cqrs_rust_lib/es/
impl.rs

1use crate::es::storage::{EventStoreStorage, EventStream};
2use crate::{
3    Aggregate, CqrsContext, CqrsError, EventEnvelope, EventStore, MaybeSend, MaybeSync, Snapshot,
4};
5use std::collections::HashMap;
6use std::fmt::Debug;
7use std::sync::Arc;
8use tracing::{debug, error, info};
9
10#[derive(Debug, Clone)]
11pub struct EventStoreImpl<A, P>
12where
13    A: Aggregate + 'static,
14    P: EventStoreStorage<A> + MaybeSend + MaybeSync + Clone + Debug + 'static,
15{
16    _phantom: std::marker::PhantomData<(A, P)>,
17    persist: P,
18}
19
20impl<A, P> EventStoreImpl<A, P>
21where
22    A: Aggregate + 'static,
23    P: EventStoreStorage<A> + MaybeSend + MaybeSync + Clone + Debug + 'static,
24{
25    #[must_use]
26    pub fn new(persist: P) -> Arc<Self> {
27        Arc::new(Self {
28            _phantom: Default::default(),
29            persist,
30        })
31    }
32
33    async fn execute_within_session(
34        &self,
35        session: &mut P::Session,
36        events: Vec<A::Event>,
37        aggregate: &A,
38        metadata: HashMap<String, String>,
39        version: usize,
40        context: &CqrsContext,
41    ) -> Result<Vec<EventEnvelope<A>>, CqrsError> {
42        let latest_event = match self.persist.fetch_latest_event(aggregate, session).await {
43            Ok(event) => {
44                debug!(has_event = event.is_some(), "Fetched latest event");
45                event
46            }
47            Err(e) => {
48                error!(error = %e, "Failed to fetch latest event");
49                return Err(e);
50            }
51        };
52
53        let latest_version = latest_event.map(|e| e.version).unwrap_or(0);
54        debug!(latest_version = %latest_version, expected_version = %version, "Checking version");
55
56        if version != latest_version {
57            error!(latest_version = %latest_version, expected_version = %version, "Version conflict detected");
58            return Err(CqrsError::concurrency_error());
59        }
60
61        debug!("Creating event envelopes");
62        let envelopes = events
63            .iter()
64            .enumerate()
65            .map(|(i, e)| {
66                let event_id = context.next_uuid();
67                let event_version = version + i + 1;
68                debug!(event_id = %event_id, event_version = %event_version, "Creating event envelope");
69                EventEnvelope {
70                    event_id,
71                    aggregate_id: aggregate.aggregate_id(),
72                    version: event_version,
73                    payload: e.clone(),
74                    metadata: metadata.clone(),
75                    at: context.now(),
76                }
77            })
78            .collect::<Vec<_>>();
79
80        debug!(event_count = envelopes.len(), "Saving events");
81        if let Err(e) = self.persist.save_events(envelopes.clone(), session).await {
82            error!(error = %e, "Failed to save events");
83            return Err(e);
84        }
85        debug!("Events saved successfully");
86
87        let next_latest_version = version + envelopes.len();
88        debug!(next_version = %next_latest_version, "Saving snapshot");
89        if let Err(e) = self
90            .persist
91            .save_snapshot(aggregate, next_latest_version, session)
92            .await
93        {
94            error!(error = %e, "Failed to save snapshot");
95            return Err(e);
96        }
97        debug!("Snapshot saved successfully");
98
99        Ok(envelopes)
100    }
101}
102
103cqrs_async_trait! {
104impl<A, P> EventStore<A> for EventStoreImpl<A, P>
105where
106    A: Aggregate + 'static,
107    P: EventStoreStorage<A> + MaybeSend + MaybeSync + Clone + Debug + 'static,
108{
109    async fn load_snapshot(&self, aggregate_id: &str) -> Result<Option<Snapshot<A>>, CqrsError> {
110        debug!("Loading snapshot for aggregate");
111        match self.persist.fetch_snapshot(aggregate_id).await {
112            Ok(Some(snapshot)) => {
113                info!(version = %snapshot.version, "Snapshot loaded successfully");
114                Ok(Some(snapshot))
115            }
116            Ok(None) => {
117                debug!("No snapshot found for aggregate");
118                Ok(None)
119            }
120            Err(e) => {
121                error!(error = %e, "Failed to load snapshot");
122                Err(e)
123            }
124        }
125    }
126
127    async fn load_events_from_version(
128        &self,
129        aggregate_id: &str,
130        version: usize,
131    ) -> Result<EventStream<A>, CqrsError> {
132        debug!("Loading events from version");
133        self.persist
134            .fetch_events_from_version(aggregate_id, version)
135            .await
136    }
137
138    async fn load_events(&self, aggregate_id: &str) -> Result<EventStream<A>, CqrsError> {
139        debug!("Loading all events for aggregate");
140        self.persist.fetch_all_events(aggregate_id).await
141    }
142
143    async fn load_events_paged(
144        &self,
145        aggregate_id: &str,
146        page: usize,
147        page_size: usize,
148    ) -> Result<(Vec<EventEnvelope<A>>, i64), CqrsError> {
149        debug!("Loading paged events for aggregate");
150        self.persist
151            .fetch_events_paged(aggregate_id, page, page_size)
152            .await
153    }
154
155    async fn commit(
156        &self,
157        events: Vec<A::Event>,
158        aggregate: &A,
159        metadata: HashMap<String, String>,
160        version: usize,
161        context: &CqrsContext,
162    ) -> Result<Vec<EventEnvelope<A>>, CqrsError> {
163        debug!("Starting commit process");
164
165        let mut session = match self.persist.start_session().await {
166            Ok(session) => {
167                debug!("Session started successfully");
168                session
169            }
170            Err(e) => {
171                error!(error = %e, "Failed to start session");
172                return Err(e);
173            }
174        };
175
176        let result = self
177            .execute_within_session(&mut session, events, aggregate, metadata, version, context)
178            .await;
179
180        match result {
181            Ok(events) => {
182                debug!("Closing session");
183                if let Err(e) = self.persist.close_session(session).await {
184                    error!(error = %e, "Failed to close session");
185                    return Err(e);
186                }
187                info!(event_count = events.len(), "Commit completed successfully");
188                Ok(events)
189            }
190            Err(e) => {
191                error!(error = %e, "Error during commit, aborting session");
192                let _ = self.persist.abort_session(session).await;
193                Err(e)
194            }
195        }
196    }
197}
198}