use anyhow::{anyhow, Context};
use async_trait::async_trait;
use bb8::Pool;
use bb8_tiberius::ConnectionManager;
use chrono::{DateTime, Utc};
use eventful::{JournalStore, Snapshot, SnapshotStore};
use serde::de::DeserializeOwned;
use serde::Serialize;
use std::fmt::Debug;
use tiberius::Row;
#[derive(Clone)]
pub struct SqlServerJournalStore {
pub pool: Pool<ConnectionManager>,
}
#[derive(Clone)]
pub struct SqlServerSnapshotStore {
pub pool: Pool<ConnectionManager>,
}
#[async_trait]
impl JournalStore for SqlServerJournalStore {
async fn get_initial_offset(&self) -> anyhow::Result<u64> {
const SQL: &str = "select max(offset) as max_offset from journal;";
tracing::info!("Getting initial sequence number");
let mut conn = self.pool.get().await?;
let rows: Vec<Row> = conn.query(SQL, &[]).await?.into_first_result().await?;
if rows.len() != 1 {
return Err(anyhow!("Expected 1 row, got {}", rows.len()));
}
let max_offset: i64 = rows[0].try_get(0).context("failed to read max_offset")?.unwrap_or(0);
tracing::info!("Got {} initial sequence number", max_offset);
Ok(max_offset as u64)
}
async fn get_events_from_journal(&self, offset: u64) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
const SQL: &str = "select offset,message from journal where journal.offset >= @p1 order by offset asc;";
let mut conn = self.pool.get().await?;
let offset = offset as i64;
let rows: Vec<Row> = conn.query(SQL, &[&offset]).await?.into_first_result().await?;
tracing::debug!("get_events_from_journal from offset {}, got {} rows", offset, rows.len());
let mut r = vec![];
for row in rows {
let offset: i64 = row.try_get(0).context("failed to read offset")?.expect("offset should not be null");
let data: &[u8] = row.try_get(1).context("failed to read data")?.expect("data should not be null");
r.push((offset as u64, data.to_vec()));
}
Ok(r)
}
async fn load_entity_events(&self, entity_type_name: &str, persistence_id: &str, offset: u64) -> anyhow::Result<Vec<Vec<u8>>> {
const SQL: &str = "select message from journal where entity_type = @P1 and persistence_id = @P2 and offset >= @P3 order by offset asc;";
let mut conn = self.pool.get().await?;
let sql_compatible_offset = offset as i64;
let rows: Vec<Row> = conn.query(SQL, &[&entity_type_name, &persistence_id, &sql_compatible_offset]).await?.into_first_result().await?;
tracing::debug!("load_entity {} ({}), got {} rows from journal", entity_type_name, persistence_id, rows.len());
if rows.is_empty() {
Ok(vec![])
} else {
let mut v = vec![];
for row in rows {
let message: &[u8] = row.try_get(0).context("failed to read message")?.unwrap();
let message = message.to_vec();
v.push(message);
}
Ok(v)
}
}
async fn persist_event_to_journal(&self, entity_type_name: &str, event_type: &str, event_date: &DateTime<Utc>, bytes: &[u8], persistence_id: &str, offset: u64) -> anyhow::Result<()> {
tracing::trace!("persist_event_to_journal: {:?} bytes", bytes.len());
const SQL: &str = "insert into journal(offset,entity_type,persistence_id,event_type,message,event_date) values (@p1,@p2,@p3,@p4,@p5,@p6);";
let mut conn = self.pool.get().await?;
let offset = offset as i64;
let results = conn.execute(SQL, &[&offset, &entity_type_name, &persistence_id, &event_type, &bytes, event_date]).await?;
tracing::trace!("persist_event_to_journal rows_affected: {:?}", results.rows_affected());
Ok(())
}
}
#[async_trait]
impl SnapshotStore for SqlServerSnapshotStore {
async fn read_snapshot<'de, S>(&self, name: &str) -> anyhow::Result<Option<Snapshot<S>>>
where
S: DeserializeOwned,
{
const SQL: &str = "select offset, value from snapshot where name = @P1;";
let mut conn = self.pool.get().await?;
let rows: Vec<Row> = conn.query(SQL, &[&name]).await?.into_first_result().await?;
tracing::debug!("read_snapshot {}, got {} rows", name, rows.len());
if rows.is_empty() {
Ok(None)
} else {
let offset: i64 = rows[0].try_get(0).context("failed to read snapshot offset value from data row")?.unwrap();
let bytes: &[u8] = rows[0].try_get(1).context("failed to read snapshot value from data row")?.unwrap();
let json = serde_json::from_slice::<S>(bytes).context("failed to deserialise snapshot from json")?;
Ok(Some(Snapshot { offset: offset as u64, value: json }))
}
}
async fn write_snapshot<S>(&self, name: &str, offset: u64, value: &S) -> anyhow::Result<()>
where
S: Debug + Serialize + Sync,
{
tracing::trace!("write_snapshot at offset: {} for value: {:?}", offset, value);
const SQL: &str = "merge snapshot as s
using (select @p1 name, @p2 offset, @p3 value) as p
on s.name = p.name
when matched then
update
set offset = p.offset, value = p.value
when not matched then
insert (name, offset, value)
values (@p1, @p2, @p3);";
let mut conn = self.pool.get().await?;
let json = serde_json::to_string(value).context("failed to serialise snapshot to json")?;
let bytes = json.as_bytes();
let sql_compatible_offset = offset as i64;
let results = conn.execute(SQL, &[&name, &sql_compatible_offset, &bytes]).await?;
tracing::trace!("write_snapshot rows_affected: {:?}", results.rows_affected());
Ok(())
}
}
pub async fn build_connection(connection_string: &str) -> Result<Pool<ConnectionManager>, bb8_tiberius::Error> {
tracing::info!("CONN: {}", connection_string);
let mgr = ConnectionManager::build(connection_string)?;
Pool::builder().connection_timeout(std::time::Duration::from_millis(15000)).max_size(6).build(mgr).await
}