Skip to main content

nitinol_sqlite_adaptor/
store.rs

1use std::collections::BTreeSet;
2use std::str::FromStr;
3use std::time::Duration;
4use async_trait::async_trait;
5use nitinol_core::identifier::EntityId;
6use nitinol_protocol::errors::ProtocolError;
7use nitinol_protocol::io::{Reader, Writer};
8use nitinol_protocol::Payload;
9use sqlx::{Pool, Sqlite, SqliteConnection};
10use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions};
11use tracing::Instrument;
12
13pub struct SqliteEventStore {
14    pool: Pool<Sqlite>
15}
16
17impl Clone for SqliteEventStore {
18    fn clone(&self) -> Self {
19        Self { pool: self.pool.clone() }
20    }
21}
22
23impl SqliteEventStore {
24    /// Nitinol sets up a Journal Database for storing Events.
25    ///
26    /// Note: Since run our own migration, we must avoid integrating databases.
27    pub async fn setup(url: impl AsRef<str>) -> Result<Self, ProtocolError> {
28        let opts = SqliteConnectOptions::from_str(url.as_ref())
29            .map_err(|e| ProtocolError::Setup(Box::new(e)))?
30            .create_if_missing(true);
31        
32        let pool = SqlitePoolOptions::new()
33            .acquire_timeout(
34                dotenvy::var("NITINOL_JOURNAL_ACQUIRE_TIMEOUT")
35                    .ok()
36                    .and_then(|timeout| timeout.parse::<u64>().ok())
37                    .map(Duration::from_millis)
38                    .unwrap_or(Duration::from_millis(5000))
39            )
40            .max_connections(
41                dotenvy::var("NITINOL_JOURNAL_MAX_CONNECTION")
42                    .ok()
43                    .and_then(|max| max.parse::<u32>().ok())
44                    .unwrap_or(8)
45            )
46            .connect_with(opts)
47            .instrument(tracing::trace_span!("connect-eventstore"))
48            .await
49            .map_err(|e| ProtocolError::Setup(Box::new(e)))?;
50        
51        sqlx::migrate!("./migrations")
52            .run(&pool)
53            .instrument(tracing::trace_span!("migrate-eventstore"))
54            .await
55            .map_err(|e| ProtocolError::Write(Box::new(e)))?;
56        
57        Ok(Self { pool })
58    }
59}
60
61#[async_trait]
62impl Writer for SqliteEventStore {
63    async fn write(&self, aggregate_id: EntityId, payload: Payload) -> Result<(), ProtocolError> {
64        let mut con = self.pool.begin()
65            .instrument(tracing::trace_span!("begin-xact-eventstore"))
66            .await
67            .map_err(|e| ProtocolError::Write(Box::new(e)))?;
68        Internal::write(aggregate_id.as_ref(), payload, &mut con)
69            .instrument(tracing::trace_span!("write-eventstore"))
70            .await
71            .map_err(|e| ProtocolError::Write(Box::new(e)))?;
72        
73        if let Err(e) = con.commit().await {
74            tracing::error!("on failure commit caused reason `{e}`");
75            return Err(ProtocolError::Write(Box::new(e)));
76        }
77        
78        Ok(())
79    }
80}
81
82#[async_trait]
83impl Reader for SqliteEventStore {
84    async fn read(&self, id: EntityId, seq: i64) -> Result<Payload, ProtocolError> {
85        let mut con = self.pool.acquire().await
86            .map_err(|e| ProtocolError::Read(Box::new(e)))?;
87        let payload = Internal::read(id.as_ref(), seq, &mut con)
88            .instrument(tracing::trace_span!("read-eventstore"))
89            .await
90            .map_err(|e| ProtocolError::Read(Box::new(e)))?;
91        Ok(payload)
92    }
93    
94    async fn read_to(&self, id: EntityId, from: i64, to: i64) -> Result<BTreeSet<Payload>, ProtocolError> {
95        let mut con = self.pool.acquire().await
96            .map_err(|e| ProtocolError::Read(Box::new(e)))?;
97        let payload = Internal::read_to(id.as_ref(), from, to, &mut con)
98            .instrument(tracing::trace_span!("read-eventstore"))
99            .await
100            .map_err(|e| ProtocolError::Read(Box::new(e)))?;
101        Ok(payload)
102    }
103}
104
105struct Internal;
106
107impl Internal {
108    pub async fn write(aggregate_id: &str, payload: Payload, con: &mut SqliteConnection) -> Result<(), sqlx::Error> {
109        // language=sqlite
110        sqlx::query(r#"
111            INSERT INTO journal(id, sequence_id, registry_key, bytes, created_at)
112            VALUES ($1, $2, $3, $4, $5)
113        "#)
114            .bind(aggregate_id)
115            .bind(payload.sequence_id)
116            .bind(&payload.registry_key)
117            .bind(&payload.bytes)
118            .bind(payload.created_at)
119            .execute(&mut *con)
120            .await?;
121        Ok(())
122    }
123    
124    pub async fn read(id: &str, seq: i64, con: &mut SqliteConnection) -> Result<Payload, sqlx::Error> {
125        // language=sqlite
126        let payload = sqlx::query_as::<_, Payload>(r#"
127            SELECT
128                id,
129                sequence_id,
130                registry_key,
131                bytes,
132                created_at
133            FROM journal
134            WHERE
135                id LIKE $1
136            AND sequence_id = $2
137        "#)
138            .bind(id)
139            .bind(seq)
140            .fetch_one(&mut *con)
141            .await?;
142        Ok(payload)
143    }
144    
145    async fn read_to(id: &str, from: i64, to: i64, con: &mut SqliteConnection) -> Result<BTreeSet<Payload>, sqlx::Error> {
146        // language=sqlite
147        let payload = sqlx::query_as::<_, Payload>(r#"
148            SELECT
149                id,
150                sequence_id,
151                registry_key,
152                bytes,
153                created_at
154            FROM journal
155            WHERE
156                id LIKE $1
157            AND sequence_id BETWEEN $2 AND $3
158        "#)
159            .bind(id)
160            .bind(from)
161            .bind(to)
162            .fetch_all(&mut *con)
163            .await?;
164        
165        let payload = payload.into_iter()
166            .collect::<BTreeSet<Payload>>();
167        
168        Ok(payload)
169    }
170}