Skip to main content

talea_client/
lib.rs

1//! Typed client SDK for the talea ledger server, plus the `talea` CLI.
2//!
3//! [`TaleaClient`] implements the same [`LedgerApi`] trait the server's
4//! in-process service does, so code written against the trait runs
5//! unchanged against either. All operations are retry-safe by construction
6//! (posts carry caller-supplied idempotency keys, registry writes are
7//! idempotent on id, reads are reads), and the client retries 503/transport
8//! failures automatically within a bounded [`RetryPolicy`].
9//!
10//! ```no_run
11//! use talea_client::{LedgerApi, Page, TaleaClient};
12//!
13//! # async fn demo() -> talea_client::ApiResult<()> {
14//! let client = TaleaClient::builder("http://127.0.0.1:8080")
15//!     .bearer_token("sekrit")
16//!     .build()?;
17//!
18//! let balance = client.balance("onramp", "cash", None).await?;
19//! println!("{} {}", balance.balance, balance.asset);
20//!
21//! let page = client
22//!     .account_history("onramp", "cash", Page { after_seq: None, limit: 100 })
23//!     .await?;
24//! # let _ = page;
25//! # Ok(())
26//! # }
27//! ```
28//!
29//! Subscriptions are unbroken streams: [`LedgerApi::subscribe`] reconnects
30//! with backoff and resumes from the last seen sequence, so consumers never
31//! re-implement cursor bookkeeping.
32
33#[cfg(feature = "cli")]
34pub mod cli;
35mod http;
36mod retry;
37mod sse;
38
39pub use retry::RetryPolicy;
40pub use talea_core::api::*;
41
42use std::time::Duration;
43
44use async_trait::async_trait;
45use chrono::{DateTime, Utc};
46use talea_core::types::Seq;
47
48/// HTTP client for a talea server, implementing [`LedgerApi`].
49///
50/// Cheap to share: hold it in an `Arc` or clone the underlying reqwest
51/// client via the builder. Construct with [`TaleaClient::builder`].
52pub struct TaleaClient {
53    http: http::Http,
54}
55
56/// Configures and builds a [`TaleaClient`]. Created by
57/// [`TaleaClient::builder`]; finish with [`ClientBuilder::build`].
58pub struct ClientBuilder {
59    base_url: String,
60    token: Option<String>,
61    timeout: Duration,
62    retry: RetryPolicy,
63    http_client: Option<reqwest::Client>,
64}
65
66impl TaleaClient {
67    /// Start building a client for the server at `base_url`
68    /// (e.g. `http://127.0.0.1:8080`). A path prefix is preserved, but the
69    /// base must not already include the `/v1` version segment.
70    pub fn builder(base_url: impl Into<String>) -> ClientBuilder {
71        ClientBuilder {
72            base_url: base_url.into(),
73            token: None,
74            timeout: Duration::from_secs(30),
75            retry: RetryPolicy::default(),
76            http_client: None,
77        }
78    }
79}
80
81impl ClientBuilder {
82    /// Send `Authorization: Bearer <token>` on every request (matches the
83    /// server's `TALEA_API_TOKEN`). Without it the client only works
84    /// against a server running in open dev mode.
85    pub fn bearer_token(mut self, token: impl Into<String>) -> Self {
86        self.token = Some(token.into());
87        self
88    }
89
90    /// Per-request timeout (default 30s). Not applied to SSE subscriptions.
91    pub fn timeout(mut self, timeout: Duration) -> Self {
92        self.timeout = timeout;
93        self
94    }
95
96    /// Retry policy for 503/408/transport failures (default: 3 attempts,
97    /// exponential 200ms..5s, honoring `Retry-After`). Safe for every
98    /// operation; use [`RetryPolicy::none`] to surface failures immediately.
99    pub fn retry(mut self, retry: RetryPolicy) -> Self {
100        self.retry = retry;
101        self
102    }
103
104    /// Bring your own reqwest client (proxies, TLS config, ...). Do NOT set
105    /// a global timeout on it — that would kill SSE; use `.timeout()` here.
106    pub fn with_http_client(mut self, client: reqwest::Client) -> Self {
107        self.http_client = Some(client);
108        self
109    }
110
111    /// Validate the base URL and construct the client. Fails with
112    /// [`ApiError::Transport`] on an unparseable URL; no network I/O happens
113    /// until the first request.
114    pub fn build(self) -> ApiResult<TaleaClient> {
115        let base = reqwest::Url::parse(&self.base_url).map_err(|e| ApiError::Transport {
116            message: format!("invalid base url: {e}"),
117        })?;
118        let client = match self.http_client {
119            Some(c) => c,
120            None => reqwest::Client::builder()
121                .connect_timeout(Duration::from_secs(10))
122                .build()
123                .map_err(|e| ApiError::Transport {
124                    message: format!("building http client: {e}"),
125                })?,
126        };
127        Ok(TaleaClient {
128            http: http::Http {
129                client,
130                base,
131                token: self.token,
132                timeout: self.timeout,
133                retry: self.retry,
134            },
135        })
136    }
137}
138
139#[async_trait]
140impl LedgerApi for TaleaClient {
141    async fn register_asset(&self, draft: AssetDraft) -> ApiResult<()> {
142        let url = self.http.url(&["assets"])?;
143        self.http
144            .execute_unit(|| self.http.client.post(url.clone()).json(&draft))
145            .await
146    }
147
148    async fn open_account(&self, draft: AccountDraft) -> ApiResult<()> {
149        let url = self.http.url(&["accounts"])?;
150        self.http
151            .execute_unit(|| self.http.client.post(url.clone()).json(&draft))
152            .await
153    }
154
155    async fn post(&self, draft: TransactionDraft) -> ApiResult<Posted> {
156        let url = self.http.url(&["transactions"])?;
157        self.http
158            .execute(|| self.http.client.post(url.clone()).json(&draft))
159            .await
160    }
161
162    async fn balance(
163        &self,
164        book: &str,
165        path: &str,
166        as_of: Option<DateTime<Utc>>,
167    ) -> ApiResult<BalanceView> {
168        let mut url = self
169            .http
170            .url(&["books", book, "accounts", path, "balance"])?;
171        if let Some(t) = as_of {
172            url.query_pairs_mut().append_pair("as_of", &t.to_rfc3339());
173        }
174        self.http
175            .execute(|| self.http.client.get(url.clone()))
176            .await
177    }
178
179    async fn account_history(
180        &self,
181        book: &str,
182        path: &str,
183        page: Page,
184    ) -> ApiResult<Paged<PostingView>> {
185        let mut url = self
186            .http
187            .url(&["books", book, "accounts", path, "history"])?;
188        {
189            let mut q = url.query_pairs_mut();
190            if let Some(after) = page.after_seq {
191                q.append_pair("after_seq", &after.to_string());
192            }
193            q.append_pair("limit", &page.limit.to_string());
194        }
195        self.http
196            .execute(|| self.http.client.get(url.clone()))
197            .await
198    }
199
200    async fn transaction(&self, tx_id: &str) -> ApiResult<TransactionView> {
201        let url = self.http.url(&["transactions", tx_id])?;
202        self.http
203            .execute(|| self.http.client.get(url.clone()))
204            .await
205    }
206
207    async fn trial_balance(
208        &self,
209        book: &str,
210        as_of: Option<DateTime<Utc>>,
211    ) -> ApiResult<TrialBalance> {
212        let mut url = self.http.url(&["books", book, "trial-balance"])?;
213        if let Some(t) = as_of {
214            url.query_pairs_mut().append_pair("as_of", &t.to_rfc3339());
215        }
216        self.http
217            .execute(|| self.http.client.get(url.clone()))
218            .await
219    }
220
221    /// Post a batch via one `POST /v1/transactions/batch` call.
222    ///
223    /// Empty input returns an empty `Vec` immediately without any HTTP call.
224    ///
225    /// **Whole-request failure** (401 / 415 / 400 / transport) yields the
226    /// same error in every slot.  Callers can detect this via all-slots-
227    /// identical.  Retrying the whole batch is always safe because idempotency
228    /// keys dedup per draft: any slot that already committed returns
229    /// `deduplicated: true` instead of double-posting.
230    ///
231    /// **Retry**: the whole request goes through the standard 503/408/transport
232    /// retry wrapper — safe for the same dedup reason.
233    async fn post_batch(&self, drafts: Vec<TransactionDraft>) -> Vec<ApiResult<Posted>> {
234        if drafts.is_empty() {
235            return Vec::new();
236        }
237        let n = drafts.len();
238        let url = match self.http.url(&["transactions", "batch"]) {
239            Ok(u) => u,
240            Err(e) => return std::iter::repeat_with(|| Err(e.clone())).take(n).collect(),
241        };
242        self.http
243            .execute_batch(|| self.http.client.post(url.clone()).json(&drafts), n)
244            .await
245    }
246
247    async fn subscribe(&self, book: &str, from: Seq) -> ApiResult<EventStream> {
248        sse::subscribe(&self.http, book, from)
249    }
250}