htsget_storage/
lib.rs

1//! Module providing the abstractions needed to read files from an storage
2//!
3
4pub use htsget_config::resolver::{IdResolver, ResolveResponse, StorageResolver};
5pub use htsget_config::types::{
6  Class, Format, Headers, HtsGetError, JsonResponse, Query, Response, Url,
7};
8
9#[cfg(feature = "experimental")]
10use crate::c4gh::storage::C4GHStorage;
11use crate::error::Result;
12use crate::error::StorageError;
13use crate::local::FileStorage;
14#[cfg(feature = "aws")]
15use crate::s3::S3Storage;
16use crate::types::{BytesPositionOptions, DataBlock, GetOptions, HeadOptions, RangeUrlOptions};
17#[cfg(feature = "url")]
18use crate::url::UrlStorage;
19use async_trait::async_trait;
20use base64::Engine;
21use base64::engine::general_purpose;
22use cfg_if::cfg_if;
23#[cfg(feature = "experimental")]
24use htsget_config::encryption_scheme::EncryptionScheme;
25use htsget_config::storage;
26#[cfg(feature = "experimental")]
27use htsget_config::storage::c4gh::C4GHKeys;
28use pin_project_lite::pin_project;
29use std::fmt;
30use std::fmt::{Debug, Formatter};
31use std::pin::Pin;
32use std::task::{Context, Poll};
33use tokio::io::{AsyncRead, ReadBuf};
34
35#[cfg(feature = "experimental")]
36pub mod c4gh;
37pub mod error;
38pub mod local;
39#[cfg(feature = "aws")]
40pub mod s3;
41pub mod types;
42#[cfg(feature = "url")]
43pub mod url;
44
45pin_project! {
46  /// A Streamable type represents any AsyncRead data used by `StorageTrait`.
47  pub struct Streamable {
48    #[pin]
49    inner: Box<dyn AsyncRead + Send + Sync + Unpin + 'static>,
50  }
51}
52
53impl Streamable {
54  /// Create a new Streamable from an AsyncRead.
55  pub fn from_async_read(inner: impl AsyncRead + Send + Sync + Unpin + 'static) -> Self {
56    Self {
57      inner: Box::new(inner),
58    }
59  }
60}
61
62impl AsyncRead for Streamable {
63  fn poll_read(
64    self: Pin<&mut Self>,
65    cx: &mut Context<'_>,
66    buf: &mut ReadBuf<'_>,
67  ) -> Poll<std::io::Result<()>> {
68    self.project().inner.poll_read(cx, buf)
69  }
70}
71
72/// The top-level storage type is created from any `StorageTrait`.
73pub struct Storage {
74  inner: Box<dyn StorageTrait + Send + Sync + 'static>,
75}
76
77impl Storage {
78  /// Get the inner value.
79  pub fn into_inner(self) -> Box<dyn StorageTrait + Send + Sync> {
80    self.inner
81  }
82}
83
84impl Clone for Storage {
85  fn clone(&self) -> Self {
86    Self {
87      inner: self.inner.clone_box(),
88    }
89  }
90}
91
92impl Debug for Storage {
93  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
94    write!(f, "Storage")
95  }
96}
97
98#[async_trait]
99impl StorageMiddleware for Storage {
100  async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
101    self.inner.preprocess(_key, _options).await
102  }
103
104  async fn postprocess(
105    &self,
106    key: &str,
107    positions_options: BytesPositionOptions<'_>,
108  ) -> Result<Vec<DataBlock>> {
109    self.inner.postprocess(key, positions_options).await
110  }
111}
112
113#[async_trait]
114impl StorageTrait for Storage {
115  async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable> {
116    self.inner.get(key, options).await
117  }
118
119  async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url> {
120    self.inner.range_url(key, options).await
121  }
122
123  async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64> {
124    self.inner.head(key, options).await
125  }
126
127  fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
128    self.inner.data_url(data, class)
129  }
130}
131
132impl Storage {
133  #[cfg(feature = "experimental")]
134  /// Wrap an existing storage with C4GH storage
135  pub async fn from_c4gh_keys(
136    keys: Option<&C4GHKeys>,
137    encryption_scheme: Option<EncryptionScheme>,
138    storage: Storage,
139  ) -> Result<Storage> {
140    match (keys, encryption_scheme) {
141      (Some(keys), Some(EncryptionScheme::C4GH)) => Ok(Storage::new(C4GHStorage::new_box(
142        keys
143          .clone()
144          .keys()
145          .await
146          .map_err(|err| StorageError::InternalError(err.to_string()))?,
147        storage.into_inner(),
148      ))),
149      (None, Some(EncryptionScheme::C4GH)) => Err(StorageError::UnsupportedFormat(
150        "C4GH keys have not been configured for this id".to_string(),
151      )),
152      _ => Ok(storage),
153    }
154  }
155
156  /// Create from local storage config.
157  pub async fn from_file(file: &storage::file::File, _query: &Query) -> Result<Storage> {
158    let storage = Storage::new(FileStorage::new(
159      file.local_path(),
160      file.clone(),
161      file.ticket_headers().to_vec(),
162    )?);
163
164    cfg_if! {
165      if #[cfg(feature = "experimental")] {
166        Self::from_c4gh_keys(file.keys(), _query.encryption_scheme(), storage).await
167      } else {
168        Ok(storage)
169      }
170    }
171  }
172
173  /// Create from s3 config.
174  #[cfg(feature = "aws")]
175  pub async fn from_s3(s3: &storage::s3::S3, _query: &Query) -> Result<Storage> {
176    let storage = Storage::new(
177      S3Storage::new_with_default_config(
178        s3.bucket().to_string(),
179        s3.endpoint().map(str::to_string),
180        s3.path_style(),
181      )
182      .await,
183    );
184
185    cfg_if! {
186      if #[cfg(feature = "experimental")] {
187        Self::from_c4gh_keys(s3.keys(), _query.encryption_scheme(), storage).await
188      } else {
189        Ok(storage)
190      }
191    }
192  }
193
194  /// Create from url config.
195  #[cfg(feature = "url")]
196  pub async fn from_url(url: &storage::url::Url, _query: &Query) -> Result<Storage> {
197    let storage = Storage::new(UrlStorage::new(
198      url.client_cloned(),
199      url.url().clone(),
200      url.response_url().clone(),
201      url.forward_headers(),
202      url.header_blacklist().to_vec(),
203    ));
204
205    cfg_if! {
206      if #[cfg(feature = "experimental")] {
207        Self::from_c4gh_keys(url.keys(), _query.encryption_scheme(), storage).await
208      } else {
209        Ok(storage)
210      }
211    }
212  }
213
214  pub fn new(inner: impl StorageTrait + Send + Sync + 'static) -> Self {
215    Self {
216      inner: Box::new(inner),
217    }
218  }
219}
220
221/// A Storage represents some kind of object based storage (either locally or in the cloud)
222/// that can be used to retrieve files for alignments, variants or its respective indexes.
223#[async_trait]
224pub trait StorageTrait: StorageMiddleware + StorageClone {
225  /// Get the object using the key.
226  async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable>;
227
228  /// Get the url of the object represented by the key using a bytes range. It is not required for
229  /// this function to check for the existent of the key, so this should be ensured beforehand.
230  async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url>;
231
232  /// Get the size of the object represented by the key.
233  async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64>;
234
235  /// Get the url of the object using an inline data uri.
236  fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
237    Url::new(format!(
238      "data:;base64,{}",
239      general_purpose::STANDARD.encode(data)
240    ))
241    .set_class(class)
242  }
243}
244
245/// Allow the `StorageTrait` to be cloned. This allows cloning a dynamic trait inside a Box.
246/// See https://crates.io/crates/dyn-clone for a similar pattern.
247pub trait StorageClone {
248  fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync>;
249}
250
251impl<T> StorageClone for T
252where
253  T: StorageTrait + Send + Sync + Clone + 'static,
254{
255  fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync> {
256    Box::new(self.clone())
257  }
258}
259
260/// A middleware trait which related to transforming or processing data returned from `StorageTrait`.
261#[async_trait]
262pub trait StorageMiddleware {
263  /// Preprocess any required state before it is requested by `StorageTrait`.
264  async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
265    Ok(())
266  }
267
268  /// Postprocess data blocks before they are returned to the client.
269  async fn postprocess(
270    &self,
271    _key: &str,
272    positions_options: BytesPositionOptions<'_>,
273  ) -> Result<Vec<DataBlock>> {
274    Ok(DataBlock::from_bytes_positions(
275      positions_options.merge_all().into_inner(),
276    ))
277  }
278}
279
280/// Formats a url for use with storage.
281pub trait UrlFormatter {
282  /// Returns the url with the path.
283  fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String>;
284}
285
286impl UrlFormatter for storage::file::File {
287  fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String> {
288    let mut url = if let Some(origin) = self.ticket_origin() {
289      origin.to_string()
290    } else {
291      format!("{}://{}", self.scheme(), self.authority())
292    };
293    if !url.ends_with('/') {
294      url = format!("{url}/");
295    }
296
297    let url = ::url::Url::parse(&url).map_err(|err| StorageError::InvalidUri(err.to_string()))?;
298    url
299      .join(key.as_ref())
300      .map_err(|err| StorageError::InvalidUri(err.to_string()))
301      .map(|url| url.to_string())
302  }
303}
304
305#[cfg(test)]
306mod tests {
307  use crate::local::FileStorage;
308  use htsget_config::types::Scheme;
309  use htsget_test::util::default_dir_data;
310  use http::uri::Authority;
311
312  use super::*;
313
314  #[test]
315  fn data_url() {
316    let result = FileStorage::<storage::file::File>::new(
317      default_dir_data(),
318      storage::file::File::default(),
319      vec![],
320    )
321    .unwrap()
322    .data_url(b"Hello World!".to_vec(), Some(Class::Header));
323    let url = data_url::DataUrl::process(&result.url);
324    let (result, _) = url.unwrap().decode_to_vec().unwrap();
325    assert_eq!(result, b"Hello World!");
326  }
327
328  #[test]
329  fn http_formatter_authority() {
330    let formatter = storage::file::File::new(
331      Scheme::Http,
332      Authority::from_static("127.0.0.1:8080"),
333      "data".to_string(),
334    );
335    test_formatter_authority(formatter, "http");
336  }
337
338  #[test]
339  fn https_formatter_authority() {
340    let formatter = storage::file::File::new(
341      Scheme::Https,
342      Authority::from_static("127.0.0.1:8080"),
343      "data".to_string(),
344    );
345    test_formatter_authority(formatter, "https");
346  }
347
348  #[cfg(feature = "experimental")]
349  #[tokio::test]
350  async fn from_c4gh_keys() {
351    let keys = tokio::spawn(async { Ok(C4GHKeys::from_key_pair(vec![], vec![])) });
352    let storage = Storage::new(
353      FileStorage::new(default_dir_data(), storage::file::File::default(), vec![]).unwrap(),
354    );
355
356    let result = Storage::from_c4gh_keys(
357      Some(&C4GHKeys::from_join_handle(keys)),
358      Some(EncryptionScheme::C4GH),
359      storage.clone(),
360    )
361    .await;
362    assert!(result.is_ok());
363
364    let result = Storage::from_c4gh_keys(None, None, storage.clone()).await;
365    assert!(result.is_ok());
366
367    let result = Storage::from_c4gh_keys(None, Some(EncryptionScheme::C4GH), storage).await;
368    assert!(matches!(result, Err(StorageError::UnsupportedFormat(_))));
369  }
370
371  fn test_formatter_authority(formatter: storage::file::File, scheme: &str) {
372    assert_eq!(
373      formatter.format_url("path").unwrap(),
374      format!("{scheme}://127.0.0.1:8080/path")
375    )
376  }
377}