use crate::{Error, Result};
use serde::Serialize;
use serde_json::Value;
use sqlx::PgPool;
use uuid::Uuid;
#[derive(Clone, Copy, Debug)]
pub enum Expected {
Any,
NoStream,
Exact(i32),
}
#[derive(Clone, Debug)]
pub struct Event {
pub typ: String,
pub body: Value,
}
impl Event {
pub fn new<T: Serialize>(typ: impl Into<String>, body: &T) -> Self {
Self {
typ: typ.into(),
body: serde_json::to_value(body).expect("failed to serialize event body"),
}
}
}
#[derive(Default, Clone, Debug)]
pub struct AppendOptions {
pub headers: Option<Value>,
pub causation_id: Option<Uuid>,
pub correlation_id: Option<Uuid>,
}
#[derive(Clone)]
pub struct Events {
pub(crate) pool: PgPool,
pub(crate) use_advisory_lock: bool,
}
#[derive(Clone, Debug)]
pub struct EventEnvelope {
pub global_seq: i64,
pub stream_id: Uuid,
pub stream_seq: i32,
pub typ: String,
pub body: Value,
pub headers: Value,
pub causation_id: Option<Uuid>,
pub correlation_id: Option<Uuid>,
pub created_at: chrono::DateTime<chrono::Utc>,
}
impl Events {
pub fn with_advisory_locks(mut self) -> Self {
self.use_advisory_lock = true;
self
}
pub async fn append_stream(
&self,
stream_id: Uuid,
expected: Expected,
events: Vec<Event>,
) -> Result<()> {
self.append_with(stream_id, expected, events, &AppendOptions::default())
.await
}
pub async fn append_stream_with_headers(
&self,
stream_id: Uuid,
expected: Expected,
events: Vec<Event>,
headers: &Value,
causation_id: Option<Uuid>,
correlation_id: Option<Uuid>,
) -> Result<()> {
let opts = AppendOptions {
headers: Some(headers.clone()),
causation_id,
correlation_id,
};
self.append_with(stream_id, expected, events, &opts).await
}
pub async fn read_stream(&self, stream_id: Uuid) -> Result<Vec<(i32, Event)>> {
let rows: Vec<(i32, String, Value)> = sqlx::query_as(
"select stream_seq, event_type, body from events where stream_id = $1 order by stream_seq asc",
)
.bind(stream_id)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(|(seq, typ, body)| (seq, Event { typ, body }))
.collect())
}
pub async fn read_stream_envelopes(&self, stream_id: Uuid) -> Result<Vec<EventEnvelope>> {
#[allow(clippy::type_complexity)]
let rows: Vec<(
i64,
Uuid,
i32,
String,
Value,
Value,
Option<Uuid>,
Option<Uuid>,
chrono::DateTime<chrono::Utc>,
)> = sqlx::query_as(
r#"select global_seq, stream_id, stream_seq, event_type, body, headers, causation_id, correlation_id, created_at
from events where stream_id = $1 order by stream_seq asc"#,
)
.bind(stream_id)
.fetch_all(&self.pool)
.await?;
Ok(rows
.into_iter()
.map(
|(
global_seq,
stream_id,
stream_seq,
typ,
body,
headers,
causation_id,
correlation_id,
created_at,
)| EventEnvelope {
global_seq,
stream_id,
stream_seq,
typ,
body,
headers,
causation_id,
correlation_id,
created_at,
},
)
.collect())
}
pub async fn append_with(
&self,
stream_id: Uuid,
expected: Expected,
events: Vec<Event>,
opts: &AppendOptions,
) -> Result<()> {
if events.is_empty() {
return Ok(());
}
let mut tx = self.pool.begin().await?;
if self.use_advisory_lock {
let key = stream_id.to_string();
sqlx::query("select pg_advisory_xact_lock(hashtext($1)::bigint)")
.bind(&key)
.execute(&mut *tx)
.await?;
}
let current: i32 = sqlx::query_scalar::<_, Option<i32>>(
"select max(stream_seq) from events where stream_id = $1",
)
.bind(stream_id)
.fetch_one(&mut *tx)
.await?
.unwrap_or(0);
match expected {
Expected::Any => {}
Expected::NoStream if current != 0 => return Err(Error::VersionConflict),
Expected::Exact(value) if value != current => return Err(Error::VersionConflict),
_ => {}
}
let mut seq = current;
let headers = opts
.headers
.clone()
.unwrap_or_else(|| Value::Object(serde_json::Map::new()));
for event in events {
seq += 1;
let res = sqlx::query(
r#"insert into events (stream_id, stream_seq, event_type, body, headers, causation_id, correlation_id)
values ($1, $2, $3, $4, $5, $6, $7)"#,
)
.bind(stream_id)
.bind(seq)
.bind(&event.typ)
.bind(&event.body)
.bind(&headers)
.bind(opts.causation_id)
.bind(opts.correlation_id)
.execute(&mut *tx)
.await;
if let Err(e) = res {
if let sqlx::Error::Database(db_err) = &e {
let msg = db_err.message().to_lowercase();
if msg.contains("events_idemp_key_uq")
|| msg.contains("unique") && msg.contains("idempotency_key")
{
return Err(Error::IdempotencyConflict);
}
}
return Err(e.into());
}
}
tx.commit().await?;
Ok(())
}
pub fn builder(&self, stream_id: Uuid) -> AppendBuilder {
AppendBuilder {
events: Vec::new(),
opts: AppendOptions::default(),
expected: Expected::Any,
stream_id,
inner: self.clone(),
}
}
}
pub struct AppendBuilder {
inner: Events,
stream_id: Uuid,
expected: Expected,
opts: AppendOptions,
events: Vec<Event>,
}
impl AppendBuilder {
pub fn expected(mut self, expected: Expected) -> Self {
self.expected = expected;
self
}
pub fn headers(mut self, headers: serde_json::Value) -> Self {
self.opts.headers = Some(headers);
self
}
pub fn idempotency_key(mut self, key: impl Into<String>) -> Self {
let mut h = self
.opts
.headers
.take()
.unwrap_or_else(|| Value::Object(serde_json::Map::new()));
if let Value::Object(ref mut m) = h {
m.insert("idempotency_key".to_string(), Value::String(key.into()));
}
self.opts.headers = Some(h);
self
}
pub fn causation_id(mut self, id: Uuid) -> Self {
self.opts.causation_id = Some(id);
self
}
pub fn correlation_id(mut self, id: Uuid) -> Self {
self.opts.correlation_id = Some(id);
self
}
pub fn push(mut self, event: Event) -> Self {
self.events.push(event);
self
}
pub fn extend<I: IntoIterator<Item = Event>>(mut self, iter: I) -> Self {
self.events.extend(iter);
self
}
pub async fn send(self) -> Result<()> {
self.inner
.append_with(self.stream_id, self.expected, self.events, &self.opts)
.await
}
}