Skip to main content

azoth_sqlite/
txn.rs

1use azoth_core::{
2    error::{AzothError, Result},
3    traits::ProjectionTxn,
4    types::EventId,
5};
6use rusqlite::Connection;
7use std::sync::MutexGuard;
8
9// Projection transaction that works with Connection directly
10pub struct SimpleProjectionTxn<'a> {
11    conn: MutexGuard<'a, Connection>,
12    in_txn: bool,
13}
14
15impl<'a> SimpleProjectionTxn<'a> {
16    pub fn new(conn: MutexGuard<'a, Connection>) -> Result<Self> {
17        conn.execute("BEGIN IMMEDIATE TRANSACTION", [])
18            .map_err(|e| AzothError::Projection(e.to_string()))?;
19
20        Ok(Self { conn, in_txn: true })
21    }
22}
23
24impl<'a> ProjectionTxn for SimpleProjectionTxn<'a> {
25    fn apply_event(&mut self, _id: EventId, _bytes: &[u8]) -> Result<()> {
26        // Applications implement their own event application logic
27        Ok(())
28    }
29
30    fn commit(mut self: Box<Self>, new_cursor: EventId) -> Result<()> {
31        if self.in_txn {
32            // Update cursor
33            self.conn
34                .execute(
35                    "UPDATE projection_meta SET last_applied_event_id = ?1, updated_at = datetime('now') WHERE id = 0",
36                    [new_cursor as i64],
37                )
38                .map_err(|e| AzothError::Projection(e.to_string()))?;
39
40            self.conn
41                .execute("COMMIT", [])
42                .map_err(|e| AzothError::Projection(e.to_string()))?;
43
44            self.in_txn = false;
45        }
46        Ok(())
47    }
48
49    fn rollback(mut self: Box<Self>) {
50        if self.in_txn {
51            let _ = self.conn.execute("ROLLBACK", []);
52            self.in_txn = false;
53        }
54    }
55}
56
57impl<'a> Drop for SimpleProjectionTxn<'a> {
58    fn drop(&mut self) {
59        if self.in_txn {
60            let _ = self.conn.execute("ROLLBACK", []);
61        }
62    }
63}