eventful_sql_server/
lib.rs1use anyhow::{anyhow, Context};
2use async_trait::async_trait;
3use bb8::Pool;
4use bb8_tiberius::ConnectionManager;
5use chrono::{DateTime, Utc};
6use eventful::{JournalStore, Snapshot, SnapshotStore};
7use serde::de::DeserializeOwned;
8use serde::Serialize;
9use std::fmt::Debug;
10use tiberius::Row;
11
12#[derive(Clone)]
13pub struct SqlServerJournalStore {
14 pub pool: Pool<ConnectionManager>,
15}
16
17#[derive(Clone)]
18pub struct SqlServerSnapshotStore {
19 pub pool: Pool<ConnectionManager>,
20}
21
22#[async_trait]
23impl JournalStore for SqlServerJournalStore {
24 async fn get_initial_offset(&self) -> anyhow::Result<u64> {
25 const SQL: &str = "select max(offset) as max_offset from journal;";
26
27 tracing::info!("Getting initial sequence number");
28
29 let mut conn = self.pool.get().await?;
30 let rows: Vec<Row> = conn.query(SQL, &[]).await?.into_first_result().await?;
31
32 if rows.len() != 1 {
33 return Err(anyhow!("Expected 1 row, got {}", rows.len()));
34 }
35
36 let max_offset: i64 = rows[0].try_get(0).context("failed to read max_offset")?.unwrap_or(0);
37
38 tracing::info!("Got {} initial sequence number", max_offset);
39
40 Ok(max_offset as u64)
41 }
42
43 async fn get_events_from_journal(&self, offset: u64) -> anyhow::Result<Vec<(u64, Vec<u8>)>> {
45 const SQL: &str = "select offset,message from journal where journal.offset >= @p1 order by offset asc;";
46 let mut conn = self.pool.get().await?;
47 let offset = offset as i64;
48 let rows: Vec<Row> = conn.query(SQL, &[&offset]).await?.into_first_result().await?;
49 tracing::debug!("get_events_from_journal from offset {}, got {} rows", offset, rows.len());
50
51 let mut r = vec![];
52
53 for row in rows {
54 let offset: i64 = row.try_get(0).context("failed to read offset")?.expect("offset should not be null");
55 let data: &[u8] = row.try_get(1).context("failed to read data")?.expect("data should not be null");
56 r.push((offset as u64, data.to_vec()));
57 }
58
59 Ok(r)
60 }
61
62 async fn load_entity_events(&self, entity_type_name: &str, persistence_id: &str, offset: u64) -> anyhow::Result<Vec<Vec<u8>>> {
63 const SQL: &str = "select message from journal where entity_type = @P1 and persistence_id = @P2 and offset >= @P3 order by offset asc;";
64 let mut conn = self.pool.get().await?;
65 let sql_compatible_offset = offset as i64;
66 let rows: Vec<Row> = conn.query(SQL, &[&entity_type_name, &persistence_id, &sql_compatible_offset]).await?.into_first_result().await?;
67 tracing::debug!("load_entity {} ({}), got {} rows from journal", entity_type_name, persistence_id, rows.len());
68 if rows.is_empty() {
69 Ok(vec![])
70 } else {
71 let mut v = vec![];
72 for row in rows {
73 let message: &[u8] = row.try_get(0).context("failed to read message")?.unwrap();
74 let message = message.to_vec();
75 v.push(message);
76 }
77
78 Ok(v)
79 }
80 }
81
82 async fn persist_event_to_journal(&self, entity_type_name: &str, event_type: &str, event_date: &DateTime<Utc>, bytes: &[u8], persistence_id: &str, offset: u64) -> anyhow::Result<()> {
83 tracing::trace!("persist_event_to_journal: {:?} bytes", bytes.len());
84
85 const SQL: &str = "insert into journal(offset,entity_type,persistence_id,event_type,message,event_date) values (@p1,@p2,@p3,@p4,@p5,@p6);";
86
87 let mut conn = self.pool.get().await?;
88 let offset = offset as i64;
89 let results = conn.execute(SQL, &[&offset, &entity_type_name, &persistence_id, &event_type, &bytes, event_date]).await?;
90
91 tracing::trace!("persist_event_to_journal rows_affected: {:?}", results.rows_affected());
92
93 Ok(())
94 }
95}
96
97#[async_trait]
98impl SnapshotStore for SqlServerSnapshotStore {
99 async fn read_snapshot<'de, S>(&self, name: &str) -> anyhow::Result<Option<Snapshot<S>>>
100 where
101 S: DeserializeOwned,
102 {
103 const SQL: &str = "select offset, value from snapshot where name = @P1;";
104 let mut conn = self.pool.get().await?;
105 let rows: Vec<Row> = conn.query(SQL, &[&name]).await?.into_first_result().await?;
106 tracing::debug!("read_snapshot {}, got {} rows", name, rows.len());
107 if rows.is_empty() {
108 Ok(None)
109 } else {
110 let offset: i64 = rows[0].try_get(0).context("failed to read snapshot offset value from data row")?.unwrap();
111 let bytes: &[u8] = rows[0].try_get(1).context("failed to read snapshot value from data row")?.unwrap();
112 let json = serde_json::from_slice::<S>(bytes).context("failed to deserialise snapshot from json")?;
113 Ok(Some(Snapshot { offset: offset as u64, value: json }))
114 }
115 }
116
117 async fn write_snapshot<S>(&self, name: &str, offset: u64, value: &S) -> anyhow::Result<()>
118 where
119 S: Debug + Serialize + Sync,
120 {
121 tracing::trace!("write_snapshot at offset: {} for value: {:?}", offset, value);
122
123 const SQL: &str = "merge snapshot as s
124using (select @p1 name, @p2 offset, @p3 value) as p
125on s.name = p.name
126when matched then
127 update
128 set offset = p.offset, value = p.value
129when not matched then
130 insert (name, offset, value)
131 values (@p1, @p2, @p3);";
132
133 let mut conn = self.pool.get().await?;
134 let json = serde_json::to_string(value).context("failed to serialise snapshot to json")?;
135 let bytes = json.as_bytes();
136 let sql_compatible_offset = offset as i64;
137 let results = conn.execute(SQL, &[&name, &sql_compatible_offset, &bytes]).await?;
138
139 tracing::trace!("write_snapshot rows_affected: {:?}", results.rows_affected());
140
141 Ok(())
142 }
143}
144
145pub async fn build_connection(connection_string: &str) -> Result<Pool<ConnectionManager>, bb8_tiberius::Error> {
146 tracing::info!("CONN: {}", connection_string);
147 let mgr = ConnectionManager::build(connection_string)?;
148 Pool::builder().connection_timeout(std::time::Duration::from_millis(15000)).max_size(6).build(mgr).await
149}