use crate::chunk_store;
use crate::error::ActiveStorageError;
use crate::models;
use crate::resource_manager::ResourceManager;
use crate::s3_client;
use axum::{
headers::authorization::{Authorization, Basic},
TypedHeader,
};
use bytes::Bytes;
use tokio::sync::SemaphorePermit;
use tracing::Instrument;
#[derive(Debug)]
pub struct ChunkDownloaderS3 {
s3_client_map: s3_client::S3ClientMap,
}
impl ChunkDownloaderS3 {
pub fn new() -> Self {
Self {
s3_client_map: s3_client::S3ClientMap::new(),
}
}
}
impl Default for ChunkDownloaderS3 {
fn default() -> Self {
Self::new()
}
}
impl<'a> chunk_store::ChunkDownloader<'a> for ChunkDownloaderS3 {
#[tracing::instrument(level = "DEBUG", skip(auth, request_data))]
async fn is_authorised(
&self,
auth: &Option<TypedHeader<Authorization<Basic>>>,
request_data: &models::RequestData,
) -> Result<bool, ActiveStorageError> {
let credentials = if let Some(TypedHeader(auth)) = auth {
s3_client::S3Credentials::access_key(auth.username(), auth.password())
} else {
s3_client::S3Credentials::None
};
let (source, bucket, object) = s3_client::parse_s3_url(&request_data.url)?;
let s3_client = self
.s3_client_map
.get(&source, credentials)
.instrument(tracing::Span::current())
.await;
s3_client
.is_authorised(&bucket, &object)
.await
.map_err(ActiveStorageError::from)
}
#[tracing::instrument(level = "DEBUG", skip(auth, request_data, resource_manager))]
async fn download(
&self,
auth: &Option<TypedHeader<Authorization<Basic>>>,
request_data: &models::RequestData,
resource_manager: &ResourceManager,
mut mem_permits: Option<SemaphorePermit<'a>>,
) -> Result<Bytes, ActiveStorageError> {
let _conn_permits = resource_manager.connection_s3().await?;
let credentials = if let Some(TypedHeader(auth)) = auth {
s3_client::S3Credentials::access_key(auth.username(), auth.password())
} else {
s3_client::S3Credentials::None
};
let range = s3_client::get_range(request_data.offset, request_data.size);
let (source, bucket, object) = s3_client::parse_s3_url(&request_data.url)?;
let s3_client = self
.s3_client_map
.get(&source, credentials)
.instrument(tracing::Span::current())
.await;
s3_client
.download_object(&bucket, &object, range, resource_manager, &mut mem_permits)
.await
}
}