1use anyhow::{bail, Error, Result};
7use reqwest::Client;
8
9use crate::msgpack_codec::{
10 decode_error_body, decode_query_response, decode_tx_key, decode_tx_result_response,
11 encode_execute_request, encode_open_db_request, encode_query_request, encode_subscribe_request,
12 ExecuteRequest, OpenDbRequest, QueryRequest, SubscribeRequest,
13};
14use crate::node::{collect_tx_ops, Database, IntoQuery, IntoTxOp, QueryNode, SubmitNode};
15use crate::ops::QueryArg;
16use crate::query::QueryResult;
17use crate::subscription::Subscription;
18use crate::transaction::{TransactionResult, TxKey};
19use edn::query::ParsedQuery;
20
21const CONTENT_TYPE: &str = "application/vnd.triplox+msgpack";
26
27async fn check_response(resp: reqwest::Response) -> Result<bytes::Bytes> {
30 let status = resp.status();
31 if status.is_success() {
32 Ok(resp.bytes().await?)
33 } else {
34 let body = resp.bytes().await?;
35 if let Ok(error) = decode_error_body(&body) {
36 let mut msg = format!("Server error (code {}): {}", error.code, error.message);
37 if let Some(d) = error.detail {
38 msg.push_str(&format!(" — {}", d));
39 }
40 bail!("{}", msg);
41 }
42 bail!("HTTP error {}: {}", status, String::from_utf8_lossy(&body));
43 }
44}
45
46pub struct ClientNode {
51 client: Client,
52 base_url: String,
53}
54
55impl ClientNode {
56 pub async fn connect(url: &str) -> Result<Self> {
60 let client = Client::builder().http2_prior_knowledge().build()?;
61 Ok(ClientNode {
62 client,
63 base_url: url.trim_end_matches('/').to_string(),
64 })
65 }
66
67 pub async fn subscribe(
72 &self,
73 query: impl IntoQuery,
74 args: &[QueryArg],
75 ) -> Result<Subscription> {
76 let parsed = query.into_query()?;
77 let body = encode_subscribe_request(&SubscribeRequest {
78 tx_key: None,
79 query: parsed.to_string(),
80 args: args.to_vec(),
81 })?;
82 let resp = self
83 .client
84 .post(format!("{}/db/subscribe", self.base_url))
85 .header("Content-Type", CONTENT_TYPE)
86 .body(body)
87 .send()
88 .await?;
89
90 if !resp.status().is_success() {
91 let status = resp.status();
92 let data = resp.bytes().await?;
93 if let Ok(error) = decode_error_body(&data) {
94 bail!("Server error (code {}): {}", error.code, error.message);
95 }
96 bail!("HTTP error {}: {}", status, String::from_utf8_lossy(&data));
97 }
98
99 Subscription::connect(resp).await
100 }
101
102 async fn open_db(&self, tx_key: Option<TxKey>) -> Result<ClientDb> {
103 let (tx_id, system_time) = match tx_key {
104 None => (None, None),
105 Some(tx_key) => (Some(tx_key.tx_id), Some(tx_key.system_time)),
106 };
107
108 let body = encode_open_db_request(&OpenDbRequest { tx_id, system_time })?;
109 let resp = self
110 .client
111 .post(format!("{}/db/open", self.base_url))
112 .header("Content-Type", CONTENT_TYPE)
113 .body(body)
114 .send()
115 .await?;
116
117 let data = check_response(resp).await?;
118 let tx_key = decode_tx_key(&data)?;
119
120 Ok(ClientDb {
121 tx_key,
122 client: self.client.clone(),
123 base_url: self.base_url.clone(),
124 })
125 }
126}
127
128impl SubmitNode for ClientNode {
129 async fn submit_tx<O: IntoTxOp>(&self, ops: Vec<O>) -> Result<TxKey, Error> {
130 let ops = collect_tx_ops(ops)?;
131 let body = encode_execute_request(&ExecuteRequest { ops })?;
132 let resp = self
133 .client
134 .post(format!("{}/tx/submit", self.base_url))
135 .header("Content-Type", CONTENT_TYPE)
136 .body(body)
137 .send()
138 .await?;
139
140 let data = check_response(resp).await?;
141 let tx_key = decode_tx_key(&data)?;
142 Ok(tx_key)
143 }
144
145 async fn execute_tx<O: IntoTxOp>(&self, ops: Vec<O>) -> Result<TransactionResult, Error> {
146 let ops = collect_tx_ops(ops)?;
147 let body = encode_execute_request(&ExecuteRequest { ops })?;
148 let resp = self
149 .client
150 .post(format!("{}/tx/execute", self.base_url))
151 .header("Content-Type", CONTENT_TYPE)
152 .body(body)
153 .send()
154 .await?;
155
156 let data = check_response(resp).await?;
157 let tx_result = decode_tx_result_response(&data)?;
158 let tx_key = TxKey {
159 tx_id: tx_result.tx_id,
160 system_time: tx_result.system_time,
161 };
162
163 if tx_result.status == 0 {
164 Ok(TransactionResult::TxCommitted(tx_key))
165 } else {
166 let err_msg = tx_result
167 .error_message
168 .unwrap_or_else(|| "transaction aborted".to_string());
169 Ok(TransactionResult::TxAborted(
170 tx_key,
171 anyhow::anyhow!("{}", err_msg).into(),
172 ))
173 }
174 }
175}
176
177impl QueryNode for ClientNode {
178 type DB = ClientDb;
179
180 async fn db(&self) -> Result<ClientDb, Error> {
181 self.open_db(None).await
182 }
183
184 async fn db_as_of(&self, tx_key: TxKey) -> Result<ClientDb, Error> {
185 self.open_db(Some(tx_key)).await
186 }
187}
188
189pub struct ClientDb {
195 tx_key: TxKey,
196 client: Client,
197 base_url: String,
198}
199
200impl ClientDb {
201 pub fn tx_key(&self) -> TxKey {
203 self.tx_key
204 }
205}
206
207impl Database for ClientDb {
208 async fn query(&self, query: impl IntoQuery) -> Result<QueryResult, Error> {
209 let parsed = query.into_query()?;
210 self.query_with_args(&parsed, &[]).await
211 }
212
213 async fn query_with_args(
214 &self,
215 query: &ParsedQuery,
216 args: &[QueryArg],
217 ) -> Result<QueryResult, Error> {
218 let body = encode_query_request(&QueryRequest {
219 tx_key: self.tx_key,
220 query: query.to_string(),
221 args: args.to_vec(),
222 })?;
223 let resp = self
224 .client
225 .post(format!("{}/db/query", self.base_url))
226 .header("Content-Type", CONTENT_TYPE)
227 .body(body)
228 .send()
229 .await?;
230
231 let data = check_response(resp).await?;
232 let query_response = decode_query_response(&data)?;
233 Ok(query_response.rows)
234 }
235}