menmos_client/
client.rs

1use std::path::{Path, PathBuf};
2use std::time::Duration;
3
4use apikit::payload::{ErrorResponse, MessageResponse};
5
6use bytes::Bytes;
7
8use futures::{Stream, TryStreamExt};
9
10use header::HeaderName;
11
12use interface::{BlobMeta, MetadataList, Query, QueryResponse, RoutingConfig};
13
14use hyper::{header, StatusCode};
15
16use protocol::directory::{auth::*, blobmeta::*, routing::*, storage::*};
17use protocol::storage::PutResponse;
18
19use reqwest::{Client as ReqwestClient, Request};
20
21use reqwest::Body;
22
23use serde::de::DeserializeOwned;
24
25use snafu::prelude::*;
26use tokio_util::codec::{BytesCodec, FramedRead};
27
28use crate::{ClientBuilder, Meta, Parameters};
29
30#[derive(Debug, Snafu)]
31pub enum ClientError {
32    #[snafu(display("failed to build reqwest client: {}", source))]
33    ClientBuildError { source: reqwest::Error },
34
35    #[snafu(display("failed to fetch response body: {}", source))]
36    FetchBodyError { source: reqwest::Error },
37
38    #[snafu(display("file [{:?}] does not exist", path))]
39    FileDoesNotExist { path: PathBuf },
40
41    #[snafu(display("failed to load the metadata for '{:?}': {}", path, source))]
42    FileMetadataError {
43        source: std::io::Error,
44        path: PathBuf,
45    },
46
47    #[snafu(display("failed to serialize metadata [{:?}]: {}", meta, source))]
48    MetaSerializationError {
49        source: serde_json::Error,
50        meta: Meta,
51    },
52
53    #[snafu(display("the redirect limit of {} was exceeded", limit))]
54    RedirectLimitExceeded { limit: u32 },
55
56    #[snafu(display("failed to build request: {}", source))]
57    RequestBuildError { source: reqwest::Error },
58
59    #[snafu(display("failed to execute request: {}", source))]
60    RequestExecutionError { source: reqwest::Error },
61
62    #[snafu(display("failed to deserialize response: {}", source))]
63    ResponseDeserializationError { source: serde_json::Error },
64
65    #[snafu(display("server returned an error: {}", message))]
66    ServerReturnedError { message: String },
67
68    #[snafu(display("did not get a redirect when expected"))]
69    MissingRedirect,
70
71    #[snafu(display("did not receive a request id when expected"))]
72    MissingRequestId,
73
74    #[snafu(display("too many retries"))]
75    TooManyRetries,
76
77    #[snafu(display("{}", message))]
78    UnknownError { message: String },
79}
80
81fn encode_metadata(meta: Meta) -> Result<String> {
82    let serialized_meta = serde_json::to_vec(&meta).context(MetaSerializationSnafu { meta })?;
83    Ok(base64::encode(&serialized_meta))
84}
85
86async fn extract_body<T: DeserializeOwned>(response: reqwest::Response) -> Result<T> {
87    let body_bytes = response.bytes().await.context(FetchBodySnafu)?;
88    tracing::debug!("body: {}", String::from_utf8_lossy(&body_bytes));
89    serde_json::from_slice(body_bytes.as_ref()).context(ResponseDeserializationSnafu)
90}
91
92async fn extract_error(response: reqwest::Response) -> ClientError {
93    match extract_body::<ErrorResponse>(response).await {
94        Ok(e) => ClientError::ServerReturnedError { message: e.error },
95        Err(e) => ClientError::UnknownError {
96            message: e.to_string(),
97        },
98    }
99}
100
101async fn extract<T: DeserializeOwned>(response: reqwest::Response) -> Result<T> {
102    let status = response.status();
103    if status.is_success() {
104        extract_body(response).await
105    } else {
106        Err(extract_error(response).await)
107    }
108}
109
110struct RedirectResponse {
111    pub location: String,
112    pub request_id: String,
113}
114
115/// The client, used for interacting witn a Menmos cluster.
116#[derive(Clone)]
117pub struct Client {
118    client: ReqwestClient,
119    host: String,
120    token: String,
121}
122
123type Result<T> = std::result::Result<T, ClientError>;
124
125impl Client {
126    /// Create a new client from explicit credentials with default settings.
127    pub async fn new<S: Into<String>, U: Into<String>, P: Into<String>>(
128        directory_host: S,
129        username: U,
130        password: P,
131    ) -> Result<Self> {
132        Client::new_with_params(Parameters {
133            host: directory_host.into(),
134            username: username.into(),
135            password: password.into(),
136            pool_idle_timeout: Duration::from_secs(5),
137            request_timeout: Duration::from_secs(60),
138        })
139        .await
140    }
141
142    /// Get a client builder to get better control on how the client is configured.
143    pub fn builder() -> ClientBuilder {
144        ClientBuilder::default()
145    }
146
147    pub(crate) async fn new_with_params(params: Parameters) -> Result<Self> {
148        let client = ReqwestClient::builder()
149            .pool_idle_timeout(params.pool_idle_timeout)
150            .timeout(params.request_timeout)
151            .redirect(reqwest::redirect::Policy::none())
152            .build()
153            .context(ClientBuildSnafu)?;
154
155        let token =
156            Client::login(&client, &params.host, &params.username, &params.password).await?;
157
158        Ok(Self {
159            host: params.host,
160            client,
161            token,
162        })
163    }
164
165    async fn prepare_push_request<P: AsRef<Path>>(
166        &self,
167        url: &str,
168        request_id: &str,
169        path: P,
170        encoded_meta: &str,
171        file_length: u64,
172    ) -> Result<reqwest::Request> {
173        let mut request_builder = self
174            .client
175            .post(url)
176            .bearer_auth(&self.token)
177            .header(header::HeaderName::from_static("x-blob-meta"), encoded_meta)
178            .header(header::HeaderName::from_static("x-request-id"), request_id);
179
180        if path.as_ref().is_file() {
181            let file = tokio::fs::File::open(path.as_ref()).await.unwrap();
182            let stream = FramedRead::new(file, BytesCodec::new());
183            request_builder = request_builder
184                .body(Body::wrap_stream(stream))
185                .header(HeaderName::from_static("x-blob-size"), file_length);
186        } else {
187            request_builder = request_builder.header(HeaderName::from_static("x-blob-size"), 0_u64);
188        }
189
190        request_builder.build().context(RequestBuildSnafu)
191    }
192
193    async fn request_with_redirect(&self, request: Request) -> Result<RedirectResponse> {
194        let response = self
195            .client
196            .execute(request)
197            .await
198            .context(RequestExecutionSnafu)?;
199
200        ensure!(
201            response.status() == StatusCode::TEMPORARY_REDIRECT,
202            MissingRedirectSnafu
203        );
204
205        let new_location = response
206            .headers()
207            .get(header::LOCATION)
208            .ok_or(ClientError::MissingRedirect)?;
209
210        let request_id = response
211            .headers()
212            .get("x-request-id")
213            .ok_or(ClientError::MissingRequestId)?;
214
215        let new_url = new_location
216            .to_str()
217            .expect("redirect location is not UTF-8");
218        tracing::debug!("redirect to {}", new_url);
219
220        Ok(RedirectResponse {
221            location: new_url.to_string(),
222            request_id: String::from_utf8(request_id.as_bytes().to_vec()).unwrap(), // We know request id is ASCII, so it is also unicode.
223        })
224    }
225
226    async fn login(
227        client: &ReqwestClient,
228        host: &str,
229        username: &str,
230        password: &str,
231    ) -> Result<String> {
232        let url = format!("{}/auth/login", host);
233
234        let response = client
235            .post(&url)
236            .json(&LoginRequest {
237                username: username.to_string(),
238                password: password.to_string(),
239            })
240            .send()
241            .await
242            .context(RequestExecutionSnafu)?;
243
244        let resp: LoginResponse = extract(response).await?;
245
246        Ok(resp.token)
247    }
248
249    pub async fn register(&self, username: &str, password: &str) -> Result<String> {
250        let url = format!("{}/auth/register", self.host);
251
252        let response = self
253            .client
254            .post(&url)
255            .bearer_auth(&self.token)
256            .json(&RegisterRequest {
257                username: username.to_string(),
258                password: password.to_string(),
259            })
260            .send()
261            .await
262            .context(RequestExecutionSnafu)?;
263
264        let resp: LoginResponse = extract(response).await?;
265        Ok(resp.token)
266    }
267
268    /// Create an empty file on the cluster with the provided meta.
269    ///
270    /// Returns the created file's ID.
271    pub async fn create_empty(&self, meta: Meta) -> Result<String> {
272        let url = format!("{}/blob", self.host);
273        let meta_b64 = encode_metadata(meta)?;
274
275        let redirect_req = self
276            .client
277            .post(&url)
278            .bearer_auth(&self.token)
279            .header(HeaderName::from_static("x-blob-meta"), meta_b64.clone())
280            .header(HeaderName::from_static("x-blob-size"), 0_u64)
281            .build()
282            .context(RequestBuildSnafu)?;
283
284        let RedirectResponse {
285            location,
286            request_id,
287        } = self.request_with_redirect(redirect_req).await?;
288
289        let response = self
290            .client
291            .post(&location)
292            .bearer_auth(&self.token)
293            .header(HeaderName::from_static("x-blob-meta"), &meta_b64)
294            .header(HeaderName::from_static("x-blob-size"), 0_u64)
295            .header(HeaderName::from_static("x-request-id"), &request_id)
296            .send()
297            .await
298            .context(RequestExecutionSnafu)?;
299
300        let put_response: PutResponse = extract(response).await?;
301        Ok(put_response.id)
302    }
303
304    async fn push_internal<P: AsRef<Path>>(
305        &self,
306        path: P,
307        meta: Meta,
308        base_url: String,
309    ) -> Result<String> {
310        ensure!(
311            path.as_ref().exists(),
312            FileDoesNotExistSnafu {
313                path: PathBuf::from(path.as_ref())
314            }
315        );
316
317        let url = base_url;
318        let meta_b64 = encode_metadata(meta)?;
319
320        let file_length = path
321            .as_ref()
322            .metadata()
323            .context(FileMetadataSnafu {
324                path: PathBuf::from(path.as_ref()),
325            })?
326            .len();
327
328        let initial_redirect_request = self
329            .client
330            .post(&url)
331            .bearer_auth(&self.token)
332            .header(
333                header::HeaderName::from_static("x-blob-meta"),
334                meta_b64.clone(),
335            )
336            .header(HeaderName::from_static("x-blob-size"), file_length)
337            .build()
338            .context(RequestBuildSnafu)?;
339
340        let RedirectResponse {
341            location,
342            request_id,
343        } = self.request_with_redirect(initial_redirect_request).await?;
344
345        let request = self
346            .prepare_push_request(
347                &location,
348                &request_id,
349                path.as_ref(),
350                &meta_b64,
351                file_length,
352            )
353            .await?;
354
355        let response = self
356            .client
357            .execute(request)
358            .await
359            .context(RequestExecutionSnafu)?;
360
361        let put_response: PutResponse = extract(response).await?;
362        Ok(put_response.id)
363    }
364
365    /// Send a health check request to the cluster.
366    ///
367    /// Returns the cluster health status as a string.
368    pub async fn health(&self) -> Result<String> {
369        let url = format!("{}/health", self.host);
370
371        let response = self
372            .client
373            .get(&url)
374            .send()
375            .await
376            .context(RequestExecutionSnafu)?;
377
378        let status = response.status();
379
380        if status.is_success() {
381            let msg: MessageResponse = extract_body(response).await?;
382            Ok(msg.message)
383        } else {
384            Err(extract_error(response).await)
385        }
386    }
387
388    /// List all storage nodes currently authenticated with the cluster.
389    pub async fn list_storage_nodes(&self) -> Result<ListStorageNodesResponse> {
390        let url = format!("{}/node/storage", self.host);
391
392        let response = self
393            .client
394            .get(&url)
395            .bearer_auth(&self.token)
396            .send()
397            .await
398            .context(RequestExecutionSnafu)?;
399
400        extract_body(response).await
401    }
402
403    /// Pushes a file with the specified meta to the cluster.
404    ///
405    /// Returns the ID of the created file.
406    pub async fn push<P: AsRef<Path>>(&self, path: P, meta: Meta) -> Result<String> {
407        self.push_internal(path, meta, format!("{}/blob", self.host))
408            .await
409    }
410
411    /// Update a blob's content.
412    ///
413    /// Returns the ID of the updated file. Should always be equal to `blob_id`.
414    pub async fn update_blob<P: AsRef<Path>>(
415        &self,
416        blob_id: &str,
417        path: P,
418        meta: Meta,
419    ) -> Result<String> {
420        self.push_internal(path, meta, format!("{}/blob/{}", self.host, blob_id))
421            .await
422    }
423
424    /// Lists all metadata values in the cluster.
425    ///
426    /// `tags` is an optional whitelist of tags to compute. When absent, all tags are included.
427    /// `meta_keys` is an optional whitelist of keys to compute. When absent, all key/value pairs are included (this can be very expensive).
428    pub async fn list_meta(
429        &self,
430        tags: Option<Vec<String>>,
431        meta_keys: Option<Vec<String>>,
432    ) -> Result<MetadataList> {
433        let url = format!("{}/metadata", &self.host);
434
435        let response = self
436            .client
437            .get(&url)
438            .bearer_auth(&self.token)
439            .json(&ListMetadataRequest {
440                tags,
441                fields: meta_keys,
442            })
443            .send()
444            .await
445            .context(RequestExecutionSnafu)?;
446
447        extract(response).await
448    }
449
450    /// Update a blob's metadata without touching the contents of the file.
451    pub async fn update_meta(&self, blob_id: &str, meta: Meta) -> Result<()> {
452        let url = format!("{}/blob/{}/metadata", self.host, blob_id);
453
454        let request = self
455            .client
456            .put(&url)
457            .bearer_auth(&self.token)
458            .json(&meta)
459            .build()
460            .context(RequestBuildSnafu)?;
461
462        let RedirectResponse {
463            location,
464            request_id,
465        } = self.request_with_redirect(request).await?;
466
467        let response = self
468            .client
469            .put(&location)
470            .bearer_auth(&self.token)
471            .header(HeaderName::from_static("x-request-id"), request_id)
472            .json(&meta)
473            .send()
474            .await
475            .context(RequestExecutionSnafu)?;
476
477        if response.status().is_success() {
478            Ok(())
479        } else {
480            Err(extract_error(response).await)
481        }
482    }
483
484    /// Force synchronization of a blob to its backing storage.
485    ///
486    /// This is an advanced feature, and should only be called by people who know what they are doing.
487    pub async fn fsync(&self, blob_id: &str) -> Result<()> {
488        let url = format!("{}/blob/{}/fsync", self.host, blob_id);
489
490        let request = self
491            .client
492            .post(&url)
493            .bearer_auth(&self.token)
494            .build()
495            .context(RequestBuildSnafu)?;
496
497        let RedirectResponse {
498            location,
499            request_id,
500        } = self.request_with_redirect(request).await?;
501
502        let response = self
503            .client
504            .post(&location)
505            .bearer_auth(&self.token)
506            .header(HeaderName::from_static("x-request-id"), request_id)
507            .send()
508            .await
509            .context(RequestBuildSnafu)?;
510
511        if response.status().is_success() {
512            Ok(())
513        } else {
514            Err(extract_error(response).await)
515        }
516    }
517
518    /// Write a byte buffer to a specified offset in a blob.
519    pub async fn write(&self, blob_id: &str, offset: u64, buffer: Bytes) -> Result<()> {
520        let url = format!("{}/blob/{}", self.host, blob_id);
521
522        let request = self
523            .client
524            .put(&url)
525            .bearer_auth(&self.token)
526            .header(
527                header::RANGE,
528                &format!("bytes={}-{}", offset, offset + (buffer.len() - 1) as u64),
529            )
530            .build()
531            .context(RequestBuildSnafu)?;
532
533        let RedirectResponse {
534            location,
535            request_id,
536        } = self.request_with_redirect(request).await?;
537
538        let response = self
539            .client
540            .put(&location)
541            .bearer_auth(&self.token)
542            .header(
543                header::RANGE,
544                &format!("bytes={}-{}", offset, offset + (buffer.len() - 1) as u64),
545            )
546            .header(HeaderName::from_static("x-request-id"), request_id)
547            .body(buffer.clone())
548            .send()
549            .await
550            .context(RequestExecutionSnafu)?;
551
552        let status = response.status();
553        if status.is_success() {
554            // Our upload got through.
555            // Deserialize the body to get the content ID.
556            Ok(())
557        } else {
558            // An error occurred.
559            Err(extract_error(response).await)
560        }
561    }
562
563    /// Get a blob's metadata.
564    pub async fn get_meta(&self, blob_id: &str) -> Result<Option<BlobMeta>> {
565        let url = format!("{}/blob/{}/metadata", self.host, blob_id);
566
567        let response = self
568            .client
569            .get(&url)
570            .bearer_auth(&self.token)
571            .send()
572            .await
573            .context(RequestExecutionSnafu)?;
574
575        let resp: GetMetaResponse = extract(response).await?;
576        Ok(resp.meta)
577    }
578
579    /// Get a blob's body as a stream of bytes.
580    pub async fn get_file(&self, blob_id: &str) -> Result<impl Stream<Item = Result<Bytes>>> {
581        let url = format!("{}/blob/{}", self.host, blob_id);
582
583        let redirect_request = self
584            .client
585            .get(&url)
586            .bearer_auth(&self.token)
587            .build()
588            .context(RequestBuildSnafu)?;
589
590        let RedirectResponse {
591            location,
592            request_id,
593        } = self.request_with_redirect(redirect_request).await?;
594
595        let response = self
596            .client
597            .get(&location)
598            .bearer_auth(&self.token)
599            .header(HeaderName::from_static("x-request-id"), request_id)
600            .send()
601            .await
602            .context(RequestExecutionSnafu)?;
603
604        if response.status().is_success() {
605            Ok(response
606                .bytes_stream()
607                .map_err(|e| ClientError::UnknownError {
608                    message: e.to_string(),
609                }))
610        } else {
611            Err(extract_error(response).await)
612        }
613    }
614
615    // TODO: This API might be improved by using a bytes buffer instead of a raw vec.
616    // TODO: Use a rust range instead of a tuple
617    // TODO: Return a stream of Bytes buffers.
618    // Note: range is end-inclusive here. TODO: Clarify when ranges are inclusive vs. exclusive.
619    /// Read a subset of a blob.
620    ///
621    /// The `range` argument is end-inclusive.
622    pub async fn read_range(&self, blob_id: &str, range: (u64, u64)) -> Result<Vec<u8>> {
623        let url = format!("{}/blob/{}", self.host, blob_id);
624
625        let request = self
626            .client
627            .get(&url)
628            .bearer_auth(&self.token)
629            .header(header::RANGE, &format!("bytes={}-{}", range.0, range.1))
630            .build()
631            .context(RequestBuildSnafu)?;
632
633        let RedirectResponse {
634            location,
635            request_id,
636        } = self.request_with_redirect(request).await?;
637
638        let response = self
639            .client
640            .get(&location)
641            .header(header::RANGE, &format!("bytes={}-{}", range.0, range.1))
642            .header(HeaderName::from_static("x-request-id"), request_id)
643            .bearer_auth(&self.token)
644            .send()
645            .await
646            .context(RequestExecutionSnafu)?;
647
648        let status = response.status();
649        if status.is_success() {
650            let resp_bytes = response.bytes().await.context(FetchBodySnafu)?;
651            Ok(resp_bytes.to_vec())
652        } else {
653            Err(extract_error(response).await)
654        }
655    }
656
657    /// Send a query to the cluster.
658    pub async fn query(&self, query: Query) -> Result<QueryResponse> {
659        let url = format!("{}/query", self.host);
660
661        let response = self
662            .client
663            .post(&url)
664            .bearer_auth(&self.token)
665            .json(&query)
666            .send()
667            .await
668            .context(RequestExecutionSnafu)?;
669        extract(response).await
670    }
671
672    /// Delete a blob from the cluster.
673    pub async fn delete(&self, blob_id: String) -> Result<()> {
674        let url = format!("{}/blob/{}", self.host, blob_id);
675
676        let request = self
677            .client
678            .delete(&url)
679            .bearer_auth(&self.token)
680            .build()
681            .context(RequestBuildSnafu)?;
682
683        let RedirectResponse {
684            location,
685            request_id,
686        } = self.request_with_redirect(request).await?;
687
688        let response = self
689            .client
690            .delete(&location)
691            .bearer_auth(&self.token)
692            .header(HeaderName::from_static("x-request-id"), request_id)
693            .send()
694            .await
695            .context(RequestExecutionSnafu)?;
696
697        let status = response.status();
698        if status.is_success() {
699            // Our delete got through.
700            Ok(())
701        } else {
702            // An error occurred.
703            Err(extract_error(response).await)
704        }
705    }
706
707    pub async fn get_routing_config(&self) -> Result<Option<RoutingConfig>> {
708        let url = format!("{}/routing", self.host);
709
710        let response = self
711            .client
712            .get(&url)
713            .bearer_auth(&self.token)
714            .send()
715            .await
716            .context(RequestExecutionSnafu)?;
717
718        let response: GetRoutingConfigResponse = extract(response).await?;
719
720        Ok(response.routing_config)
721    }
722
723    pub async fn set_routing_config(&self, routing_config: &RoutingConfig) -> Result<()> {
724        let url = format!("{}/routing", self.host);
725
726        let response = self
727            .client
728            .put(&url)
729            .bearer_auth(&self.token)
730            .json(&SetRoutingConfigRequest {
731                routing_config: routing_config.clone(),
732            })
733            .send()
734            .await
735            .context(RequestExecutionSnafu)?;
736
737        if response.status().is_success() {
738            Ok(())
739        } else {
740            Err(extract_error(response).await)
741        }
742    }
743
744    pub async fn delete_routing_config(&self) -> Result<()> {
745        let url = format!("{}/routing", self.host);
746
747        let response = self
748            .client
749            .delete(&url)
750            .bearer_auth(&self.token)
751            .send()
752            .await
753            .context(RequestExecutionSnafu)?;
754
755        if response.status().is_success() {
756            Ok(())
757        } else {
758            Err(extract_error(response).await)
759        }
760    }
761}