1use std::str::FromStr;
14
15use async_trait::async_trait;
16use sqlx::{Any, Pool, Row};
17
18use kuatia_storage::error::StoreError;
19use kuatia_storage::events::{EventStore, LedgerEvent};
20use kuatia_storage::store::*;
21use kuatia_types::*;
22
23pub struct SqlStore {
25 pool: Pool<Any>,
26 autoid: kuatia_types::autoid::AutoId,
27}
28
29impl SqlStore {
30 pub fn new(pool: Pool<Any>) -> Self {
32 Self {
33 pool,
34 autoid: kuatia_types::autoid::AutoId::new(),
35 }
36 }
37
38 pub async fn migrate(&self) -> Result<(), StoreError> {
42 let is_sqlite = sqlx::query("SELECT sqlite_version()")
44 .fetch_optional(&self.pool)
45 .await
46 .is_ok();
47
48 sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
49 .execute(&self.pool)
50 .await
51 .map_err(|e| StoreError::Internal(e.to_string()))?;
52
53 let migrations: &[(&str, &str)] = if is_sqlite {
54 &[("001_init", include_str!("migrations/sqlite/001_init.sql"))]
55 } else {
56 &[("001_init", include_str!("migrations/postgres/001_init.sql"))]
57 };
58
59 for (name, sql) in migrations {
60 let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
61 .bind(*name)
62 .fetch_optional(&self.pool)
63 .await
64 .map_err(|e| StoreError::Internal(e.to_string()))?;
65 if applied.is_some() {
66 continue;
67 }
68
69 for statement in sql.split(';') {
70 let trimmed = statement.trim();
71 if !trimmed.is_empty() {
72 sqlx::query(trimmed)
73 .execute(&self.pool)
74 .await
75 .map_err(|e| StoreError::Internal(e.to_string()))?;
76 }
77 }
78
79 sqlx::query("INSERT INTO _migrations (name) VALUES ($1)")
80 .bind(*name)
81 .execute(&self.pool)
82 .await
83 .map_err(|e| StoreError::Internal(e.to_string()))?;
84 }
85 Ok(())
86 }
87}
88
89fn serialize_policy(policy: &AccountPolicy) -> Result<String, StoreError> {
94 serde_json::to_string(policy)
95 .map_err(|e| StoreError::Internal(format!("policy serialization: {e}")))
96}
97
98fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
99 serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
100}
101
102fn serialize_blob<T: serde::Serialize>(val: &T) -> Result<Vec<u8>, StoreError> {
103 serde_json::to_vec(val).map_err(|e| StoreError::Internal(format!("blob serialization: {e}")))
104}
105
106fn deserialize_blob<T: serde::de::DeserializeOwned>(bytes: &[u8]) -> Result<T, StoreError> {
107 serde_json::from_slice(bytes).map_err(|e| StoreError::Internal(format!("bad blob: {e}")))
108}
109
110fn status_to_i16(s: PostingStatus) -> i16 {
111 match s {
112 PostingStatus::Active => 0,
113 PostingStatus::PendingInactive => 1,
114 PostingStatus::Inactive => 2,
115 }
116}
117
118fn status_from_i16(v: i16) -> Result<PostingStatus, StoreError> {
119 match v {
120 0 => Ok(PostingStatus::Active),
121 1 => Ok(PostingStatus::PendingInactive),
122 2 => Ok(PostingStatus::Inactive),
123 _ => Err(StoreError::Internal(format!("bad posting status: {v}"))),
124 }
125}
126
127fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
128 let id: i64 = row
129 .try_get("id")
130 .map_err(|e| StoreError::Internal(e.to_string()))?;
131 let version: i64 = row
132 .try_get("version")
133 .map_err(|e| StoreError::Internal(e.to_string()))?;
134 let policy_str: String = row
135 .try_get("policy")
136 .map_err(|e| StoreError::Internal(e.to_string()))?;
137 let flags_bits: i32 = row
138 .try_get("flags")
139 .map_err(|e| StoreError::Internal(e.to_string()))?;
140 let book: i64 = row
141 .try_get("book")
142 .map_err(|e| StoreError::Internal(e.to_string()))?;
143 let user_data_bytes: Vec<u8> = row
144 .try_get("user_data")
145 .map_err(|e| StoreError::Internal(e.to_string()))?;
146 let metadata_bytes: Vec<u8> = row
147 .try_get("metadata")
148 .map_err(|e| StoreError::Internal(e.to_string()))?;
149
150 Ok(Account {
151 id: AccountId::new(id),
152 version: version as u64,
153 policy: deserialize_policy(&policy_str)?,
154 flags: AccountFlags::from_bits_truncate(flags_bits as u32),
155 book: BookId::new(book),
156 user_data: deserialize_blob(&user_data_bytes)?,
157 metadata: deserialize_blob(&metadata_bytes)?,
158 })
159}
160
161fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
162 let transfer_id: Vec<u8> = row
163 .try_get("transfer_id")
164 .map_err(|e| StoreError::Internal(e.to_string()))?;
165 let idx: i16 = row
166 .try_get("idx")
167 .map_err(|e| StoreError::Internal(e.to_string()))?;
168 let owner: i64 = row
169 .try_get("owner")
170 .map_err(|e| StoreError::Internal(e.to_string()))?;
171 let asset: i32 = row
172 .try_get("asset")
173 .map_err(|e| StoreError::Internal(e.to_string()))?;
174 let value: String = row
175 .try_get("value")
176 .map_err(|e| StoreError::Internal(e.to_string()))?;
177 let value = Cent::from_str(&value).map_err(|e| StoreError::Internal(e.to_string()))?;
178 let status: i16 = row
179 .try_get("status")
180 .map_err(|e| StoreError::Internal(e.to_string()))?;
181 let reservation: Option<i64> = row
182 .try_get("reservation")
183 .map_err(|e| StoreError::Internal(e.to_string()))?;
184
185 let mut tid = [0u8; 32];
186 tid.copy_from_slice(&transfer_id);
187
188 Ok(Posting {
189 id: PostingId {
190 transfer: EnvelopeId(tid),
191 index: idx as u16,
192 },
193 owner: AccountId::new(owner),
194 asset: AssetId::new(asset as u32),
195 value,
196 status: status_from_i16(status)?,
197 reservation: reservation.map(ReservationId::new),
198 })
199}
200
201#[async_trait]
206impl AccountStore for SqlStore {
207 async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError> {
208 let row = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
209 .bind(id.0)
210 .fetch_optional(&self.pool)
211 .await
212 .map_err(|e| StoreError::Internal(e.to_string()))?
213 .ok_or_else(|| StoreError::NotFound(format!("account {id:?}")))?;
214 row_to_account(&row)
215 }
216
217 async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError> {
218 let mut result = Vec::with_capacity(ids.len());
219 for id in ids {
220 result.push(self.get_account(id).await?);
221 }
222 Ok(result)
223 }
224
225 async fn create_account(&self, account: Account) -> Result<(), StoreError> {
226 let exists = sqlx::query("SELECT 1 FROM accounts WHERE id = $1 LIMIT 1")
227 .bind(account.id.0)
228 .fetch_optional(&self.pool)
229 .await
230 .map_err(|e| StoreError::Internal(e.to_string()))?;
231 if exists.is_some() {
232 return Err(StoreError::AlreadyExists(format!(
233 "account {:?}",
234 account.id
235 )));
236 }
237
238 sqlx::query(
239 "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
240 )
241 .bind(account.id.0)
242 .bind(account.version as i64)
243 .bind(serialize_policy(&account.policy)?)
244 .bind(account.flags.bits() as i32)
245 .bind(account.book.0)
246 .bind(serialize_blob(&account.user_data)?)
247 .bind(serialize_blob(&account.metadata)?)
248 .execute(&self.pool)
249 .await
250 .map_err(|e| StoreError::Internal(e.to_string()))?;
251 Ok(())
252 }
253
254 async fn append_account_version(&self, account: Account) -> Result<(), StoreError> {
255 let current =
256 sqlx::query("SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
257 .bind(account.id.0)
258 .fetch_optional(&self.pool)
259 .await
260 .map_err(|e| StoreError::Internal(e.to_string()))?
261 .ok_or_else(|| StoreError::NotFound(format!("account {:?}", account.id)))?;
262
263 let current_version: i64 = current
264 .try_get("version")
265 .map_err(|e| StoreError::Internal(e.to_string()))?;
266 let expected = current_version
267 .checked_add(1)
268 .ok_or_else(|| StoreError::Internal("account version overflow".to_string()))?;
269
270 if account.version as i64 != expected {
271 return Err(StoreError::VersionConflict {
272 account: account.id,
273 expected: expected as u64,
274 actual: account.version,
275 });
276 }
277
278 sqlx::query(
279 "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
280 )
281 .bind(account.id.0)
282 .bind(account.version as i64)
283 .bind(serialize_policy(&account.policy)?)
284 .bind(account.flags.bits() as i32)
285 .bind(account.book.0)
286 .bind(serialize_blob(&account.user_data)?)
287 .bind(serialize_blob(&account.metadata)?)
288 .execute(&self.pool)
289 .await
290 .map_err(|e| StoreError::Internal(e.to_string()))?;
291 Ok(())
292 }
293
294 async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError> {
295 let rows = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version ASC")
296 .bind(id.0)
297 .fetch_all(&self.pool)
298 .await
299 .map_err(|e| StoreError::Internal(e.to_string()))?;
300 if rows.is_empty() {
301 return Err(StoreError::NotFound(format!("account {id:?}")));
302 }
303 rows.iter().map(row_to_account).collect()
304 }
305
306 async fn list_accounts(&self) -> Result<Vec<Account>, StoreError> {
307 let rows = sqlx::query("SELECT * FROM accounts ORDER BY id, version DESC")
308 .fetch_all(&self.pool)
309 .await
310 .map_err(|e| StoreError::Internal(e.to_string()))?;
311 let mut accounts: Vec<Account> =
312 rows.iter().map(row_to_account).collect::<Result<_, _>>()?;
313 accounts.dedup_by_key(|a| a.id);
314 Ok(accounts)
315 }
316}
317
318#[async_trait]
323impl PostingStore for SqlStore {
324 async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError> {
325 let mut result = Vec::with_capacity(ids.len());
326 for id in ids {
327 let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
328 .bind(id.transfer.0.as_slice())
329 .bind(id.index as i16)
330 .fetch_optional(&self.pool)
331 .await
332 .map_err(|e| StoreError::Internal(e.to_string()))?
333 .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
334 result.push(row_to_posting(&row)?);
335 }
336 Ok(result)
337 }
338
339 async fn get_postings_by_account(
340 &self,
341 account: &AccountId,
342 asset: Option<&AssetId>,
343 status: Option<PostingStatus>,
344 ) -> Result<Vec<Posting>, StoreError> {
345 let rows = match (asset, status) {
346 (Some(a), Some(s)) => {
347 sqlx::query(
348 "SELECT * FROM postings WHERE owner = $1 AND asset = $2 AND status = $3",
349 )
350 .bind(account.0)
351 .bind(a.0 as i32)
352 .bind(status_to_i16(s))
353 .fetch_all(&self.pool)
354 .await
355 }
356 (Some(a), None) => {
357 sqlx::query("SELECT * FROM postings WHERE owner = $1 AND asset = $2")
358 .bind(account.0)
359 .bind(a.0 as i32)
360 .fetch_all(&self.pool)
361 .await
362 }
363 (None, Some(s)) => {
364 sqlx::query("SELECT * FROM postings WHERE owner = $1 AND status = $2")
365 .bind(account.0)
366 .bind(status_to_i16(s))
367 .fetch_all(&self.pool)
368 .await
369 }
370 (None, None) => {
371 sqlx::query("SELECT * FROM postings WHERE owner = $1")
372 .bind(account.0)
373 .fetch_all(&self.pool)
374 .await
375 }
376 }
377 .map_err(|e| StoreError::Internal(e.to_string()))?;
378
379 rows.iter().map(row_to_posting).collect()
380 }
381
382 async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
383 let (where_clause, count_clause) = {
384 let mut w = String::from("WHERE owner = $1");
385 let mut idx = 2u32;
386 if query.asset.is_some() {
387 w.push_str(&format!(" AND asset = ${idx}"));
388 idx += 1;
389 }
390 if query.status.is_some() {
391 w.push_str(&format!(" AND status = ${idx}"));
392 }
393 let c = format!("SELECT COUNT(*) as cnt FROM postings {w}");
394 let limit = query.limit.unwrap_or(u32::MAX);
395 let offset = query.offset.unwrap_or(0);
396 w.push_str(&format!(" LIMIT {limit} OFFSET {offset}"));
397 (format!("SELECT * FROM postings {w}"), c)
398 };
399
400 let mut count_q = sqlx::query(&count_clause).bind(query.account.0);
402 if let Some(ref a) = query.asset {
403 count_q = count_q.bind(a.0 as i32);
404 }
405 if let Some(s) = query.status {
406 count_q = count_q.bind(status_to_i16(s));
407 }
408 let count_row = count_q
409 .fetch_one(&self.pool)
410 .await
411 .map_err(|e| StoreError::Internal(e.to_string()))?;
412 let total: i64 = count_row
413 .try_get("cnt")
414 .map_err(|e| StoreError::Internal(e.to_string()))?;
415
416 let mut data_q = sqlx::query(&where_clause).bind(query.account.0);
418 if let Some(ref a) = query.asset {
419 data_q = data_q.bind(a.0 as i32);
420 }
421 if let Some(s) = query.status {
422 data_q = data_q.bind(status_to_i16(s));
423 }
424 let rows = data_q
425 .fetch_all(&self.pool)
426 .await
427 .map_err(|e| StoreError::Internal(e.to_string()))?;
428
429 let items: Vec<Posting> = rows.iter().map(row_to_posting).collect::<Result<_, _>>()?;
430 Ok(Page {
431 items,
432 total: total as u64,
433 })
434 }
435
436 async fn reserve_postings(
437 &self,
438 ids: &[PostingId],
439 reservation: ReservationId,
440 ) -> Result<u64, StoreError> {
441 let mut tx = self
445 .pool
446 .begin()
447 .await
448 .map_err(|e| StoreError::Internal(e.to_string()))?;
449 let mut reserved: u64 = 0;
450 for id in ids {
451 let res = sqlx::query(
452 "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4 AND status = $5",
453 )
454 .bind(status_to_i16(PostingStatus::PendingInactive))
455 .bind(reservation.0)
456 .bind(id.transfer.0.as_slice())
457 .bind(id.index as i16)
458 .bind(status_to_i16(PostingStatus::Active))
459 .execute(&mut *tx)
460 .await
461 .map_err(|e| StoreError::Internal(e.to_string()))?;
462 reserved += res.rows_affected();
463 }
464
465 tx.commit()
466 .await
467 .map_err(|e| StoreError::Internal(e.to_string()))?;
468 Ok(reserved)
469 }
470
471 async fn release_postings(
472 &self,
473 ids: &[PostingId],
474 reservation: ReservationId,
475 ) -> Result<u64, StoreError> {
476 let mut tx = self
480 .pool
481 .begin()
482 .await
483 .map_err(|e| StoreError::Internal(e.to_string()))?;
484 let mut released: u64 = 0;
485 for id in ids {
486 let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
487 .bind(status_to_i16(PostingStatus::Active))
488 .bind(id.transfer.0.as_slice())
489 .bind(id.index as i16)
490 .bind(status_to_i16(PostingStatus::PendingInactive))
491 .bind(reservation.0)
492 .execute(&mut *tx)
493 .await
494 .map_err(|e| StoreError::Internal(e.to_string()))?;
495 released += res.rows_affected();
496 }
497
498 tx.commit()
499 .await
500 .map_err(|e| StoreError::Internal(e.to_string()))?;
501 Ok(released)
502 }
503
504 async fn deactivate_postings(
505 &self,
506 ids: &[PostingId],
507 reservation: Option<ReservationId>,
508 ) -> Result<u64, StoreError> {
509 let mut tx = self
510 .pool
511 .begin()
512 .await
513 .map_err(|e| StoreError::Internal(e.to_string()))?;
514 let mut changed: u64 = 0;
515 for id in ids {
516 let res = match reservation {
519 None => {
520 sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
521 .bind(status_to_i16(PostingStatus::Inactive))
522 .bind(id.transfer.0.as_slice())
523 .bind(id.index as i16)
524 .bind(status_to_i16(PostingStatus::Active))
525 .execute(&mut *tx)
526 .await
527 }
528 Some(rid) => {
529 sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
530 .bind(status_to_i16(PostingStatus::Inactive))
531 .bind(id.transfer.0.as_slice())
532 .bind(id.index as i16)
533 .bind(status_to_i16(PostingStatus::PendingInactive))
534 .bind(rid.0)
535 .execute(&mut *tx)
536 .await
537 }
538 }
539 .map_err(|e| StoreError::Internal(e.to_string()))?;
540 changed += res.rows_affected();
541 }
542 tx.commit()
543 .await
544 .map_err(|e| StoreError::Internal(e.to_string()))?;
545 Ok(changed)
546 }
547
548 async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError> {
549 let mut tx = self
550 .pool
551 .begin()
552 .await
553 .map_err(|e| StoreError::Internal(e.to_string()))?;
554 let mut inserted: u64 = 0;
555 for posting in postings {
556 let res = sqlx::query(
557 "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
558 )
559 .bind(posting.id.transfer.0.as_slice())
560 .bind(posting.id.index as i16)
561 .bind(posting.owner.0)
562 .bind(posting.asset.0 as i32)
563 .bind(posting.value.to_string())
564 .bind(status_to_i16(posting.status))
565 .execute(&mut *tx)
566 .await
567 .map_err(|e| StoreError::Internal(e.to_string()))?;
568 inserted += res.rows_affected();
569 }
570 tx.commit()
571 .await
572 .map_err(|e| StoreError::Internal(e.to_string()))?;
573 Ok(inserted)
574 }
575}
576
577#[async_trait]
582impl TransferStore for SqlStore {
583 async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
584 let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
585 .bind(id.0.as_slice())
586 .fetch_optional(&self.pool)
587 .await
588 .map_err(|e| StoreError::Internal(e.to_string()))?;
589
590 match row {
591 None => Ok(None),
592 Some(row) => {
593 let transfer_bytes: Vec<u8> = row
594 .try_get("transfer")
595 .map_err(|e| StoreError::Internal(e.to_string()))?;
596 let receipt_bytes: Vec<u8> = row
597 .try_get("receipt")
598 .map_err(|e| StoreError::Internal(e.to_string()))?;
599 let created_at: i64 = row
600 .try_get("created_at")
601 .map_err(|e| StoreError::Internal(e.to_string()))?;
602 Ok(Some(EnvelopeRecord {
603 envelope: deserialize_blob(&transfer_bytes)?,
604 receipt: deserialize_blob(&receipt_bytes)?,
605 created_at,
606 }))
607 }
608 }
609 }
610
611 async fn store_transfer(
612 &self,
613 record: EnvelopeRecord,
614 involved: &[AccountId],
615 ) -> Result<u64, StoreError> {
616 let tid = record.receipt.transfer_id;
617 let transfer_bytes = serialize_blob(&record.envelope)?;
618 let receipt_bytes = serialize_blob(&record.receipt)?;
619
620 let mut tx = self
621 .pool
622 .begin()
623 .await
624 .map_err(|e| StoreError::Internal(e.to_string()))?;
625
626 let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
627 .bind(tid.0.as_slice())
628 .bind(&transfer_bytes)
629 .bind(&receipt_bytes)
630 .bind(record.created_at)
631 .bind(record.envelope.book().0)
632 .execute(&mut *tx)
633 .await
634 .map_err(|e| StoreError::Internal(e.to_string()))?;
635 let inserted = res.rows_affected();
636
637 for account in involved {
640 sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
641 .bind(tid.0.as_slice())
642 .bind(account.0)
643 .execute(&mut *tx)
644 .await
645 .map_err(|e| StoreError::Internal(e.to_string()))?;
646 }
647
648 tx.commit()
649 .await
650 .map_err(|e| StoreError::Internal(e.to_string()))?;
651 Ok(inserted)
652 }
653
654 async fn get_transfers_for_account(
655 &self,
656 account: &AccountId,
657 ) -> Result<Vec<EnvelopeRecord>, StoreError> {
658 let rows = sqlx::query(
659 "SELECT t.id, t.transfer, t.receipt, t.created_at FROM transfers t INNER JOIN transfer_accounts ta ON t.id = ta.transfer_id WHERE ta.account_id = $1 ORDER BY t.created_at"
660 )
661 .bind(account.0)
662 .fetch_all(&self.pool)
663 .await
664 .map_err(|e| StoreError::Internal(e.to_string()))?;
665
666 let mut result = Vec::with_capacity(rows.len());
667 for row in &rows {
668 let transfer_bytes: Vec<u8> = row
669 .try_get("transfer")
670 .map_err(|e| StoreError::Internal(e.to_string()))?;
671 let receipt_bytes: Vec<u8> = row
672 .try_get("receipt")
673 .map_err(|e| StoreError::Internal(e.to_string()))?;
674 let created_at: i64 = row
675 .try_get("created_at")
676 .map_err(|e| StoreError::Internal(e.to_string()))?;
677 result.push(EnvelopeRecord {
678 envelope: deserialize_blob(&transfer_bytes)?,
679 receipt: deserialize_blob(&receipt_bytes)?,
680 created_at,
681 });
682 }
683 Ok(result)
684 }
685
686 async fn query_transfers(
687 &self,
688 query: &TransferQuery,
689 ) -> Result<Page<EnvelopeRecord>, StoreError> {
690 let base_records = if let Some(ref account) = query.account {
692 self.get_transfers_for_account(account).await?
693 } else {
694 let rows = sqlx::query(
695 "SELECT transfer, receipt, created_at FROM transfers ORDER BY created_at",
696 )
697 .fetch_all(&self.pool)
698 .await
699 .map_err(|e| StoreError::Internal(e.to_string()))?;
700
701 let mut records = Vec::with_capacity(rows.len());
702 for row in &rows {
703 let transfer_bytes: Vec<u8> = row
704 .try_get("transfer")
705 .map_err(|e| StoreError::Internal(e.to_string()))?;
706 let receipt_bytes: Vec<u8> = row
707 .try_get("receipt")
708 .map_err(|e| StoreError::Internal(e.to_string()))?;
709 let created_at: i64 = row
710 .try_get("created_at")
711 .map_err(|e| StoreError::Internal(e.to_string()))?;
712 records.push(EnvelopeRecord {
713 envelope: deserialize_blob(&transfer_bytes)?,
714 receipt: deserialize_blob(&receipt_bytes)?,
715 created_at,
716 });
717 }
718 records
719 };
720
721 let filtered: Vec<EnvelopeRecord> = base_records
723 .into_iter()
724 .filter(|r| {
725 if let Some(from) = query.from_ts
726 && r.created_at < from
727 {
728 return false;
729 }
730 if let Some(to) = query.to_ts
731 && r.created_at >= to
732 {
733 return false;
734 }
735 if let Some(book) = query.book
736 && r.envelope.book() != book
737 {
738 return false;
739 }
740 true
741 })
742 .collect();
743
744 let total = filtered.len() as u64;
745 let offset = query.offset.unwrap_or(0) as usize;
746 let limit = query.limit.unwrap_or(u32::MAX) as usize;
747 let items = filtered.into_iter().skip(offset).take(limit).collect();
748
749 Ok(Page { items, total })
750 }
751}
752
753#[async_trait]
758impl SagaStore for SqlStore {
759 async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
760 sqlx::query(
761 "INSERT INTO sagas (id, data) VALUES ($1, $2) \
762 ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
763 )
764 .bind(*id)
765 .bind(&data)
766 .execute(&self.pool)
767 .await
768 .map_err(|e| StoreError::Internal(e.to_string()))?;
769 Ok(())
770 }
771
772 async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError> {
773 let rows = sqlx::query("SELECT id, data FROM sagas")
774 .fetch_all(&self.pool)
775 .await
776 .map_err(|e| StoreError::Internal(e.to_string()))?;
777 let mut result = Vec::with_capacity(rows.len());
778 for row in &rows {
779 let id: i64 = row
780 .try_get("id")
781 .map_err(|e| StoreError::Internal(e.to_string()))?;
782 let data: Vec<u8> = row
783 .try_get("data")
784 .map_err(|e| StoreError::Internal(e.to_string()))?;
785 result.push((id, data));
786 }
787 Ok(result)
788 }
789
790 async fn delete_saga(&self, id: &i64) -> Result<(), StoreError> {
791 sqlx::query("DELETE FROM sagas WHERE id = $1")
792 .bind(*id)
793 .execute(&self.pool)
794 .await
795 .map_err(|e| StoreError::Internal(e.to_string()))?;
796 Ok(())
797 }
798}
799
800#[async_trait]
805impl EventStore for SqlStore {
806 async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
807 let kind_str =
808 serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
809 let data = serialize_blob(event)?;
810 let seq = self.autoid.next() as u64;
811
812 match kuatia_storage::events::event_dedup_key(&event.kind) {
815 Some(eid) => {
816 let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
817 .bind(seq as i64)
818 .bind(event.timestamp)
819 .bind(&kind_str)
820 .bind(&data)
821 .bind(eid.0.as_slice())
822 .execute(&self.pool)
823 .await
824 .map_err(|e| StoreError::Internal(e.to_string()))?;
825 if res.rows_affected() == 0 {
826 let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
827 .bind(eid.0.as_slice())
828 .fetch_one(&self.pool)
829 .await
830 .map_err(|e| StoreError::Internal(e.to_string()))?;
831 let existing: i64 = row
832 .try_get("seq")
833 .map_err(|e| StoreError::Internal(e.to_string()))?;
834 return Ok(existing as u64);
835 }
836 Ok(seq)
837 }
838 None => {
839 sqlx::query(
840 "INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)",
841 )
842 .bind(seq as i64)
843 .bind(event.timestamp)
844 .bind(&kind_str)
845 .bind(&data)
846 .execute(&self.pool)
847 .await
848 .map_err(|e| StoreError::Internal(e.to_string()))?;
849 Ok(seq)
850 }
851 }
852 }
853
854 async fn get_events_since(
855 &self,
856 after_seq: u64,
857 limit: u32,
858 ) -> Result<Vec<LedgerEvent>, StoreError> {
859 let rows = sqlx::query("SELECT seq, data FROM events WHERE seq > $1 ORDER BY seq LIMIT $2")
860 .bind(after_seq as i64)
861 .bind(limit as i32)
862 .fetch_all(&self.pool)
863 .await
864 .map_err(|e| StoreError::Internal(e.to_string()))?;
865
866 let mut events = Vec::with_capacity(rows.len());
867 for row in &rows {
868 let seq: i64 = row
869 .try_get("seq")
870 .map_err(|e| StoreError::Internal(e.to_string()))?;
871 let data: Vec<u8> = row
872 .try_get("data")
873 .map_err(|e| StoreError::Internal(e.to_string()))?;
874 let mut event: LedgerEvent = deserialize_blob(&data)?;
875 event.seq = seq as u64;
876 events.push(event);
877 }
878 Ok(events)
879 }
880}
881
882#[async_trait]
887impl BookStore for SqlStore {
888 async fn create_book(&self, book: Book) -> Result<(), StoreError> {
889 let exists = sqlx::query("SELECT 1 FROM books WHERE id = $1 LIMIT 1")
890 .bind(book.id.0)
891 .fetch_optional(&self.pool)
892 .await
893 .map_err(|e| StoreError::Internal(e.to_string()))?;
894 if exists.is_some() {
895 return Err(StoreError::AlreadyExists(format!("book {:?}", book.id)));
896 }
897
898 let data = serialize_blob(&book)?;
899 sqlx::query("INSERT INTO books (id, name, data) VALUES ($1, $2, $3)")
900 .bind(book.id.0)
901 .bind(&book.name)
902 .bind(&data)
903 .execute(&self.pool)
904 .await
905 .map_err(|e| StoreError::Internal(e.to_string()))?;
906 Ok(())
907 }
908
909 async fn get_book(&self, id: &BookId) -> Result<Book, StoreError> {
910 let row = sqlx::query("SELECT data FROM books WHERE id = $1")
911 .bind(id.0)
912 .fetch_optional(&self.pool)
913 .await
914 .map_err(|e| StoreError::Internal(e.to_string()))?
915 .ok_or_else(|| StoreError::NotFound(format!("book {id:?}")))?;
916 let data: Vec<u8> = row
917 .try_get("data")
918 .map_err(|e| StoreError::Internal(e.to_string()))?;
919 deserialize_blob(&data)
920 }
921
922 async fn list_books(&self) -> Result<Vec<Book>, StoreError> {
923 let rows = sqlx::query("SELECT data FROM books")
924 .fetch_all(&self.pool)
925 .await
926 .map_err(|e| StoreError::Internal(e.to_string()))?;
927 rows.iter()
928 .map(|row| {
929 let data: Vec<u8> = row
930 .try_get("data")
931 .map_err(|e| StoreError::Internal(e.to_string()))?;
932 deserialize_blob(&data)
933 })
934 .collect()
935 }
936}