Skip to main content

triplox_client/
client.rs

1//! HTTP/2 client library for connecting to a Triplox server.
2//!
3//! `ClientNode` mirrors the `Node` API and `ClientDb` mirrors the `DB` API,
4//! both operating over HTTP/2 via the binary protocol encoding.
5
6use anyhow::{bail, Error, Result};
7use reqwest::Client;
8
9use crate::msgpack_codec::{
10    decode_error_body, decode_query_response, decode_tx_key, decode_tx_result_response,
11    encode_execute_request, encode_open_db_request, encode_query_request, encode_subscribe_request,
12    ExecuteRequest, OpenDbRequest, QueryRequest, SubscribeRequest,
13};
14use crate::node::{collect_tx_ops, Database, IntoQuery, IntoTxOp, QueryNode, SubmitNode};
15use crate::ops::QueryArg;
16use crate::query::QueryResult;
17use crate::subscription::Subscription;
18use crate::transaction::{TransactionResult, TxKey};
19use edn::query::ParsedQuery;
20
21// ---------------------------------------------------------------------------
22// Helpers
23// ---------------------------------------------------------------------------
24
25const CONTENT_TYPE: &str = "application/vnd.triplox+msgpack";
26
27/// Check an HTTP response for errors. If the status is not success,
28/// attempt to decode a binary ErrorResponse from the body.
29async fn check_response(resp: reqwest::Response) -> Result<bytes::Bytes> {
30    let status = resp.status();
31    if status.is_success() {
32        Ok(resp.bytes().await?)
33    } else {
34        let body = resp.bytes().await?;
35        if let Ok(error) = decode_error_body(&body) {
36            let mut msg = format!("Server error (code {}): {}", error.code, error.message);
37            if let Some(d) = error.detail {
38                msg.push_str(&format!(" — {}", d));
39            }
40            bail!("{}", msg);
41        }
42        bail!("HTTP error {}: {}", status, String::from_utf8_lossy(&body));
43    }
44}
45
46// ---------------------------------------------------------------------------
47// ClientNode
48// ---------------------------------------------------------------------------
49
50pub struct ClientNode {
51    client: Client,
52    base_url: String,
53}
54
55impl ClientNode {
56    /// Connect to a Triplox HTTP server.
57    ///
58    /// `url` should be the base URL, e.g. `http://127.0.0.1:5490`.
59    pub async fn connect(url: &str) -> Result<Self> {
60        let client = Client::builder().http2_prior_knowledge().build()?;
61        Ok(ClientNode {
62            client,
63            base_url: url.trim_end_matches('/').to_string(),
64        })
65    }
66
67    /// Register an incremental query and stream its result deltas.
68    ///
69    /// Subscribes at the latest indexed basis. The returned [`Subscription`] is a
70    /// `Stream` of [`Delta`](crate::Delta)s; dropping it unsubscribes.
71    pub async fn subscribe(
72        &self,
73        query: impl IntoQuery,
74        args: &[QueryArg],
75    ) -> Result<Subscription> {
76        let parsed = query.into_query()?;
77        let body = encode_subscribe_request(&SubscribeRequest {
78            tx_key: None,
79            query: parsed.to_string(),
80            args: args.to_vec(),
81        })?;
82        let resp = self
83            .client
84            .post(format!("{}/db/subscribe", self.base_url))
85            .header("Content-Type", CONTENT_TYPE)
86            .body(body)
87            .send()
88            .await?;
89
90        if !resp.status().is_success() {
91            let status = resp.status();
92            let data = resp.bytes().await?;
93            if let Ok(error) = decode_error_body(&data) {
94                bail!("Server error (code {}): {}", error.code, error.message);
95            }
96            bail!("HTTP error {}: {}", status, String::from_utf8_lossy(&data));
97        }
98
99        Subscription::connect(resp).await
100    }
101
102    async fn open_db(&self, tx_key: Option<TxKey>) -> Result<ClientDb> {
103        let (tx_id, system_time) = match tx_key {
104            None => (None, None),
105            Some(tx_key) => (Some(tx_key.tx_id), Some(tx_key.system_time)),
106        };
107
108        let body = encode_open_db_request(&OpenDbRequest { tx_id, system_time })?;
109        let resp = self
110            .client
111            .post(format!("{}/db/open", self.base_url))
112            .header("Content-Type", CONTENT_TYPE)
113            .body(body)
114            .send()
115            .await?;
116
117        let data = check_response(resp).await?;
118        let tx_key = decode_tx_key(&data)?;
119
120        Ok(ClientDb {
121            tx_key,
122            client: self.client.clone(),
123            base_url: self.base_url.clone(),
124        })
125    }
126}
127
128impl SubmitNode for ClientNode {
129    async fn submit_tx<O: IntoTxOp>(&self, ops: Vec<O>) -> Result<TxKey, Error> {
130        let ops = collect_tx_ops(ops)?;
131        let body = encode_execute_request(&ExecuteRequest { ops })?;
132        let resp = self
133            .client
134            .post(format!("{}/tx/submit", self.base_url))
135            .header("Content-Type", CONTENT_TYPE)
136            .body(body)
137            .send()
138            .await?;
139
140        let data = check_response(resp).await?;
141        let tx_key = decode_tx_key(&data)?;
142        Ok(tx_key)
143    }
144
145    async fn execute_tx<O: IntoTxOp>(&self, ops: Vec<O>) -> Result<TransactionResult, Error> {
146        let ops = collect_tx_ops(ops)?;
147        let body = encode_execute_request(&ExecuteRequest { ops })?;
148        let resp = self
149            .client
150            .post(format!("{}/tx/execute", self.base_url))
151            .header("Content-Type", CONTENT_TYPE)
152            .body(body)
153            .send()
154            .await?;
155
156        let data = check_response(resp).await?;
157        let tx_result = decode_tx_result_response(&data)?;
158        let tx_key = TxKey {
159            tx_id: tx_result.tx_id,
160            system_time: tx_result.system_time,
161        };
162
163        if tx_result.status == 0 {
164            Ok(TransactionResult::TxCommitted(tx_key))
165        } else {
166            let err_msg = tx_result
167                .error_message
168                .unwrap_or_else(|| "transaction aborted".to_string());
169            Ok(TransactionResult::TxAborted(
170                tx_key,
171                anyhow::anyhow!("{}", err_msg).into(),
172            ))
173        }
174    }
175}
176
177impl QueryNode for ClientNode {
178    type DB = ClientDb;
179
180    async fn db(&self) -> Result<ClientDb, Error> {
181        self.open_db(None).await
182    }
183
184    async fn db_as_of(&self, tx_key: TxKey) -> Result<ClientDb, Error> {
185        self.open_db(Some(tx_key)).await
186    }
187}
188
189// ---------------------------------------------------------------------------
190// ClientDb
191// ---------------------------------------------------------------------------
192
193/// A remote DB read basis. Mirrors the `DB` API.
194pub struct ClientDb {
195    tx_key: TxKey,
196    client: Client,
197    base_url: String,
198}
199
200impl ClientDb {
201    /// The transaction key this DB value is pinned to.
202    pub fn tx_key(&self) -> TxKey {
203        self.tx_key
204    }
205}
206
207impl Database for ClientDb {
208    async fn query(&self, query: impl IntoQuery) -> Result<QueryResult, Error> {
209        let parsed = query.into_query()?;
210        self.query_with_args(&parsed, &[]).await
211    }
212
213    async fn query_with_args(
214        &self,
215        query: &ParsedQuery,
216        args: &[QueryArg],
217    ) -> Result<QueryResult, Error> {
218        let body = encode_query_request(&QueryRequest {
219            tx_key: self.tx_key,
220            query: query.to_string(),
221            args: args.to_vec(),
222        })?;
223        let resp = self
224            .client
225            .post(format!("{}/db/query", self.base_url))
226            .header("Content-Type", CONTENT_TYPE)
227            .body(body)
228            .send()
229            .await?;
230
231        let data = check_response(resp).await?;
232        let query_response = decode_query_response(&data)?;
233        Ok(query_response.rows)
234    }
235}