1use std::str::FromStr;
14
15use async_trait::async_trait;
16use sqlx::{Any, Pool, Row};
17
18use kuatia_storage::error::StoreError;
19use kuatia_storage::events::{EventStore, LedgerEvent};
20use kuatia_storage::store::*;
21use kuatia_types::*;
22
23pub struct SqlStore {
25 pool: Pool<Any>,
26 autoid: kuatia_types::autoid::AutoId,
27}
28
29impl SqlStore {
30 pub fn new(pool: Pool<Any>) -> Self {
32 Self {
33 pool,
34 autoid: kuatia_types::autoid::AutoId::new(),
35 }
36 }
37
38 pub async fn migrate(&self) -> Result<(), StoreError> {
45 sqlx::query("CREATE TABLE IF NOT EXISTS _migrations (name TEXT PRIMARY KEY)")
46 .execute(&self.pool)
47 .await
48 .map_err(|e| StoreError::Internal(e.to_string()))?;
49
50 let migrations: &[(&str, &str)] = &[("001_init", include_str!("migrations/001_init.sql"))];
51
52 for (name, sql) in migrations {
53 let applied = sqlx::query("SELECT 1 FROM _migrations WHERE name = $1")
54 .bind(*name)
55 .fetch_optional(&self.pool)
56 .await
57 .map_err(|e| StoreError::Internal(e.to_string()))?;
58 if applied.is_some() {
59 continue;
60 }
61
62 for statement in sql.split(';') {
63 let trimmed = statement.trim();
64 if !trimmed.is_empty() {
65 sqlx::query(trimmed)
66 .execute(&self.pool)
67 .await
68 .map_err(|e| StoreError::Internal(e.to_string()))?;
69 }
70 }
71
72 sqlx::query("INSERT INTO _migrations (name) VALUES ($1)")
73 .bind(*name)
74 .execute(&self.pool)
75 .await
76 .map_err(|e| StoreError::Internal(e.to_string()))?;
77 }
78 Ok(())
79 }
80}
81
82fn serialize_policy(policy: &AccountPolicy) -> Result<String, StoreError> {
87 serde_json::to_string(policy)
88 .map_err(|e| StoreError::Internal(format!("policy serialization: {e}")))
89}
90
91fn deserialize_policy(s: &str) -> Result<AccountPolicy, StoreError> {
92 serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad policy: {e}")))
93}
94
95fn serialize_json<T: serde::Serialize>(val: &T) -> Result<String, StoreError> {
99 serde_json::to_string(val).map_err(|e| StoreError::Internal(format!("json serialization: {e}")))
100}
101
102fn deserialize_json<T: serde::de::DeserializeOwned>(s: &str) -> Result<T, StoreError> {
103 serde_json::from_str(s).map_err(|e| StoreError::Internal(format!("bad json: {e}")))
104}
105
106fn to_hex(bytes: &[u8]) -> String {
110 let mut s = String::with_capacity(bytes.len() * 2);
111 for b in bytes {
112 s.push_str(&format!("{b:02x}"));
113 }
114 s
115}
116
117fn from_hex(s: &str) -> Result<Vec<u8>, StoreError> {
118 if s.len() % 2 != 0 {
119 return Err(StoreError::Internal(format!("odd-length hex: {s:?}")));
120 }
121 (0..s.len())
122 .step_by(2)
123 .map(|i| {
124 u8::from_str_radix(&s[i..i + 2], 16)
125 .map_err(|e| StoreError::Internal(format!("bad hex: {e}")))
126 })
127 .collect()
128}
129
130fn envelope_id_to_hex(id: &EnvelopeId) -> String {
131 to_hex(&id.0)
132}
133
134fn envelope_id_from_hex(s: &str) -> Result<EnvelopeId, StoreError> {
135 let bytes = from_hex(s)?;
136 let arr: [u8; 32] = bytes.as_slice().try_into().map_err(|_| {
137 StoreError::Internal(format!("expected 32-byte id, got {} bytes", bytes.len()))
138 })?;
139 Ok(EnvelopeId(arr))
140}
141
142fn status_to_i16(s: PostingStatus) -> i16 {
143 match s {
144 PostingStatus::Active => 0,
145 PostingStatus::PendingInactive => 1,
146 PostingStatus::Inactive => 2,
147 }
148}
149
150fn status_from_i16(v: i16) -> Result<PostingStatus, StoreError> {
151 match v {
152 0 => Ok(PostingStatus::Active),
153 1 => Ok(PostingStatus::PendingInactive),
154 2 => Ok(PostingStatus::Inactive),
155 _ => Err(StoreError::Internal(format!("bad posting status: {v}"))),
156 }
157}
158
159fn row_to_account(row: &sqlx::any::AnyRow) -> Result<Account, StoreError> {
160 let id: i64 = row
161 .try_get("id")
162 .map_err(|e| StoreError::Internal(e.to_string()))?;
163 let version: i64 = row
164 .try_get("version")
165 .map_err(|e| StoreError::Internal(e.to_string()))?;
166 let policy_str: String = row
167 .try_get("policy")
168 .map_err(|e| StoreError::Internal(e.to_string()))?;
169 let flags_bits: i32 = row
170 .try_get("flags")
171 .map_err(|e| StoreError::Internal(e.to_string()))?;
172 let book: i64 = row
173 .try_get("book")
174 .map_err(|e| StoreError::Internal(e.to_string()))?;
175 let user_data_json: String = row
176 .try_get("user_data")
177 .map_err(|e| StoreError::Internal(e.to_string()))?;
178 let metadata_json: String = row
179 .try_get("metadata")
180 .map_err(|e| StoreError::Internal(e.to_string()))?;
181
182 Ok(Account {
183 id: AccountId::new(id),
184 version: version as u64,
185 policy: deserialize_policy(&policy_str)?,
186 flags: AccountFlags::from_bits_truncate(flags_bits as u32),
187 book: BookId::new(book),
188 user_data: deserialize_json(&user_data_json)?,
189 metadata: deserialize_json(&metadata_json)?,
190 })
191}
192
193fn row_to_posting(row: &sqlx::any::AnyRow) -> Result<Posting, StoreError> {
194 let transfer_id: String = row
195 .try_get("transfer_id")
196 .map_err(|e| StoreError::Internal(e.to_string()))?;
197 let idx: i16 = row
198 .try_get("idx")
199 .map_err(|e| StoreError::Internal(e.to_string()))?;
200 let owner: i64 = row
201 .try_get("owner")
202 .map_err(|e| StoreError::Internal(e.to_string()))?;
203 let asset: i32 = row
204 .try_get("asset")
205 .map_err(|e| StoreError::Internal(e.to_string()))?;
206 let value: String = row
207 .try_get("value")
208 .map_err(|e| StoreError::Internal(e.to_string()))?;
209 let value = Cent::from_str(&value).map_err(|e| StoreError::Internal(e.to_string()))?;
210 let status: i16 = row
211 .try_get("status")
212 .map_err(|e| StoreError::Internal(e.to_string()))?;
213 let reservation: Option<i64> = row
214 .try_get("reservation")
215 .map_err(|e| StoreError::Internal(e.to_string()))?;
216
217 Ok(Posting {
218 id: PostingId {
219 transfer: envelope_id_from_hex(&transfer_id)?,
220 index: idx as u16,
221 },
222 owner: AccountId::new(owner),
223 asset: AssetId::new(asset as u32),
224 value,
225 status: status_from_i16(status)?,
226 reservation: reservation.map(ReservationId::new),
227 })
228}
229
230#[async_trait]
235impl AccountStore for SqlStore {
236 async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError> {
237 let row = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
238 .bind(id.0)
239 .fetch_optional(&self.pool)
240 .await
241 .map_err(|e| StoreError::Internal(e.to_string()))?
242 .ok_or_else(|| StoreError::NotFound(format!("account {id:?}")))?;
243 row_to_account(&row)
244 }
245
246 async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError> {
247 let mut result = Vec::with_capacity(ids.len());
248 for id in ids {
249 result.push(self.get_account(id).await?);
250 }
251 Ok(result)
252 }
253
254 async fn create_account(&self, account: Account) -> Result<(), StoreError> {
255 let exists = sqlx::query("SELECT 1 FROM accounts WHERE id = $1 LIMIT 1")
256 .bind(account.id.0)
257 .fetch_optional(&self.pool)
258 .await
259 .map_err(|e| StoreError::Internal(e.to_string()))?;
260 if exists.is_some() {
261 return Err(StoreError::AlreadyExists(format!(
262 "account {:?}",
263 account.id
264 )));
265 }
266
267 sqlx::query(
268 "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
269 )
270 .bind(account.id.0)
271 .bind(account.version as i64)
272 .bind(serialize_policy(&account.policy)?)
273 .bind(account.flags.bits() as i32)
274 .bind(account.book.0)
275 .bind(serialize_json(&account.user_data)?)
276 .bind(serialize_json(&account.metadata)?)
277 .execute(&self.pool)
278 .await
279 .map_err(|e| StoreError::Internal(e.to_string()))?;
280 Ok(())
281 }
282
283 async fn append_account_version(&self, account: Account) -> Result<(), StoreError> {
284 let current =
285 sqlx::query("SELECT version FROM accounts WHERE id = $1 ORDER BY version DESC LIMIT 1")
286 .bind(account.id.0)
287 .fetch_optional(&self.pool)
288 .await
289 .map_err(|e| StoreError::Internal(e.to_string()))?
290 .ok_or_else(|| StoreError::NotFound(format!("account {:?}", account.id)))?;
291
292 let current_version: i64 = current
293 .try_get("version")
294 .map_err(|e| StoreError::Internal(e.to_string()))?;
295 let expected = current_version
296 .checked_add(1)
297 .ok_or_else(|| StoreError::Internal("account version overflow".to_string()))?;
298
299 if account.version as i64 != expected {
300 return Err(StoreError::VersionConflict {
301 account: account.id,
302 expected: expected as u64,
303 actual: account.version,
304 });
305 }
306
307 sqlx::query(
308 "INSERT INTO accounts (id, version, policy, flags, book, user_data, metadata) VALUES ($1, $2, $3, $4, $5, $6, $7)"
309 )
310 .bind(account.id.0)
311 .bind(account.version as i64)
312 .bind(serialize_policy(&account.policy)?)
313 .bind(account.flags.bits() as i32)
314 .bind(account.book.0)
315 .bind(serialize_json(&account.user_data)?)
316 .bind(serialize_json(&account.metadata)?)
317 .execute(&self.pool)
318 .await
319 .map_err(|e| StoreError::Internal(e.to_string()))?;
320 Ok(())
321 }
322
323 async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError> {
324 let rows = sqlx::query("SELECT * FROM accounts WHERE id = $1 ORDER BY version ASC")
325 .bind(id.0)
326 .fetch_all(&self.pool)
327 .await
328 .map_err(|e| StoreError::Internal(e.to_string()))?;
329 if rows.is_empty() {
330 return Err(StoreError::NotFound(format!("account {id:?}")));
331 }
332 rows.iter().map(row_to_account).collect()
333 }
334
335 async fn list_accounts(&self) -> Result<Vec<Account>, StoreError> {
336 let rows = sqlx::query("SELECT * FROM accounts ORDER BY id, version DESC")
337 .fetch_all(&self.pool)
338 .await
339 .map_err(|e| StoreError::Internal(e.to_string()))?;
340 let mut accounts: Vec<Account> =
341 rows.iter().map(row_to_account).collect::<Result<_, _>>()?;
342 accounts.dedup_by_key(|a| a.id);
343 Ok(accounts)
344 }
345}
346
347#[async_trait]
352impl PostingStore for SqlStore {
353 async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError> {
354 let mut result = Vec::with_capacity(ids.len());
355 for id in ids {
356 let row = sqlx::query("SELECT * FROM postings WHERE transfer_id = $1 AND idx = $2")
357 .bind(envelope_id_to_hex(&id.transfer))
358 .bind(id.index as i16)
359 .fetch_optional(&self.pool)
360 .await
361 .map_err(|e| StoreError::Internal(e.to_string()))?
362 .ok_or_else(|| StoreError::NotFound(format!("posting {id:?}")))?;
363 result.push(row_to_posting(&row)?);
364 }
365 Ok(result)
366 }
367
368 async fn get_postings_by_account(
369 &self,
370 account: &AccountId,
371 asset: Option<&AssetId>,
372 status: Option<PostingStatus>,
373 ) -> Result<Vec<Posting>, StoreError> {
374 let rows = match (asset, status) {
375 (Some(a), Some(s)) => {
376 sqlx::query(
377 "SELECT * FROM postings WHERE owner = $1 AND asset = $2 AND status = $3",
378 )
379 .bind(account.0)
380 .bind(a.0 as i32)
381 .bind(status_to_i16(s))
382 .fetch_all(&self.pool)
383 .await
384 }
385 (Some(a), None) => {
386 sqlx::query("SELECT * FROM postings WHERE owner = $1 AND asset = $2")
387 .bind(account.0)
388 .bind(a.0 as i32)
389 .fetch_all(&self.pool)
390 .await
391 }
392 (None, Some(s)) => {
393 sqlx::query("SELECT * FROM postings WHERE owner = $1 AND status = $2")
394 .bind(account.0)
395 .bind(status_to_i16(s))
396 .fetch_all(&self.pool)
397 .await
398 }
399 (None, None) => {
400 sqlx::query("SELECT * FROM postings WHERE owner = $1")
401 .bind(account.0)
402 .fetch_all(&self.pool)
403 .await
404 }
405 }
406 .map_err(|e| StoreError::Internal(e.to_string()))?;
407
408 rows.iter().map(row_to_posting).collect()
409 }
410
411 async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
412 let (where_clause, count_clause) = {
413 let mut w = String::from("WHERE owner = $1");
414 let mut idx = 2u32;
415 if query.asset.is_some() {
416 w.push_str(&format!(" AND asset = ${idx}"));
417 idx += 1;
418 }
419 if query.status.is_some() {
420 w.push_str(&format!(" AND status = ${idx}"));
421 }
422 let c = format!("SELECT COUNT(*) as cnt FROM postings {w}");
423 let limit = query.limit.unwrap_or(u32::MAX);
424 let offset = query.offset.unwrap_or(0);
425 w.push_str(&format!(" LIMIT {limit} OFFSET {offset}"));
426 (format!("SELECT * FROM postings {w}"), c)
427 };
428
429 let mut count_q = sqlx::query(&count_clause).bind(query.account.0);
431 if let Some(ref a) = query.asset {
432 count_q = count_q.bind(a.0 as i32);
433 }
434 if let Some(s) = query.status {
435 count_q = count_q.bind(status_to_i16(s));
436 }
437 let count_row = count_q
438 .fetch_one(&self.pool)
439 .await
440 .map_err(|e| StoreError::Internal(e.to_string()))?;
441 let total: i64 = count_row
442 .try_get("cnt")
443 .map_err(|e| StoreError::Internal(e.to_string()))?;
444
445 let mut data_q = sqlx::query(&where_clause).bind(query.account.0);
447 if let Some(ref a) = query.asset {
448 data_q = data_q.bind(a.0 as i32);
449 }
450 if let Some(s) = query.status {
451 data_q = data_q.bind(status_to_i16(s));
452 }
453 let rows = data_q
454 .fetch_all(&self.pool)
455 .await
456 .map_err(|e| StoreError::Internal(e.to_string()))?;
457
458 let items: Vec<Posting> = rows.iter().map(row_to_posting).collect::<Result<_, _>>()?;
459 Ok(Page {
460 items,
461 total: total as u64,
462 })
463 }
464
465 async fn reserve_postings(
466 &self,
467 ids: &[PostingId],
468 reservation: ReservationId,
469 ) -> Result<u64, StoreError> {
470 let mut tx = self
474 .pool
475 .begin()
476 .await
477 .map_err(|e| StoreError::Internal(e.to_string()))?;
478 let mut reserved: u64 = 0;
479 for id in ids {
480 let res = sqlx::query(
481 "UPDATE postings SET status = $1, reservation = $2 WHERE transfer_id = $3 AND idx = $4 AND status = $5",
482 )
483 .bind(status_to_i16(PostingStatus::PendingInactive))
484 .bind(reservation.0)
485 .bind(envelope_id_to_hex(&id.transfer))
486 .bind(id.index as i16)
487 .bind(status_to_i16(PostingStatus::Active))
488 .execute(&mut *tx)
489 .await
490 .map_err(|e| StoreError::Internal(e.to_string()))?;
491 reserved += res.rows_affected();
492 }
493
494 tx.commit()
495 .await
496 .map_err(|e| StoreError::Internal(e.to_string()))?;
497 Ok(reserved)
498 }
499
500 async fn release_postings(
501 &self,
502 ids: &[PostingId],
503 reservation: ReservationId,
504 ) -> Result<u64, StoreError> {
505 let mut tx = self
509 .pool
510 .begin()
511 .await
512 .map_err(|e| StoreError::Internal(e.to_string()))?;
513 let mut released: u64 = 0;
514 for id in ids {
515 let res = sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
516 .bind(status_to_i16(PostingStatus::Active))
517 .bind(envelope_id_to_hex(&id.transfer))
518 .bind(id.index as i16)
519 .bind(status_to_i16(PostingStatus::PendingInactive))
520 .bind(reservation.0)
521 .execute(&mut *tx)
522 .await
523 .map_err(|e| StoreError::Internal(e.to_string()))?;
524 released += res.rows_affected();
525 }
526
527 tx.commit()
528 .await
529 .map_err(|e| StoreError::Internal(e.to_string()))?;
530 Ok(released)
531 }
532
533 async fn deactivate_postings(
534 &self,
535 ids: &[PostingId],
536 reservation: Option<ReservationId>,
537 ) -> Result<u64, StoreError> {
538 let mut tx = self
539 .pool
540 .begin()
541 .await
542 .map_err(|e| StoreError::Internal(e.to_string()))?;
543 let mut changed: u64 = 0;
544 for id in ids {
545 let res = match reservation {
548 None => {
549 sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4")
550 .bind(status_to_i16(PostingStatus::Inactive))
551 .bind(envelope_id_to_hex(&id.transfer))
552 .bind(id.index as i16)
553 .bind(status_to_i16(PostingStatus::Active))
554 .execute(&mut *tx)
555 .await
556 }
557 Some(rid) => {
558 sqlx::query("UPDATE postings SET status = $1, reservation = NULL WHERE transfer_id = $2 AND idx = $3 AND status = $4 AND reservation = $5")
559 .bind(status_to_i16(PostingStatus::Inactive))
560 .bind(envelope_id_to_hex(&id.transfer))
561 .bind(id.index as i16)
562 .bind(status_to_i16(PostingStatus::PendingInactive))
563 .bind(rid.0)
564 .execute(&mut *tx)
565 .await
566 }
567 }
568 .map_err(|e| StoreError::Internal(e.to_string()))?;
569 changed += res.rows_affected();
570 }
571 tx.commit()
572 .await
573 .map_err(|e| StoreError::Internal(e.to_string()))?;
574 Ok(changed)
575 }
576
577 async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError> {
578 let mut tx = self
579 .pool
580 .begin()
581 .await
582 .map_err(|e| StoreError::Internal(e.to_string()))?;
583 let mut inserted: u64 = 0;
584 for posting in postings {
585 let res = sqlx::query(
586 "INSERT INTO postings (transfer_id, idx, owner, asset, value, status) VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT (transfer_id, idx) DO NOTHING"
587 )
588 .bind(envelope_id_to_hex(&posting.id.transfer))
589 .bind(posting.id.index as i16)
590 .bind(posting.owner.0)
591 .bind(posting.asset.0 as i32)
592 .bind(posting.value.to_string())
593 .bind(status_to_i16(posting.status))
594 .execute(&mut *tx)
595 .await
596 .map_err(|e| StoreError::Internal(e.to_string()))?;
597 inserted += res.rows_affected();
598 }
599 tx.commit()
600 .await
601 .map_err(|e| StoreError::Internal(e.to_string()))?;
602 Ok(inserted)
603 }
604}
605
606#[async_trait]
611impl TransferStore for SqlStore {
612 async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError> {
613 let row = sqlx::query("SELECT transfer, receipt, created_at FROM transfers WHERE id = $1")
614 .bind(envelope_id_to_hex(id))
615 .fetch_optional(&self.pool)
616 .await
617 .map_err(|e| StoreError::Internal(e.to_string()))?;
618
619 match row {
620 None => Ok(None),
621 Some(row) => {
622 let transfer_json: String = row
623 .try_get("transfer")
624 .map_err(|e| StoreError::Internal(e.to_string()))?;
625 let receipt_json: String = row
626 .try_get("receipt")
627 .map_err(|e| StoreError::Internal(e.to_string()))?;
628 let created_at: i64 = row
629 .try_get("created_at")
630 .map_err(|e| StoreError::Internal(e.to_string()))?;
631 Ok(Some(EnvelopeRecord {
632 envelope: deserialize_json(&transfer_json)?,
633 receipt: deserialize_json(&receipt_json)?,
634 created_at,
635 }))
636 }
637 }
638 }
639
640 async fn store_transfer(
641 &self,
642 record: EnvelopeRecord,
643 involved: &[AccountId],
644 ) -> Result<u64, StoreError> {
645 let tid = record.receipt.transfer_id;
646 let tid_hex = envelope_id_to_hex(&tid);
647 let transfer_json = serialize_json(&record.envelope)?;
648 let receipt_json = serialize_json(&record.receipt)?;
649
650 let mut tx = self
651 .pool
652 .begin()
653 .await
654 .map_err(|e| StoreError::Internal(e.to_string()))?;
655
656 let res = sqlx::query("INSERT INTO transfers (id, transfer, receipt, created_at, book) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (id) DO NOTHING")
657 .bind(&tid_hex)
658 .bind(&transfer_json)
659 .bind(&receipt_json)
660 .bind(record.created_at)
661 .bind(record.envelope.book().0)
662 .execute(&mut *tx)
663 .await
664 .map_err(|e| StoreError::Internal(e.to_string()))?;
665 let inserted = res.rows_affected();
666
667 for account in involved {
670 sqlx::query("INSERT INTO transfer_accounts (transfer_id, account_id) VALUES ($1, $2) ON CONFLICT (transfer_id, account_id) DO NOTHING")
671 .bind(&tid_hex)
672 .bind(account.0)
673 .execute(&mut *tx)
674 .await
675 .map_err(|e| StoreError::Internal(e.to_string()))?;
676 }
677
678 tx.commit()
679 .await
680 .map_err(|e| StoreError::Internal(e.to_string()))?;
681 Ok(inserted)
682 }
683
684 async fn get_transfers_for_account(
685 &self,
686 account: &AccountId,
687 ) -> Result<Vec<EnvelopeRecord>, StoreError> {
688 let rows = sqlx::query(
689 "SELECT t.id, t.transfer, t.receipt, t.created_at FROM transfers t INNER JOIN transfer_accounts ta ON t.id = ta.transfer_id WHERE ta.account_id = $1 ORDER BY t.created_at"
690 )
691 .bind(account.0)
692 .fetch_all(&self.pool)
693 .await
694 .map_err(|e| StoreError::Internal(e.to_string()))?;
695
696 let mut result = Vec::with_capacity(rows.len());
697 for row in &rows {
698 let transfer_json: String = row
699 .try_get("transfer")
700 .map_err(|e| StoreError::Internal(e.to_string()))?;
701 let receipt_json: String = row
702 .try_get("receipt")
703 .map_err(|e| StoreError::Internal(e.to_string()))?;
704 let created_at: i64 = row
705 .try_get("created_at")
706 .map_err(|e| StoreError::Internal(e.to_string()))?;
707 result.push(EnvelopeRecord {
708 envelope: deserialize_json(&transfer_json)?,
709 receipt: deserialize_json(&receipt_json)?,
710 created_at,
711 });
712 }
713 Ok(result)
714 }
715
716 async fn query_transfers(
717 &self,
718 query: &TransferQuery,
719 ) -> Result<Page<EnvelopeRecord>, StoreError> {
720 let base_records = if let Some(ref account) = query.account {
722 self.get_transfers_for_account(account).await?
723 } else {
724 let rows = sqlx::query(
725 "SELECT transfer, receipt, created_at FROM transfers ORDER BY created_at",
726 )
727 .fetch_all(&self.pool)
728 .await
729 .map_err(|e| StoreError::Internal(e.to_string()))?;
730
731 let mut records = Vec::with_capacity(rows.len());
732 for row in &rows {
733 let transfer_json: String = row
734 .try_get("transfer")
735 .map_err(|e| StoreError::Internal(e.to_string()))?;
736 let receipt_json: String = row
737 .try_get("receipt")
738 .map_err(|e| StoreError::Internal(e.to_string()))?;
739 let created_at: i64 = row
740 .try_get("created_at")
741 .map_err(|e| StoreError::Internal(e.to_string()))?;
742 records.push(EnvelopeRecord {
743 envelope: deserialize_json(&transfer_json)?,
744 receipt: deserialize_json(&receipt_json)?,
745 created_at,
746 });
747 }
748 records
749 };
750
751 let filtered: Vec<EnvelopeRecord> = base_records
753 .into_iter()
754 .filter(|r| {
755 if let Some(from) = query.from_ts
756 && r.created_at < from
757 {
758 return false;
759 }
760 if let Some(to) = query.to_ts
761 && r.created_at >= to
762 {
763 return false;
764 }
765 if let Some(book) = query.book
766 && r.envelope.book() != book
767 {
768 return false;
769 }
770 true
771 })
772 .collect();
773
774 let total = filtered.len() as u64;
775 let offset = query.offset.unwrap_or(0) as usize;
776 let limit = query.limit.unwrap_or(u32::MAX) as usize;
777 let items = filtered.into_iter().skip(offset).take(limit).collect();
778
779 Ok(Page { items, total })
780 }
781}
782
783#[async_trait]
788impl SagaStore for SqlStore {
789 async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError> {
790 sqlx::query(
791 "INSERT INTO sagas (id, data) VALUES ($1, $2) \
792 ON CONFLICT (id) DO UPDATE SET data = EXCLUDED.data",
793 )
794 .bind(*id)
795 .bind(to_hex(&data))
796 .execute(&self.pool)
797 .await
798 .map_err(|e| StoreError::Internal(e.to_string()))?;
799 Ok(())
800 }
801
802 async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError> {
803 let rows = sqlx::query("SELECT id, data FROM sagas")
804 .fetch_all(&self.pool)
805 .await
806 .map_err(|e| StoreError::Internal(e.to_string()))?;
807 let mut result = Vec::with_capacity(rows.len());
808 for row in &rows {
809 let id: i64 = row
810 .try_get("id")
811 .map_err(|e| StoreError::Internal(e.to_string()))?;
812 let data_hex: String = row
813 .try_get("data")
814 .map_err(|e| StoreError::Internal(e.to_string()))?;
815 result.push((id, from_hex(&data_hex)?));
816 }
817 Ok(result)
818 }
819
820 async fn delete_saga(&self, id: &i64) -> Result<(), StoreError> {
821 sqlx::query("DELETE FROM sagas WHERE id = $1")
822 .bind(*id)
823 .execute(&self.pool)
824 .await
825 .map_err(|e| StoreError::Internal(e.to_string()))?;
826 Ok(())
827 }
828}
829
830#[async_trait]
835impl EventStore for SqlStore {
836 async fn append_event(&self, event: &LedgerEvent) -> Result<u64, StoreError> {
837 let kind_str =
838 serde_json::to_string(&event.kind).map_err(|e| StoreError::Internal(e.to_string()))?;
839 let data = serialize_json(event)?;
840 let seq = self.autoid.next() as u64;
841
842 match kuatia_storage::events::event_dedup_key(&event.kind) {
845 Some(eid) => {
846 let dedup_hex = envelope_id_to_hex(&eid);
847 let res = sqlx::query("INSERT INTO events (seq, timestamp, kind, data, dedup_key) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (dedup_key) DO NOTHING")
848 .bind(seq as i64)
849 .bind(event.timestamp)
850 .bind(&kind_str)
851 .bind(&data)
852 .bind(&dedup_hex)
853 .execute(&self.pool)
854 .await
855 .map_err(|e| StoreError::Internal(e.to_string()))?;
856 if res.rows_affected() == 0 {
857 let row = sqlx::query("SELECT seq FROM events WHERE dedup_key = $1")
858 .bind(&dedup_hex)
859 .fetch_one(&self.pool)
860 .await
861 .map_err(|e| StoreError::Internal(e.to_string()))?;
862 let existing: i64 = row
863 .try_get("seq")
864 .map_err(|e| StoreError::Internal(e.to_string()))?;
865 return Ok(existing as u64);
866 }
867 Ok(seq)
868 }
869 None => {
870 sqlx::query(
871 "INSERT INTO events (seq, timestamp, kind, data) VALUES ($1, $2, $3, $4)",
872 )
873 .bind(seq as i64)
874 .bind(event.timestamp)
875 .bind(&kind_str)
876 .bind(&data)
877 .execute(&self.pool)
878 .await
879 .map_err(|e| StoreError::Internal(e.to_string()))?;
880 Ok(seq)
881 }
882 }
883 }
884
885 async fn get_events_since(
886 &self,
887 after_seq: u64,
888 limit: u32,
889 ) -> Result<Vec<LedgerEvent>, StoreError> {
890 let rows = sqlx::query("SELECT seq, data FROM events WHERE seq > $1 ORDER BY seq LIMIT $2")
891 .bind(after_seq as i64)
892 .bind(limit as i32)
893 .fetch_all(&self.pool)
894 .await
895 .map_err(|e| StoreError::Internal(e.to_string()))?;
896
897 let mut events = Vec::with_capacity(rows.len());
898 for row in &rows {
899 let seq: i64 = row
900 .try_get("seq")
901 .map_err(|e| StoreError::Internal(e.to_string()))?;
902 let data_json: String = row
903 .try_get("data")
904 .map_err(|e| StoreError::Internal(e.to_string()))?;
905 let mut event: LedgerEvent = deserialize_json(&data_json)?;
906 event.seq = seq as u64;
907 events.push(event);
908 }
909 Ok(events)
910 }
911}
912
913#[async_trait]
918impl BookStore for SqlStore {
919 async fn create_book(&self, book: Book) -> Result<(), StoreError> {
920 let exists = sqlx::query("SELECT 1 FROM books WHERE id = $1 LIMIT 1")
921 .bind(book.id.0)
922 .fetch_optional(&self.pool)
923 .await
924 .map_err(|e| StoreError::Internal(e.to_string()))?;
925 if exists.is_some() {
926 return Err(StoreError::AlreadyExists(format!("book {:?}", book.id)));
927 }
928
929 let data = serialize_json(&book)?;
930 sqlx::query("INSERT INTO books (id, name, data) VALUES ($1, $2, $3)")
931 .bind(book.id.0)
932 .bind(&book.name)
933 .bind(&data)
934 .execute(&self.pool)
935 .await
936 .map_err(|e| StoreError::Internal(e.to_string()))?;
937 Ok(())
938 }
939
940 async fn get_book(&self, id: &BookId) -> Result<Book, StoreError> {
941 let row = sqlx::query("SELECT data FROM books WHERE id = $1")
942 .bind(id.0)
943 .fetch_optional(&self.pool)
944 .await
945 .map_err(|e| StoreError::Internal(e.to_string()))?
946 .ok_or_else(|| StoreError::NotFound(format!("book {id:?}")))?;
947 let data: String = row
948 .try_get("data")
949 .map_err(|e| StoreError::Internal(e.to_string()))?;
950 deserialize_json(&data)
951 }
952
953 async fn list_books(&self) -> Result<Vec<Book>, StoreError> {
954 let rows = sqlx::query("SELECT data FROM books")
955 .fetch_all(&self.pool)
956 .await
957 .map_err(|e| StoreError::Internal(e.to_string()))?;
958 rows.iter()
959 .map(|row| {
960 let data: String = row
961 .try_get("data")
962 .map_err(|e| StoreError::Internal(e.to_string()))?;
963 deserialize_json(&data)
964 })
965 .collect()
966 }
967}