1#![warn(
2 clippy::all,
3 clippy::pedantic,
4 clippy::nursery,
5 )]
7#![allow(clippy::module_name_repetitions)]
8#![allow(clippy::toplevel_ref_arg)]
9#![allow(clippy::similar_names)]
10#![allow(dead_code)]
11mod connection;
84mod error;
85mod event;
86mod expected_version;
87mod read_version;
88mod storage;
89mod stream;
90mod subscriptions;
91
92use actix::prelude::*;
93use connection::{Append, Connection, CreateStream, Read, StreamInfo};
94use error::EventStoreError;
95pub use event::Event;
96use event::{ParseEventError, RecordedEvent};
97use expected_version::ExpectedVersion;
98use tracing::{debug, info, instrument, trace, warn};
99
100use read_version::ReadVersion;
101use std::borrow::Cow;
102use storage::{appender::Appender, reader::Reader, Storage};
103use tracing_futures::Instrument;
104use uuid::Uuid;
105
106#[derive(Clone)]
108pub struct EventStore<S: Storage> {
109 connection: Addr<Connection<S>>,
110}
111
112impl<S: Storage> std::default::Default for EventStore<S> {
113 fn default() -> Self {
114 unimplemented!()
115 }
116}
117
118impl<S> ::actix::Supervised for EventStore<S> where S: 'static + Storage + std::marker::Unpin {}
119impl<S> ::actix::Actor for EventStore<S>
120where
121 S: 'static + Storage + std::marker::Unpin,
122{
123 type Context = ::actix::Context<Self>;
124}
125
126impl<S: Storage> Handler<storage::reader::ReadStreamRequest> for EventStore<S> {
127 type Result = actix::ResponseActFuture<Self, Result<Vec<RecordedEvent>, EventStoreError>>;
128
129 #[tracing::instrument(name = "EventStore::ReadStreamRequest", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
130 fn handle(
131 &mut self,
132 e: storage::reader::ReadStreamRequest,
133 _ctx: &mut Self::Context,
134 ) -> Self::Result {
135 let stream: String = e.stream.to_string();
136
137 info!("Attempting to read {} stream event(s)", stream);
138
139 let connection = self.connection.clone();
140
141 let fut = async move {
142 match connection
143 .send(Read {
144 correlation_id: e.correlation_id,
145 #[cfg(feature = "verbose")]
146 stream: stream.clone(),
147 #[cfg(not(feature = "verbose"))]
148 stream,
149 version: e.version,
150 limit: e.limit,
151 })
152 .await?
153 {
154 Ok(events) => {
155 #[cfg(feature = "verbose")]
156 info!("Read {} event(s) to {}", events.len(), stream);
157 Ok(events)
158 }
159 Err(e) => {
160 #[cfg(feature = "verbose")]
161 info!("Failed to read event(s) from {}", stream);
162 Err(e)
163 }
164 }
165 }
166 .instrument(tracing::Span::current())
167 .into_actor(self);
168
169 Box::pin(fut)
170 }
171}
172
173impl<S: Storage> Handler<StreamInfo> for EventStore<S> {
174 type Result = actix::ResponseActFuture<
175 Self,
176 Result<Cow<'static, crate::stream::Stream>, EventStoreError>,
177 >;
178
179 #[tracing::instrument(name = "EventStore::StreamInfo", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
180 fn handle(&mut self, e: StreamInfo, _ctx: &mut Self::Context) -> Self::Result {
181 debug!("Asking for stream {} infos", e.stream_uuid);
182
183 let connection = self.connection.clone();
184 let fut = async move { connection.send(e).await? }.into_actor(self);
185
186 Box::pin(fut)
187 }
188}
189
190impl<S: Storage> Handler<CreateStream> for EventStore<S> {
191 type Result = actix::ResponseActFuture<
192 Self,
193 Result<Cow<'static, crate::stream::Stream>, EventStoreError>,
194 >;
195
196 #[tracing::instrument(name = "EventStore::CreateStream", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
197 fn handle(&mut self, e: CreateStream, _ctx: &mut Self::Context) -> Self::Result {
198 debug!("Creating stream {}", e.stream_uuid);
199
200 let connection = self.connection.clone();
201 let fut = async move { connection.send(e).await? }.into_actor(self);
202
203 Box::pin(fut)
204 }
205}
206
207impl<S: Storage> Handler<storage::appender::AppendToStreamRequest> for EventStore<S> {
208 type Result = actix::ResponseActFuture<Self, Result<Vec<Uuid>, EventStoreError>>;
209
210 #[tracing::instrument(name = "EventStore::AppendToStream", skip(self, e, _ctx), fields(correlation_id = %e.correlation_id))]
211 fn handle(
212 &mut self,
213 e: storage::appender::AppendToStreamRequest,
214 _ctx: &mut Self::Context,
215 ) -> Self::Result {
216 let stream: String = e.stream.to_string();
217
218 #[cfg(feature = "verbose")]
219 let events_number = e.events.len();
220 info!(
221 "Attempting to append {} event(s) to {} with ExpectedVersion::{:?}",
222 e.events.len(),
223 e.stream,
224 e.expected_version
225 );
226
227 let connection = self.connection.clone();
228 let fut = async move {
229 match connection
230 .send(Append {
231 correlation_id: e.correlation_id,
232 #[cfg(feature = "verbose")]
233 stream: stream.clone(),
234 #[cfg(not(feature = "verbose"))]
235 stream,
236 expected_version: e.expected_version,
237 events: e.events,
238 })
239 .await?
240 {
241 Ok(events) => {
242 #[cfg(feature = "verbose")]
243 info!("Appended {} event(s) to {}", events.len(), stream);
244 Ok(events)
245 }
246 Err(e) => {
247 #[cfg(feature = "verbose")]
248 info!("Failed to append {} event(s) to {}", events_number, stream);
249 Err(e)
250 }
251 }
252 }
253 .instrument(tracing::Span::current())
254 .into_actor(self);
255
256 Box::pin(fut)
257 }
258}
259
260impl<S> EventStore<S>
261where
262 S: 'static + Storage + std::marker::Unpin,
263{
264 #[must_use]
265 pub fn builder() -> EventStoreBuilder<S> {
266 EventStoreBuilder { storage: None }
267 }
268}
269
270#[must_use]
272pub fn append() -> Appender {
273 Appender::default()
274}
275
276#[must_use]
278pub fn read() -> Reader {
279 Reader::default()
280}
281
282#[derive(Debug)]
284pub struct EventStoreBuilder<S: Storage> {
285 storage: Option<S>,
286}
287
288impl<S> EventStoreBuilder<S>
289where
290 S: Storage,
291{
292 pub fn storage(mut self, storage: S) -> Self {
294 self.storage = Some(storage);
295
296 self
297 }
298
299 #[instrument(level = "trace", name = "EventStoreBuilder::build", skip(self))]
306 pub async fn build(self) -> Result<EventStore<S>, ()> {
307 if self.storage.is_none() {
308 return Err(());
309 }
310
311 trace!("Creating EventStore with {} storage", S::storage_name());
312 Ok(EventStore {
313 connection: Connection::make(self.storage.unwrap()).start(),
314 })
315 }
316}
317
318pub mod prelude {
319 pub use crate::connection::StreamInfo;
320 pub use crate::error::EventStoreError;
321 pub use crate::event::{Event, RecordedEvent, RecordedEvents, UnsavedEvent};
322 pub use crate::expected_version::ExpectedVersion;
323 pub use crate::read_version::ReadVersion;
324 pub use crate::storage::{
325 appender::Appender, inmemory::InMemoryBackend, postgres::PostgresBackend, reader::Reader,
326 Storage, StorageError,
327 };
328 pub use crate::stream::Stream;
329 pub use crate::EventStore;
330}