1pub use htsget_config::resolver::{IdResolver, ResolveResponse, StorageResolver};
5pub use htsget_config::types::{
6 Class, Format, Headers, HtsGetError, JsonResponse, Query, Response, Url,
7};
8
9#[cfg(feature = "experimental")]
10use crate::c4gh::storage::C4GHStorage;
11use crate::error::Result;
12use crate::error::StorageError;
13use crate::local::FileStorage;
14#[cfg(feature = "aws")]
15use crate::s3::S3Storage;
16use crate::types::{BytesPositionOptions, DataBlock, GetOptions, HeadOptions, RangeUrlOptions};
17#[cfg(feature = "url")]
18use crate::url::UrlStorage;
19use async_trait::async_trait;
20use base64::Engine;
21use base64::engine::general_purpose;
22use cfg_if::cfg_if;
23#[cfg(feature = "experimental")]
24use htsget_config::encryption_scheme::EncryptionScheme;
25use htsget_config::storage;
26#[cfg(feature = "experimental")]
27use htsget_config::storage::c4gh::C4GHKeys;
28use pin_project_lite::pin_project;
29use std::fmt;
30use std::fmt::{Debug, Formatter};
31use std::pin::Pin;
32use std::task::{Context, Poll};
33use tokio::io::{AsyncRead, ReadBuf};
34
35#[cfg(feature = "experimental")]
36pub mod c4gh;
37pub mod error;
38pub mod local;
39#[cfg(feature = "aws")]
40pub mod s3;
41pub mod types;
42#[cfg(feature = "url")]
43pub mod url;
44
45pin_project! {
46 pub struct Streamable {
48 #[pin]
49 inner: Box<dyn AsyncRead + Send + Sync + Unpin + 'static>,
50 }
51}
52
53impl Streamable {
54 pub fn from_async_read(inner: impl AsyncRead + Send + Sync + Unpin + 'static) -> Self {
56 Self {
57 inner: Box::new(inner),
58 }
59 }
60}
61
62impl AsyncRead for Streamable {
63 fn poll_read(
64 self: Pin<&mut Self>,
65 cx: &mut Context<'_>,
66 buf: &mut ReadBuf<'_>,
67 ) -> Poll<std::io::Result<()>> {
68 self.project().inner.poll_read(cx, buf)
69 }
70}
71
72pub struct Storage {
74 inner: Box<dyn StorageTrait + Send + Sync + 'static>,
75}
76
77impl Storage {
78 pub fn into_inner(self) -> Box<dyn StorageTrait + Send + Sync> {
80 self.inner
81 }
82}
83
84impl Clone for Storage {
85 fn clone(&self) -> Self {
86 Self {
87 inner: self.inner.clone_box(),
88 }
89 }
90}
91
92impl Debug for Storage {
93 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
94 write!(f, "Storage")
95 }
96}
97
98#[async_trait]
99impl StorageMiddleware for Storage {
100 async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
101 self.inner.preprocess(_key, _options).await
102 }
103
104 async fn postprocess(
105 &self,
106 key: &str,
107 positions_options: BytesPositionOptions<'_>,
108 ) -> Result<Vec<DataBlock>> {
109 self.inner.postprocess(key, positions_options).await
110 }
111}
112
113#[async_trait]
114impl StorageTrait for Storage {
115 async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable> {
116 self.inner.get(key, options).await
117 }
118
119 async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url> {
120 self.inner.range_url(key, options).await
121 }
122
123 async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64> {
124 self.inner.head(key, options).await
125 }
126
127 fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
128 self.inner.data_url(data, class)
129 }
130}
131
132impl Storage {
133 #[cfg(feature = "experimental")]
134 pub async fn from_c4gh_keys(
136 keys: Option<&C4GHKeys>,
137 encryption_scheme: Option<EncryptionScheme>,
138 storage: Storage,
139 ) -> Result<Storage> {
140 match (keys, encryption_scheme) {
141 (Some(keys), Some(EncryptionScheme::C4GH)) => Ok(Storage::new(C4GHStorage::new_box(
142 keys
143 .clone()
144 .keys()
145 .await
146 .map_err(|err| StorageError::InternalError(err.to_string()))?,
147 storage.into_inner(),
148 ))),
149 (None, Some(EncryptionScheme::C4GH)) => Err(StorageError::UnsupportedFormat(
150 "C4GH keys have not been configured for this id".to_string(),
151 )),
152 _ => Ok(storage),
153 }
154 }
155
156 pub async fn from_file(file: &storage::file::File, _query: &Query) -> Result<Storage> {
158 let storage = Storage::new(FileStorage::new(
159 file.local_path(),
160 file.clone(),
161 file.ticket_headers().to_vec(),
162 )?);
163
164 cfg_if! {
165 if #[cfg(feature = "experimental")] {
166 Self::from_c4gh_keys(file.keys(), _query.encryption_scheme(), storage).await
167 } else {
168 Ok(storage)
169 }
170 }
171 }
172
173 #[cfg(feature = "aws")]
175 pub async fn from_s3(s3: &storage::s3::S3, _query: &Query) -> Result<Storage> {
176 let storage = Storage::new(
177 S3Storage::new_with_default_config(
178 s3.bucket().to_string(),
179 s3.endpoint().map(str::to_string),
180 s3.path_style(),
181 )
182 .await,
183 );
184
185 cfg_if! {
186 if #[cfg(feature = "experimental")] {
187 Self::from_c4gh_keys(s3.keys(), _query.encryption_scheme(), storage).await
188 } else {
189 Ok(storage)
190 }
191 }
192 }
193
194 #[cfg(feature = "url")]
196 pub async fn from_url(url: &storage::url::Url, _query: &Query) -> Result<Storage> {
197 let storage = Storage::new(UrlStorage::new(
198 url.client_cloned(),
199 url.url().clone(),
200 url.response_url().clone(),
201 url.forward_headers(),
202 url.header_blacklist().to_vec(),
203 ));
204
205 cfg_if! {
206 if #[cfg(feature = "experimental")] {
207 Self::from_c4gh_keys(url.keys(), _query.encryption_scheme(), storage).await
208 } else {
209 Ok(storage)
210 }
211 }
212 }
213
214 pub fn new(inner: impl StorageTrait + Send + Sync + 'static) -> Self {
215 Self {
216 inner: Box::new(inner),
217 }
218 }
219}
220
221#[async_trait]
224pub trait StorageTrait: StorageMiddleware + StorageClone {
225 async fn get(&self, key: &str, options: GetOptions<'_>) -> Result<Streamable>;
227
228 async fn range_url(&self, key: &str, options: RangeUrlOptions<'_>) -> Result<Url>;
231
232 async fn head(&self, key: &str, options: HeadOptions<'_>) -> Result<u64>;
234
235 fn data_url(&self, data: Vec<u8>, class: Option<Class>) -> Url {
237 Url::new(format!(
238 "data:;base64,{}",
239 general_purpose::STANDARD.encode(data)
240 ))
241 .set_class(class)
242 }
243}
244
245pub trait StorageClone {
248 fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync>;
249}
250
251impl<T> StorageClone for T
252where
253 T: StorageTrait + Send + Sync + Clone + 'static,
254{
255 fn clone_box(&self) -> Box<dyn StorageTrait + Send + Sync> {
256 Box::new(self.clone())
257 }
258}
259
260#[async_trait]
262pub trait StorageMiddleware {
263 async fn preprocess(&mut self, _key: &str, _options: GetOptions<'_>) -> Result<()> {
265 Ok(())
266 }
267
268 async fn postprocess(
270 &self,
271 _key: &str,
272 positions_options: BytesPositionOptions<'_>,
273 ) -> Result<Vec<DataBlock>> {
274 Ok(DataBlock::from_bytes_positions(
275 positions_options.merge_all().into_inner(),
276 ))
277 }
278}
279
280pub trait UrlFormatter {
282 fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String>;
284}
285
286impl UrlFormatter for storage::file::File {
287 fn format_url<K: AsRef<str>>(&self, key: K) -> Result<String> {
288 let mut url = if let Some(origin) = self.ticket_origin() {
289 origin.to_string()
290 } else {
291 format!("{}://{}", self.scheme(), self.authority())
292 };
293 if !url.ends_with('/') {
294 url = format!("{url}/");
295 }
296
297 let url = ::url::Url::parse(&url).map_err(|err| StorageError::InvalidUri(err.to_string()))?;
298 url
299 .join(key.as_ref())
300 .map_err(|err| StorageError::InvalidUri(err.to_string()))
301 .map(|url| url.to_string())
302 }
303}
304
305#[cfg(test)]
306mod tests {
307 use crate::local::FileStorage;
308 use htsget_config::types::Scheme;
309 use htsget_test::util::default_dir_data;
310 use http::uri::Authority;
311
312 use super::*;
313
314 #[test]
315 fn data_url() {
316 let result = FileStorage::<storage::file::File>::new(
317 default_dir_data(),
318 storage::file::File::default(),
319 vec![],
320 )
321 .unwrap()
322 .data_url(b"Hello World!".to_vec(), Some(Class::Header));
323 let url = data_url::DataUrl::process(&result.url);
324 let (result, _) = url.unwrap().decode_to_vec().unwrap();
325 assert_eq!(result, b"Hello World!");
326 }
327
328 #[test]
329 fn http_formatter_authority() {
330 let formatter = storage::file::File::new(
331 Scheme::Http,
332 Authority::from_static("127.0.0.1:8080"),
333 "data".to_string(),
334 );
335 test_formatter_authority(formatter, "http");
336 }
337
338 #[test]
339 fn https_formatter_authority() {
340 let formatter = storage::file::File::new(
341 Scheme::Https,
342 Authority::from_static("127.0.0.1:8080"),
343 "data".to_string(),
344 );
345 test_formatter_authority(formatter, "https");
346 }
347
348 #[cfg(feature = "experimental")]
349 #[tokio::test]
350 async fn from_c4gh_keys() {
351 let keys = tokio::spawn(async { Ok(C4GHKeys::from_key_pair(vec![], vec![])) });
352 let storage = Storage::new(
353 FileStorage::new(default_dir_data(), storage::file::File::default(), vec![]).unwrap(),
354 );
355
356 let result = Storage::from_c4gh_keys(
357 Some(&C4GHKeys::from_join_handle(keys)),
358 Some(EncryptionScheme::C4GH),
359 storage.clone(),
360 )
361 .await;
362 assert!(result.is_ok());
363
364 let result = Storage::from_c4gh_keys(None, None, storage.clone()).await;
365 assert!(result.is_ok());
366
367 let result = Storage::from_c4gh_keys(None, Some(EncryptionScheme::C4GH), storage).await;
368 assert!(matches!(result, Err(StorageError::UnsupportedFormat(_))));
369 }
370
371 fn test_formatter_authority(formatter: storage::file::File, scheme: &str) {
372 assert_eq!(
373 formatter.format_url("path").unwrap(),
374 format!("{scheme}://127.0.0.1:8080/path")
375 )
376 }
377}