signer-daemon 0.3.1

Signer daemon package.
Documentation
use std::{io::SeekFrom, path::PathBuf};

use base64::Engine as _;
use futures::stream::{self, StreamExt, TryStreamExt};
use sha3::Digest;
use signer_core::SignerUser;
use signer_hub_kit::models::{
  StorageBlobsCheckExistenceRequest, StorageItemData, StorageItemResponse,
};
use tokio::io::{AsyncReadExt as _, AsyncSeekExt, AsyncWriteExt, BufReader};

use crate::OpenapiConfiguration;

pub struct SignerDaemonStorage {
  addr: String,
  user: SignerUser,
}

#[derive(Debug, Clone)]
struct ChunkDescriptor {
  hash: String,
  offset: u64,
  length: u64,
}

impl SignerDaemonStorage {
  pub fn new(addr: String, user: SignerUser) -> Self {
    Self { addr, user }
  }

  pub async fn upload(
    &self,
    path: PathBuf,
  ) -> anyhow::Result<StorageItemResponse> {
    let file = tokio::fs::File::open(&path).await?;
    let mut reader = BufReader::new(file);

    // 1. Read file in chunks and compute hashes, but only store descriptors
    let mut chunk_descriptors = Vec::new();
    let mut current_offset = 0;

    loop {
      let mut buffer = vec![0; 16 * 1024 * 1024];
      let n = reader.read(&mut buffer).await?;
      if n == 0 {
        break;
      }
      buffer.truncate(n);

      let mut hasher = sha3::Sha3_256::new();
      hasher.update(&buffer);
      let hash =
        base64::prelude::BASE64_URL_SAFE.encode(hasher.finalize().to_vec());

      chunk_descriptors.push(ChunkDescriptor {
        hash,
        offset: current_offset,
        length: n as u64,
      });

      current_offset += n as u64;
    }

    let blob_hashes: Vec<String> =
      chunk_descriptors.iter().map(|d| d.hash.clone()).collect();

    // 2. Compute item hash
    let mut item_hasher = sha3::Sha3_256::new();
    for hash in &blob_hashes {
      item_hasher.update(hash.as_bytes());
    }
    let item_hash =
      base64::prelude::BASE64_URL_SAFE.encode(item_hasher.finalize().to_vec());

    let conf =
      OpenapiConfiguration::new(self.user.clone(), self.addr.clone()).build();

    // 3. Check if item exists for the user
    match signer_hub_kit::apis::default_api::api_signer_storage_item_pubkey_hash_head(
      &conf,
      &self.user.public.pub_key,
      &item_hash,
    )
    .await
    {
      Ok(_) => {
        return Ok(StorageItemResponse {
          pubkey: self.user.public.pub_key.clone(),
          hash: item_hash,
          file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
        });
      }
      Err(e) => match e {
        signer_hub_kit::apis::Error::ResponseError(response) => {
          match response.status {
            reqwest::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS => {
              let item = signer_hub_kit::apis::default_api::api_signer_storage_item_post(
                &conf,
                StorageItemData {
                  chunks: blob_hashes,
                  file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
                },
              )
              .await?;
              return Ok(item);
            }
            reqwest::StatusCode::NOT_FOUND => { /* Continue */ }
            _ => return Err(anyhow::anyhow!("Unknown response error: {:?}", response)),
          }
        }
        _ => return Err(anyhow::anyhow!("Unknown error: {:?}", e)),
      },
    };

    // 4. Check for missing blobs
    let check_existence_response =
      signer_hub_kit::apis::default_api::api_signer_storage_blobs_check_existence_post(
        &conf,
        StorageBlobsCheckExistenceRequest {
          hashes: blob_hashes.clone(),
        },
      )
      .await?;

    let missing_hashes: std::collections::HashSet<String> =
      check_existence_response
        .missing_hashes
        .into_iter()
        .collect();

    // 5. Upload missing blobs by streaming from the file
    let descriptors_to_upload: Vec<ChunkDescriptor> = chunk_descriptors
      .into_iter()
      .filter(|d| missing_hashes.contains(&d.hash))
      .collect();

    let upload_futs = descriptors_to_upload.into_iter().map(|desc| {
      let user = self.user.clone();
      let addr = self.addr.clone();
      let path = path.clone();
      async move {
        let mut file = tokio::fs::File::open(&path).await?;
        file.seek(SeekFrom::Start(desc.offset)).await?;
        let mut chunk_reader = file.take(desc.length);
        let mut chunk_data = Vec::with_capacity(desc.length as usize);
        chunk_reader.read_to_end(&mut chunk_data).await?;

        let conf = OpenapiConfiguration::new(user, addr.clone()).build();
        let api_key = conf.api_key.unwrap();
        let file_data = reqwest::multipart::Part::bytes(chunk_data);
        let form_data = reqwest::multipart::Form::new().part("blob", file_data);

        let client = reqwest::Client::new();
        client
          .post(format!("{}/api/signer/storage/blob", &addr))
          .header(
            "Authorization",
            &format!("{} {}", api_key.prefix.unwrap(), api_key.key),
          )
          .multipart(form_data)
          .send()
          .await?
          .error_for_status()?;

        let result: anyhow::Result<()> = Ok(());
        result
      }
    });

    stream::iter(upload_futs)
      .buffer_unordered(10)
      .try_collect::<Vec<()>>()
      .await?;

    // 6. Create the item on the server
    let item = signer_hub_kit::apis::default_api::api_signer_storage_item_post(
      &conf,
      StorageItemData {
        chunks: blob_hashes,
        file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
      },
    )
    .await?;

    Ok(item)
  }

