eventful_sql_server/
lib.rs

1use anyhow::{anyhow, Context};
2use async_trait::async_trait;
3use bb8::Pool;
4use bb8_tiberius::ConnectionManager;
5use chrono::{DateTime, Utc};
6use eventful::{JournalStore, Snapshot, SnapshotStore};
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use std::fmt::Debug;
10use tiberius::Row;
11
12#[derive(Clone)]
13pub struct SqlServerJournalStore {
14    pub pool: Pool<ConnectionManager>,
15}
16
17#[derive(Clone)]
18pub struct SqlServerSnapshotStore {
19    pub pool: Pool<ConnectionManager>,
20}
21
22#[async_trait]
23impl JournalStore for SqlServerJournalStore {
24    async fn get_initial_offset(&self) -> anyhow::Result<u64> {
25        const SQL: &str = "select max(offset) as max_offset from journal;";
26
27        tracing::info!("Getting initial sequence number");
28
29        let mut conn = self.pool.get().await?;
30        let rows: Vec<Row> = conn.query(SQL, &[]).await?.into_first_result().await?;
31
32        if rows.len() != 1 {
33            return Err(anyhow!("Expected 1 row, got {}", rows.len()));
34        }
35
36        let max_offset: i64 = rows[0].try_get(0).context("failed to read max_offset")?.unwrap_or(0);
37
38        tracing::info!("Got {} initial sequence number", max_offset);
39
40        Ok(max_offset as u64)
41    }
42
43    // TODO - this should stream events directly from the database rather than returning a vector of all events
44    async fn get_events_from_journal(&self, offset: u64) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
45        const SQL: &str = "select offset,message from journal where journal.offset >= @p1 order by offset asc;";
46        let mut conn = self.pool.get().await?;
47        let offset = offset as i64;
48        let rows: Vec<Row> = conn.query(SQL, &[&offset]).await?.into_first_result().await?;
49        tracing::debug!("get_events_from_journal from offset {}, got {} rows", offset, rows.len());
50
51        let mut r = vec![];
52
53        for row in rows {
54            let offset: i64 = row.try_get(0).context("failed to read offset")?.expect("offset should not be null");
55            let data: &[u8] = row.try_get(1).context("failed to read data")?.expect("data should not be null");
56            r.push((offset as u64, data.to_vec()));
57        }
58
59        Ok(r)
60    }
61
62    async fn load_entity_events(&self, entity_type_name: &str, persistence_id: &str, offset: u64) -> anyhow::Result<Vec<Vec<u8>>> {
63        const SQL: &str = "select message from journal where entity_type = @P1 and persistence_id = @P2 and offset >= @P3 order by offset asc;";
64        let mut conn = self.pool.get().await?;
65        let sql_compatible_offset = offset as i64;
66        let rows: Vec<Row> = conn.query(SQL, &[&entity_type_name, &persistence_id, &sql_compatible_offset]).await?.into_first_result().await?;
67        tracing::debug!("load_entity {} ({}), got {} rows from journal", entity_type_name, persistence_id, rows.len());
68        if rows.is_empty() {
69            Ok(vec![])
70        } else {
71            let mut v = vec![];
72            for row in rows {
73                let message: &[u8] = row.try_get(0).context("failed to read message")?.unwrap();
74                let message = message.to_vec();
75                v.push(message);
76            }
77
78            Ok(v)
79        }
80    }
81
82    async fn persist_event_to_journal(&self, entity_type_name: &str, event_type: &str, event_date: &DateTime<Utc>, bytes: &[u8], persistence_id: &str, offset: u64) -> anyhow::Result<()> {
83        tracing::trace!("persist_event_to_journal: {:?} bytes", bytes.len());
84
85        const SQL: &str = "insert into journal(offset,entity_type,persistence_id,event_type,message,event_date) values (@p1,@p2,@p3,@p4,@p5,@p6);";
86
87        let mut conn = self.pool.get().await?;
88        let offset = offset as i64;
89        let results = conn.execute(SQL, &[&offset, &entity_type_name, &persistence_id, &event_type, &bytes, event_date]).await?;
90
91        tracing::trace!("persist_event_to_journal rows_affected: {:?}", results.rows_affected());
92
93        Ok(())
94    }
95}
96
97#[async_trait]
98impl SnapshotStore for SqlServerSnapshotStore {
99    async fn read_snapshot<'de, S>(&self, name: &str) -> anyhow::Result<Option<Snapshot<S>>>
100    where
101        S: DeserializeOwned,
102    {
103        const SQL: &str = "select offset, value from snapshot where name = @P1;";
104        let mut conn = self.pool.get().await?;
105        let rows: Vec<Row> = conn.query(SQL, &[&name]).await?.into_first_result().await?;
106        tracing::debug!("read_snapshot {}, got {} rows", name, rows.len());
107        if rows.is_empty() {
108            Ok(None)
109        } else {
110            let offset: i64 = rows[0].try_get(0).context("failed to read snapshot offset value from data row")?.unwrap();
111            let bytes: &[u8] = rows[0].try_get(1).context("failed to read snapshot value from data row")?.unwrap();
112            let json = serde_json::from_slice::<S>(bytes).context("failed to deserialise snapshot from json")?;
113            Ok(Some(Snapshot { offset: offset as u64, value: json }))
114        }
115    }
116
117    async fn write_snapshot<S>(&self, name: &str, offset: u64, value: &S) -> anyhow::Result<()>
118    where
119        S: Debug + Serialize + Sync,
120    {
121        tracing::trace!("write_snapshot at offset: {} for value: {:?}", offset, value);
122
123        const SQL: &str = "merge snapshot as s
124using (select @p1 name, @p2 offset, @p3 value) as p
125on s.name = p.name
126when matched then
127    update
128    set offset = p.offset, value = p.value
129when not matched then
130    insert (name, offset, value)
131    values (@p1, @p2, @p3);";
132
133        let mut conn = self.pool.get().await?;
134        let json = serde_json::to_string(value).context("failed to serialise snapshot to json")?;
135        let bytes = json.as_bytes();
136        let sql_compatible_offset = offset as i64;
137        let results = conn.execute(SQL, &[&name, &sql_compatible_offset, &bytes]).await?;
138
139        tracing::trace!("write_snapshot rows_affected: {:?}", results.rows_affected());
140
141        Ok(())
142    }
143}
144
145pub async fn build_connection(connection_string: &str) -> Result<Pool<ConnectionManager>, bb8_tiberius::Error> {
146    tracing::info!("CONN: {}", connection_string);
147    let mgr = ConnectionManager::build(connection_string)?;
148    Pool::builder().connection_timeout(std::time::Duration::from_millis(15000)).max_size(6).build(mgr).await
149}