Skip to main content

kuatia_storage_sql/
lib.rs

1//! SQL-backed Store implementation for SQLite and PostgreSQL.
2//!
3//! Uses `sqlx::Any` for database-agnostic queries. Enable features
4//! `sqlite` or `postgres` to select the backend.
5//!
6//! ```text
7//! let pool = sqlx::any::Pool<Any>Options::new()
8//!     .connect("sqlite::memory:").await?;
9//! let store = SqlStore::new(pool);
10//! store.migrate().await?;
11//! ```
12
13use std::str::FromStr;
14
15use async_trait::async_trait;
16use sqlx::{Any, Pool, Row};
17
18use kuatia_storage::error::StoreError;
19use kuatia_storage::events::{EventStore, LedgerEvent};
20use kuatia_storage::store::*;
21use kuatia_types::*;
22
23/// SQL-backed [`Store`] implementation.
24pub struct SqlStore {
25    pool: Pool<Any>,
26    autoid: kuatia_types::autoid::AutoId,
27}
28
29impl SqlStore {
30    /// Create a new SQL store wrapping an existing connection pool.
31    pub fn new(pool: Pool<Any>) -> Self {
32        Self {
33            pool,
34            autoid: kuatia_types::autoid::AutoId::new(),
35        }
36    }
37
38    /// Run database migrations. Idempotent: a `_migrations` ledger records what
39    /// has been applied, so re-running is a no-op. The DDL is selected per
40    /// backend (SQLite uses `BLOB`, PostgreSQL uses `BYTEA`).
41    pub async fn migrate(&self) -> Result<(), StoreError> {
42        // Detect the backend. `sqlite_version()` exists only on SQLite.
43        let is_sqlite = sqlx::query("SELECT sqlite_version()")
44            .fetch_optional(&self.pool)
45            .await
46            .is_ok();
47
48        sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
49            .execute(&self.pool)
50            .await
51            .map_err(|e| StoreError::Internal(e.to_string()))?;
52
53        let migrations: &[(&str, &str)] = if is_sqlite {
54            &[("001_init", include_str!("migrations/sqlite/001_init.sql"))]
55        } else {
56            &[("001_init", include_str!("migrations/postgres/001_init.sql"))]
57        };
58
59        for (name, sql) in migrations {
60            let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
61                .bind(*name)
62                .fetch_optional(&self.pool)
63                .await
64                .map_err(|e| StoreError::Internal(e.to_string()))?;
65            if applied.is_some() {
66                continue;
67            }
68
69            for statement in sql.split(';') {
70                let trimmed = statement.trim();
71                if !trimmed.is_empty() {
72                    sqlx::query(trimmed)
73                        .execute(&self.pool)
74                        .await
75                        .map_err(|e| StoreError::Internal(e.to_string()))?;
76                }
77            }
78
79            sqlx::query("INSERT INTO _migrations (name) VALUES ($1)")
80                .bind(*name)
81                .execute(&self.pool)
82                .await
83                .map_err(|e| StoreError::Internal(e.to_string()))?;
84        }
85        Ok(())
86    }
87}
88
89// ---------------------------------------------------------------------------
90// Serialization helpers
91// ---------------------------------------------------------------------------
92
93fn serialize_policy(policy: &AccountPolicy) -> Result<String, StoreError> {
94    serde_json::to_string(policy)
95        .map_err(|e| StoreError::Internal(format!("policy serialization: {e}")))
96}
97
98fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
99    serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
100}
101
102fn serialize_blob<T: serde::Serialize>(val: &T) -> Result<Vec<u8>, StoreError> {
103    serde_json::to_vec(val).map_err(|e| StoreError::Internal(format!("blob serialization: {e}")))
104}
105
106fn deserialize_blob<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, StoreError> {
107    serde_json::from_slice(bytes).map_err(|e| StoreError::Internal(format!("bad blob: {e}")))
108}
109
110fn status_to_i16(s: PostingStatus) -> i16 {
111    match s {
112        PostingStatus::Active => 0,
113        PostingStatus::PendingInactive => 1,
114        PostingStatus::Inactive => 2,
115    }
116}
117
118fn status_from_i16(v: i16) -> Result<PostingStatus, StoreError> {
119    match v {
120        0 => Ok(PostingStatus::Active),
121        1 => Ok(PostingStatus::PendingInactive),
122        2 => Ok(PostingStatus::Inactive),
123        _ => Err(StoreError::Internal(format!("bad posting status: {v}"))),
124    }
125}
126
127fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
128    let id: i64 = row
129        .try_get("id")
130        .map_err(|e| StoreError::Internal(e.to_string()))?;
131    let version: i64 = row
132        .try_get("version")
133        .map_err(|e| StoreError::Internal(e.to_string()))?;
134    let policy_str: String = row
135        .try_get("policy")
136        .map_err(|e| StoreError::Internal(e.to_string()))?;
137    let flags_bits: i32 = row
138        .try_get("flags")
139        .map_err(|e| StoreError::Internal(e.to_string()))?;
140    let book: i64 = row
141        .try_get("book")
142        .map_err(|e| StoreError::Internal(e.to_string()))?;
143    let user_data_bytes: Vec<u8> = row
144        .try_get("user_data")
145        .map_err(|e| StoreError::Internal(e.to_string()))?;
146    let metadata_bytes: Vec<u8> = row
147        .try_get("metadata")
148        .map_err(|e| StoreError::Internal(e.to_string()))?;
149
150    Ok(Account {
151        id: AccountId::new(id),
152        version: version as u64,
153        policy: deserialize_policy(&policy_str)?,
154        flags: AccountFlags::from_bits_truncate(flags_bits as u32),
155        book: BookId::new(book),
156        user_data: deserialize_blob(&user_data_bytes)?,
157        metadata: deserialize_blob(&metadata_bytes)?,
158    })
159}
160
161fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
162    let transfer_id: Vec<u8> = row
163        .try_get("transfer_id")
164        .map_err(|e| StoreError::Internal(e.to_string()))?;
165    let idx: i16 = row
166        .try_get("idx")
167        .map_err(|e| StoreError::Internal(e.to_string()))?;
168    let owner: i64 = row
169        .try_get("owner")
170        .map_err(|e| StoreError::Internal(e.to_string()))?;
171    let asset: i32 = row
172        .try_get("asset")
173        .map_err(|e| StoreError::Internal(e.to_string()))?;
174    let value: String = row
175        .try_get("value")
176        .map_err(|e| StoreError::Internal(e.to_string()))?;
177    let value = Cent::from_str(&value).map_err(|e| StoreError::Internal(e.to_string()))?;
178    let status: i16 = row
179        .try_get("status")
180        .map_err(|e| StoreError::Internal(e.to_string()))?;
181    let reservation: Option<i64> = row
182        .try_get("reservation")
183        .map_err(|e| StoreError::Internal(e.to_string()))?;
184
185    let mut tid = [0u8; 32];
186    tid.copy_from_slice(&transfer_id);
187
188    Ok(Posting {
189        id: PostingId {
190            transfer: EnvelopeId(tid),
191            index: idx as u16,
192        },
193        owner: AccountId::new(owner),
194        asset: AssetId::new(asset as u32),
195        value,
196        status: status_from_i16(status)?,
197        reservation: reservation.map(ReservationId::new),
198    })
199}
200
201// ---------------------------------------------------------------------------
202// AccountStore
203// ---------------------------------------------------------------------------
204
205#[async_trait]
206impl AccountStore for SqlStore {
207    async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError> {
208        let row = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
209            .bind(id.0)
210            .fetch_optional(&self.pool)
211            .await
212            .map_err(|e| StoreError::Internal(e.to_string()))?
213            .ok_or_else(|| StoreError::NotFound(format!("account {id:?}")))?;
214        row_to_account(&row)
215    }
216
217    async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError> {
218        let mut result = Vec::with_capacity(ids.len());
219        for id in ids {
220            result.push(self.get_account(id).await?);
221        }
222        Ok(result)
223    }
224
225    async fn create_account(&self, account: Account) -> Result<(), StoreError> {
226        let exists = sqlx::query("SELECT 1 FROM accounts WHERE id = $1 LIMIT 1")
227            .bind(account.id.0)
228            .fetch_optional(&self.pool)
229            .await
230            .map_err(|e| StoreError::Internal(e.to_string()))?;
231        if exists.is_some() {
232            return Err(StoreError::AlreadyExists(format!(
233                "account {:?}",
234                account.id
235            )));
236        }
237
238        sqlx::query(
239            "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
240        )
241            .bind(account.id.0)
242            .bind(account.version as i64)
243            .bind(serialize_policy(&account.policy)?)
244            .bind(account.flags.bits() as i32)
245            .bind(account.book.0)
246            .bind(serialize_blob(&account.user_data)?)
247            .bind(serialize_blob(&account.metadata)?)
248            .execute(&self.pool)
249            .await
250            .map_err(|e| StoreError::Internal(e.to_string()))?;
251        Ok(())
252    }
253
254    async fn append_account_version(&self, account: Account) -> Result<(), StoreError> {
255        let current =
256            sqlx::query("SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
257                .bind(account.id.0)
258                .fetch_optional(&self.pool)
259                .await
260                .map_err(|e| StoreError::Internal(e.to_string()))?
261                .ok_or_else(|| StoreError::NotFound(format!("account {:?}", account.id)))?;
262
263        let current_version: i64 = current
264            .try_get("version")
265            .map_err(|e| StoreError::Internal(e.to_string()))?;
266        let expected = current_version
267            .checked_add(1)
268            .ok_or_else(|| StoreError::Internal("account version overflow".to_string()))?;
269
270        if account.version as i64 != expected {
271            return Err(StoreError::VersionConflict {
272                account: account.id,
273                expected: expected as u64,
274                actual: account.version,
275            });
276        }
277
278        sqlx::query(
279            "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
280        )
281            .bind(account.id.0)
282            .bind(account.version as i64)
283            .bind(serialize_policy(&account.policy)?)
284            .bind(account.flags.bits() as i32)
285            .bind(account.book.0)
286            .bind(serialize_blob(&account.user_data)?)
287            .bind(serialize_blob(&account.metadata)?)
288            .execute(&self.pool)
289            .await
290            .map_err(|e| StoreError::Internal(e.to_string()))?;
291        Ok(())
292    }
293
294    async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError> {
295        let rows = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version ASC")
296            .bind(id.0)
297            .fetch_all(&self.pool)
298            .await
299            .map_err(|e| StoreError::Internal(e.to_string()))?;
300        if rows.is_empty() {
301            return Err(StoreError::NotFound(format!("account {id:?}")));
302        }
303        rows.iter().map(row_to_account).collect()
304    }
305
306    async fn list_accounts(&self) -> Result<Vec<Account>, StoreError> {
307        let rows = sqlx::query("SELECT * FROM accounts ORDER BY id, version DESC")
308            .fetch_all(&self.pool)
309            .await
310            .map_err(|e| StoreError::Internal(e.to_string()))?;
311        let mut accounts: Vec<Account> =
312            rows.iter().map(row_to_account).collect::<Result<_, _>>()?;
313        accounts.dedup_by_key(|a| a.id);
314        Ok(accounts)
315    }
316}
317
318// ---------------------------------------------------------------------------
319// PostingStore
320// ---------------------------------------------------------------------------
321
322#[async_trait]
323impl PostingStore for SqlStore {
324    async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError> {
325        let mut result = Vec::with_capacity(ids.len());
326        for id in ids {
327            let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
328                .bind(id.transfer.0.as_slice())
329                .bind(id.index as i16)
330                .fetch_optional(&self.pool)
331                .await
332                .map_err(|e| StoreError::Internal(e.to_string()))?
333                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
334            result.push(row_to_posting(&row)?);
335        }
336        Ok(result)
337    }
338
339    async fn get_postings_by_account(
340        &self,
341        account: &AccountId,
342        asset: Option<&AssetId>,
343        status: Option<PostingStatus>,
344    ) -> Result<Vec<Posting>, StoreError> {
345        let rows = match (asset, status) {
346            (Some(a), Some(s)) => {
347                sqlx::query(
348                    "SELECT * FROM postings WHERE owner = $1 AND asset = $2 AND status = $3",
349                )
350                .bind(account.0)
351                .bind(a.0 as i32)
352                .bind(status_to_i16(s))
353                .fetch_all(&self.pool)
354                .await
355            }
356            (Some(a), None) => {
357                sqlx::query("SELECT * FROM postings WHERE owner = $1 AND asset = $2")
358                    .bind(account.0)
359                    .bind(a.0 as i32)
360                    .fetch_all(&self.pool)
361                    .await
362            }
363            (None, Some(s)) => {
364                sqlx::query("SELECT * FROM postings WHERE owner = $1 AND status = $2")
365                    .bind(account.0)
366                    .bind(status_to_i16(s))
367                    .fetch_all(&self.pool)
368                    .await
369            }
370            (None, None) => {
371                sqlx::query("SELECT * FROM postings WHERE owner = $1")
372                    .bind(account.0)
373                    .fetch_all(&self.pool)
374                    .await
375            }
376        }
377        .map_err(|e| StoreError::Internal(e.to_string()))?;
378
379        rows.iter().map(row_to_posting).collect()
380    }
381
382    async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
383        let (where_clause, count_clause) = {
384            let mut w = String::from("WHERE owner = $1");
385            let mut idx = 2u32;
386            if query.asset.is_some() {
387                w.push_str(&format!(" AND asset = ${idx}"));
388                idx += 1;
389            }
390            if query.status.is_some() {
391                w.push_str(&format!(" AND status = ${idx}"));
392            }
393            let c = format!("SELECT COUNT(*) as cnt FROM postings {w}");
394            let limit = query.limit.unwrap_or(u32::MAX);
395            let offset = query.offset.unwrap_or(0);
396            w.push_str(&format!(" LIMIT {limit} OFFSET {offset}"));
397            (format!("SELECT * FROM postings {w}"), c)
398        };
399
400        // Build count query
401        let mut count_q = sqlx::query(&count_clause).bind(query.account.0);
402        if let Some(ref a) = query.asset {
403            count_q = count_q.bind(a.0 as i32);
404        }
405        if let Some(s) = query.status {
406            count_q = count_q.bind(status_to_i16(s));
407        }
408        let count_row = count_q
409            .fetch_one(&self.pool)
410            .await
411            .map_err(|e| StoreError::Internal(e.to_string()))?;
412        let total: i64 = count_row
413            .try_get("cnt")
414            .map_err(|e| StoreError::Internal(e.to_string()))?;
415
416        // Build data query
417        let mut data_q = sqlx::query(&where_clause).bind(query.account.0);
418        if let Some(ref a) = query.asset {
419            data_q = data_q.bind(a.0 as i32);
420        }
421        if let Some(s) = query.status {
422            data_q = data_q.bind(status_to_i16(s));
423        }
424        let rows = data_q
425            .fetch_all(&self.pool)
426            .await
427            .map_err(|e| StoreError::Internal(e.to_string()))?;
428
429        let items: Vec<Posting> = rows.iter().map(row_to_posting).collect::<Result<_, _>>()?;
430        Ok(Page {
431            items,
432            total: total as u64,
433        })
434    }
435
436    async fn reserve_postings(
437        &self,
438        ids: &[PostingId],
439        reservation: ReservationId,
440    ) -> Result<u64, StoreError> {
441        // Dumb instruction: each id flips Active → PendingInactive (the status
442        // precondition is in the WHERE so it is atomic). Return the count of rows
443        // changed; the caller decides what a short count means.
444        let mut tx = self
445            .pool
446            .begin()
447            .await
448            .map_err(|e| StoreError::Internal(e.to_string()))?;
449        let mut reserved: u64 = 0;
450        for id in ids {
451            let res = sqlx::query(
452                "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4 AND status = $5",
453            )
454            .bind(status_to_i16(PostingStatus::PendingInactive))
455            .bind(reservation.0)
456            .bind(id.transfer.0.as_slice())
457            .bind(id.index as i16)
458            .bind(status_to_i16(PostingStatus::Active))
459            .execute(&mut *tx)
460            .await
461            .map_err(|e| StoreError::Internal(e.to_string()))?;
462            reserved += res.rows_affected();
463        }
464
465        tx.commit()
466            .await
467            .map_err(|e| StoreError::Internal(e.to_string()))?;
468        Ok(reserved)
469    }
470
471    async fn release_postings(
472        &self,
473        ids: &[PostingId],
474        reservation: ReservationId,
475    ) -> Result<u64, StoreError> {
476        // Dumb instruction: each id reserved by `reservation` flips
477        // PendingInactive → Active. Return the count released; an already-Active
478        // or differently-owned posting simply does not count.
479        let mut tx = self
480            .pool
481            .begin()
482            .await
483            .map_err(|e| StoreError::Internal(e.to_string()))?;
484        let mut released: u64 = 0;
485        for id in ids {
486            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
487                .bind(status_to_i16(PostingStatus::Active))
488                .bind(id.transfer.0.as_slice())
489                .bind(id.index as i16)
490                .bind(status_to_i16(PostingStatus::PendingInactive))
491                .bind(reservation.0)
492                .execute(&mut *tx)
493                .await
494                .map_err(|e| StoreError::Internal(e.to_string()))?;
495            released += res.rows_affected();
496        }
497
498        tx.commit()
499            .await
500            .map_err(|e| StoreError::Internal(e.to_string()))?;
501        Ok(released)
502    }
503
504    async fn deactivate_postings(
505        &self,
506        ids: &[PostingId],
507        reservation: Option<ReservationId>,
508    ) -> Result<u64, StoreError> {
509        let mut tx = self
510            .pool
511            .begin()
512            .await
513            .map_err(|e| StoreError::Internal(e.to_string()))?;
514        let mut changed: u64 = 0;
515        for id in ids {
516            // The precondition is the instruction; the count is the result. The
517            // caller decides what a short count means.
518            let res = match reservation {
519                None => {
520                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
521                        .bind(status_to_i16(PostingStatus::Inactive))
522                        .bind(id.transfer.0.as_slice())
523                        .bind(id.index as i16)
524                        .bind(status_to_i16(PostingStatus::Active))
525                        .execute(&mut *tx)
526                        .await
527                }
528                Some(rid) => {
529                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
530                        .bind(status_to_i16(PostingStatus::Inactive))
531                        .bind(id.transfer.0.as_slice())
532                        .bind(id.index as i16)
533                        .bind(status_to_i16(PostingStatus::PendingInactive))
534                        .bind(rid.0)
535                        .execute(&mut *tx)
536                        .await
537                }
538            }
539            .map_err(|e| StoreError::Internal(e.to_string()))?;
540            changed += res.rows_affected();
541        }
542        tx.commit()
543            .await
544            .map_err(|e| StoreError::Internal(e.to_string()))?;
545        Ok(changed)
546    }
547
548    async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError> {
549        let mut tx = self
550            .pool
551            .begin()
552            .await
553            .map_err(|e| StoreError::Internal(e.to_string()))?;
554        let mut inserted: u64 = 0;
555        for posting in postings {
556            let res = sqlx::query(
557                "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
558            )
559                .bind(posting.id.transfer.0.as_slice())
560                .bind(posting.id.index as i16)
561                .bind(posting.owner.0)
562                .bind(posting.asset.0 as i32)
563                .bind(posting.value.to_string())
564                .bind(status_to_i16(posting.status))
565                .execute(&mut *tx)
566                .await
567                .map_err(|e| StoreError::Internal(e.to_string()))?;
568            inserted += res.rows_affected();
569        }
570        tx.commit()
571            .await
572            .map_err(|e| StoreError::Internal(e.to_string()))?;
573        Ok(inserted)
574    }
575}
576
577// ---------------------------------------------------------------------------
578// TransferStore
579// ---------------------------------------------------------------------------
580
581#[async_trait]
582impl TransferStore for SqlStore {
583    async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
584        let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
585            .bind(id.0.as_slice())
586            .fetch_optional(&self.pool)
587            .await
588            .map_err(|e| StoreError::Internal(e.to_string()))?;
589
590        match row {
591            None => Ok(None),
592            Some(row) => {
593                let transfer_bytes: Vec<u8> = row
594                    .try_get("transfer")
595                    .map_err(|e| StoreError::Internal(e.to_string()))?;
596                let receipt_bytes: Vec<u8> = row
597                    .try_get("receipt")
598                    .map_err(|e| StoreError::Internal(e.to_string()))?;
599                let created_at: i64 = row
600                    .try_get("created_at")
601                    .map_err(|e| StoreError::Internal(e.to_string()))?;
602                Ok(Some(EnvelopeRecord {
603                    envelope: deserialize_blob(&transfer_bytes)?,
604                    receipt: deserialize_blob(&receipt_bytes)?,
605                    created_at,
606                }))
607            }
608        }
609    }
610
611    async fn store_transfer(
612        &self,
613        record: EnvelopeRecord,
614        involved: &[AccountId],
615    ) -> Result<u64, StoreError> {
616        let tid = record.receipt.transfer_id;
617        let transfer_bytes = serialize_blob(&record.envelope)?;
618        let receipt_bytes = serialize_blob(&record.receipt)?;
619
620        let mut tx = self
621            .pool
622            .begin()
623            .await
624            .map_err(|e| StoreError::Internal(e.to_string()))?;
625
626        let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
627            .bind(tid.0.as_slice())
628            .bind(&transfer_bytes)
629            .bind(&receipt_bytes)
630            .bind(record.created_at)
631            .bind(record.envelope.book().0)
632            .execute(&mut *tx)
633            .await
634            .map_err(|e| StoreError::Internal(e.to_string()))?;
635        let inserted = res.rows_affected();
636
637        // Index every involved account (caller supplies the set; storage does no
638        // computation). Idempotent so a replay is harmless.
639        for account in involved {
640            sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
641                .bind(tid.0.as_slice())
642                .bind(account.0)
643                .execute(&mut *tx)
644                .await
645                .map_err(|e| StoreError::Internal(e.to_string()))?;
646        }
647
648        tx.commit()
649            .await
650            .map_err(|e| StoreError::Internal(e.to_string()))?;
651        Ok(inserted)
652    }
653
654    async fn get_transfers_for_account(
655        &self,
656        account: &AccountId,
657    ) -> Result<Vec<EnvelopeRecord>, StoreError> {
658        let rows = sqlx::query(
659            "SELECT t.id, t.transfer, t.receipt, t.created_at FROM transfers t INNER JOIN transfer_accounts ta ON t.id = ta.transfer_id WHERE ta.account_id = $1 ORDER BY t.created_at"
660        )
661            .bind(account.0)
662            .fetch_all(&self.pool)
663            .await
664            .map_err(|e| StoreError::Internal(e.to_string()))?;
665
666        let mut result = Vec::with_capacity(rows.len());
667        for row in &rows {
668            let transfer_bytes: Vec<u8> = row
669                .try_get("transfer")
670                .map_err(|e| StoreError::Internal(e.to_string()))?;
671            let receipt_bytes: Vec<u8> = row
672                .try_get("receipt")
673                .map_err(|e| StoreError::Internal(e.to_string()))?;
674            let created_at: i64 = row
675                .try_get("created_at")
676                .map_err(|e| StoreError::Internal(e.to_string()))?;
677            result.push(EnvelopeRecord {
678                envelope: deserialize_blob(&transfer_bytes)?,
679                receipt: deserialize_blob(&receipt_bytes)?,
680                created_at,
681            });
682        }
683        Ok(result)
684    }
685
686    async fn query_transfers(
687        &self,
688        query: &TransferQuery,
689    ) -> Result<Page<EnvelopeRecord>, StoreError> {
690        // Load base records, using the account join when available.
691        let base_records = if let Some(ref account) = query.account {
692            self.get_transfers_for_account(account).await?
693        } else {
694            let rows = sqlx::query(
695                "SELECT transfer, receipt, created_at FROM transfers ORDER BY created_at",
696            )
697            .fetch_all(&self.pool)
698            .await
699            .map_err(|e| StoreError::Internal(e.to_string()))?;
700
701            let mut records = Vec::with_capacity(rows.len());
702            for row in &rows {
703                let transfer_bytes: Vec<u8> = row
704                    .try_get("transfer")
705                    .map_err(|e| StoreError::Internal(e.to_string()))?;
706                let receipt_bytes: Vec<u8> = row
707                    .try_get("receipt")
708                    .map_err(|e| StoreError::Internal(e.to_string()))?;
709                let created_at: i64 = row
710                    .try_get("created_at")
711                    .map_err(|e| StoreError::Internal(e.to_string()))?;
712                records.push(EnvelopeRecord {
713                    envelope: deserialize_blob(&transfer_bytes)?,
714                    receipt: deserialize_blob(&receipt_bytes)?,
715                    created_at,
716                });
717            }
718            records
719        };
720
721        // Filter in memory for remaining conditions.
722        let filtered: Vec<EnvelopeRecord> = base_records
723            .into_iter()
724            .filter(|r| {
725                if let Some(from) = query.from_ts
726                    && r.created_at < from
727                {
728                    return false;
729                }
730                if let Some(to) = query.to_ts
731                    && r.created_at >= to
732                {
733                    return false;
734                }
735                if let Some(book) = query.book
736                    && r.envelope.book() != book
737                {
738                    return false;
739                }
740                true
741            })
742            .collect();
743
744        let total = filtered.len() as u64;
745        let offset = query.offset.unwrap_or(0) as usize;
746        let limit = query.limit.unwrap_or(u32::MAX) as usize;
747        let items = filtered.into_iter().skip(offset).take(limit).collect();
748
749        Ok(Page { items, total })
750    }
751}
752
753// ---------------------------------------------------------------------------
754// SagaStore
755// ---------------------------------------------------------------------------
756
757#[async_trait]
758impl SagaStore for SqlStore {
759    async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
760        sqlx::query(
761            "INSERT INTO sagas (id, data) VALUES ($1, $2) \
762             ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
763        )
764        .bind(*id)
765        .bind(&data)
766        .execute(&self.pool)
767        .await
768        .map_err(|e| StoreError::Internal(e.to_string()))?;
769        Ok(())
770    }
771
772    async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError> {
773        let rows = sqlx::query("SELECT id, data FROM sagas")
774            .fetch_all(&self.pool)
775            .await
776            .map_err(|e| StoreError::Internal(e.to_string()))?;
777        let mut result = Vec::with_capacity(rows.len());
778        for row in &rows {
779            let id: i64 = row
780                .try_get("id")
781                .map_err(|e| StoreError::Internal(e.to_string()))?;
782            let data: Vec<u8> = row
783                .try_get("data")
784                .map_err(|e| StoreError::Internal(e.to_string()))?;
785            result.push((id, data));
786        }
787        Ok(result)
788    }
789
790    async fn delete_saga(&self, id: &i64) -> Result<(), StoreError> {
791        sqlx::query("DELETE FROM sagas WHERE id = $1")
792            .bind(*id)
793            .execute(&self.pool)
794            .await
795            .map_err(|e| StoreError::Internal(e.to_string()))?;
796        Ok(())
797    }
798}
799
800// ---------------------------------------------------------------------------
801// EventStore
802// ---------------------------------------------------------------------------
803
804#[async_trait]
805impl EventStore for SqlStore {
806    async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
807        let kind_str =
808            serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
809        let data = serialize_blob(event)?;
810        let seq = self.autoid.next() as u64;
811
812        // Idempotent on the dedup key: a replayed transfer event conflicts on
813        // `dedup_key` and returns the existing seq instead of a duplicate row.
814        match kuatia_storage::events::event_dedup_key(&event.kind) {
815            Some(eid) => {
816                let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
817                    .bind(seq as i64)
818                    .bind(event.timestamp)
819                    .bind(&kind_str)
820                    .bind(&data)
821                    .bind(eid.0.as_slice())
822                    .execute(&self.pool)
823                    .await
824                    .map_err(|e| StoreError::Internal(e.to_string()))?;
825                if res.rows_affected() == 0 {
826                    let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
827                        .bind(eid.0.as_slice())
828                        .fetch_one(&self.pool)
829                        .await
830                        .map_err(|e| StoreError::Internal(e.to_string()))?;
831                    let existing: i64 = row
832                        .try_get("seq")
833                        .map_err(|e| StoreError::Internal(e.to_string()))?;
834                    return Ok(existing as u64);
835                }
836                Ok(seq)
837            }
838            None => {
839                sqlx::query(
840                    "INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)",
841                )
842                .bind(seq as i64)
843                .bind(event.timestamp)
844                .bind(&kind_str)
845                .bind(&data)
846                .execute(&self.pool)
847                .await
848                .map_err(|e| StoreError::Internal(e.to_string()))?;
849                Ok(seq)
850            }
851        }
852    }
853
854    async fn get_events_since(
855        &self,
856        after_seq: u64,
857        limit: u32,
858    ) -> Result<Vec<LedgerEvent>, StoreError> {
859        let rows = sqlx::query("SELECT seq, data FROM events WHERE seq > $1 ORDER BY seq LIMIT $2")
860            .bind(after_seq as i64)
861            .bind(limit as i32)
862            .fetch_all(&self.pool)
863            .await
864            .map_err(|e| StoreError::Internal(e.to_string()))?;
865
866        let mut events = Vec::with_capacity(rows.len());
867        for row in &rows {
868            let seq: i64 = row
869                .try_get("seq")
870                .map_err(|e| StoreError::Internal(e.to_string()))?;
871            let data: Vec<u8> = row
872                .try_get("data")
873                .map_err(|e| StoreError::Internal(e.to_string()))?;
874            let mut event: LedgerEvent = deserialize_blob(&data)?;
875            event.seq = seq as u64;
876            events.push(event);
877        }
878        Ok(events)
879    }
880}
881
882// ---------------------------------------------------------------------------
883// BookStore
884// ---------------------------------------------------------------------------
885
886#[async_trait]
887impl BookStore for SqlStore {
888    async fn create_book(&self, book: Book) -> Result<(), StoreError> {
889        let exists = sqlx::query("SELECT 1 FROM books WHERE id = $1 LIMIT 1")
890            .bind(book.id.0)
891            .fetch_optional(&self.pool)
892            .await
893            .map_err(|e| StoreError::Internal(e.to_string()))?;
894        if exists.is_some() {
895            return Err(StoreError::AlreadyExists(format!("book {:?}", book.id)));
896        }
897
898        let data = serialize_blob(&book)?;
899        sqlx::query("INSERT INTO books (id, name, data) VALUES ($1, $2, $3)")
900            .bind(book.id.0)
901            .bind(&book.name)
902            .bind(&data)
903            .execute(&self.pool)
904            .await
905            .map_err(|e| StoreError::Internal(e.to_string()))?;
906        Ok(())
907    }
908
909    async fn get_book(&self, id: &BookId) -> Result<Book, StoreError> {
910        let row = sqlx::query("SELECT data FROM books WHERE id = $1")
911            .bind(id.0)
912            .fetch_optional(&self.pool)
913            .await
914            .map_err(|e| StoreError::Internal(e.to_string()))?
915            .ok_or_else(|| StoreError::NotFound(format!("book {id:?}")))?;
916        let data: Vec<u8> = row
917            .try_get("data")
918            .map_err(|e| StoreError::Internal(e.to_string()))?;
919        deserialize_blob(&data)
920    }
921
922    async fn list_books(&self) -> Result<Vec<Book>, StoreError> {
923        let rows = sqlx::query("SELECT data FROM books")
924            .fetch_all(&self.pool)
925            .await
926            .map_err(|e| StoreError::Internal(e.to_string()))?;
927        rows.iter()
928            .map(|row| {
929                let data: Vec<u8> = row
930                    .try_get("data")
931                    .map_err(|e| StoreError::Internal(e.to_string()))?;
932                deserialize_blob(&data)
933            })
934            .collect()
935    }
936}