1use std::str::FromStr;
2use std::time::Duration;
3
4use futures::stream::StreamExt;
5use http::{HeaderValue, Method};
6use reqwest::{IntoUrl, Response, Url};
7use reqwest_eventsource::{Event, EventSource};
8use serde::{Deserialize, Serialize};
9use tokio::select;
10use tokio::task::JoinHandle;
11use tokio_util::sync::CancellationToken;
12use tracing::error;
13use uuid::Uuid;
14
15use crate::request::RequestBuilder;
16
17const PROOF_TIMEOUT: Duration = Duration::from_millis(1000);
19
20#[derive(Clone)]
21pub struct VerityClientConfig {
22 pub prover_url: String,
23}
24
25#[derive(Clone)]
26pub struct VerityClient {
27 pub(crate) inner: reqwest::Client,
28 pub(crate) config: VerityClientConfig,
29}
30
31pub struct VerityResponse {
32 pub subject: Response,
33 pub proof: String,
34 pub notary_pub_key: String,
35}
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
38#[serde(rename_all = "camelCase")]
39pub struct NotaryInformation {
40 pub version: String,
41 pub public_key: String,
42 pub git_commit_hash: String,
43 pub git_commit_timestamp: String,
44}
45
46impl VerityClient {
47 pub fn new(config: VerityClientConfig) -> Self {
49 return Self {
50 inner: reqwest::Client::new(),
51 config,
52 };
53 }
54
55 pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
61 self.request(Method::GET, url)
62 }
63
64 pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
70 self.request(Method::POST, url)
71 }
72
73 pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
82 RequestBuilder {
83 client: self.clone(),
84 inner: self.inner.request(method, url),
85 }
86 }
87
88 pub async fn execute(&mut self, request: reqwest::Request) -> anyhow::Result<VerityResponse> {
101 self.execute_request(request).await
102 }
103
104 pub async fn execute_request(
110 &mut self,
111 mut req: reqwest::Request,
112 ) -> anyhow::Result<VerityResponse> {
113 let proxy_url = &String::from(req.url().as_str());
114 let headers = req.headers_mut();
115
116 let request_id = Uuid::new_v4();
117 headers.append(
118 "T-REQUEST-ID",
119 HeaderValue::from_str(&format!("{}", request_id))?,
120 );
121
122 headers.append("T-PROXY-URL", HeaderValue::from_str(proxy_url)?);
123
124 *req.url_mut() = Url::from_str(&format!("{}/proxy", self.config.prover_url))?;
125
126 let req = reqwest::RequestBuilder::from_parts(self.inner.clone(), req);
127
128 let request_cancellation_token = CancellationToken::new();
129 let timeout_cancellation_token = CancellationToken::new();
130
131 let proof_awaiter = self.await_proof(
132 request_id.to_string(),
133 request_cancellation_token.clone(),
134 timeout_cancellation_token.clone(),
135 )?;
136
137 let (response, proof_msg) = tokio::try_join!(
139 self.send_request(req, request_cancellation_token, timeout_cancellation_token),
140 proof_awaiter
141 )
142 .map_err(|e| anyhow::anyhow!("Failed to prove the request: {}", e))?;
143
144 let subject = response?;
145 let (notary_pub_key, proof) = proof_msg?;
146
147 Ok(VerityResponse {
148 subject,
149 proof,
150 notary_pub_key,
151 })
152 }
153
154 fn send_request(
158 &self,
159 request: reqwest::RequestBuilder,
160 request_cancellation_token: CancellationToken,
161 timeout_cancellation_token: CancellationToken,
162 ) -> JoinHandle<anyhow::Result<reqwest::Response>> {
163 tokio::spawn(async move {
164 let result = request.send().await;
165 let response = result.map_err(|e| {
166 error!("{}", e);
167 e
168 })?;
169
170 if response.status().is_success() {
171 tokio::spawn(async move {
172 tokio::time::sleep(PROOF_TIMEOUT).await;
173 timeout_cancellation_token.cancel();
174 });
175 } else {
176 request_cancellation_token.cancel();
177 error!(
178 "Request is not success: {} {}",
179 response.status().as_str(),
180 response.status().canonical_reason().unwrap_or_default()
181 );
182 return Ok(response);
183 }
184
185 Ok(response)
186 })
187 }
188
189 fn await_proof(
197 &self,
198 request_id: String,
199 request_cancellation_token: CancellationToken,
200 timeout_cancellation_token: CancellationToken,
201 ) -> anyhow::Result<JoinHandle<anyhow::Result<(String, String)>>> {
202 let url = Url::from_str(&format!("{}/proof/{}", self.config.prover_url, request_id))?;
203 let mut event_source = EventSource::get(url);
204
205 let awaiter = tokio::task::spawn(async move {
206 while let Some(event) = event_source.next().await {
207 match event {
208 Ok(Event::Open) => {}
209 Ok(Event::Message(message)) => {
210 let parts: Vec<&str> = message.data.splitn(2, "|").collect();
211 if parts.len() != 2 {
212 anyhow::bail!("Invalid proof response");
213 }
214
215 return Ok((parts[0].to_string(), parts[1].to_string()));
216 }
217 Err(err) => {
218 error!("{}", err);
219 Err(err)?;
220 }
221 }
222 }
223
224 Ok((String::from(""), String::from("")))
225 });
226
227 let join_handle = tokio::spawn(async move {
228 select! {
230 proof = awaiter => {
231 proof.unwrap()
232 }
233 () = timeout_cancellation_token.cancelled() => {
234 anyhow::bail!("Timeout reached while waiting for a proof")
235 }
236 () = request_cancellation_token.cancelled() => {
237 Ok((String::new(), String::new()))
238 }
239 }
240 });
241
242 Ok(join_handle)
243 }
244
245 pub async fn get_notary_info(&self) -> anyhow::Result<NotaryInformation> {
247 let notary_info_url = format!("{}/notaryinfo", self.config.prover_url);
248 let notary_information = reqwest::get(notary_info_url)
249 .await?
250 .json::<NotaryInformation>()
251 .await?;
252
253 Ok(notary_information)
254 }
255}