signer-daemon 0.3.2

Signer daemon package.
Documentation
use std::{io::SeekFrom, path::PathBuf};

use base64::Engine as _;
use futures::stream::{self, StreamExt, TryStreamExt};
use sha3::Digest;
use signer_core::SignerUser;
use tokio::io::{AsyncReadExt as _, AsyncSeekExt, AsyncWriteExt, BufReader};

use crate::{
    HttpClient, HttpClientConfig, StorageBlobsCheckExistenceRequest,
    StorageBlobsCheckExistenceResponse, StorageItemData, StorageItemResponse,
};

pub struct SignerDaemonStorage {
    addr: String,
    user: SignerUser,
}

#[derive(Debug, Clone)]
struct ChunkDescriptor {
    hash: String,
    offset: u64,
    length: u64,
}

impl SignerDaemonStorage {
    pub fn new(addr: String, user: SignerUser) -> Self {
        Self { addr, user }
    }

    pub async fn upload(&self, path: PathBuf) -> crate::DaemonResult<StorageItemResponse> {
        let file = tokio::fs::File::open(&path).await?;
        let mut reader = BufReader::new(file);

        // 1. Read file in chunks and compute hashes, but only store descriptors
        let mut chunk_descriptors = Vec::new();
        let mut current_offset = 0;

        loop {
            let mut buffer = vec![0; 16 * 1024 * 1024];
            let n = reader.read(&mut buffer).await?;
            if n == 0 {
                break;
            }
            buffer.truncate(n);

            let mut hasher = sha3::Sha3_256::new();
            hasher.update(&buffer);
            let hash = base64::prelude::BASE64_URL_SAFE.encode(hasher.finalize().to_vec());

            chunk_descriptors.push(ChunkDescriptor {
                hash,
                offset: current_offset,
                length: n as u64,
            });

            current_offset += n as u64;
        }

        let blob_hashes: Vec<String> = chunk_descriptors.iter().map(|d| d.hash.clone()).collect();

        // 2. Compute item hash
        let mut item_hasher = sha3::Sha3_256::new();
        for hash in &blob_hashes {
            item_hasher.update(hash.as_bytes());
        }
        let item_hash = base64::prelude::BASE64_URL_SAFE.encode(item_hasher.finalize().to_vec());

        let config = HttpClientConfig::new(self.user.clone(), self.addr.clone());
        let client = HttpClient::new(config);

        // 3. Check if item exists for the user
        let head_response = client
            .head(&format!(
                "/api/storage/item/{}/{}",
                &self.user.public.pub_key, &item_hash
            ))
            .await;

        match head_response {
            Ok(response) => {
                if response.status().is_success() {
                    return Ok(StorageItemResponse {
                        pubkey: self.user.public.pub_key.clone(),
                        hash: item_hash,
                        file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
                    });
                }
            }
            Err(_) => {
                // 如果 HEAD 请求失败,继续执行后续逻辑
            }
        }

        // 尝试创建项目,如果返回 421 状态码则表示项目已存在
        let create_response = client
            .post_raw(
                "/api/storage/item",
                &StorageItemData {
                    chunks: blob_hashes.clone(),
                    file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
                },
            )
            .await;

        match create_response {
            Ok(response) => {
                if response.status() == reqwest::StatusCode::UNAVAILABLE_FOR_LEGAL_REASONS {
                    let item: StorageItemResponse = response.json().await?;
                    return Ok(item);
                }
            }
            Err(_) => {
                // 继续执行后续逻辑
            }
        }

        // 4. Check for missing blobs
        let missing_blobs: StorageBlobsCheckExistenceResponse = client
            .post(
                "/api/storage/blobs/check-existence",
                &StorageBlobsCheckExistenceRequest {
                    hashes: blob_hashes.clone(),
                },
            )
            .await?;

        let missing_hashes: std::collections::HashSet<String> =
            missing_blobs.missing_hashes.into_iter().collect();

        // 5. Upload missing blobs by streaming from the file
        let descriptors_to_upload: Vec<ChunkDescriptor> = chunk_descriptors
            .into_iter()
            .filter(|d| missing_hashes.contains(&d.hash))
            .collect();

        let upload_futs = descriptors_to_upload.into_iter().map(|desc| {
            let user = self.user.clone();
            let addr = self.addr.clone();
            let path = path.clone();
            async move {
                let mut file = tokio::fs::File::open(&path).await?;
                file.seek(SeekFrom::Start(desc.offset)).await?;
                let mut chunk_reader = file.take(desc.length);
                let mut chunk_data = Vec::with_capacity(desc.length as usize);
                chunk_reader.read_to_end(&mut chunk_data).await?;

                let config = HttpClientConfig::new(user, addr.clone());
                let client = HttpClient::new(config);

                let file_data = reqwest::multipart::Part::bytes(chunk_data);
                let form_data = reqwest::multipart::Form::new().part("blob", file_data);

                client
                    .post_multipart("/api/storage/blob", form_data)
                    .await?
                    .error_for_status()?;

                let result: crate::DaemonResult<()> = Ok(());
                result
            }
        });

        stream::iter(upload_futs)
            .buffer_unordered(10)
            .try_collect::<Vec<()>>()
            .await?;

        // 6. Create the item
        let item: StorageItemResponse = client
            .post(
                "/api/storage/item",
                &StorageItemData {
                    chunks: blob_hashes,
                    file_name: path.file_name().unwrap().to_str().unwrap().to_string(),
                },
            )
            .await?;

        Ok(item)
    }

    pub async fn download(
        &self,
        key: StorageItemResponse,
        path: PathBuf,
    ) -> crate::DaemonResult<()> {
        let config = HttpClientConfig::new(self.user.clone(), self.addr.clone());
        let client = HttpClient::new(config);

        let r = client
            .get_raw(&format!("/api/storage/item/{}/{}", &key.pubkey, &key.hash))
            .await?
            .error_for_status()?;

        let mut f = tokio::fs::File::create(path).await?;
        let mut response_stream = r.bytes_stream();
        while let Some(chunk) = response_stream.try_next().await? {
            f.write_all(&chunk).await?;
        }

        Ok(())
    }
}

#[cfg(test)]
mod test {
    use std::path::PathBuf;

    #[tokio::test]
    async fn test_upload_and_download_cargo_toml() -> crate::DaemonResult<()> {
        use crate::{SignerDaemonStorage, SignerRemote};
        use signer_core::SignerUser;
        use tokio::fs;

        let user = SignerUser::generete("alice_cargo_toml_test")?;

        let remote = SignerRemote::new("http://localhost:8080");
        remote.ping(&user).await?;

        let storage = SignerDaemonStorage::new("http://localhost:8080".into(), user);

        let source_path = PathBuf::from("../../Cargo.toml");
        let download_path = PathBuf::from("./testdir/Cargo.toml.downloaded");

        // 确保下载目录存在
        fs::create_dir_all("./testdir").await?;

        // 上传文件
        let storage_key = storage.upload(source_path.clone()).await?;

        // 下载文件
        storage.download(storage_key, download_path.clone()).await?;

        // 对比文件内容
        let source_content = fs::read_to_string(source_path).await?;
        let downloaded_content = fs::read_to_string(&download_path).await?;

        assert_eq!(source_content, downloaded_content);

        // 清理下载的文件
        fs::remove_file(download_path).await?;

        Ok(())
    }
}