Skip to main content

talea_client/
lib.rs

1//! Typed client SDK for the talea ledger server, plus the `talea` CLI.
2//!
3//! [`TaleaClient`] implements the same [`LedgerApi`] trait the server's
4//! in-process service does, so code written against the trait runs
5//! unchanged against either. All operations are retry-safe by construction
6//! (posts carry caller-supplied idempotency keys, registry writes are
7//! idempotent on id, reads are reads), and the client retries 503/transport
8//! failures automatically within a bounded [`RetryPolicy`].
9//!
10//! ```no_run
11//! use talea_client::{LedgerApi, Page, TaleaClient};
12//!
13//! # async fn demo() -> talea_client::ApiResult<()> {
14//! let client = TaleaClient::builder("http://127.0.0.1:8080")
15//!     .bearer_token("sekrit")
16//!     .build()?;
17//!
18//! let balance = client.balance("onramp", "cash", None).await?;
19//! println!("{} {}", balance.balance, balance.asset);
20//!
21//! let page = client
22//!     .account_history("onramp", "cash", Page { after_seq: None, limit: 100 })
23//!     .await?;
24//! # let _ = page;
25//! # Ok(())
26//! # }
27//! ```
28//!
29//! Subscriptions are unbroken streams: [`LedgerApi::subscribe`] reconnects
30//! with backoff and resumes from the last seen sequence, so consumers never
31//! re-implement cursor bookkeeping.
32
33pub mod cli;
34mod http;
35mod retry;
36mod sse;
37
38pub use retry::RetryPolicy;
39pub use talea_core::api::*;
40
41use std::time::Duration;
42
43use async_trait::async_trait;
44use chrono::{DateTime, Utc};
45use talea_core::types::Seq;
46
47/// HTTP client for a talea server, implementing [`LedgerApi`].
48///
49/// Cheap to share: hold it in an `Arc` or clone the underlying reqwest
50/// client via the builder. Construct with [`TaleaClient::builder`].
51pub struct TaleaClient {
52    http: http::Http,
53}
54
55/// Configures and builds a [`TaleaClient`]. Created by
56/// [`TaleaClient::builder`]; finish with [`ClientBuilder::build`].
57pub struct ClientBuilder {
58    base_url: String,
59    token: Option<String>,
60    timeout: Duration,
61    retry: RetryPolicy,
62    http_client: Option<reqwest::Client>,
63}
64
65impl TaleaClient {
66    /// Start building a client for the server at `base_url`
67    /// (e.g. `http://127.0.0.1:8080`). A path prefix is preserved, but the
68    /// base must not already include the `/v1` version segment.
69    pub fn builder(base_url: impl Into<String>) -> ClientBuilder {
70        ClientBuilder {
71            base_url: base_url.into(),
72            token: None,
73            timeout: Duration::from_secs(30),
74            retry: RetryPolicy::default(),
75            http_client: None,
76        }
77    }
78}
79
80impl ClientBuilder {
81    /// Send `Authorization: Bearer <token>` on every request (matches the
82    /// server's `TALEA_API_TOKEN`). Without it the client only works
83    /// against a server running in open dev mode.
84    pub fn bearer_token(mut self, token: impl Into<String>) -> Self {
85        self.token = Some(token.into());
86        self
87    }
88
89    /// Per-request timeout (default 30s). Not applied to SSE subscriptions.
90    pub fn timeout(mut self, timeout: Duration) -> Self {
91        self.timeout = timeout;
92        self
93    }
94
95    /// Retry policy for 503/408/transport failures (default: 3 attempts,
96    /// exponential 200ms..5s, honoring `Retry-After`). Safe for every
97    /// operation; use [`RetryPolicy::none`] to surface failures immediately.
98    pub fn retry(mut self, retry: RetryPolicy) -> Self {
99        self.retry = retry;
100        self
101    }
102
103    /// Bring your own reqwest client (proxies, TLS config, ...). Do NOT set
104    /// a global timeout on it — that would kill SSE; use `.timeout()` here.
105    pub fn with_http_client(mut self, client: reqwest::Client) -> Self {
106        self.http_client = Some(client);
107        self
108    }
109
110    /// Validate the base URL and construct the client. Fails with
111    /// [`ApiError::Transport`] on an unparseable URL; no network I/O happens
112    /// until the first request.
113    pub fn build(self) -> ApiResult<TaleaClient> {
114        let base = reqwest::Url::parse(&self.base_url).map_err(|e| ApiError::Transport {
115            message: format!("invalid base url: {e}"),
116        })?;
117        let client = match self.http_client {
118            Some(c) => c,
119            None => reqwest::Client::builder()
120                .connect_timeout(Duration::from_secs(10))
121                .build()
122                .map_err(|e| ApiError::Transport {
123                    message: format!("building http client: {e}"),
124                })?,
125        };
126        Ok(TaleaClient {
127            http: http::Http {
128                client,
129                base,
130                token: self.token,
131                timeout: self.timeout,
132                retry: self.retry,
133            },
134        })
135    }
136}
137
138#[async_trait]
139impl LedgerApi for TaleaClient {
140    async fn register_asset(&self, draft: AssetDraft) -> ApiResult<()> {
141        let url = self.http.url(&["assets"])?;
142        self.http
143            .execute_unit(|| self.http.client.post(url.clone()).json(&draft))
144            .await
145    }
146
147    async fn open_account(&self, draft: AccountDraft) -> ApiResult<()> {
148        let url = self.http.url(&["accounts"])?;
149        self.http
150            .execute_unit(|| self.http.client.post(url.clone()).json(&draft))
151            .await
152    }
153
154    async fn post(&self, draft: TransactionDraft) -> ApiResult<Posted> {
155        let url = self.http.url(&["transactions"])?;
156        self.http
157            .execute(|| self.http.client.post(url.clone()).json(&draft))
158            .await
159    }
160
161    async fn balance(
162        &self,
163        book: &str,
164        path: &str,
165        as_of: Option<DateTime<Utc>>,
166    ) -> ApiResult<BalanceView> {
167        let mut url = self
168            .http
169            .url(&["books", book, "accounts", path, "balance"])?;
170        if let Some(t) = as_of {
171            url.query_pairs_mut().append_pair("as_of", &t.to_rfc3339());
172        }
173        self.http
174            .execute(|| self.http.client.get(url.clone()))
175            .await
176    }
177
178    async fn account_history(
179        &self,
180        book: &str,
181        path: &str,
182        page: Page,
183    ) -> ApiResult<Paged<PostingView>> {
184        let mut url = self
185            .http
186            .url(&["books", book, "accounts", path, "history"])?;
187        {
188            let mut q = url.query_pairs_mut();
189            if let Some(after) = page.after_seq {
190                q.append_pair("after_seq", &after.to_string());
191            }
192            q.append_pair("limit", &page.limit.to_string());
193        }
194        self.http
195            .execute(|| self.http.client.get(url.clone()))
196            .await
197    }
198
199    async fn transaction(&self, tx_id: &str) -> ApiResult<TransactionView> {
200        let url = self.http.url(&["transactions", tx_id])?;
201        self.http
202            .execute(|| self.http.client.get(url.clone()))
203            .await
204    }
205
206    async fn trial_balance(
207        &self,
208        book: &str,
209        as_of: Option<DateTime<Utc>>,
210    ) -> ApiResult<TrialBalance> {
211        let mut url = self.http.url(&["books", book, "trial-balance"])?;
212        if let Some(t) = as_of {
213            url.query_pairs_mut().append_pair("as_of", &t.to_rfc3339());
214        }
215        self.http
216            .execute(|| self.http.client.get(url.clone()))
217            .await
218    }
219
220    /// Post a batch via one `POST /v1/transactions/batch` call.
221    ///
222    /// Empty input returns an empty `Vec` immediately without any HTTP call.
223    ///
224    /// **Whole-request failure** (401 / 415 / 400 / transport) yields the
225    /// same error in every slot.  Callers can detect this via all-slots-
226    /// identical.  Retrying the whole batch is always safe because idempotency
227    /// keys dedup per draft: any slot that already committed returns
228    /// `deduplicated: true` instead of double-posting.
229    ///
230    /// **Retry**: the whole request goes through the standard 503/408/transport
231    /// retry wrapper — safe for the same dedup reason.
232    async fn post_batch(&self, drafts: Vec<TransactionDraft>) -> Vec<ApiResult<Posted>> {
233        if drafts.is_empty() {
234            return Vec::new();
235        }
236        let n = drafts.len();
237        let url = match self.http.url(&["transactions", "batch"]) {
238            Ok(u) => u,
239            Err(e) => return std::iter::repeat_with(|| Err(e.clone())).take(n).collect(),
240        };
241        self.http
242            .execute_batch(|| self.http.client.post(url.clone()).json(&drafts), n)
243            .await
244    }
245
246    async fn subscribe(&self, book: &str, from: Seq) -> ApiResult<EventStream> {
247        sse::subscribe(&self.http, book, from)
248    }
249}