nitinol_sqlite_adaptor/
store.rs1use std::collections::BTreeSet;
2use std::str::FromStr;
3use std::time::Duration;
4use async_trait::async_trait;
5use nitinol_core::identifier::EntityId;
6use nitinol_protocol::errors::ProtocolError;
7use nitinol_protocol::io::{Reader, Writer};
8use nitinol_protocol::Payload;
9use sqlx::{Pool, Sqlite, SqliteConnection};
10use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
11use tracing::Instrument;
12
13pub struct SqliteEventStore {
14 pool: Pool<Sqlite>
15}
16
17impl Clone for SqliteEventStore {
18 fn clone(&self) -> Self {
19 Self { pool: self.pool.clone() }
20 }
21}
22
23impl SqliteEventStore {
24 pub async fn setup(url: impl AsRef<str>) -> Result<Self, ProtocolError> {
28 let opts = SqliteConnectOptions::from_str(url.as_ref())
29 .map_err(|e| ProtocolError::Setup(Box::new(e)))?
30 .create_if_missing(true);
31
32 let pool = SqlitePoolOptions::new()
33 .acquire_timeout(
34 dotenvy::var("NITINOL_JOURNAL_ACQUIRE_TIMEOUT")
35 .ok()
36 .and_then(|timeout| timeout.parse::<u64>().ok())
37 .map(Duration::from_millis)
38 .unwrap_or(Duration::from_millis(5000))
39 )
40 .max_connections(
41 dotenvy::var("NITINOL_JOURNAL_MAX_CONNECTION")
42 .ok()
43 .and_then(|max| max.parse::<u32>().ok())
44 .unwrap_or(8)
45 )
46 .connect_with(opts)
47 .instrument(tracing::trace_span!("connect-eventstore"))
48 .await
49 .map_err(|e| ProtocolError::Setup(Box::new(e)))?;
50
51 sqlx::migrate!("./migrations")
52 .run(&pool)
53 .instrument(tracing::trace_span!("migrate-eventstore"))
54 .await
55 .map_err(|e| ProtocolError::Write(Box::new(e)))?;
56
57 Ok(Self { pool })
58 }
59}
60
61#[async_trait]
62impl Writer for SqliteEventStore {
63 async fn write(&self, aggregate_id: EntityId, payload: Payload) -> Result<(), ProtocolError> {
64 let mut con = self.pool.begin()
65 .instrument(tracing::trace_span!("begin-xact-eventstore"))
66 .await
67 .map_err(|e| ProtocolError::Write(Box::new(e)))?;
68 Internal::write(aggregate_id.as_ref(), payload, &mut con)
69 .instrument(tracing::trace_span!("write-eventstore"))
70 .await
71 .map_err(|e| ProtocolError::Write(Box::new(e)))?;
72
73 if let Err(e) = con.commit().await {
74 tracing::error!("on failure commit caused reason `{e}`");
75 return Err(ProtocolError::Write(Box::new(e)));
76 }
77
78 Ok(())
79 }
80}
81
82#[async_trait]
83impl Reader for SqliteEventStore {
84 async fn read(&self, id: EntityId, seq: i64) -> Result<Payload, ProtocolError> {
85 let mut con = self.pool.acquire().await
86 .map_err(|e| ProtocolError::Read(Box::new(e)))?;
87 let payload = Internal::read(id.as_ref(), seq, &mut con)
88 .instrument(tracing::trace_span!("read-eventstore"))
89 .await
90 .map_err(|e| ProtocolError::Read(Box::new(e)))?;
91 Ok(payload)
92 }
93
94 async fn read_to(&self, id: EntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
95 let mut con = self.pool.acquire().await
96 .map_err(|e| ProtocolError::Read(Box::new(e)))?;
97 let payload = Internal::read_to(id.as_ref(), from, to, &mut con)
98 .instrument(tracing::trace_span!("read-eventstore"))
99 .await
100 .map_err(|e| ProtocolError::Read(Box::new(e)))?;
101 Ok(payload)
102 }
103}
104
105struct Internal;
106
107impl Internal {
108 pub async fn write(aggregate_id: &str, payload: Payload, con: &mut SqliteConnection) -> Result<(), sqlx::Error> {
109 sqlx::query(r#"
111 INSERT INTO journal(id, sequence_id, registry_key, bytes, created_at)
112 VALUES ($1, $2, $3, $4, $5)
113 "#)
114 .bind(aggregate_id)
115 .bind(payload.sequence_id)
116 .bind(&payload.registry_key)
117 .bind(&payload.bytes)
118 .bind(payload.created_at)
119 .execute(&mut *con)
120 .await?;
121 Ok(())
122 }
123
124 pub async fn read(id: &str, seq: i64, con: &mut SqliteConnection) -> Result<Payload, sqlx::Error> {
125 let payload = sqlx::query_as::<_, Payload>(r#"
127 SELECT
128 id,
129 sequence_id,
130 registry_key,
131 bytes,
132 created_at
133 FROM journal
134 WHERE
135 id LIKE $1
136 AND sequence_id = $2
137 "#)
138 .bind(id)
139 .bind(seq)
140 .fetch_one(&mut *con)
141 .await?;
142 Ok(payload)
143 }
144
145 async fn read_to(id: &str, from: i64, to: i64, con: &mut SqliteConnection) -> Result<BTreeSet<Payload>, sqlx::Error> {
146 let payload = sqlx::query_as::<_, Payload>(r#"
148 SELECT
149 id,
150 sequence_id,
151 registry_key,
152 bytes,
153 created_at
154 FROM journal
155 WHERE
156 id LIKE $1
157 AND sequence_id BETWEEN $2 AND $3
158 "#)
159 .bind(id)
160 .bind(from)
161 .bind(to)
162 .fetch_all(&mut *con)
163 .await?;
164
165 let payload = payload.into_iter()
166 .collect::<BTreeSet<Payload>>();
167
168 Ok(payload)
169 }
170}