Skip to main content

kuatia_storage_sql/
lib.rs

1//! SQL-backed Store implementation for SQLite and PostgreSQL.
2//!
3//! Uses `sqlx::Any` for database-agnostic queries. Enable features
4//! `sqlite` or `postgres` to select the backend.
5//!
6//! ```text
7//! let pool = sqlx::any::Pool<Any>Options::new()
8//!     .connect("sqlite::memory:").await?;
9//! let store = SqlStore::new(pool);
10//! store.migrate().await?;
11//! ```
12
13use std::str::FromStr;
14
15use async_trait::async_trait;
16use sqlx::{Any, Pool, Row};
17
18use kuatia_storage::error::StoreError;
19use kuatia_storage::events::{EventStore, LedgerEvent};
20use kuatia_storage::store::*;
21use kuatia_types::*;
22
23/// SQL-backed [`Store`] implementation.
24pub struct SqlStore {
25    pool: Pool<Any>,
26    autoid: kuatia_types::autoid::AutoId,
27}
28
29impl SqlStore {
30    /// Create a new SQL store wrapping an existing connection pool.
31    pub fn new(pool: Pool<Any>) -> Self {
32        Self {
33            pool,
34            autoid: kuatia_types::autoid::AutoId::new(),
35        }
36    }
37
38    /// Run database migrations. Idempotent: a `_migrations` ledger records what
39    /// has been applied, so re-running is a no-op. Every column is a text type,
40    /// so the store holds no opaque binary and the DDL is identical for both
41    /// backends. Content-addressed ids and opaque saga bytes are stored as hex
42    /// `TEXT`, and JSON payloads as their `TEXT` serialization, keeping every
43    /// row legible for auditing.
44    pub async fn migrate(&self) -> Result<(), StoreError> {
45        sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
46            .execute(&self.pool)
47            .await
48            .map_err(|e| StoreError::Internal(e.to_string()))?;
49
50        let migrations: &[(&str, &str)] = &[("001_init", include_str!("migrations/001_init.sql"))];
51
52        for (name, sql) in migrations {
53            let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
54                .bind(*name)
55                .fetch_optional(&self.pool)
56                .await
57                .map_err(|e| StoreError::Internal(e.to_string()))?;
58            if applied.is_some() {
59                continue;
60            }
61
62            for statement in sql.split(';') {
63                let trimmed = statement.trim();
64                if !trimmed.is_empty() {
65                    sqlx::query(trimmed)
66                        .execute(&self.pool)
67                        .await
68                        .map_err(|e| StoreError::Internal(e.to_string()))?;
69                }
70            }
71
72            sqlx::query("INSERT INTO _migrations (name) VALUES ($1)")
73                .bind(*name)
74                .execute(&self.pool)
75                .await
76                .map_err(|e| StoreError::Internal(e.to_string()))?;
77        }
78        Ok(())
79    }
80}
81
82// ---------------------------------------------------------------------------
83// Serialization helpers
84// ---------------------------------------------------------------------------
85
86fn serialize_policy(policy: &AccountPolicy) -> Result<String, StoreError> {
87    serde_json::to_string(policy)
88        .map_err(|e| StoreError::Internal(format!("policy serialization: {e}")))
89}
90
91fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
92    serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
93}
94
95/// Serialize a value to a JSON string. Payload columns store JSON as `TEXT` so
96/// the database is directly readable for auditing; the ledger never queries
97/// into the JSON, so no binary or indexed representation is needed.
98fn serialize_json<T: serde::Serialize>(val: &T) -> Result<String, StoreError> {
99    serde_json::to_string(val).map_err(|e| StoreError::Internal(format!("json serialization: {e}")))
100}
101
102fn deserialize_json<T: serde::de::DeserializeOwned>(s: &str) -> Result<T, StoreError> {
103    serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad json: {e}")))
104}
105
106/// Lower-case hex encoding. Binary identifiers (content-addressed hashes) and
107/// opaque saga bytes are stored as hex `TEXT` so a row is legible in any SQL
108/// client and matches the hex form used in logs and `Debug` output.
109fn to_hex(bytes: &[u8]) -> String {
110    let mut s = String::with_capacity(bytes.len() * 2);
111    for b in bytes {
112        s.push_str(&format!("{b:02x}"));
113    }
114    s
115}
116
117fn from_hex(s: &str) -> Result<Vec<u8>, StoreError> {
118    if s.len() % 2 != 0 {
119        return Err(StoreError::Internal(format!("odd-length hex: {s:?}")));
120    }
121    (0..s.len())
122        .step_by(2)
123        .map(|i| {
124            u8::from_str_radix(&s[i..i + 2], 16)
125                .map_err(|e| StoreError::Internal(format!("bad hex: {e}")))
126        })
127        .collect()
128}
129
130fn envelope_id_to_hex(id: &EnvelopeId) -> String {
131    to_hex(&id.0)
132}
133
134fn envelope_id_from_hex(s: &str) -> Result<EnvelopeId, StoreError> {
135    let bytes = from_hex(s)?;
136    let arr: [u8; 32] = bytes.as_slice().try_into().map_err(|_| {
137        StoreError::Internal(format!("expected 32-byte id, got {} bytes", bytes.len()))
138    })?;
139    Ok(EnvelopeId(arr))
140}
141
142fn status_to_i16(s: PostingStatus) -> i16 {
143    match s {
144        PostingStatus::Active => 0,
145        PostingStatus::PendingInactive => 1,
146        PostingStatus::Inactive => 2,
147    }
148}
149
150fn status_from_i16(v: i16) -> Result<PostingStatus, StoreError> {
151    match v {
152        0 => Ok(PostingStatus::Active),
153        1 => Ok(PostingStatus::PendingInactive),
154        2 => Ok(PostingStatus::Inactive),
155        _ => Err(StoreError::Internal(format!("bad posting status: {v}"))),
156    }
157}
158
159fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
160    let id: i64 = row
161        .try_get("id")
162        .map_err(|e| StoreError::Internal(e.to_string()))?;
163    let version: i64 = row
164        .try_get("version")
165        .map_err(|e| StoreError::Internal(e.to_string()))?;
166    let policy_str: String = row
167        .try_get("policy")
168        .map_err(|e| StoreError::Internal(e.to_string()))?;
169    let flags_bits: i32 = row
170        .try_get("flags")
171        .map_err(|e| StoreError::Internal(e.to_string()))?;
172    let book: i64 = row
173        .try_get("book")
174        .map_err(|e| StoreError::Internal(e.to_string()))?;
175    let user_data_json: String = row
176        .try_get("user_data")
177        .map_err(|e| StoreError::Internal(e.to_string()))?;
178    let metadata_json: String = row
179        .try_get("metadata")
180        .map_err(|e| StoreError::Internal(e.to_string()))?;
181
182    Ok(Account {
183        id: AccountId::new(id),
184        version: version as u64,
185        policy: deserialize_policy(&policy_str)?,
186        flags: AccountFlags::from_bits_truncate(flags_bits as u32),
187        book: BookId::new(book),
188        user_data: deserialize_json(&user_data_json)?,
189        metadata: deserialize_json(&metadata_json)?,
190    })
191}
192
193fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
194    let transfer_id: String = row
195        .try_get("transfer_id")
196        .map_err(|e| StoreError::Internal(e.to_string()))?;
197    let idx: i16 = row
198        .try_get("idx")
199        .map_err(|e| StoreError::Internal(e.to_string()))?;
200    let owner: i64 = row
201        .try_get("owner")
202        .map_err(|e| StoreError::Internal(e.to_string()))?;
203    let asset: i32 = row
204        .try_get("asset")
205        .map_err(|e| StoreError::Internal(e.to_string()))?;
206    let value: String = row
207        .try_get("value")
208        .map_err(|e| StoreError::Internal(e.to_string()))?;
209    let value = Cent::from_str(&value).map_err(|e| StoreError::Internal(e.to_string()))?;
210    let status: i16 = row
211        .try_get("status")
212        .map_err(|e| StoreError::Internal(e.to_string()))?;
213    let reservation: Option<i64> = row
214        .try_get("reservation")
215        .map_err(|e| StoreError::Internal(e.to_string()))?;
216
217    Ok(Posting {
218        id: PostingId {
219            transfer: envelope_id_from_hex(&transfer_id)?,
220            index: idx as u16,
221        },
222        owner: AccountId::new(owner),
223        asset: AssetId::new(asset as u32),
224        value,
225        status: status_from_i16(status)?,
226        reservation: reservation.map(ReservationId::new),
227    })
228}
229
230// ---------------------------------------------------------------------------
231// AccountStore
232// ---------------------------------------------------------------------------
233
234#[async_trait]
235impl AccountStore for SqlStore {
236    async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError> {
237        let row = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
238            .bind(id.0)
239            .fetch_optional(&self.pool)
240            .await
241            .map_err(|e| StoreError::Internal(e.to_string()))?
242            .ok_or_else(|| StoreError::NotFound(format!("account {id:?}")))?;
243        row_to_account(&row)
244    }
245
246    async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError> {
247        let mut result = Vec::with_capacity(ids.len());
248        for id in ids {
249            result.push(self.get_account(id).await?);
250        }
251        Ok(result)
252    }
253
254    async fn create_account(&self, account: Account) -> Result<(), StoreError> {
255        let exists = sqlx::query("SELECT 1 FROM accounts WHERE id = $1 LIMIT 1")
256            .bind(account.id.0)
257            .fetch_optional(&self.pool)
258            .await
259            .map_err(|e| StoreError::Internal(e.to_string()))?;
260        if exists.is_some() {
261            return Err(StoreError::AlreadyExists(format!(
262                "account {:?}",
263                account.id
264            )));
265        }
266
267        sqlx::query(
268            "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
269        )
270            .bind(account.id.0)
271            .bind(account.version as i64)
272            .bind(serialize_policy(&account.policy)?)
273            .bind(account.flags.bits() as i32)
274            .bind(account.book.0)
275            .bind(serialize_json(&account.user_data)?)
276            .bind(serialize_json(&account.metadata)?)
277            .execute(&self.pool)
278            .await
279            .map_err(|e| StoreError::Internal(e.to_string()))?;
280        Ok(())
281    }
282
283    async fn append_account_version(&self, account: Account) -> Result<(), StoreError> {
284        let current =
285            sqlx::query("SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
286                .bind(account.id.0)
287                .fetch_optional(&self.pool)
288                .await
289                .map_err(|e| StoreError::Internal(e.to_string()))?
290                .ok_or_else(|| StoreError::NotFound(format!("account {:?}", account.id)))?;
291
292        let current_version: i64 = current
293            .try_get("version")
294            .map_err(|e| StoreError::Internal(e.to_string()))?;
295        let expected = current_version
296            .checked_add(1)
297            .ok_or_else(|| StoreError::Internal("account version overflow".to_string()))?;
298
299        if account.version as i64 != expected {
300            return Err(StoreError::VersionConflict {
301                account: account.id,
302                expected: expected as u64,
303                actual: account.version,
304            });
305        }
306
307        sqlx::query(
308            "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
309        )
310            .bind(account.id.0)
311            .bind(account.version as i64)
312            .bind(serialize_policy(&account.policy)?)
313            .bind(account.flags.bits() as i32)
314            .bind(account.book.0)
315            .bind(serialize_json(&account.user_data)?)
316            .bind(serialize_json(&account.metadata)?)
317            .execute(&self.pool)
318            .await
319            .map_err(|e| StoreError::Internal(e.to_string()))?;
320        Ok(())
321    }
322
323    async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError> {
324        let rows = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version ASC")
325            .bind(id.0)
326            .fetch_all(&self.pool)
327            .await
328            .map_err(|e| StoreError::Internal(e.to_string()))?;
329        if rows.is_empty() {
330            return Err(StoreError::NotFound(format!("account {id:?}")));
331        }
332        rows.iter().map(row_to_account).collect()
333    }
334
335    async fn list_accounts(&self) -> Result<Vec<Account>, StoreError> {
336        let rows = sqlx::query("SELECT * FROM accounts ORDER BY id, version DESC")
337            .fetch_all(&self.pool)
338            .await
339            .map_err(|e| StoreError::Internal(e.to_string()))?;
340        let mut accounts: Vec<Account> =
341            rows.iter().map(row_to_account).collect::<Result<_, _>>()?;
342        accounts.dedup_by_key(|a| a.id);
343        Ok(accounts)
344    }
345}
346
347// ---------------------------------------------------------------------------
348// PostingStore
349// ---------------------------------------------------------------------------
350
351#[async_trait]
352impl PostingStore for SqlStore {
353    async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError> {
354        let mut result = Vec::with_capacity(ids.len());
355        for id in ids {
356            let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
357                .bind(envelope_id_to_hex(&id.transfer))
358                .bind(id.index as i16)
359                .fetch_optional(&self.pool)
360                .await
361                .map_err(|e| StoreError::Internal(e.to_string()))?
362                .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
363            result.push(row_to_posting(&row)?);
364        }
365        Ok(result)
366    }
367
368    async fn get_postings_by_account(
369        &self,
370        account: &AccountId,
371        asset: Option<&AssetId>,
372        status: Option<PostingStatus>,
373    ) -> Result<Vec<Posting>, StoreError> {
374        let rows = match (asset, status) {
375            (Some(a), Some(s)) => {
376                sqlx::query(
377                    "SELECT * FROM postings WHERE owner = $1 AND asset = $2 AND status = $3",
378                )
379                .bind(account.0)
380                .bind(a.0 as i32)
381                .bind(status_to_i16(s))
382                .fetch_all(&self.pool)
383                .await
384            }
385            (Some(a), None) => {
386                sqlx::query("SELECT * FROM postings WHERE owner = $1 AND asset = $2")
387                    .bind(account.0)
388                    .bind(a.0 as i32)
389                    .fetch_all(&self.pool)
390                    .await
391            }
392            (None, Some(s)) => {
393                sqlx::query("SELECT * FROM postings WHERE owner = $1 AND status = $2")
394                    .bind(account.0)
395                    .bind(status_to_i16(s))
396                    .fetch_all(&self.pool)
397                    .await
398            }
399            (None, None) => {
400                sqlx::query("SELECT * FROM postings WHERE owner = $1")
401                    .bind(account.0)
402                    .fetch_all(&self.pool)
403                    .await
404            }
405        }
406        .map_err(|e| StoreError::Internal(e.to_string()))?;
407
408        rows.iter().map(row_to_posting).collect()
409    }
410
411    async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
412        let (where_clause, count_clause) = {
413            let mut w = String::from("WHERE owner = $1");
414            let mut idx = 2u32;
415            if query.asset.is_some() {
416                w.push_str(&format!(" AND asset = ${idx}"));
417                idx += 1;
418            }
419            if query.status.is_some() {
420                w.push_str(&format!(" AND status = ${idx}"));
421            }
422            let c = format!("SELECT COUNT(*) as cnt FROM postings {w}");
423            let limit = query.limit.unwrap_or(u32::MAX);
424            let offset = query.offset.unwrap_or(0);
425            w.push_str(&format!(" LIMIT {limit} OFFSET {offset}"));
426            (format!("SELECT * FROM postings {w}"), c)
427        };
428
429        // Build count query
430        let mut count_q = sqlx::query(&count_clause).bind(query.account.0);
431        if let Some(ref a) = query.asset {
432            count_q = count_q.bind(a.0 as i32);
433        }
434        if let Some(s) = query.status {
435            count_q = count_q.bind(status_to_i16(s));
436        }
437        let count_row = count_q
438            .fetch_one(&self.pool)
439            .await
440            .map_err(|e| StoreError::Internal(e.to_string()))?;
441        let total: i64 = count_row
442            .try_get("cnt")
443            .map_err(|e| StoreError::Internal(e.to_string()))?;
444
445        // Build data query
446        let mut data_q = sqlx::query(&where_clause).bind(query.account.0);
447        if let Some(ref a) = query.asset {
448            data_q = data_q.bind(a.0 as i32);
449        }
450        if let Some(s) = query.status {
451            data_q = data_q.bind(status_to_i16(s));
452        }
453        let rows = data_q
454            .fetch_all(&self.pool)
455            .await
456            .map_err(|e| StoreError::Internal(e.to_string()))?;
457
458        let items: Vec<Posting> = rows.iter().map(row_to_posting).collect::<Result<_, _>>()?;
459        Ok(Page {
460            items,
461            total: total as u64,
462        })
463    }
464
465    async fn reserve_postings(
466        &self,
467        ids: &[PostingId],
468        reservation: ReservationId,
469    ) -> Result<u64, StoreError> {
470        // Dumb instruction: each id flips Active → PendingInactive (the status
471        // precondition is in the WHERE so it is atomic). Return the count of rows
472        // changed; the caller decides what a short count means.
473        let mut tx = self
474            .pool
475            .begin()
476            .await
477            .map_err(|e| StoreError::Internal(e.to_string()))?;
478        let mut reserved: u64 = 0;
479        for id in ids {
480            let res = sqlx::query(
481                "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4 AND status = $5",
482            )
483            .bind(status_to_i16(PostingStatus::PendingInactive))
484            .bind(reservation.0)
485            .bind(envelope_id_to_hex(&id.transfer))
486            .bind(id.index as i16)
487            .bind(status_to_i16(PostingStatus::Active))
488            .execute(&mut *tx)
489            .await
490            .map_err(|e| StoreError::Internal(e.to_string()))?;
491            reserved += res.rows_affected();
492        }
493
494        tx.commit()
495            .await
496            .map_err(|e| StoreError::Internal(e.to_string()))?;
497        Ok(reserved)
498    }
499
500    async fn release_postings(
501        &self,
502        ids: &[PostingId],
503        reservation: ReservationId,
504    ) -> Result<u64, StoreError> {
505        // Dumb instruction: each id reserved by `reservation` flips
506        // PendingInactive → Active. Return the count released; an already-Active
507        // or differently-owned posting simply does not count.
508        let mut tx = self
509            .pool
510            .begin()
511            .await
512            .map_err(|e| StoreError::Internal(e.to_string()))?;
513        let mut released: u64 = 0;
514        for id in ids {
515            let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
516                .bind(status_to_i16(PostingStatus::Active))
517                .bind(envelope_id_to_hex(&id.transfer))
518                .bind(id.index as i16)
519                .bind(status_to_i16(PostingStatus::PendingInactive))
520                .bind(reservation.0)
521                .execute(&mut *tx)
522                .await
523                .map_err(|e| StoreError::Internal(e.to_string()))?;
524            released += res.rows_affected();
525        }
526
527        tx.commit()
528            .await
529            .map_err(|e| StoreError::Internal(e.to_string()))?;
530        Ok(released)
531    }
532
533    async fn deactivate_postings(
534        &self,
535        ids: &[PostingId],
536        reservation: Option<ReservationId>,
537    ) -> Result<u64, StoreError> {
538        let mut tx = self
539            .pool
540            .begin()
541            .await
542            .map_err(|e| StoreError::Internal(e.to_string()))?;
543        let mut changed: u64 = 0;
544        for id in ids {
545            // The precondition is the instruction; the count is the result. The
546            // caller decides what a short count means.
547            let res = match reservation {
548                None => {
549                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
550                        .bind(status_to_i16(PostingStatus::Inactive))
551                        .bind(envelope_id_to_hex(&id.transfer))
552                        .bind(id.index as i16)
553                        .bind(status_to_i16(PostingStatus::Active))
554                        .execute(&mut *tx)
555                        .await
556                }
557                Some(rid) => {
558                    sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
559                        .bind(status_to_i16(PostingStatus::Inactive))
560                        .bind(envelope_id_to_hex(&id.transfer))
561                        .bind(id.index as i16)
562                        .bind(status_to_i16(PostingStatus::PendingInactive))
563                        .bind(rid.0)
564                        .execute(&mut *tx)
565                        .await
566                }
567            }
568            .map_err(|e| StoreError::Internal(e.to_string()))?;
569            changed += res.rows_affected();
570        }
571        tx.commit()
572            .await
573            .map_err(|e| StoreError::Internal(e.to_string()))?;
574        Ok(changed)
575    }
576
577    async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError> {
578        let mut tx = self
579            .pool
580            .begin()
581            .await
582            .map_err(|e| StoreError::Internal(e.to_string()))?;
583        let mut inserted: u64 = 0;
584        for posting in postings {
585            let res = sqlx::query(
586                "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
587            )
588                .bind(envelope_id_to_hex(&posting.id.transfer))
589                .bind(posting.id.index as i16)
590                .bind(posting.owner.0)
591                .bind(posting.asset.0 as i32)
592                .bind(posting.value.to_string())
593                .bind(status_to_i16(posting.status))
594                .execute(&mut *tx)
595                .await
596                .map_err(|e| StoreError::Internal(e.to_string()))?;
597            inserted += res.rows_affected();
598        }
599        tx.commit()
600            .await
601            .map_err(|e| StoreError::Internal(e.to_string()))?;
602        Ok(inserted)
603    }
604}
605
606// ---------------------------------------------------------------------------
607// TransferStore
608// ---------------------------------------------------------------------------
609
610#[async_trait]
611impl TransferStore for SqlStore {
612    async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
613        let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
614            .bind(envelope_id_to_hex(id))
615            .fetch_optional(&self.pool)
616            .await
617            .map_err(|e| StoreError::Internal(e.to_string()))?;
618
619        match row {
620            None => Ok(None),
621            Some(row) => {
622                let transfer_json: String = row
623                    .try_get("transfer")
624                    .map_err(|e| StoreError::Internal(e.to_string()))?;
625                let receipt_json: String = row
626                    .try_get("receipt")
627                    .map_err(|e| StoreError::Internal(e.to_string()))?;
628                let created_at: i64 = row
629                    .try_get("created_at")
630                    .map_err(|e| StoreError::Internal(e.to_string()))?;
631                Ok(Some(EnvelopeRecord {
632                    envelope: deserialize_json(&transfer_json)?,
633                    receipt: deserialize_json(&receipt_json)?,
634                    created_at,
635                }))
636            }
637        }
638    }
639
640    async fn store_transfer(
641        &self,
642        record: EnvelopeRecord,
643        involved: &[AccountId],
644    ) -> Result<u64, StoreError> {
645        let tid = record.receipt.transfer_id;
646        let tid_hex = envelope_id_to_hex(&tid);
647        let transfer_json = serialize_json(&record.envelope)?;
648        let receipt_json = serialize_json(&record.receipt)?;
649
650        let mut tx = self
651            .pool
652            .begin()
653            .await
654            .map_err(|e| StoreError::Internal(e.to_string()))?;
655
656        let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
657            .bind(&tid_hex)
658            .bind(&transfer_json)
659            .bind(&receipt_json)
660            .bind(record.created_at)
661            .bind(record.envelope.book().0)
662            .execute(&mut *tx)
663            .await
664            .map_err(|e| StoreError::Internal(e.to_string()))?;
665        let inserted = res.rows_affected();
666
667        // Index every involved account (caller supplies the set; storage does no
668        // computation). Idempotent so a replay is harmless.
669        for account in involved {
670            sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
671                .bind(&tid_hex)
672                .bind(account.0)
673                .execute(&mut *tx)
674                .await
675                .map_err(|e| StoreError::Internal(e.to_string()))?;
676        }
677
678        tx.commit()
679            .await
680            .map_err(|e| StoreError::Internal(e.to_string()))?;
681        Ok(inserted)
682    }
683
684    async fn get_transfers_for_account(
685        &self,
686        account: &AccountId,
687    ) -> Result<Vec<EnvelopeRecord>, StoreError> {
688        let rows = sqlx::query(
689            "SELECT t.id, t.transfer, t.receipt, t.created_at FROM transfers t INNER JOIN transfer_accounts ta ON t.id = ta.transfer_id WHERE ta.account_id = $1 ORDER BY t.created_at"
690        )
691            .bind(account.0)
692            .fetch_all(&self.pool)
693            .await
694            .map_err(|e| StoreError::Internal(e.to_string()))?;
695
696        let mut result = Vec::with_capacity(rows.len());
697        for row in &rows {
698            let transfer_json: String = row
699                .try_get("transfer")
700                .map_err(|e| StoreError::Internal(e.to_string()))?;
701            let receipt_json: String = row
702                .try_get("receipt")
703                .map_err(|e| StoreError::Internal(e.to_string()))?;
704            let created_at: i64 = row
705                .try_get("created_at")
706                .map_err(|e| StoreError::Internal(e.to_string()))?;
707            result.push(EnvelopeRecord {
708                envelope: deserialize_json(&transfer_json)?,
709                receipt: deserialize_json(&receipt_json)?,
710                created_at,
711            });
712        }
713        Ok(result)
714    }
715
716    async fn query_transfers(
717        &self,
718        query: &TransferQuery,
719    ) -> Result<Page<EnvelopeRecord>, StoreError> {
720        // Load base records, using the account join when available.
721        let base_records = if let Some(ref account) = query.account {
722            self.get_transfers_for_account(account).await?
723        } else {
724            let rows = sqlx::query(
725                "SELECT transfer, receipt, created_at FROM transfers ORDER BY created_at",
726            )
727            .fetch_all(&self.pool)
728            .await
729            .map_err(|e| StoreError::Internal(e.to_string()))?;
730
731            let mut records = Vec::with_capacity(rows.len());
732            for row in &rows {
733                let transfer_json: String = row
734                    .try_get("transfer")
735                    .map_err(|e| StoreError::Internal(e.to_string()))?;
736                let receipt_json: String = row
737                    .try_get("receipt")
738                    .map_err(|e| StoreError::Internal(e.to_string()))?;
739                let created_at: i64 = row
740                    .try_get("created_at")
741                    .map_err(|e| StoreError::Internal(e.to_string()))?;
742                records.push(EnvelopeRecord {
743                    envelope: deserialize_json(&transfer_json)?,
744                    receipt: deserialize_json(&receipt_json)?,
745                    created_at,
746                });
747            }
748            records
749        };
750
751        // Filter in memory for remaining conditions.
752        let filtered: Vec<EnvelopeRecord> = base_records
753            .into_iter()
754            .filter(|r| {
755                if let Some(from) = query.from_ts
756                    && r.created_at < from
757                {
758                    return false;
759                }
760                if let Some(to) = query.to_ts
761                    && r.created_at >= to
762                {
763                    return false;
764                }
765                if let Some(book) = query.book
766                    && r.envelope.book() != book
767                {
768                    return false;
769                }
770                true
771            })
772            .collect();
773
774        let total = filtered.len() as u64;
775        let offset = query.offset.unwrap_or(0) as usize;
776        let limit = query.limit.unwrap_or(u32::MAX) as usize;
777        let items = filtered.into_iter().skip(offset).take(limit).collect();
778
779        Ok(Page { items, total })
780    }
781}
782
783// ---------------------------------------------------------------------------
784// SagaStore
785// ---------------------------------------------------------------------------
786
787#[async_trait]
788impl SagaStore for SqlStore {
789    async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
790        sqlx::query(
791            "INSERT INTO sagas (id, data) VALUES ($1, $2) \
792             ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
793        )
794        .bind(*id)
795        .bind(to_hex(&data))
796        .execute(&self.pool)
797        .await
798        .map_err(|e| StoreError::Internal(e.to_string()))?;
799        Ok(())
800    }
801
802    async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError> {
803        let rows = sqlx::query("SELECT id, data FROM sagas")
804            .fetch_all(&self.pool)
805            .await
806            .map_err(|e| StoreError::Internal(e.to_string()))?;
807        let mut result = Vec::with_capacity(rows.len());
808        for row in &rows {
809            let id: i64 = row
810                .try_get("id")
811                .map_err(|e| StoreError::Internal(e.to_string()))?;
812            let data_hex: String = row
813                .try_get("data")
814                .map_err(|e| StoreError::Internal(e.to_string()))?;
815            result.push((id, from_hex(&data_hex)?));
816        }
817        Ok(result)
818    }
819
820    async fn delete_saga(&self, id: &i64) -> Result<(), StoreError> {
821        sqlx::query("DELETE FROM sagas WHERE id = $1")
822            .bind(*id)
823            .execute(&self.pool)
824            .await
825            .map_err(|e| StoreError::Internal(e.to_string()))?;
826        Ok(())
827    }
828}
829
830// ---------------------------------------------------------------------------
831// EventStore
832// ---------------------------------------------------------------------------
833
834#[async_trait]
835impl EventStore for SqlStore {
836    async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
837        let kind_str =
838            serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
839        let data = serialize_json(event)?;
840        let seq = self.autoid.next() as u64;
841
842        // Idempotent on the dedup key: a replayed transfer event conflicts on
843        // `dedup_key` and returns the existing seq instead of a duplicate row.
844        match kuatia_storage::events::event_dedup_key(&event.kind) {
845            Some(eid) => {
846                let dedup_hex = envelope_id_to_hex(&eid);
847                let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
848                    .bind(seq as i64)
849                    .bind(event.timestamp)
850                    .bind(&kind_str)
851                    .bind(&data)
852                    .bind(&dedup_hex)
853                    .execute(&self.pool)
854                    .await
855                    .map_err(|e| StoreError::Internal(e.to_string()))?;
856                if res.rows_affected() == 0 {
857                    let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
858                        .bind(&dedup_hex)
859                        .fetch_one(&self.pool)
860                        .await
861                        .map_err(|e| StoreError::Internal(e.to_string()))?;
862                    let existing: i64 = row
863                        .try_get("seq")
864                        .map_err(|e| StoreError::Internal(e.to_string()))?;
865                    return Ok(existing as u64);
866                }
867                Ok(seq)
868            }
869            None => {
870                sqlx::query(
871                    "INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)",
872                )
873                .bind(seq as i64)
874                .bind(event.timestamp)
875                .bind(&kind_str)
876                .bind(&data)
877                .execute(&self.pool)
878                .await
879                .map_err(|e| StoreError::Internal(e.to_string()))?;
880                Ok(seq)
881            }
882        }
883    }
884
885    async fn get_events_since(
886        &self,
887        after_seq: u64,
888        limit: u32,
889    ) -> Result<Vec<LedgerEvent>, StoreError> {
890        let rows = sqlx::query("SELECT seq, data FROM events WHERE seq > $1 ORDER BY seq LIMIT $2")
891            .bind(after_seq as i64)
892            .bind(limit as i32)
893            .fetch_all(&self.pool)
894            .await
895            .map_err(|e| StoreError::Internal(e.to_string()))?;
896
897        let mut events = Vec::with_capacity(rows.len());
898        for row in &rows {
899            let seq: i64 = row
900                .try_get("seq")
901                .map_err(|e| StoreError::Internal(e.to_string()))?;
902            let data_json: String = row
903                .try_get("data")
904                .map_err(|e| StoreError::Internal(e.to_string()))?;
905            let mut event: LedgerEvent = deserialize_json(&data_json)?;
906            event.seq = seq as u64;
907            events.push(event);
908        }
909        Ok(events)
910    }
911}
912
913// ---------------------------------------------------------------------------
914// BookStore
915// ---------------------------------------------------------------------------
916
917#[async_trait]
918impl BookStore for SqlStore {
919    async fn create_book(&self, book: Book) -> Result<(), StoreError> {
920        let exists = sqlx::query("SELECT 1 FROM books WHERE id = $1 LIMIT 1")
921            .bind(book.id.0)
922            .fetch_optional(&self.pool)
923            .await
924            .map_err(|e| StoreError::Internal(e.to_string()))?;
925        if exists.is_some() {
926            return Err(StoreError::AlreadyExists(format!("book {:?}", book.id)));
927        }
928
929        let data = serialize_json(&book)?;
930        sqlx::query("INSERT INTO books (id, name, data) VALUES ($1, $2, $3)")
931            .bind(book.id.0)
932            .bind(&book.name)
933            .bind(&data)
934            .execute(&self.pool)
935            .await
936            .map_err(|e| StoreError::Internal(e.to_string()))?;
937        Ok(())
938    }
939
940    async fn get_book(&self, id: &BookId) -> Result<Book, StoreError> {
941        let row = sqlx::query("SELECT data FROM books WHERE id = $1")
942            .bind(id.0)
943            .fetch_optional(&self.pool)
944            .await
945            .map_err(|e| StoreError::Internal(e.to_string()))?
946            .ok_or_else(|| StoreError::NotFound(format!("book {id:?}")))?;
947        let data: String = row
948            .try_get("data")
949            .map_err(|e| StoreError::Internal(e.to_string()))?;
950        deserialize_json(&data)
951    }
952
953    async fn list_books(&self) -> Result<Vec<Book>, StoreError> {
954        let rows = sqlx::query("SELECT data FROM books")
955            .fetch_all(&self.pool)
956            .await
957            .map_err(|e| StoreError::Internal(e.to_string()))?;
958        rows.iter()
959            .map(|row| {
960                let data: String = row
961                    .try_get("data")
962                    .map_err(|e| StoreError::Internal(e.to_string()))?;
963                deserialize_json(&data)
964            })
965            .collect()
966    }
967}