use std::{io::SeekFrom, path::PathBuf};
use base64::Engine as _;
use futures::stream::{self, StreamExt, TryStreamExt};
use sha3::Digest;
use signer_core::SignerUser;
use tokio::io::{AsyncReadExt as _, AsyncSeekExt, AsyncWriteExt, BufReader};
use crate::{
HttpClient, HttpClientConfig, StorageBlobsCheckExistenceRequest,
StorageBlobsCheckExistenceResponse, StorageItemData, StorageItemResponse,
};
pub struct SignerDaemonStorage {
addr: String,
user: SignerUser,
}
#[derive(Debug, Clone)]
struct ChunkDescriptor {
hash: String,
offset: u64,
length: u64,
}
impl SignerDaemonStorage {
pub fn new(addr: String, user: SignerUser) -> Self {
Self { addr, user }
}
pub async fn upload(&self, path: PathBuf) -> crate::DaemonResult<StorageItemResponse> {
let file = tokio::fs::File::open(&path).await?;
let mut reader = BufReader::new(file);
let mut chunk_descriptors = Vec::new();
let mut current_offset = 0;
loop {
let mut buffer = vec![0; 16 * 1024 * 1024];
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
buffer.truncate(n);
let mut hasher = sha3::Sha3_256::new();
hasher.update(&buffer);
let hash = base64::prelude::BASE64_URL_SAFE.encode(hasher.finalize().to_vec());
chunk_descriptors.push(ChunkDescriptor {
hash,
offset: current_offset,
length: n as u64,
});
current_offset += n as u64;
}
let blob_hashes: Vec<String> = chunk_descriptors.iter().map(|d| d.hash.clone()).collect();
let mut item_hasher = sha3::Sha3_256::new();
for hash in &blob_hashes {
item_hasher.update(hash.as_bytes());
}
let item_hash = base64::prelude::BASE64_URL_SAFE.encode(item_hasher.finalize().to_vec());
let config = HttpClientConfig::new(self.user.clone(), self.addr.clone());
let client = HttpClient::new(config);
let head_response = client
.head(&format!(
"/api/storage/item/{}/{}",
&self.user.public.pub_key, &item_hash
))
.await;
match head_response {
Ok(response) => {
if response.status().is_success() {
return Ok(StorageItemResponse {
pubkey: self.user.public.pub_key.clone(),
hash: item_hash,
file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
});
}
}
Err(_) => {
}
}
let create_response = client
.post_raw(
"/api/storage/item",
&StorageItemData {
chunks: blob_hashes.clone(),
file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
},
)
.await;
match create_response {
Ok(response) => {
if response.status() == reqwest::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS {
let item: StorageItemResponse = response.json().await?;
return Ok(item);
}
}
Err(_) => {
}
}
let missing_blobs: StorageBlobsCheckExistenceResponse = client
.post(
"/api/storage/blobs/check-existence",
&StorageBlobsCheckExistenceRequest {
hashes: blob_hashes.clone(),
},
)
.await?;
let missing_hashes: std::collections::HashSet<String> =
missing_blobs.missing_hashes.into_iter().collect();
let descriptors_to_upload: Vec<ChunkDescriptor> = chunk_descriptors
.into_iter()
.filter(|d| missing_hashes.contains(&d.hash))
.collect();
let upload_futs = descriptors_to_upload.into_iter().map(|desc| {
let user = self.user.clone();
let addr = self.addr.clone();
let path = path.clone();
async move {
let mut file = tokio::fs::File::open(&path).await?;
file.seek(SeekFrom::Start(desc.offset)).await?;
let mut chunk_reader = file.take(desc.length);
let mut chunk_data = Vec::with_capacity(desc.length as usize);
chunk_reader.read_to_end(&mut chunk_data).await?;
let config = HttpClientConfig::new(user, addr.clone());
let client = HttpClient::new(config);
let file_data = reqwest::multipart::Part::bytes(chunk_data);
let form_data = reqwest::multipart::Form::new().part("blob", file_data);
client
.post_multipart("/api/storage/blob", form_data)
.await?
.error_for_status()?;
let result: crate::DaemonResult<()> = Ok(());
result
}
});
stream::iter(upload_futs)
.buffer_unordered(10)
.try_collect::<Vec<()>>()
.await?;
let item: StorageItemResponse = client
.post(
"/api/storage/item",
&StorageItemData {
chunks: blob_hashes,
file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
},
)
.await?;
Ok(item)
}
pub async fn download(
&self,
key: StorageItemResponse,
path: PathBuf,
) -> crate::DaemonResult<()> {
let config = HttpClientConfig::new(self.user.clone(), self.addr.clone());
let client = HttpClient::new(config);
let r = client
.get_raw(&format!("/api/storage/item/{}/{}", &key.pubkey, &key.hash))
.await?
.error_for_status()?;
let mut f = tokio::fs::File::create(path).await?;
let mut response_stream = r.bytes_stream();
while let Some(chunk) = response_stream.try_next().await? {
f.write_all(&chunk).await?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
#[tokio::test]
async fn test_upload_and_download_cargo_toml() -> crate::DaemonResult<()> {
use crate::{SignerDaemonStorage, SignerRemote};
use signer_core::SignerUser;
use tokio::fs;
let user = SignerUser::generete("alice_cargo_toml_test")?;
let remote = SignerRemote::new("http://localhost:8080");
remote.ping(&user).await?;
let storage = SignerDaemonStorage::new("http://localhost:8080".into(), user);
let source_path = PathBuf::from("../../Cargo.toml");
let download_path = PathBuf::from("./testdir/Cargo.toml.downloaded");
fs::create_dir_all("./testdir").await?;
let storage_key = storage.upload(source_path.clone()).await?;
storage.download(storage_key, download_path.clone()).await?;
let source_content = fs::read_to_string(source_path).await?;
let downloaded_content = fs::read_to_string(&download_path).await?;
assert_eq!(source_content, downloaded_content);
fs::remove_file(download_path).await?;
Ok(())
}
}