  pub async fn download(
    &self,
    key: StorageItemResponse,
    path: PathBuf,
  ) -> anyhow::Result<()> {
    let r = signer_hub_kit::apis::default_api::api_signer_storage_item_pubkey_hash_get(
      &OpenapiConfiguration::new(self.user.clone(), self.addr.clone()).build(),
      &key.pubkey,
      &key.hash,
    )
    .await?.error_for_status()?;

    let mut f = tokio::fs::File::create(path).await?;
    let mut response_stream = r.bytes_stream();
    while let Some(chunk) = response_stream.try_next().await? {
      f.write_all(&chunk).await?;
    }

    Ok(())
  }
}

#[cfg(test)]
mod test {
  use std::path::PathBuf;

  #[tokio::test]
  async fn test_remote_storage() -> anyhow::Result<()> {
    use crate::{SignerDaemonStorage, SignerRemote};
    use signer_core::SignerUser;

    let user = SignerUser::generete("alice")?;

    let remote = SignerRemote::new("http://localhost:8080");
    remote.ping(&user).await?;

    let storage =
      SignerDaemonStorage::new("http://localhost:8080".into(), user);
    let storage_key = storage.upload("./testdir/large-obj.exe".into()).await?;

    storage
      .download(storage_key, PathBuf::from("./testdir/large-obj-2.exe"))
      .await?;

    Ok(())
  }

  #[tokio::test]
  async fn test_upload_and_download_cargo_toml() -> anyhow::Result<()> {
    use crate::{SignerDaemonStorage, SignerRemote};
    use signer_core::SignerUser;
    use tokio::fs;

    let user = SignerUser::generete("alice_cargo_toml_test")?;

    let remote = SignerRemote::new("http://localhost:8080");
    remote.ping(&user).await?;

    let storage =
      SignerDaemonStorage::new("http://localhost:8080".into(), user);

    let source_path = PathBuf::from("./Cargo.toml");
    let download_path = PathBuf::from("./testdir/Cargo.toml.downloaded");

    // 确保下载目录存在
    fs::create_dir_all("./testdir").await?;

    // 上传文件
    let storage_key = storage.upload(source_path.clone()).await?;

    // 下载文件
    storage.download(storage_key, download_path.clone()).await?;

    // 对比文件内容
    let source_content = fs::read_to_string(source_path).await?;
    let downloaded_content = fs::read_to_string(&download_path).await?;

    assert_eq!(source_content, downloaded_content);

    // 清理下载的文件
    fs::remove_file(download_path).await?;

    Ok(())
  }
}