kuatia_storage/store.rs
1//! Storage abstraction separating the pure decision logic from IO.
2//!
3//! The [`Store`] trait composes focused sub-traits, each a dumb instruction
4//! follower: writes apply one update and report an affected-row count (or an I/O
5//! error). The saga, not the store, interprets counts and owns idempotency and
6//! compensation.
7//! - [`AccountStore`] — account CRUD and versioning
8//! - [`PostingStore`] — posting reads and lifecycle transitions
9//! - [`TransferStore`] — transfer persistence and queries
10//! - [`SagaStore`] — saga state for crash recovery
11//! - [`EventStore`] — the ledger event log
12//! - [`BookStore`] — book persistence
13
14use async_trait::async_trait;
15use kuatia_types::{
16 Account, AccountId, AssetId, Book, BookId, Envelope, EnvelopeId, Posting, PostingId,
17 PostingStatus, Receipt, ReservationId,
18};
19
20use crate::error::StoreError;
21use crate::events::EventStore;
22
23/// Pairs a committed transfer with its receipt.
24#[derive(Debug, Clone)]
25pub struct EnvelopeRecord {
26 /// The envelope that was committed.
27 pub envelope: Envelope,
28 /// The receipt proving commitment.
29 pub receipt: Receipt,
30 /// Unix milliseconds when this record was created.
31 pub created_at: i64,
32}
33
34/// Pagination and filtering parameters for posting queries.
35#[derive(Debug, Clone)]
36pub struct PostingQuery {
37 /// Filter to postings owned by this account.
38 pub account: AccountId,
39 /// Filter by asset.
40 pub asset: Option<AssetId>,
41 /// Filter by posting status.
42 pub status: Option<PostingStatus>,
43 /// Max results to return.
44 pub limit: Option<u32>,
45 /// Number of results to skip.
46 pub offset: Option<u32>,
47}
48
49/// Pagination and filtering parameters for transfer queries.
50#[derive(Debug, Clone, Default)]
51pub struct TransferQuery {
52 /// Filter to transfers involving this account.
53 pub account: Option<AccountId>,
54 /// Inclusive lower bound (unix millis).
55 pub from_ts: Option<i64>,
56 /// Exclusive upper bound (unix millis).
57 pub to_ts: Option<i64>,
58 /// Filter by book.
59 pub book: Option<BookId>,
60 /// Max results to return.
61 pub limit: Option<u32>,
62 /// Number of results to skip.
63 pub offset: Option<u32>,
64}
65
66/// A page of results with total count for pagination.
67#[derive(Debug, Clone)]
68pub struct Page<T> {
69 /// The items in this page.
70 pub items: Vec<T>,
71 /// Total number of matching items (before pagination).
72 pub total: u64,
73}
74
75// ---------------------------------------------------------------------------
76// Sub-traits
77// ---------------------------------------------------------------------------
78
79/// Account persistence: create, version, query.
80#[async_trait]
81pub trait AccountStore: Send + Sync {
82 /// Fetch a single account by id.
83 async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError>;
84 /// Fetch multiple accounts by id.
85 async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError>;
86 /// Persist a new account (version 1).
87 async fn create_account(&self, account: Account) -> Result<(), StoreError>;
88 /// Append a new version to an existing account.
89 async fn append_account_version(&self, account: Account) -> Result<(), StoreError>;
90 /// Return the full version history for an account.
91 async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError>;
92 /// List all accounts (latest version of each).
93 async fn list_accounts(&self) -> Result<Vec<Account>, StoreError>;
94}
95
96/// Posting persistence: reads and lifecycle transitions.
97#[async_trait]
98pub trait PostingStore: Send + Sync {
99 /// Fetch postings by their ids.
100 async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError>;
101 /// Return postings owned by an account, optionally filtered by asset and/or status.
102 async fn get_postings_by_account(
103 &self,
104 account: &AccountId,
105 asset: Option<&AssetId>,
106 status: Option<PostingStatus>,
107 ) -> Result<Vec<Posting>, StoreError>;
108 /// Reserve postings: `Active → PendingInactive`, stamping `reservation` as
109 /// the owner token. A dumb instruction — each id flips only if still `Active`;
110 /// returns the **number of rows reserved** (0 ≤ n ≤ ids.len()). It does not
111 /// error on a short count; the caller (saga) interprets it.
112 async fn reserve_postings(
113 &self,
114 ids: &[PostingId],
115 reservation: ReservationId,
116 ) -> Result<u64, StoreError>;
117 /// Release postings: `PendingInactive` owned by `reservation` → `Active`,
118 /// clearing the owner. A dumb instruction — only postings reserved by this
119 /// `reservation` flip; returns the **number of rows released**. Releasing an
120 /// `Active` (already released) or differently-owned posting simply does not
121 /// count. The caller interprets the result.
122 async fn release_postings(
123 &self,
124 ids: &[PostingId],
125 reservation: ReservationId,
126 ) -> Result<u64, StoreError>;
127
128 /// Deactivate postings: flip to `Inactive`. A dumb instruction — it applies
129 /// the conditional update and returns the **number of rows changed**; it does
130 /// not decide whether that count is correct. The caller (saga) interprets it.
131 /// - `reservation == None` (raw): only postings still `Active` flip.
132 /// - `reservation == Some(rid)`: only postings `PendingInactive` owned by
133 /// `rid` flip.
134 /// Returns the count of postings actually transitioned (0 ≤ n ≤ ids.len()).
135 async fn deactivate_postings(
136 &self,
137 ids: &[PostingId],
138 reservation: Option<ReservationId>,
139 ) -> Result<u64, StoreError>;
140
141 /// Insert postings if absent (idempotent). A dumb instruction — inserts each
142 /// posting unless one with the same id already exists, and returns the
143 /// **number of rows inserted** (already-present postings contribute 0). The
144 /// caller decides what a short count means.
145 async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError>;
146
147 /// Query postings with filtering and pagination.
148 async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
149 let all = self
150 .get_postings_by_account(&query.account, query.asset.as_ref(), query.status)
151 .await?;
152 let total = all.len() as u64;
153 let offset = query.offset.unwrap_or(0) as usize;
154 let limit = query.limit.unwrap_or(u32::MAX) as usize;
155 let items = all.into_iter().skip(offset).take(limit).collect();
156 Ok(Page { items, total })
157 }
158}
159
160/// Transfer persistence: store and query committed transfers.
161#[async_trait]
162pub trait TransferStore: Send + Sync {
163 /// Fetch a transfer record by its content-addressed id.
164 async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
165 /// Persist a transfer record if absent (idempotent) and index it under every
166 /// account in `involved` (both created and consumed owners — the caller
167 /// supplies the set so storage computes nothing). A dumb instruction:
168 /// returns **1** if the transfer row was newly inserted, **0** if it already
169 /// existed. The caller decides what `0` means.
170 async fn store_transfer(
171 &self,
172 record: EnvelopeRecord,
173 involved: &[AccountId],
174 ) -> Result<u64, StoreError>;
175 /// Return all transfers involving the given account.
176 async fn get_transfers_for_account(
177 &self,
178 account: &AccountId,
179 ) -> Result<Vec<EnvelopeRecord>, StoreError>;
180
181 /// Query transfers with filtering and pagination.
182 async fn query_transfers(
183 &self,
184 query: &TransferQuery,
185 ) -> Result<Page<EnvelopeRecord>, StoreError> {
186 // Default in-memory implementation
187 let all = if let Some(ref account) = query.account {
188 self.get_transfers_for_account(account).await?
189 } else {
190 return Err(StoreError::Internal(
191 "query_transfers requires account filter in default implementation".into(),
192 ));
193 };
194
195 let filtered: Vec<EnvelopeRecord> = all
196 .into_iter()
197 .filter(|r| {
198 if let Some(from) = query.from_ts
199 && r.created_at < from
200 {
201 return false;
202 }
203 if let Some(to) = query.to_ts
204 && r.created_at >= to
205 {
206 return false;
207 }
208 if let Some(book) = query.book
209 && r.envelope.book() != book
210 {
211 return false;
212 }
213 true
214 })
215 .collect();
216
217 let total = filtered.len() as u64;
218 let offset = query.offset.unwrap_or(0) as usize;
219 let limit = query.limit.unwrap_or(u32::MAX) as usize;
220 let items = filtered.into_iter().skip(offset).take(limit).collect();
221
222 Ok(Page { items, total })
223 }
224}
225
226/// Saga state persistence for crash recovery.
227#[async_trait]
228pub trait SagaStore: Send + Sync {
229 /// Persist a saga execution state.
230 async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError>;
231 /// Load all pending (incomplete) saga states.
232 async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError>;
233 /// Delete a completed saga state.
234 async fn delete_saga(&self, id: &i64) -> Result<(), StoreError>;
235}
236
237/// Book persistence.
238#[async_trait]
239pub trait BookStore: Send + Sync {
240 /// Create a new book.
241 async fn create_book(&self, book: Book) -> Result<(), StoreError>;
242 /// Fetch a book by id.
243 async fn get_book(&self, id: &BookId) -> Result<Book, StoreError>;
244 /// List all books.
245 async fn list_books(&self) -> Result<Vec<Book>, StoreError>;
246}
247
248// ---------------------------------------------------------------------------
249// Composite trait
250// ---------------------------------------------------------------------------
251
252/// Async storage abstraction composing all sub-traits.
253pub trait Store:
254 AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore
255{
256}
257
258impl<T: AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore> Store
259 for T
260{
261}