use std::{io::SeekFrom, path::PathBuf};
use base64::Engine as _;
use futures::stream::{self, StreamExt, TryStreamExt};
use sha3::Digest;
use signer_core::SignerUser;
use signer_hub_kit::models::{
StorageBlobsCheckExistenceRequest, StorageItemData, StorageItemResponse,
};
use tokio::io::{AsyncReadExt as _, AsyncSeekExt, AsyncWriteExt, BufReader};
use crate::OpenapiConfiguration;
pub struct SignerDaemonStorage {
addr: String,
user: SignerUser,
}
#[derive(Debug, Clone)]
struct ChunkDescriptor {
hash: String,
offset: u64,
length: u64,
}
impl SignerDaemonStorage {
pub fn new(addr: String, user: SignerUser) -> Self {
Self { addr, user }
}
pub async fn upload(
&self,
path: PathBuf,
) -> anyhow::Result<StorageItemResponse> {
let file = tokio::fs::File::open(&path).await?;
let mut reader = BufReader::new(file);
let mut chunk_descriptors = Vec::new();
let mut current_offset = 0;
loop {
let mut buffer = vec![0; 16 * 1024 * 1024];
let n = reader.read(&mut buffer).await?;
if n == 0 {
break;
}
buffer.truncate(n);
let mut hasher = sha3::Sha3_256::new();
hasher.update(&buffer);
let hash =
base64::prelude::BASE64_URL_SAFE.encode(hasher.finalize().to_vec());
chunk_descriptors.push(ChunkDescriptor {
hash,
offset: current_offset,
length: n as u64,
});
current_offset += n as u64;
}
let blob_hashes: Vec<String> =
chunk_descriptors.iter().map(|d| d.hash.clone()).collect();
let mut item_hasher = sha3::Sha3_256::new();
for hash in &blob_hashes {
item_hasher.update(hash.as_bytes());
}
let item_hash =
base64::prelude::BASE64_URL_SAFE.encode(item_hasher.finalize().to_vec());
let conf =
OpenapiConfiguration::new(self.user.clone(), self.addr.clone()).build();
match signer_hub_kit::apis::default_api::api_signer_storage_item_pubkey_hash_head(
&conf,
&self.user.public.pub_key,
&item_hash,
)
.await
{
Ok(_) => {
return Ok(StorageItemResponse {
pubkey: self.user.public.pub_key.clone(),
hash: item_hash,
file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
});
}
Err(e) => match e {
signer_hub_kit::apis::Error::ResponseError(response) => {
match response.status {
reqwest::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => {
let item = signer_hub_kit::apis::default_api::api_signer_storage_item_post(
&conf,
StorageItemData {
chunks: blob_hashes,
file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
},
)
.await?;
return Ok(item);
}
reqwest::StatusCode::NOT_FOUND => { }
_ => return Err(anyhow::anyhow!("Unknown response error: {:?}", response)),
}
}
_ => return Err(anyhow::anyhow!("Unknown error: {:?}", e)),
},
};
let check_existence_response =
signer_hub_kit::apis::default_api::api_signer_storage_blobs_check_existence_post(
&conf,
StorageBlobsCheckExistenceRequest {
hashes: blob_hashes.clone(),
},
)
.await?;
let missing_hashes: std::collections::HashSet<String> =
check_existence_response
.missing_hashes
.into_iter()
.collect();
let descriptors_to_upload: Vec<ChunkDescriptor> = chunk_descriptors
.into_iter()
.filter(|d| missing_hashes.contains(&d.hash))
.collect();
let upload_futs = descriptors_to_upload.into_iter().map(|desc| {
let user = self.user.clone();
let addr = self.addr.clone();
let path = path.clone();
async move {
let mut file = tokio::fs::File::open(&path).await?;
file.seek(SeekFrom::Start(desc.offset)).await?;
let mut chunk_reader = file.take(desc.length);
let mut chunk_data = Vec::with_capacity(desc.length as usize);
chunk_reader.read_to_end(&mut chunk_data).await?;
let conf = OpenapiConfiguration::new(user, addr.clone()).build();
let api_key = conf.api_key.unwrap();
let file_data = reqwest::multipart::Part::bytes(chunk_data);
let form_data = reqwest::multipart::Form::new().part("blob", file_data);
let client = reqwest::Client::new();
client
.post(format!("{}/api/signer/storage/blob", &addr))
.header(
"Authorization",
&format!("{} {}", api_key.prefix.unwrap(), api_key.key),
)
.multipart(form_data)
.send()
.await?
.error_for_status()?;
let result: anyhow::Result<()> = Ok(());
result
}
});
stream::iter(upload_futs)
.buffer_unordered(10)
.try_collect::<Vec<()>>()
.await?;
let item = signer_hub_kit::apis::default_api::api_signer_storage_item_post(
&conf,
StorageItemData {
chunks: blob_hashes,
file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
},
)
.await?;
Ok(item)
}
pub async fn download(
&self,
key: StorageItemResponse,
path: PathBuf,
) -> anyhow::Result<()> {
let r = signer_hub_kit::apis::default_api::api_signer_storage_item_pubkey_hash_get(
&OpenapiConfiguration::new(self.user.clone(), self.addr.clone()).build(),
&key.pubkey,
&key.hash,
)
.await?.error_for_status()?;
let mut f = tokio::fs::File::create(path).await?;
let mut response_stream = r.bytes_stream();
while let Some(chunk) = response_stream.try_next().await? {
f.write_all(&chunk).await?;
}
Ok(())
}
}
#[cfg(test)]
mod test {
use std::path::PathBuf;
#[tokio::test]
async fn test_remote_storage() -> anyhow::Result<()> {
use crate::{SignerDaemonStorage, SignerRemote};
use signer_core::SignerUser;
let user = SignerUser::generete("alice")?;
let remote = SignerRemote::new("http://localhost:8080");
remote.ping(&user).await?;
let storage =
SignerDaemonStorage::new("http://localhost:8080".into(), user);
let storage_key = storage.upload("./testdir/large-obj.exe".into()).await?;
storage
.download(storage_key, PathBuf::from("./testdir/large-obj-2.exe"))
.await?;
Ok(())
}
#[tokio::test]
async fn test_upload_and_download_cargo_toml() -> anyhow::Result<()> {
use crate::{SignerDaemonStorage, SignerRemote};
use signer_core::SignerUser;
use tokio::fs;
let user = SignerUser::generete("alice_cargo_toml_test")?;
let remote = SignerRemote::new("http://localhost:8080");
remote.ping(&user).await?;
let storage =
SignerDaemonStorage::new("http://localhost:8080".into(), user);
let source_path = PathBuf::from("./Cargo.toml");
let download_path = PathBuf::from("./testdir/Cargo.toml.downloaded");
fs::create_dir_all("./testdir").await?;
let storage_key = storage.upload(source_path.clone()).await?;
storage.download(storage_key, download_path.clone()).await?;
let source_content = fs::read_to_string(source_path).await?;
let downloaded_content = fs::read_to_string(&download_path).await?;
assert_eq!(source_content, downloaded_content);
fs::remove_file(download_path).await?;
Ok(())
}
}