1use std::path::{Path, PathBuf};
2use std::time::Duration;
3
4use apikit::payload::{ErrorResponse, MessageResponse};
5
6use bytes::Bytes;
7
8use futures::{Stream, TryStreamExt};
9
10use header::HeaderName;
11
12use interface::{BlobMeta, MetadataList, Query, QueryResponse, RoutingConfig};
13
14use hyper::{header, StatusCode};
15
16use protocol::directory::{auth::*, blobmeta::*, routing::*, storage::*};
17use protocol::storage::PutResponse;
18
19use reqwest::{Client as ReqwestClient, Request};
20
21use reqwest::Body;
22
23use serde::de::DeserializeOwned;
24
25use snafu::prelude::*;
26use tokio_util::codec::{BytesCodec, FramedRead};
27
28use crate::{ClientBuilder, Meta, Parameters};
29
30#[derive(Debug, Snafu)]
31pub enum ClientError {
32 #[snafu(display("failed to build reqwest client: {}", source))]
33 ClientBuildError { source: reqwest::Error },
34
35 #[snafu(display("failed to fetch response body: {}", source))]
36 FetchBodyError { source: reqwest::Error },
37
38 #[snafu(display("file [{:?}] does not exist", path))]
39 FileDoesNotExist { path: PathBuf },
40
41 #[snafu(display("failed to load the metadata for '{:?}': {}", path, source))]
42 FileMetadataError {
43 source: std::io::Error,
44 path: PathBuf,
45 },
46
47 #[snafu(display("failed to serialize metadata [{:?}]: {}", meta, source))]
48 MetaSerializationError {
49 source: serde_json::Error,
50 meta: Meta,
51 },
52
53 #[snafu(display("the redirect limit of {} was exceeded", limit))]
54 RedirectLimitExceeded { limit: u32 },
55
56 #[snafu(display("failed to build request: {}", source))]
57 RequestBuildError { source: reqwest::Error },
58
59 #[snafu(display("failed to execute request: {}", source))]
60 RequestExecutionError { source: reqwest::Error },
61
62 #[snafu(display("failed to deserialize response: {}", source))]
63 ResponseDeserializationError { source: serde_json::Error },
64
65 #[snafu(display("server returned an error: {}", message))]
66 ServerReturnedError { message: String },
67
68 #[snafu(display("did not get a redirect when expected"))]
69 MissingRedirect,
70
71 #[snafu(display("did not receive a request id when expected"))]
72 MissingRequestId,
73
74 #[snafu(display("too many retries"))]
75 TooManyRetries,
76
77 #[snafu(display("{}", message))]
78 UnknownError { message: String },
79}
80
81fn encode_metadata(meta: Meta) -> Result<String> {
82 let serialized_meta = serde_json::to_vec(&meta).context(MetaSerializationSnafu { meta })?;
83 Ok(base64::encode(&serialized_meta))
84}
85
86async fn extract_body<T: DeserializeOwned>(response: reqwest::Response) -> Result<T> {
87 let body_bytes = response.bytes().await.context(FetchBodySnafu)?;
88 tracing::debug!("body: {}", String::from_utf8_lossy(&body_bytes));
89 serde_json::from_slice(body_bytes.as_ref()).context(ResponseDeserializationSnafu)
90}
91
92async fn extract_error(response: reqwest::Response) -> ClientError {
93 match extract_body::<ErrorResponse>(response).await {
94 Ok(e) => ClientError::ServerReturnedError { message: e.error },
95 Err(e) => ClientError::UnknownError {
96 message: e.to_string(),
97 },
98 }
99}
100
101async fn extract<T: DeserializeOwned>(response: reqwest::Response) -> Result<T> {
102 let status = response.status();
103 if status.is_success() {
104 extract_body(response).await
105 } else {
106 Err(extract_error(response).await)
107 }
108}
109
110struct RedirectResponse {
111 pub location: String,
112 pub request_id: String,
113}
114
115#[derive(Clone)]
117pub struct Client {
118 client: ReqwestClient,
119 host: String,
120 token: String,
121}
122
123type Result<T> = std::result::Result<T, ClientError>;
124
125impl Client {
126 pub async fn new<S: Into<String>, U: Into<String>, P: Into<String>>(
128 directory_host: S,
129 username: U,
130 password: P,
131 ) -> Result<Self> {
132 Client::new_with_params(Parameters {
133 host: directory_host.into(),
134 username: username.into(),
135 password: password.into(),
136 pool_idle_timeout: Duration::from_secs(5),
137 request_timeout: Duration::from_secs(60),
138 })
139 .await
140 }
141
142 pub fn builder() -> ClientBuilder {
144 ClientBuilder::default()
145 }
146
147 pub(crate) async fn new_with_params(params: Parameters) -> Result<Self> {
148 let client = ReqwestClient::builder()
149 .pool_idle_timeout(params.pool_idle_timeout)
150 .timeout(params.request_timeout)
151 .redirect(reqwest::redirect::Policy::none())
152 .build()
153 .context(ClientBuildSnafu)?;
154
155 let token =
156 Client::login(&client, ¶ms.host, ¶ms.username, ¶ms.password).await?;
157
158 Ok(Self {
159 host: params.host,
160 client,
161 token,
162 })
163 }
164
165 async fn prepare_push_request<P: AsRef<Path>>(
166 &self,
167 url: &str,
168 request_id: &str,
169 path: P,
170 encoded_meta: &str,
171 file_length: u64,
172 ) -> Result<reqwest::Request> {
173 let mut request_builder = self
174 .client
175 .post(url)
176 .bearer_auth(&self.token)
177 .header(header::HeaderName::from_static("x-blob-meta"), encoded_meta)
178 .header(header::HeaderName::from_static("x-request-id"), request_id);
179
180 if path.as_ref().is_file() {
181 let file = tokio::fs::File::open(path.as_ref()).await.unwrap();
182 let stream = FramedRead::new(file, BytesCodec::new());
183 request_builder = request_builder
184 .body(Body::wrap_stream(stream))
185 .header(HeaderName::from_static("x-blob-size"), file_length);
186 } else {
187 request_builder = request_builder.header(HeaderName::from_static("x-blob-size"), 0_u64);
188 }
189
190 request_builder.build().context(RequestBuildSnafu)
191 }
192
193 async fn request_with_redirect(&self, request: Request) -> Result<RedirectResponse> {
194 let response = self
195 .client
196 .execute(request)
197 .await
198 .context(RequestExecutionSnafu)?;
199
200 ensure!(
201 response.status() == StatusCode::TEMPORARY_REDIRECT,
202 MissingRedirectSnafu
203 );
204
205 let new_location = response
206 .headers()
207 .get(header::LOCATION)
208 .ok_or(ClientError::MissingRedirect)?;
209
210 let request_id = response
211 .headers()
212 .get("x-request-id")
213 .ok_or(ClientError::MissingRequestId)?;
214
215 let new_url = new_location
216 .to_str()
217 .expect("redirect location is not UTF-8");
218 tracing::debug!("redirect to {}", new_url);
219
220 Ok(RedirectResponse {
221 location: new_url.to_string(),
222 request_id: String::from_utf8(request_id.as_bytes().to_vec()).unwrap(), })
224 }
225
226 async fn login(
227 client: &ReqwestClient,
228 host: &str,
229 username: &str,
230 password: &str,
231 ) -> Result<String> {
232 let url = format!("{}/auth/login", host);
233
234 let response = client
235 .post(&url)
236 .json(&LoginRequest {
237 username: username.to_string(),
238 password: password.to_string(),
239 })
240 .send()
241 .await
242 .context(RequestExecutionSnafu)?;
243
244 let resp: LoginResponse = extract(response).await?;
245
246 Ok(resp.token)
247 }
248
249 pub async fn register(&self, username: &str, password: &str) -> Result<String> {
250 let url = format!("{}/auth/register", self.host);
251
252 let response = self
253 .client
254 .post(&url)
255 .bearer_auth(&self.token)
256 .json(&RegisterRequest {
257 username: username.to_string(),
258 password: password.to_string(),
259 })
260 .send()
261 .await
262 .context(RequestExecutionSnafu)?;
263
264 let resp: LoginResponse = extract(response).await?;
265 Ok(resp.token)
266 }
267
268 pub async fn create_empty(&self, meta: Meta) -> Result<String> {
272 let url = format!("{}/blob", self.host);
273 let meta_b64 = encode_metadata(meta)?;
274
275 let redirect_req = self
276 .client
277 .post(&url)
278 .bearer_auth(&self.token)
279 .header(HeaderName::from_static("x-blob-meta"), meta_b64.clone())
280 .header(HeaderName::from_static("x-blob-size"), 0_u64)
281 .build()
282 .context(RequestBuildSnafu)?;
283
284 let RedirectResponse {
285 location,
286 request_id,
287 } = self.request_with_redirect(redirect_req).await?;
288
289 let response = self
290 .client
291 .post(&location)
292 .bearer_auth(&self.token)
293 .header(HeaderName::from_static("x-blob-meta"), &meta_b64)
294 .header(HeaderName::from_static("x-blob-size"), 0_u64)
295 .header(HeaderName::from_static("x-request-id"), &request_id)
296 .send()
297 .await
298 .context(RequestExecutionSnafu)?;
299
300 let put_response: PutResponse = extract(response).await?;
301 Ok(put_response.id)
302 }
303
304 async fn push_internal<P: AsRef<Path>>(
305 &self,
306 path: P,
307 meta: Meta,
308 base_url: String,
309 ) -> Result<String> {
310 ensure!(
311 path.as_ref().exists(),
312 FileDoesNotExistSnafu {
313 path: PathBuf::from(path.as_ref())
314 }
315 );
316
317 let url = base_url;
318 let meta_b64 = encode_metadata(meta)?;
319
320 let file_length = path
321 .as_ref()
322 .metadata()
323 .context(FileMetadataSnafu {
324 path: PathBuf::from(path.as_ref()),
325 })?
326 .len();
327
328 let initial_redirect_request = self
329 .client
330 .post(&url)
331 .bearer_auth(&self.token)
332 .header(
333 header::HeaderName::from_static("x-blob-meta"),
334 meta_b64.clone(),
335 )
336 .header(HeaderName::from_static("x-blob-size"), file_length)
337 .build()
338 .context(RequestBuildSnafu)?;
339
340 let RedirectResponse {
341 location,
342 request_id,
343 } = self.request_with_redirect(initial_redirect_request).await?;
344
345 let request = self
346 .prepare_push_request(
347 &location,
348 &request_id,
349 path.as_ref(),
350 &meta_b64,
351 file_length,
352 )
353 .await?;
354
355 let response = self
356 .client
357 .execute(request)
358 .await
359 .context(RequestExecutionSnafu)?;
360
361 let put_response: PutResponse = extract(response).await?;
362 Ok(put_response.id)
363 }
364
365 pub async fn health(&self) -> Result<String> {
369 let url = format!("{}/health", self.host);
370
371 let response = self
372 .client
373 .get(&url)
374 .send()
375 .await
376 .context(RequestExecutionSnafu)?;
377
378 let status = response.status();
379
380 if status.is_success() {
381 let msg: MessageResponse = extract_body(response).await?;
382 Ok(msg.message)
383 } else {
384 Err(extract_error(response).await)
385 }
386 }
387
388 pub async fn list_storage_nodes(&self) -> Result<ListStorageNodesResponse> {
390 let url = format!("{}/node/storage", self.host);
391
392 let response = self
393 .client
394 .get(&url)
395 .bearer_auth(&self.token)
396 .send()
397 .await
398 .context(RequestExecutionSnafu)?;
399
400 extract_body(response).await
401 }
402
403 pub async fn push<P: AsRef<Path>>(&self, path: P, meta: Meta) -> Result<String> {
407 self.push_internal(path, meta, format!("{}/blob", self.host))
408 .await
409 }
410
411 pub async fn update_blob<P: AsRef<Path>>(
415 &self,
416 blob_id: &str,
417 path: P,
418 meta: Meta,
419 ) -> Result<String> {
420 self.push_internal(path, meta, format!("{}/blob/{}", self.host, blob_id))
421 .await
422 }
423
424 pub async fn list_meta(
429 &self,
430 tags: Option<Vec<String>>,
431 meta_keys: Option<Vec<String>>,
432 ) -> Result<MetadataList> {
433 let url = format!("{}/metadata", &self.host);
434
435 let response = self
436 .client
437 .get(&url)
438 .bearer_auth(&self.token)
439 .json(&ListMetadataRequest {
440 tags,
441 fields: meta_keys,
442 })
443 .send()
444 .await
445 .context(RequestExecutionSnafu)?;
446
447 extract(response).await
448 }
449
450 pub async fn update_meta(&self, blob_id: &str, meta: Meta) -> Result<()> {
452 let url = format!("{}/blob/{}/metadata", self.host, blob_id);
453
454 let request = self
455 .client
456 .put(&url)
457 .bearer_auth(&self.token)
458 .json(&meta)
459 .build()
460 .context(RequestBuildSnafu)?;
461
462 let RedirectResponse {
463 location,
464 request_id,
465 } = self.request_with_redirect(request).await?;
466
467 let response = self
468 .client
469 .put(&location)
470 .bearer_auth(&self.token)
471 .header(HeaderName::from_static("x-request-id"), request_id)
472 .json(&meta)
473 .send()
474 .await
475 .context(RequestExecutionSnafu)?;
476
477 if response.status().is_success() {
478 Ok(())
479 } else {
480 Err(extract_error(response).await)
481 }
482 }
483
484 pub async fn fsync(&self, blob_id: &str) -> Result<()> {
488 let url = format!("{}/blob/{}/fsync", self.host, blob_id);
489
490 let request = self
491 .client
492 .post(&url)
493 .bearer_auth(&self.token)
494 .build()
495 .context(RequestBuildSnafu)?;
496
497 let RedirectResponse {
498 location,
499 request_id,
500 } = self.request_with_redirect(request).await?;
501
502 let response = self
503 .client
504 .post(&location)
505 .bearer_auth(&self.token)
506 .header(HeaderName::from_static("x-request-id"), request_id)
507 .send()
508 .await
509 .context(RequestBuildSnafu)?;
510
511 if response.status().is_success() {
512 Ok(())
513 } else {
514 Err(extract_error(response).await)
515 }
516 }
517
518 pub async fn write(&self, blob_id: &str, offset: u64, buffer: Bytes) -> Result<()> {
520 let url = format!("{}/blob/{}", self.host, blob_id);
521
522 let request = self
523 .client
524 .put(&url)
525 .bearer_auth(&self.token)
526 .header(
527 header::RANGE,
528 &format!("bytes={}-{}", offset, offset + (buffer.len() - 1) as u64),
529 )
530 .build()
531 .context(RequestBuildSnafu)?;
532
533 let RedirectResponse {
534 location,
535 request_id,
536 } = self.request_with_redirect(request).await?;
537
538 let response = self
539 .client
540 .put(&location)
541 .bearer_auth(&self.token)
542 .header(
543 header::RANGE,
544 &format!("bytes={}-{}", offset, offset + (buffer.len() - 1) as u64),
545 )
546 .header(HeaderName::from_static("x-request-id"), request_id)
547 .body(buffer.clone())
548 .send()
549 .await
550 .context(RequestExecutionSnafu)?;
551
552 let status = response.status();
553 if status.is_success() {
554 Ok(())
557 } else {
558 Err(extract_error(response).await)
560 }
561 }
562
563 pub async fn get_meta(&self, blob_id: &str) -> Result<Option<BlobMeta>> {
565 let url = format!("{}/blob/{}/metadata", self.host, blob_id);
566
567 let response = self
568 .client
569 .get(&url)
570 .bearer_auth(&self.token)
571 .send()
572 .await
573 .context(RequestExecutionSnafu)?;
574
575 let resp: GetMetaResponse = extract(response).await?;
576 Ok(resp.meta)
577 }
578
579 pub async fn get_file(&self, blob_id: &str) -> Result<impl Stream<Item = Result<Bytes>>> {
581 let url = format!("{}/blob/{}", self.host, blob_id);
582
583 let redirect_request = self
584 .client
585 .get(&url)
586 .bearer_auth(&self.token)
587 .build()
588 .context(RequestBuildSnafu)?;
589
590 let RedirectResponse {
591 location,
592 request_id,
593 } = self.request_with_redirect(redirect_request).await?;
594
595 let response = self
596 .client
597 .get(&location)
598 .bearer_auth(&self.token)
599 .header(HeaderName::from_static("x-request-id"), request_id)
600 .send()
601 .await
602 .context(RequestExecutionSnafu)?;
603
604 if response.status().is_success() {
605 Ok(response
606 .bytes_stream()
607 .map_err(|e| ClientError::UnknownError {
608 message: e.to_string(),
609 }))
610 } else {
611 Err(extract_error(response).await)
612 }
613 }
614
615 pub async fn read_range(&self, blob_id: &str, range: (u64, u64)) -> Result<Vec<u8>> {
623 let url = format!("{}/blob/{}", self.host, blob_id);
624
625 let request = self
626 .client
627 .get(&url)
628 .bearer_auth(&self.token)
629 .header(header::RANGE, &format!("bytes={}-{}", range.0, range.1))
630 .build()
631 .context(RequestBuildSnafu)?;
632
633 let RedirectResponse {
634 location,
635 request_id,
636 } = self.request_with_redirect(request).await?;
637
638 let response = self
639 .client
640 .get(&location)
641 .header(header::RANGE, &format!("bytes={}-{}", range.0, range.1))
642 .header(HeaderName::from_static("x-request-id"), request_id)
643 .bearer_auth(&self.token)
644 .send()
645 .await
646 .context(RequestExecutionSnafu)?;
647
648 let status = response.status();
649 if status.is_success() {
650 let resp_bytes = response.bytes().await.context(FetchBodySnafu)?;
651 Ok(resp_bytes.to_vec())
652 } else {
653 Err(extract_error(response).await)
654 }
655 }
656
657 pub async fn query(&self, query: Query) -> Result<QueryResponse> {
659 let url = format!("{}/query", self.host);
660
661 let response = self
662 .client
663 .post(&url)
664 .bearer_auth(&self.token)
665 .json(&query)
666 .send()
667 .await
668 .context(RequestExecutionSnafu)?;
669 extract(response).await
670 }
671
672 pub async fn delete(&self, blob_id: String) -> Result<()> {
674 let url = format!("{}/blob/{}", self.host, blob_id);
675
676 let request = self
677 .client
678 .delete(&url)
679 .bearer_auth(&self.token)
680 .build()
681 .context(RequestBuildSnafu)?;
682
683 let RedirectResponse {
684 location,
685 request_id,
686 } = self.request_with_redirect(request).await?;
687
688 let response = self
689 .client
690 .delete(&location)
691 .bearer_auth(&self.token)
692 .header(HeaderName::from_static("x-request-id"), request_id)
693 .send()
694 .await
695 .context(RequestExecutionSnafu)?;
696
697 let status = response.status();
698 if status.is_success() {
699 Ok(())
701 } else {
702 Err(extract_error(response).await)
704 }
705 }
706
707 pub async fn get_routing_config(&self) -> Result<Option<RoutingConfig>> {
708 let url = format!("{}/routing", self.host);
709
710 let response = self
711 .client
712 .get(&url)
713 .bearer_auth(&self.token)
714 .send()
715 .await
716 .context(RequestExecutionSnafu)?;
717
718 let response: GetRoutingConfigResponse = extract(response).await?;
719
720 Ok(response.routing_config)
721 }
722
723 pub async fn set_routing_config(&self, routing_config: &RoutingConfig) -> Result<()> {
724 let url = format!("{}/routing", self.host);
725
726 let response = self
727 .client
728 .put(&url)
729 .bearer_auth(&self.token)
730 .json(&SetRoutingConfigRequest {
731 routing_config: routing_config.clone(),
732 })
733 .send()
734 .await
735 .context(RequestExecutionSnafu)?;
736
737 if response.status().is_success() {
738 Ok(())
739 } else {
740 Err(extract_error(response).await)
741 }
742 }
743
744 pub async fn delete_routing_config(&self) -> Result<()> {
745 let url = format!("{}/routing", self.host);
746
747 let response = self
748 .client
749 .delete(&url)
750 .bearer_auth(&self.token)
751 .send()
752 .await
753 .context(RequestExecutionSnafu)?;
754
755 if response.status().is_success() {
756 Ok(())
757 } else {
758 Err(extract_error(response).await)
759 }
760 }
761}