Skip to main content

kuatia_storage/
store.rs

1//! Storage abstraction separating the pure decision logic from IO.
2//!
3//! The [`Store`] trait composes focused sub-traits, each a dumb instruction
4//! follower: writes apply one update and report an affected-row count (or an I/O
5//! error). The saga, not the store, interprets counts and owns idempotency and
6//! compensation.
7//! - [`AccountStore`] — account CRUD and versioning
8//! - [`PostingStore`] — posting reads and lifecycle transitions
9//! - [`TransferStore`] — transfer persistence and queries
10//! - [`SagaStore`] — saga state for crash recovery
11//! - [`EventStore`] — the ledger event log
12//! - [`BookStore`] — book persistence
13
14use async_trait::async_trait;
15use kuatia_types::{
16    Account, AccountId, AssetId, Book, BookId, Envelope, EnvelopeId, Posting, PostingId,
17    PostingStatus, Receipt, ReservationId,
18};
19
20use crate::error::StoreError;
21use crate::events::EventStore;
22
23/// Pairs a committed transfer with its receipt.
24#[derive(Debug, Clone)]
25pub struct EnvelopeRecord {
26    /// The envelope that was committed.
27    pub envelope: Envelope,
28    /// The receipt proving commitment.
29    pub receipt: Receipt,
30    /// Unix milliseconds when this record was created.
31    pub created_at: i64,
32}
33
34/// Pagination and filtering parameters for posting queries.
35#[derive(Debug, Clone)]
36pub struct PostingQuery {
37    /// Filter to postings owned by this account.
38    pub account: AccountId,
39    /// Filter by asset.
40    pub asset: Option<AssetId>,
41    /// Filter by posting status.
42    pub status: Option<PostingStatus>,
43    /// Max results to return.
44    pub limit: Option<u32>,
45    /// Number of results to skip.
46    pub offset: Option<u32>,
47}
48
49/// Pagination and filtering parameters for transfer queries.
50#[derive(Debug, Clone, Default)]
51pub struct TransferQuery {
52    /// Filter to transfers involving this account.
53    pub account: Option<AccountId>,
54    /// Inclusive lower bound (unix millis).
55    pub from_ts: Option<i64>,
56    /// Exclusive upper bound (unix millis).
57    pub to_ts: Option<i64>,
58    /// Filter by book.
59    pub book: Option<BookId>,
60    /// Max results to return.
61    pub limit: Option<u32>,
62    /// Number of results to skip.
63    pub offset: Option<u32>,
64}
65
66/// A page of results with total count for pagination.
67#[derive(Debug, Clone)]
68pub struct Page<T> {
69    /// The items in this page.
70    pub items: Vec<T>,
71    /// Total number of matching items (before pagination).
72    pub total: u64,
73}
74
75// ---------------------------------------------------------------------------
76// Sub-traits
77// ---------------------------------------------------------------------------
78
79/// Account persistence: create, version, query.
80#[async_trait]
81pub trait AccountStore: Send + Sync {
82    /// Fetch a single account by id.
83    async fn get_account(&self, id: &AccountId) -> Result<Account, StoreError>;
84    /// Fetch multiple accounts by id.
85    async fn get_accounts(&self, ids: &[AccountId]) -> Result<Vec<Account>, StoreError>;
86    /// Persist a new account (version 1).
87    async fn create_account(&self, account: Account) -> Result<(), StoreError>;
88    /// Append a new version to an existing account.
89    async fn append_account_version(&self, account: Account) -> Result<(), StoreError>;
90    /// Return the full version history for an account.
91    async fn get_account_history(&self, id: &AccountId) -> Result<Vec<Account>, StoreError>;
92    /// List all accounts (latest version of each).
93    async fn list_accounts(&self) -> Result<Vec<Account>, StoreError>;
94}
95
96/// Posting persistence: reads and lifecycle transitions.
97#[async_trait]
98pub trait PostingStore: Send + Sync {
99    /// Fetch postings by their ids.
100    async fn get_postings(&self, ids: &[PostingId]) -> Result<Vec<Posting>, StoreError>;
101    /// Return postings owned by an account, optionally filtered by asset and/or status.
102    async fn get_postings_by_account(
103        &self,
104        account: &AccountId,
105        asset: Option<&AssetId>,
106        status: Option<PostingStatus>,
107    ) -> Result<Vec<Posting>, StoreError>;
108    /// Reserve postings: `Active → PendingInactive`, stamping `reservation` as
109    /// the owner token. A dumb instruction — each id flips only if still `Active`;
110    /// returns the **number of rows reserved** (0 ≤ n ≤ ids.len()). It does not
111    /// error on a short count; the caller (saga) interprets it.
112    async fn reserve_postings(
113        &self,
114        ids: &[PostingId],
115        reservation: ReservationId,
116    ) -> Result<u64, StoreError>;
117    /// Release postings: `PendingInactive` owned by `reservation` → `Active`,
118    /// clearing the owner. A dumb instruction — only postings reserved by this
119    /// `reservation` flip; returns the **number of rows released**. Releasing an
120    /// `Active` (already released) or differently-owned posting simply does not
121    /// count. The caller interprets the result.
122    async fn release_postings(
123        &self,
124        ids: &[PostingId],
125        reservation: ReservationId,
126    ) -> Result<u64, StoreError>;
127
128    /// Deactivate postings: flip to `Inactive`. A dumb instruction — it applies
129    /// the conditional update and returns the **number of rows changed**; it does
130    /// not decide whether that count is correct. The caller (saga) interprets it.
131    /// - `reservation == None` (raw): only postings still `Active` flip.
132    /// - `reservation == Some(rid)`: only postings `PendingInactive` owned by
133    ///   `rid` flip.
134    /// Returns the count of postings actually transitioned (0 ≤ n ≤ ids.len()).
135    async fn deactivate_postings(
136        &self,
137        ids: &[PostingId],
138        reservation: Option<ReservationId>,
139    ) -> Result<u64, StoreError>;
140
141    /// Insert postings if absent (idempotent). A dumb instruction — inserts each
142    /// posting unless one with the same id already exists, and returns the
143    /// **number of rows inserted** (already-present postings contribute 0). The
144    /// caller decides what a short count means.
145    async fn insert_postings(&self, postings: &[Posting]) -> Result<u64, StoreError>;
146
147    /// Query postings with filtering and pagination.
148    async fn query_postings(&self, query: &PostingQuery) -> Result<Page<Posting>, StoreError> {
149        let all = self
150            .get_postings_by_account(&query.account, query.asset.as_ref(), query.status)
151            .await?;
152        let total = all.len() as u64;
153        let offset = query.offset.unwrap_or(0) as usize;
154        let limit = query.limit.unwrap_or(u32::MAX) as usize;
155        let items = all.into_iter().skip(offset).take(limit).collect();
156        Ok(Page { items, total })
157    }
158}
159
160/// Transfer persistence: store and query committed transfers.
161#[async_trait]
162pub trait TransferStore: Send + Sync {
163    /// Fetch a transfer record by its content-addressed id.
164    async fn get_transfer(&self, id: &EnvelopeId) -> Result<Option<EnvelopeRecord>, StoreError>;
165    /// Persist a transfer record if absent (idempotent) and index it under every
166    /// account in `involved` (both created and consumed owners — the caller
167    /// supplies the set so storage computes nothing). A dumb instruction:
168    /// returns **1** if the transfer row was newly inserted, **0** if it already
169    /// existed. The caller decides what `0` means.
170    async fn store_transfer(
171        &self,
172        record: EnvelopeRecord,
173        involved: &[AccountId],
174    ) -> Result<u64, StoreError>;
175    /// Return all transfers involving the given account.
176    async fn get_transfers_for_account(
177        &self,
178        account: &AccountId,
179    ) -> Result<Vec<EnvelopeRecord>, StoreError>;
180
181    /// Query transfers with filtering and pagination.
182    async fn query_transfers(
183        &self,
184        query: &TransferQuery,
185    ) -> Result<Page<EnvelopeRecord>, StoreError> {
186        // Default in-memory implementation
187        let all = if let Some(ref account) = query.account {
188            self.get_transfers_for_account(account).await?
189        } else {
190            return Err(StoreError::Internal(
191                "query_transfers requires account filter in default implementation".into(),
192            ));
193        };
194
195        let filtered: Vec<EnvelopeRecord> = all
196            .into_iter()
197            .filter(|r| {
198                if let Some(from) = query.from_ts
199                    && r.created_at < from
200                {
201                    return false;
202                }
203                if let Some(to) = query.to_ts
204                    && r.created_at >= to
205                {
206                    return false;
207                }
208                if let Some(book) = query.book
209                    && r.envelope.book() != book
210                {
211                    return false;
212                }
213                true
214            })
215            .collect();
216
217        let total = filtered.len() as u64;
218        let offset = query.offset.unwrap_or(0) as usize;
219        let limit = query.limit.unwrap_or(u32::MAX) as usize;
220        let items = filtered.into_iter().skip(offset).take(limit).collect();
221
222        Ok(Page { items, total })
223    }
224}
225
226/// Saga state persistence for crash recovery.
227#[async_trait]
228pub trait SagaStore: Send + Sync {
229    /// Persist a saga execution state.
230    async fn save_saga(&self, id: &i64, data: Vec<u8>) -> Result<(), StoreError>;
231    /// Load all pending (incomplete) saga states.
232    async fn list_pending_sagas(&self) -> Result<Vec<(i64, Vec<u8>)>, StoreError>;
233    /// Delete a completed saga state.
234    async fn delete_saga(&self, id: &i64) -> Result<(), StoreError>;
235}
236
237/// Book persistence.
238#[async_trait]
239pub trait BookStore: Send + Sync {
240    /// Create a new book.
241    async fn create_book(&self, book: Book) -> Result<(), StoreError>;
242    /// Fetch a book by id.
243    async fn get_book(&self, id: &BookId) -> Result<Book, StoreError>;
244    /// List all books.
245    async fn list_books(&self) -> Result<Vec<Book>, StoreError>;
246}
247
248// ---------------------------------------------------------------------------
249// Composite trait
250// ---------------------------------------------------------------------------
251
252/// Async storage abstraction composing all sub-traits.
253pub trait Store:
254    AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore
255{
256}
257
258impl<T: AccountStore + PostingStore + TransferStore + SagaStore + EventStore + BookStore> Store
259    for T
260{
261}