use crate::event::ParseEventError;
use crate::event::UnsavedEvent;
use crate::expected_version::ExpectedVersionResult;
use crate::storage::StorageError;
use crate::stream::Stream;
use crate::Event;
use crate::EventStore;
use crate::EventStoreError;
use crate::ExpectedVersion;
use crate::Storage;
use std::str::FromStr;
use tracing::trace;
use uuid::Uuid;
pub struct Appender {
id: Uuid,
correlation_id: Uuid,
span: tracing::Span,
expected_version: ExpectedVersion,
events: Vec<UnsavedEvent>,
stream: String,
}
impl Default for Appender {
fn default() -> Self {
Self::with_correlation_id(Uuid::new_v4())
}
}
impl Appender {
#[tracing::instrument(name = "Appender")]
pub fn with_correlation_id(correlation_id: Uuid) -> Self {
let appender = Self {
id: Uuid::new_v4(),
correlation_id,
span: tracing::Span::current(),
expected_version: ExpectedVersion::AnyVersion,
events: Vec::new(),
stream: String::new(),
};
trace!(
parent: &appender.span,
"Created");
appender
}
pub fn events<E: Event>(mut self, events: &[&E]) -> Result<Self, EventStoreError> {
trace!(
parent: &self.span,
"Attemting to add {} event(s)", events.len());
let events: Vec<Result<UnsavedEvent, ParseEventError>> = events
.iter()
.map(|event| UnsavedEvent::try_from(*event))
.collect();
if events.iter().any(Result::is_err) {
trace!(
parent: &self.span,
"Failed to add {} event(s)", events.len(),);
return Err(EventStoreError::Any);
}
let mut events: Vec<UnsavedEvent> = events.into_iter().map(Result::unwrap).collect();
trace!(
parent: &self.span,
"Added {} event(s)", events.len());
self.events.append(&mut events);
Ok(self)
}
pub fn event<E: Event>(mut self, event: &E) -> Result<Self, EventStoreError> {
trace!(
parent: &self.span,
"Attemting to add 1 event");
self.events.push(UnsavedEvent::try_from(event)?);
trace!(
parent: &self.span,
"Added 1 event");
Ok(self)
}
pub fn to<S: ToString>(mut self, stream: &S) -> Result<Self, EventStoreError> {
self.stream = stream.to_string();
trace!(
parent: &self.span,
"Defined stream {} as target", self.stream,);
Ok(self)
}
#[must_use]
pub fn expected_version(mut self, version: ExpectedVersion) -> Self {
self.expected_version = version;
trace!(
parent: &self.span,
"Defined {:?}", self.expected_version,);
self
}
pub async fn execute<S: Storage>(
self,
event_store: Addr<EventStore<S>>,
) -> Result<Vec<Uuid>, EventStoreError> {
trace!(
parent: &self.span,
"Attempting to execute");
if !Stream::validates_stream_id(&self.stream) {
return Err(EventStoreError::Any);
}
let stream = event_store
.send(crate::connection::StreamInfo {
correlation_id: self.correlation_id,
stream_uuid: self.stream.to_string(),
})
.await?;
let expected_version_result = match stream {
Ok(ref stream) => ExpectedVersion::verify(stream, &self.expected_version),
Err(EventStoreError::Storage(StorageError::StreamDoesntExists)) => {
ExpectedVersion::verify(
&Stream::from_str(&self.stream).unwrap(),
&self.expected_version,
)
}
Err(_) => ExpectedVersionResult::WrongExpectedVersion,
};
let stream = match expected_version_result {
ExpectedVersionResult::NeedCreation => {
trace!(
parent: &self.span,
"Stream {} does not exists", self.stream);
event_store
.send(crate::connection::CreateStream {
correlation_id: self.correlation_id,
stream_uuid: self.stream.to_string(),
})
.await??
}
ExpectedVersionResult::Ok => {
trace!(
parent: &self.span,
"Fetched stream {} informations", self.stream,);
stream.unwrap()
}
_ => {
trace!(
parent: &self.span,
"Stream {} does not exists", self.stream);
return Err(EventStoreError::Any);
}
};
let events: Vec<UnsavedEvent> = self
.events
.into_iter()
.enumerate()
.map(|(index, mut event)| {
event.stream_version = stream.stream_version + (index + 1) as i64;
event.stream_uuid = stream.stream_uuid.clone();
event
})
.collect();
trace!(
parent: &self.span,
"Transformed {} event(s) into appendable events",
events.len(),
);
let res = event_store
.send(AppendToStreamRequest {
correlation_id: self.correlation_id,
stream: self.stream.to_string(),
expected_version: self.expected_version,
events,
})
.await?;
trace!(parent: &self.span, "{}", match res {
Ok(_) => "Successfully executed",
Err(_) => "Unsuccessfully executed",
});
res
}
}
use actix::prelude::*;
#[derive(Debug, Message)]
#[rtype(result = "Result<Vec<Uuid>, EventStoreError>")]
pub struct AppendToStreamRequest {
pub correlation_id: Uuid,
pub stream: String,
pub expected_version: ExpectedVersion,
pub events: Vec<UnsavedEvent>,
}
#[cfg(test)]
mod test {
use super::*;
use crate::storage::inmemory::InMemoryBackend;
use uuid::Uuid;
#[derive(serde::Serialize, serde::Deserialize)]
struct MyEvent {}
impl Event for MyEvent {
fn event_type(&self) -> &'static str {
"MyEvent"
}
fn all_event_types() -> Vec<&'static str> {
vec!["MyEvent"]
}
}
impl std::convert::TryFrom<crate::prelude::RecordedEvent> for MyEvent {
type Error = ();
fn try_from(e: crate::prelude::RecordedEvent) -> Result<Self, Self::Error> {
serde_json::from_value(e.data).map_err(|_| ())
}
}
#[test]
fn that_an_appender_can_be_configured() {
let mut appender = Appender::default();
appender = appender.to(&"stream_name").unwrap();
assert_eq!(appender.stream, "stream_name");
appender = appender.event(&MyEvent {}).unwrap();
assert_eq!(appender.events.len(), 1);
appender = appender.events(&[&MyEvent {}]).unwrap();
assert_eq!(appender.events.len(), 2);
appender = appender.expected_version(ExpectedVersion::AnyVersion);
assert_eq!(appender.expected_version, ExpectedVersion::AnyVersion);
}
#[actix::test]
async fn that_an_appender_can_be_executed() -> Result<(), EventStoreError> {
let es = EventStore::builder()
.storage(InMemoryBackend::default())
.build()
.await
.unwrap();
let addr = es.start();
let uuid = Uuid::new_v4();
let event = MyEvent {};
let res = Appender::default()
.to(&uuid)?
.event(&event)?
.expected_version(ExpectedVersion::StreamExists)
.execute(addr.clone())
.await;
assert_eq!(res, Err(EventStoreError::Any));
let res = Appender::default()
.to(&uuid)?
.event(&event)?
.execute(addr)
.await;
assert!(res.is_ok());
Ok(())
}
}