event_store/
lib.rs

1#![warn(
2    clippy::all,
3    clippy::pedantic,
4    clippy::nursery,
5    // clippy::cargo
6)]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::toplevel_ref_arg)]
9#![allow(clippy::similar_names)]
10#![allow(dead_code)]
11//! # EventStore
12//!
13//! The `EventStore` will allow you to deal with every aspects of the event sourcing part of Chekov.
14//!
15//!
16//! ## Appending an event
17//!
18//!
19//! Events are appended by using the fluent API exposed at the root level of the event_store crate:
20//!
21//! ```rust
22//! use event_store::prelude::*;
23//! # use uuid::Uuid;
24//! # use std::convert::TryFrom;
25//! # use actix::Actor;
26//! #
27//! # #[derive(serde::Deserialize, serde::Serialize)]
28//! # struct MyEvent{
29//! #     account_id: Uuid
30//! # }
31//! # impl Event for MyEvent {
32//! #     fn event_type(&self) -> &'static str { "MyEvent" }
33//! #     fn all_event_types() -> Vec<&'static str> { vec!["MyEvent"] }
34//! # }
35//! # impl TryFrom<RecordedEvent> for MyEvent {
36//! #     type Error = ();
37//! #      fn try_from(e: RecordedEvent) -> Result<Self, ()> {
38//! #        serde_json::from_value(e.data).map_err(|_| ())
39//! #      }
40//! # }
41//! # #[actix::main]
42//! # async fn reading() -> Result<(), Box<dyn std::error::Error>>{
43//! # let mut event_store = EventStore::builder().storage(InMemoryBackend::default()).build().await.unwrap().start();
44//!
45//! let stream_uuid = Uuid::new_v4().to_string();
46//! let my_event = MyEvent { account_id: Uuid::new_v4() };
47//!
48//! event_store::append()
49//!   .event(&my_event)?
50//!   .to(&stream_uuid)?
51//!   .execute(event_store)
52//!   .await;
53//! # Ok(())
54//! # }
55//! ```
56//!
57//! ## Reading from stream
58//!
59//!
60//! A `Stream` can be read with the fluent API exposed at the root level of the event_store crate:
61//!
62//! ```rust
63//! use event_store::prelude::*;
64//! # use uuid::Uuid;
65//! # use actix::Actor;
66//! #
67//! # #[actix::main]
68//! # async fn reading() -> Result<(), Box<dyn std::error::Error>>{
69//! # let mut event_store = EventStore::builder().storage(InMemoryBackend::default()).build().await.unwrap().start();
70//!
71//! let stream_uuid = Uuid::new_v4().to_string();
72//!
73//! event_store::read()
74//!   .stream(&stream_uuid)?
75//!   .from(ReadVersion::Origin)
76//!   .limit(10)
77//!   .execute(event_store)
78//!   .await;
79//! # Ok(())
80//! # }
81//! ```
82
83mod connection;
84mod error;
85mod event;
86mod expected_version;
87mod read_version;
88mod storage;
89mod stream;
90mod subscriptions;
91
92use actix::prelude::*;
93use connection::{Append, Connection, CreateStream, Read, StreamInfo};
94use error::EventStoreError;
95pub use event::Event;
96use event::{ParseEventError, RecordedEvent};
97use expected_version::ExpectedVersion;
98use tracing::{debug, info, instrument, trace, warn};
99
100use read_version::ReadVersion;
101use std::borrow::Cow;
102use storage::{appender::Appender, reader::Reader, Storage};
103use tracing_futures::Instrument;
104use uuid::Uuid;
105
106/// An `EventStore` that hold a storage connection
107#[derive(Clone)]
108pub struct EventStore<S: Storage> {
109    connection: Addr<Connection<S>>,
110}
111
112impl<S: Storage> std::default::Default for EventStore<S> {
113    fn default() -> Self {
114        unimplemented!()
115    }
116}
117
118impl<S> ::actix::Supervised for EventStore<S> where S: 'static + Storage + std::marker::Unpin {}
119impl<S> ::actix::Actor for EventStore<S>
120where
121    S: 'static + Storage + std::marker::Unpin,
122{
123    type Context = ::actix::Context<Self>;
124}
125
126impl<S: Storage> Handler<storage::reader::ReadStreamRequest> for EventStore<S> {
127    type Result = actix::ResponseActFuture<Self, Result<Vec<RecordedEvent>, EventStoreError>>;
128
129    #[tracing::instrument(name = "EventStore::ReadStreamRequest", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
130    fn handle(
131        &mut self,
132        e: storage::reader::ReadStreamRequest,
133        _ctx: &mut Self::Context,
134    ) -> Self::Result {
135        let stream: String = e.stream.to_string();
136
137        info!("Attempting to read {} stream event(s)", stream);
138
139        let connection = self.connection.clone();
140
141        let fut = async move {
142            match connection
143                .send(Read {
144                    correlation_id: e.correlation_id,
145                    #[cfg(feature = "verbose")]
146                    stream: stream.clone(),
147                    #[cfg(not(feature = "verbose"))]
148                    stream,
149                    version: e.version,
150                    limit: e.limit,
151                })
152                .await?
153            {
154                Ok(events) => {
155                    #[cfg(feature = "verbose")]
156                    info!("Read {} event(s) to {}", events.len(), stream);
157                    Ok(events)
158                }
159                Err(e) => {
160                    #[cfg(feature = "verbose")]
161                    info!("Failed to read event(s) from {}", stream);
162                    Err(e)
163                }
164            }
165        }
166        .instrument(tracing::Span::current())
167        .into_actor(self);
168
169        Box::pin(fut)
170    }
171}
172
173impl<S: Storage> Handler<StreamInfo> for EventStore<S> {
174    type Result = actix::ResponseActFuture<
175        Self,
176        Result<Cow<'static, crate::stream::Stream>, EventStoreError>,
177    >;
178
179    #[tracing::instrument(name = "EventStore::StreamInfo", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
180    fn handle(&mut self, e: StreamInfo, _ctx: &mut Self::Context) -> Self::Result {
181        debug!("Asking for stream {} infos", e.stream_uuid);
182
183        let connection = self.connection.clone();
184        let fut = async move { connection.send(e).await? }.into_actor(self);
185
186        Box::pin(fut)
187    }
188}
189
190impl<S: Storage> Handler<CreateStream> for EventStore<S> {
191    type Result = actix::ResponseActFuture<
192        Self,
193        Result<Cow<'static, crate::stream::Stream>, EventStoreError>,
194    >;
195
196    #[tracing::instrument(name = "EventStore::CreateStream", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
197    fn handle(&mut self, e: CreateStream, _ctx: &mut Self::Context) -> Self::Result {
198        debug!("Creating stream {}", e.stream_uuid);
199
200        let connection = self.connection.clone();
201        let fut = async move { connection.send(e).await? }.into_actor(self);
202
203        Box::pin(fut)
204    }
205}
206
207impl<S: Storage> Handler<storage::appender::AppendToStreamRequest> for EventStore<S> {
208    type Result = actix::ResponseActFuture<Self, Result<Vec<Uuid>, EventStoreError>>;
209
210    #[tracing::instrument(name = "EventStore::AppendToStream", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
211    fn handle(
212        &mut self,
213        e: storage::appender::AppendToStreamRequest,
214        _ctx: &mut Self::Context,
215    ) -> Self::Result {
216        let stream: String = e.stream.to_string();
217
218        #[cfg(feature = "verbose")]
219        let events_number = e.events.len();
220        info!(
221            "Attempting to append {} event(s) to {} with ExpectedVersion::{:?}",
222            e.events.len(),
223            e.stream,
224            e.expected_version
225        );
226
227        let connection = self.connection.clone();
228        let fut = async move {
229            match connection
230                .send(Append {
231                    correlation_id: e.correlation_id,
232                    #[cfg(feature = "verbose")]
233                    stream: stream.clone(),
234                    #[cfg(not(feature = "verbose"))]
235                    stream,
236                    expected_version: e.expected_version,
237                    events: e.events,
238                })
239                .await?
240            {
241                Ok(events) => {
242                    #[cfg(feature = "verbose")]
243                    info!("Appended {} event(s) to {}", events.len(), stream);
244                    Ok(events)
245                }
246                Err(e) => {
247                    #[cfg(feature = "verbose")]
248                    info!("Failed to append {} event(s) to {}", events_number, stream);
249                    Err(e)
250                }
251            }
252        }
253        .instrument(tracing::Span::current())
254        .into_actor(self);
255
256        Box::pin(fut)
257    }
258}
259
260impl<S> EventStore<S>
261where
262    S: 'static + Storage + std::marker::Unpin,
263{
264    #[must_use]
265    pub fn builder() -> EventStoreBuilder<S> {
266        EventStoreBuilder { storage: None }
267    }
268}
269
270/// Create an `Appender` to append events
271#[must_use]
272pub fn append() -> Appender {
273    Appender::default()
274}
275
276/// Create a `Reader` to read a stream
277#[must_use]
278pub fn read() -> Reader {
279    Reader::default()
280}
281
282/// Builder use to simplify the `EventStore` creation
283#[derive(Debug)]
284pub struct EventStoreBuilder<S: Storage> {
285    storage: Option<S>,
286}
287
288impl<S> EventStoreBuilder<S>
289where
290    S: Storage,
291{
292    /// Define which storage will be used by this building `EventStore`
293    pub fn storage(mut self, storage: S) -> Self {
294        self.storage = Some(storage);
295
296        self
297    }
298
299    /// Try to build the previously configured `EventStore`
300    ///
301    /// # Errors
302    ///
303    /// For now this method can fail only if you haven't define a `Storage`
304    // #[instrument(level = "trace", name = "my_name", skip(self))]
305    #[instrument(level = "trace", name = "EventStoreBuilder::build", skip(self))]
306    pub async fn build(self) -> Result<EventStore<S>, ()> {
307        if self.storage.is_none() {
308            return Err(());
309        }
310
311        trace!("Creating EventStore with {} storage", S::storage_name());
312        Ok(EventStore {
313            connection: Connection::make(self.storage.unwrap()).start(),
314        })
315    }
316}
317
318pub mod prelude {
319    pub use crate::connection::StreamInfo;
320    pub use crate::error::EventStoreError;
321    pub use crate::event::{Event, RecordedEvent, RecordedEvents, UnsavedEvent};
322    pub use crate::expected_version::ExpectedVersion;
323    pub use crate::read_version::ReadVersion;
324    pub use crate::storage::{
325        appender::Appender, inmemory::InMemoryBackend, postgres::PostgresBackend, reader::Reader,
326        Storage, StorageError,
327    };
328    pub use crate::stream::Stream;
329    pub use crate::EventStore;
330}