1pub use htsget_config::resolver::{IdResolver, ResolveResponse, StorageResolver};
5pub use htsget_config::types::{
6 Class, Format, Headers, HtsGetError, JsonResponse, Query, Response, Url,
7};
8
9#[cfg(feature = "experimental")]
10use crate::c4gh::storage::C4GHStorage;
11use crate::error::Result;
12use crate::error::StorageError;
13use crate::error::StorageError::InvalidKey;
14use crate::local::FileStorage;
15#[cfg(feature = "aws")]
16use crate::s3::S3Storage;
17use crate::types::{BytesPositionOptions, DataBlock, GetOptions, HeadOptions, RangeUrlOptions};
18#[cfg(feature = "url")]
19use crate::url::UrlStorage;
20use async_trait::async_trait;
21use base64::engine::general_purpose;
22use base64::Engine;
23use cfg_if::cfg_if;
24#[cfg(feature = "experimental")]
25use htsget_config::encryption_scheme::EncryptionScheme;
26use htsget_config::storage;
27#[cfg(feature = "experimental")]
28use htsget_config::storage::c4gh::C4GHKeys;
29use htsget_config::types::Scheme;
30use http::uri;
31use pin_project_lite::pin_project;
32use std::fmt;
33use std::fmt::{Debug, Formatter};
34use std::path::Path;
35use std::pin::Pin;
36use std::task::{Context, Poll};
37use tokio::io::{AsyncRead, ReadBuf};
38
39#[cfg(feature = "experimental")]
40pub mod c4gh;
41pub mod error;
42pub mod local;
43#[cfg(feature = "aws")]
44pub mod s3;
45pub mod types;
46#[cfg(feature = "url")]
47pub mod url;
48
49pin_project! {
50 pub struct Streamable {
52 #[pin]
53 inner: Box<dyn AsyncRead + Send + Sync + Unpin + 'static>,
54 }
55}
56
57impl Streamable {
58 pub fn from_async_read(inner: impl AsyncRead + Send + Sync + Unpin + 'static) -> Self {
60 Self {
61 inner: Box::new(inner),
62 }
63 }
64}
65
66impl AsyncRead for Streamable {
67 fn poll_read(
68 self: Pin<&mut Self>,
69 cx: &mut Context<'_>,
70 buf: &mut ReadBuf<'_>,
71 ) -> Poll<std::io::Result<()>> {
72 self.project().inner.poll_read(cx, buf)
73 }
74}
75
76pub struct Storage {
78 inner: Box<dyn StorageTrait + Send + Sync + 'static>,
79}
80
81impl Storage {
82 pub fn into_inner(self) -> Box<dyn StorageTrait + Send + Sync> {
84 self.inner
85 }
86}
87
88impl Clone for Storage {
89 fn clone(&self) -> Self {
90 Self {
91 inner: self.inner.clone_box(),
92 }
93 }
94}
95
96impl Debug for Storage {
97 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
98 write!(f, "Storage")
99 }
100}
101
102#[async_trait]
103impl StorageMiddleware for Storage {
104 async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
105 self.inner.preprocess(_key, _options).await
106 }
107
108 async fn postprocess(
109 &self,
110 key: &str,
111 positions_options: BytesPositionOptions<'_>,
112 ) -> Result<Vec<DataBlock>> {
113 self.inner.postprocess(key, positions_options).await
114 }
115}
116
117#[async_trait]
118impl StorageTrait for Storage {
119 async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable> {
120 self.inner.get(key, options).await
121 }
122
123 async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url> {
124 self.inner.range_url(key, options).await
125 }
126
127 async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64> {
128 self.inner.head(key, options).await
129 }
130
131 fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
132 self.inner.data_url(data, class)
133 }
134}
135
136impl Storage {
137 #[cfg(feature = "experimental")]
138 pub async fn from_c4gh_keys(
140 keys: Option<&C4GHKeys>,
141 encryption_scheme: Option<EncryptionScheme>,
142 storage: Storage,
143 ) -> Result<Storage> {
144 match (keys, encryption_scheme) {
145 (Some(keys), Some(EncryptionScheme::C4GH)) => Ok(Storage::new(C4GHStorage::new_box(
146 keys
147 .clone()
148 .keys()
149 .await
150 .map_err(|err| StorageError::InternalError(err.to_string()))?,
151 storage.into_inner(),
152 ))),
153 (None, Some(EncryptionScheme::C4GH)) => Err(StorageError::UnsupportedFormat(
154 "C4GH keys have not been configured for this id".to_string(),
155 )),
156 _ => Ok(storage),
157 }
158 }
159
160 pub async fn from_file(file: &storage::file::File, _query: &Query) -> Result<Storage> {
162 let storage = Storage::new(FileStorage::new(file.local_path(), file.clone())?);
163
164 cfg_if! {
165 if #[cfg(feature = "experimental")] {
166 Self::from_c4gh_keys(file.keys(), _query.encryption_scheme(), storage).await
167 } else {
168 Ok(storage)
169 }
170 }
171 }
172
173 #[cfg(feature = "aws")]
175 pub async fn from_s3(s3: &storage::s3::S3, _query: &Query) -> Result<Storage> {
176 let storage = Storage::new(
177 S3Storage::new_with_default_config(
178 s3.bucket().to_string(),
179 s3.endpoint().map(str::to_string),
180 s3.path_style(),
181 )
182 .await,
183 );
184
185 cfg_if! {
186 if #[cfg(feature = "experimental")] {
187 Self::from_c4gh_keys(s3.keys(), _query.encryption_scheme(), storage).await
188 } else {
189 Ok(storage)
190 }
191 }
192 }
193
194 #[cfg(feature = "url")]
196 pub async fn from_url(url: &storage::url::Url, _query: &Query) -> Result<Storage> {
197 let storage = Storage::new(UrlStorage::new(
198 url.client_cloned(),
199 url.url().clone(),
200 url.response_url().clone(),
201 url.forward_headers(),
202 url.header_blacklist().to_vec(),
203 ));
204
205 cfg_if! {
206 if #[cfg(feature = "experimental")] {
207 Self::from_c4gh_keys(url.keys(), _query.encryption_scheme(), storage).await
208 } else {
209 Ok(storage)
210 }
211 }
212 }
213
214 pub fn new(inner: impl StorageTrait + Send + Sync + 'static) -> Self {
215 Self {
216 inner: Box::new(inner),
217 }
218 }
219}
220
221#[async_trait]
224pub trait StorageTrait: StorageMiddleware + StorageClone {
225 async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable>;
227
228 async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url>;
231
232 async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64>;
234
235 fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
237 Url::new(format!(
238 "data:;base64,{}",
239 general_purpose::STANDARD.encode(data)
240 ))
241 .set_class(class)
242 }
243}
244
245pub trait StorageClone {
248 fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync>;
249}
250
251impl<T> StorageClone for T
252where
253 T: StorageTrait + Send + Sync + Clone + 'static,
254{
255 fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync> {
256 Box::new(self.clone())
257 }
258}
259
260#[async_trait]
262pub trait StorageMiddleware {
263 async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
265 Ok(())
266 }
267
268 async fn postprocess(
270 &self,
271 _key: &str,
272 positions_options: BytesPositionOptions<'_>,
273 ) -> Result<Vec<DataBlock>> {
274 Ok(DataBlock::from_bytes_positions(
275 positions_options.merge_all().into_inner(),
276 ))
277 }
278}
279
280pub trait UrlFormatter {
282 fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String>;
284}
285
286impl UrlFormatter for storage::file::File {
287 fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String> {
288 let path = Path::new("/").join(key.as_ref());
289 uri::Builder::new()
290 .scheme(match self.scheme() {
291 Scheme::Http => uri::Scheme::HTTP,
292 Scheme::Https => uri::Scheme::HTTPS,
293 })
294 .authority(self.authority().to_string())
295 .path_and_query(
296 path
297 .to_str()
298 .ok_or_else(|| InvalidKey("constructing url".to_string()))?,
299 )
300 .build()
301 .map_err(|err| StorageError::InvalidUri(err.to_string()))
302 .map(|value| value.to_string())
303 }
304}
305
306#[cfg(test)]
307mod tests {
308 use http::uri::Authority;
309
310 use crate::local::FileStorage;
311 use htsget_test::util::default_dir_data;
312
313 use super::*;
314
315 #[test]
316 fn data_url() {
317 let result =
318 FileStorage::<storage::file::File>::new(default_dir_data(), storage::file::File::default())
319 .unwrap()
320 .data_url(b"Hello World!".to_vec(), Some(Class::Header));
321 let url = data_url::DataUrl::process(&result.url);
322 let (result, _) = url.unwrap().decode_to_vec().unwrap();
323 assert_eq!(result, b"Hello World!");
324 }
325
326 #[test]
327 fn http_formatter_authority() {
328 let formatter = storage::file::File::new(
329 Scheme::Http,
330 Authority::from_static("127.0.0.1:8080"),
331 "data".to_string(),
332 );
333 test_formatter_authority(formatter, "http");
334 }
335
336 #[test]
337 fn https_formatter_authority() {
338 let formatter = storage::file::File::new(
339 Scheme::Https,
340 Authority::from_static("127.0.0.1:8080"),
341 "data".to_string(),
342 );
343 test_formatter_authority(formatter, "https");
344 }
345
346 #[cfg(feature = "experimental")]
347 #[tokio::test]
348 async fn from_c4gh_keys() {
349 let keys = tokio::spawn(async { Ok(C4GHKeys::from_key_pair(vec![], vec![])) });
350 let storage =
351 Storage::new(FileStorage::new(default_dir_data(), storage::file::File::default()).unwrap());
352
353 let result = Storage::from_c4gh_keys(
354 Some(&C4GHKeys::from_join_handle(keys)),
355 Some(EncryptionScheme::C4GH),
356 storage.clone(),
357 )
358 .await;
359 assert!(result.is_ok());
360
361 let result = Storage::from_c4gh_keys(None, None, storage.clone()).await;
362 assert!(result.is_ok());
363
364 let result = Storage::from_c4gh_keys(None, Some(EncryptionScheme::C4GH), storage).await;
365 assert!(matches!(result, Err(StorageError::UnsupportedFormat(_))));
366 }
367
368 fn test_formatter_authority(formatter: storage::file::File, scheme: &str) {
369 assert_eq!(
370 formatter.format_url("path").unwrap(),
371 format!("{}://127.0.0.1:8080/path", scheme)
372 )
373 }
374}