htsget_storage/
lib.rs

1//! Module providing the abstractions needed to read files from an storage
2//!
3
4pub use htsget_config::resolver::{IdResolver, ResolveResponse, StorageResolver};
5pub use htsget_config::types::{
6  Class, Format, Headers, HtsGetError, JsonResponse, Query, Response, Url,
7};
8
9#[cfg(feature = "experimental")]
10use crate::c4gh::storage::C4GHStorage;
11use crate::error::Result;
12use crate::error::StorageError;
13use crate::error::StorageError::InvalidKey;
14use crate::local::FileStorage;
15#[cfg(feature = "aws")]
16use crate::s3::S3Storage;
17use crate::types::{BytesPositionOptions, DataBlock, GetOptions, HeadOptions, RangeUrlOptions};
18#[cfg(feature = "url")]
19use crate::url::UrlStorage;
20use async_trait::async_trait;
21use base64::engine::general_purpose;
22use base64::Engine;
23use cfg_if::cfg_if;
24#[cfg(feature = "experimental")]
25use htsget_config::encryption_scheme::EncryptionScheme;
26use htsget_config::storage;
27#[cfg(feature = "experimental")]
28use htsget_config::storage::c4gh::C4GHKeys;
29use htsget_config::types::Scheme;
30use http::uri;
31use pin_project_lite::pin_project;
32use std::fmt;
33use std::fmt::{Debug, Formatter};
34use std::path::Path;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37use tokio::io::{AsyncRead, ReadBuf};
38
39#[cfg(feature = "experimental")]
40pub mod c4gh;
41pub mod error;
42pub mod local;
43#[cfg(feature = "aws")]
44pub mod s3;
45pub mod types;
46#[cfg(feature = "url")]
47pub mod url;
48
49pin_project! {
50  /// A Streamable type represents any AsyncRead data used by `StorageTrait`.
51  pub struct Streamable {
52    #[pin]
53    inner: Box<dyn AsyncRead + Send + Sync + Unpin + 'static>,
54  }
55}
56
57impl Streamable {
58  /// Create a new Streamable from an AsyncRead.
59  pub fn from_async_read(inner: impl AsyncRead + Send + Sync + Unpin + 'static) -> Self {
60    Self {
61      inner: Box::new(inner),
62    }
63  }
64}
65
66impl AsyncRead for Streamable {
67  fn poll_read(
68    self: Pin<&mut Self>,
69    cx: &mut Context<'_>,
70    buf: &mut ReadBuf<'_>,
71  ) -> Poll<std::io::Result<()>> {
72    self.project().inner.poll_read(cx, buf)
73  }
74}
75
76/// The top-level storage type is created from any `StorageTrait`.
77pub struct Storage {
78  inner: Box<dyn StorageTrait + Send + Sync + 'static>,
79}
80
81impl Storage {
82  /// Get the inner value.
83  pub fn into_inner(self) -> Box<dyn StorageTrait + Send + Sync> {
84    self.inner
85  }
86}
87
88impl Clone for Storage {
89  fn clone(&self) -> Self {
90    Self {
91      inner: self.inner.clone_box(),
92    }
93  }
94}
95
96impl Debug for Storage {
97  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
98    write!(f, "Storage")
99  }
100}
101
102#[async_trait]
103impl StorageMiddleware for Storage {
104  async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
105    self.inner.preprocess(_key, _options).await
106  }
107
108  async fn postprocess(
109    &self,
110    key: &str,
111    positions_options: BytesPositionOptions<'_>,
112  ) -> Result<Vec<DataBlock>> {
113    self.inner.postprocess(key, positions_options).await
114  }
115}
116
117#[async_trait]
118impl StorageTrait for Storage {
119  async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable> {
120    self.inner.get(key, options).await
121  }
122
123  async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url> {
124    self.inner.range_url(key, options).await
125  }
126
127  async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64> {
128    self.inner.head(key, options).await
129  }
130
131  fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
132    self.inner.data_url(data, class)
133  }
134}
135
136impl Storage {
137  #[cfg(feature = "experimental")]
138  /// Wrap an existing storage with C4GH storage
139  pub async fn from_c4gh_keys(
140    keys: Option<&C4GHKeys>,
141    encryption_scheme: Option<EncryptionScheme>,
142    storage: Storage,
143  ) -> Result<Storage> {
144    match (keys, encryption_scheme) {
145      (Some(keys), Some(EncryptionScheme::C4GH)) => Ok(Storage::new(C4GHStorage::new_box(
146        keys
147          .clone()
148          .keys()
149          .await
150          .map_err(|err| StorageError::InternalError(err.to_string()))?,
151        storage.into_inner(),
152      ))),
153      (None, Some(EncryptionScheme::C4GH)) => Err(StorageError::UnsupportedFormat(
154        "C4GH keys have not been configured for this id".to_string(),
155      )),
156      _ => Ok(storage),
157    }
158  }
159
160  /// Create from local storage config.
161  pub async fn from_file(file: &storage::file::File, _query: &Query) -> Result<Storage> {
162    let storage = Storage::new(FileStorage::new(file.local_path(), file.clone())?);
163
164    cfg_if! {
165      if #[cfg(feature = "experimental")] {
166        Self::from_c4gh_keys(file.keys(), _query.encryption_scheme(), storage).await
167      } else {
168        Ok(storage)
169      }
170    }
171  }
172
173  /// Create from s3 config.
174  #[cfg(feature = "aws")]
175  pub async fn from_s3(s3: &storage::s3::S3, _query: &Query) -> Result<Storage> {
176    let storage = Storage::new(
177      S3Storage::new_with_default_config(
178        s3.bucket().to_string(),
179        s3.endpoint().map(str::to_string),
180        s3.path_style(),
181      )
182      .await,
183    );
184
185    cfg_if! {
186      if #[cfg(feature = "experimental")] {
187        Self::from_c4gh_keys(s3.keys(), _query.encryption_scheme(), storage).await
188      } else {
189        Ok(storage)
190      }
191    }
192  }
193
194  /// Create from url config.
195  #[cfg(feature = "url")]
196  pub async fn from_url(url: &storage::url::Url, _query: &Query) -> Result<Storage> {
197    let storage = Storage::new(UrlStorage::new(
198      url.client_cloned(),
199      url.url().clone(),
200      url.response_url().clone(),
201      url.forward_headers(),
202      url.header_blacklist().to_vec(),
203    ));
204
205    cfg_if! {
206      if #[cfg(feature = "experimental")] {
207        Self::from_c4gh_keys(url.keys(), _query.encryption_scheme(), storage).await
208      } else {
209        Ok(storage)
210      }
211    }
212  }
213
214  pub fn new(inner: impl StorageTrait + Send + Sync + 'static) -> Self {
215    Self {
216      inner: Box::new(inner),
217    }
218  }
219}
220
221/// A Storage represents some kind of object based storage (either locally or in the cloud)
222/// that can be used to retrieve files for alignments, variants or its respective indexes.
223#[async_trait]
224pub trait StorageTrait: StorageMiddleware + StorageClone {
225  /// Get the object using the key.
226  async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable>;
227
228  /// Get the url of the object represented by the key using a bytes range. It is not required for
229  /// this function to check for the existent of the key, so this should be ensured beforehand.
230  async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url>;
231
232  /// Get the size of the object represented by the key.
233  async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64>;
234
235  /// Get the url of the object using an inline data uri.
236  fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
237    Url::new(format!(
238      "data:;base64,{}",
239      general_purpose::STANDARD.encode(data)
240    ))
241    .set_class(class)
242  }
243}
244
245/// Allow the `StorageTrait` to be cloned. This allows cloning a dynamic trait inside a Box.
246/// See https://crates.io/crates/dyn-clone for a similar pattern.
247pub trait StorageClone {
248  fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync>;
249}
250
251impl<T> StorageClone for T
252where
253  T: StorageTrait + Send + Sync + Clone + 'static,
254{
255  fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync> {
256    Box::new(self.clone())
257  }
258}
259
260/// A middleware trait which related to transforming or processing data returned from `StorageTrait`.
261#[async_trait]
262pub trait StorageMiddleware {
263  /// Preprocess any required state before it is requested by `StorageTrait`.
264  async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
265    Ok(())
266  }
267
268  /// Postprocess data blocks before they are returned to the client.
269  async fn postprocess(
270    &self,
271    _key: &str,
272    positions_options: BytesPositionOptions<'_>,
273  ) -> Result<Vec<DataBlock>> {
274    Ok(DataBlock::from_bytes_positions(
275      positions_options.merge_all().into_inner(),
276    ))
277  }
278}
279
280/// Formats a url for use with storage.
281pub trait UrlFormatter {
282  /// Returns the url with the path.
283  fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String>;
284}
285
286impl UrlFormatter for storage::file::File {
287  fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String> {
288    let path = Path::new("/").join(key.as_ref());
289    uri::Builder::new()
290      .scheme(match self.scheme() {
291        Scheme::Http => uri::Scheme::HTTP,
292        Scheme::Https => uri::Scheme::HTTPS,
293      })
294      .authority(self.authority().to_string())
295      .path_and_query(
296        path
297          .to_str()
298          .ok_or_else(|| InvalidKey("constructing url".to_string()))?,
299      )
300      .build()
301      .map_err(|err| StorageError::InvalidUri(err.to_string()))
302      .map(|value| value.to_string())
303  }
304}
305
306#[cfg(test)]
307mod tests {
308  use http::uri::Authority;
309
310  use crate::local::FileStorage;
311  use htsget_test::util::default_dir_data;
312
313  use super::*;
314
315  #[test]
316  fn data_url() {
317    let result =
318      FileStorage::<storage::file::File>::new(default_dir_data(), storage::file::File::default())
319        .unwrap()
320        .data_url(b"Hello World!".to_vec(), Some(Class::Header));
321    let url = data_url::DataUrl::process(&result.url);
322    let (result, _) = url.unwrap().decode_to_vec().unwrap();
323    assert_eq!(result, b"Hello World!");
324  }
325
326  #[test]
327  fn http_formatter_authority() {
328    let formatter = storage::file::File::new(
329      Scheme::Http,
330      Authority::from_static("127.0.0.1:8080"),
331      "data".to_string(),
332    );
333    test_formatter_authority(formatter, "http");
334  }
335
336  #[test]
337  fn https_formatter_authority() {
338    let formatter = storage::file::File::new(
339      Scheme::Https,
340      Authority::from_static("127.0.0.1:8080"),
341      "data".to_string(),
342    );
343    test_formatter_authority(formatter, "https");
344  }
345
346  #[cfg(feature = "experimental")]
347  #[tokio::test]
348  async fn from_c4gh_keys() {
349    let keys = tokio::spawn(async { Ok(C4GHKeys::from_key_pair(vec![], vec![])) });
350    let storage =
351      Storage::new(FileStorage::new(default_dir_data(), storage::file::File::default()).unwrap());
352
353    let result = Storage::from_c4gh_keys(
354      Some(&C4GHKeys::from_join_handle(keys)),
355      Some(EncryptionScheme::C4GH),
356      storage.clone(),
357    )
358    .await;
359    assert!(result.is_ok());
360
361    let result = Storage::from_c4gh_keys(None, None, storage.clone()).await;
362    assert!(result.is_ok());
363
364    let result = Storage::from_c4gh_keys(None, Some(EncryptionScheme::C4GH), storage).await;
365    assert!(matches!(result, Err(StorageError::UnsupportedFormat(_))));
366  }
367
368  fn test_formatter_authority(formatter: storage::file::File, scheme: &str) {
369    assert_eq!(
370      formatter.format_url("path").unwrap(),
371      format!("{}://127.0.0.1:8080/path", scheme)
372    )
373  }
374}