pub mod batch;
pub mod multi;
use crate::auth::{
auth_header_value, build_blossom_auth, build_blossom_auth_with_extra_tags, BlossomSigner,
};
use crate::protocol::{sha256_hex, BlobDescriptor, STREAM_CHUNK_SIZE};
use tracing::{info, instrument, warn};
pub struct BlossomClient {
http: reqwest::Client,
servers: Vec<String>,
signer: Box<dyn BlossomSigner>,
}
impl BlossomClient {
pub fn new(servers: Vec<String>, signer: impl BlossomSigner + 'static) -> Self {
Self::with_timeout(servers, signer, std::time::Duration::from_secs(30))
}
pub fn with_timeout(
servers: Vec<String>,
signer: impl BlossomSigner + 'static,
timeout: std::time::Duration,
) -> Self {
let http = reqwest::Client::builder()
.timeout(timeout)
.build()
.unwrap_or_else(|_| reqwest::Client::new());
Self {
http,
servers,
signer: Box::new(signer),
}
}
#[instrument(name = "blossom.client.upload", skip_all, fields(
blob.size = data.len(),
blob.sha256,
blob.content_type = content_type,
server.url,
))]
pub async fn upload(&self, data: &[u8], content_type: &str) -> Result<BlobDescriptor, String> {
let our_sha256 = sha256_hex(data);
tracing::Span::current().record("blob.sha256", our_sha256.as_str());
let auth_event =
build_blossom_auth(self.signer.as_ref(), "upload", Some(&our_sha256), None, "");
let auth_header = auth_header_value(&auth_event);
for server in &self.servers {
let url = format!("{}/upload", server.trim_end_matches('/'));
let result = self
.http
.put(&url)
.header("Authorization", &auth_header)
.header("Content-Type", content_type)
.body(data.to_vec())
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
let desc: BlobDescriptor = resp
.json()
.await
.map_err(|e| format!("parse upload response: {e}"))?;
if desc.sha256 != our_sha256 {
return Err(format!(
"SHA256 mismatch: server={}, ours={}",
desc.sha256, our_sha256
));
}
tracing::Span::current().record("server.url", server.as_str());
info!(
blob.sha256 = %desc.sha256,
blob.size = desc.size,
server.url = %server,
"blob uploaded"
);
return Ok(desc);
}
Ok(resp) => {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(
server.url = %server,
http.status_code = status.as_u16(),
error.message = %text,
"upload failed, trying next server"
);
continue;
}
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"upload request error, trying next server"
);
continue;
}
}
}
Err("all Blossom servers failed for upload".into())
}
#[instrument(name = "blossom.client.upload_lfs", skip_all, fields(
blob.size = data.len(),
blob.sha256,
lfs.path = path,
lfs.repo = repo,
server.url,
))]
pub async fn upload_lfs(
&self,
data: &[u8],
content_type: &str,
path: &str,
repo: &str,
base_sha256: Option<&str>,
is_manifest: bool,
) -> Result<BlobDescriptor, String> {
let our_sha256 = sha256_hex(data);
tracing::Span::current().record("blob.sha256", our_sha256.as_str());
let mut extra_tags = vec![
vec!["t".into(), "lfs".into()],
vec!["path".into(), path.into()],
vec!["repo".into(), repo.into()],
];
if let Some(base) = base_sha256 {
extra_tags.push(vec!["base".into(), base.into()]);
}
if is_manifest {
extra_tags.push(vec!["manifest".into()]);
}
let auth_event = build_blossom_auth_with_extra_tags(
self.signer.as_ref(),
"upload",
Some(&our_sha256),
None,
"",
&extra_tags,
);
let auth_header = auth_header_value(&auth_event);
for server in &self.servers {
let url = format!("{}/upload", server.trim_end_matches('/'));
let result = self
.http
.put(&url)
.header("Authorization", &auth_header)
.header("Content-Type", content_type)
.body(data.to_vec())
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
let desc: BlobDescriptor = resp
.json()
.await
.map_err(|e| format!("parse upload response: {e}"))?;
if desc.sha256 != our_sha256 {
return Err(format!(
"SHA256 mismatch: server={}, ours={}",
desc.sha256, our_sha256
));
}
tracing::Span::current().record("server.url", server.as_str());
info!(
blob.sha256 = %desc.sha256,
blob.size = desc.size,
lfs.path = %path,
server.url = %server,
"LFS blob uploaded"
);
return Ok(desc);
}
Ok(resp) => {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(
server.url = %server,
http.status_code = status.as_u16(),
error.message = %text,
"LFS upload failed, trying next server"
);
continue;
}
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"LFS upload request error, trying next server"
);
continue;
}
}
}
Err("all Blossom servers failed for LFS upload".into())
}
#[instrument(name = "blossom.client.download", skip_all, fields(
blob.sha256 = %sha256,
blob.size,
server.url,
))]
pub async fn download(&self, sha256: &str) -> Result<Vec<u8>, String> {
let auth_event = build_blossom_auth(self.signer.as_ref(), "get", None, None, "");
let auth_header = auth_header_value(&auth_event);
for server in &self.servers {
let url = format!("{}/{}", server.trim_end_matches('/'), sha256);
let result = self
.http
.get(&url)
.header("Authorization", &auth_header)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
let data = resp
.bytes()
.await
.map_err(|e| format!("download body: {e}"))?
.to_vec();
let actual_hash = sha256_hex(&data);
if actual_hash != sha256 {
return Err(format!(
"SHA256 mismatch on download: expected={}, actual={}",
sha256, actual_hash
));
}
tracing::Span::current().record("blob.size", data.len() as u64);
tracing::Span::current().record("server.url", server.as_str());
info!(
blob.sha256 = %sha256,
blob.size = data.len(),
server.url = %server,
"blob downloaded"
);
return Ok(data);
}
Ok(resp) => {
warn!(
server.url = %server,
http.status_code = resp.status().as_u16(),
"download failed, trying next server"
);
continue;
}
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"download request error, trying next server"
);
continue;
}
}
}
Err(format!("blob {} not found on any Blossom server", sha256))
}
#[instrument(name = "blossom.client.exists", skip_all, fields(blob.sha256 = %sha256))]
pub async fn exists(&self, sha256: &str) -> Result<bool, String> {
for server in &self.servers {
let url = format!("{}/{}", server.trim_end_matches('/'), sha256);
let result = self.http.head(&url).send().await;
match result {
Ok(resp) if resp.status().is_success() => return Ok(true),
Ok(resp) if resp.status().as_u16() == 404 => continue,
Ok(_) => continue,
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"exists check error, trying next server"
);
continue;
}
}
}
Ok(false)
}
#[instrument(name = "blossom.client.delete", skip_all, fields(blob.sha256 = %sha256))]
pub async fn delete(&self, sha256: &str) -> Result<bool, String> {
let auth_event = build_blossom_auth(self.signer.as_ref(), "delete", None, None, "");
let auth_header = auth_header_value(&auth_event);
for server in &self.servers {
let url = format!("{}/{}", server.trim_end_matches('/'), sha256);
let result = self
.http
.delete(&url)
.header("Authorization", &auth_header)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
info!(blob.sha256 = %sha256, server.url = %server, "blob deleted");
return Ok(true);
}
Ok(resp) if resp.status().as_u16() == 404 => return Ok(false),
Ok(resp) => {
warn!(
server.url = %server,
http.status_code = resp.status().as_u16(),
"delete failed, trying next server"
);
continue;
}
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"delete request error, trying next server"
);
continue;
}
}
}
Err("all Blossom servers failed for delete".into())
}
#[instrument(name = "blossom.client.list", skip_all, fields(list.pubkey = %pubkey))]
pub async fn list(&self, pubkey: &str) -> Result<Vec<BlobDescriptor>, String> {
for server in &self.servers {
let url = format!("{}/list/{}", server.trim_end_matches('/'), pubkey);
let result = self.http.get(&url).send().await;
match result {
Ok(resp) if resp.status().is_success() => {
let descs: Vec<BlobDescriptor> = resp
.json()
.await
.map_err(|e| format!("parse list response: {e}"))?;
info!(list.pubkey = %pubkey, server.url = %server, "list retrieved");
return Ok(descs);
}
Ok(resp) => {
warn!(
server.url = %server,
http.status_code = resp.status().as_u16(),
"list failed, trying next server"
);
continue;
}
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"list request error, trying next server"
);
continue;
}
}
}
Err("all Blossom servers failed for list".into())
}
#[instrument(name = "blossom.client.upload_file", skip_all, fields(
file.path = %path.display(),
blob.sha256,
blob.size,
))]
pub async fn upload_file(
&self,
path: &std::path::Path,
content_type: &str,
) -> Result<BlobDescriptor, String> {
let file_meta = tokio::fs::metadata(path)
.await
.map_err(|e| format!("stat file: {e}"))?;
let file_size = file_meta.len();
let our_sha256 = tokio::task::block_in_place(|| {
let mut f = std::fs::File::open(path).map_err(|e| format!("open file: {e}"))?;
let (hash, _) =
crate::protocol::sha256_stream(&mut f).map_err(|e| format!("hash file: {e}"))?;
Ok::<_, String>(hash)
})?;
tracing::Span::current().record("blob.sha256", our_sha256.as_str());
tracing::Span::current().record("blob.size", file_size);
let auth_event =
build_blossom_auth(self.signer.as_ref(), "upload", Some(&our_sha256), None, "");
let auth_header = auth_header_value(&auth_event);
for server in &self.servers {
let url = format!("{}/upload", server.trim_end_matches('/'));
let file = tokio::fs::File::open(path)
.await
.map_err(|e| format!("open file: {e}"))?;
let stream = tokio_util::io::ReaderStream::with_capacity(file, STREAM_CHUNK_SIZE);
let body = reqwest::Body::wrap_stream(stream);
let result = self
.http
.put(&url)
.header("Authorization", &auth_header)
.header("Content-Type", content_type)
.header("Content-Length", file_size)
.body(body)
.send()
.await;
match result {
Ok(resp) if resp.status().is_success() => {
let desc: BlobDescriptor = resp
.json()
.await
.map_err(|e| format!("parse upload response: {e}"))?;
if desc.sha256 != our_sha256 {
return Err(format!(
"SHA256 mismatch: server={}, ours={}",
desc.sha256, our_sha256
));
}
info!(
blob.sha256 = %desc.sha256,
blob.size = desc.size,
server.url = %server,
"file uploaded (streaming)"
);
return Ok(desc);
}
Ok(resp) => {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
warn!(
server.url = %server,
http.status_code = status.as_u16(),
error.message = %text,
"upload_file failed, trying next server"
);
continue;
}
Err(e) => {
warn!(
server.url = %server,
error.message = %e,
"upload_file request error, trying next server"
);
continue;
}
}
}
Err("all Blossom servers failed for upload_file".into())
}
}
impl crate::traits::BlobClient for BlossomClient {
type Address = ();
async fn upload(
&self,
_addr: &(),
data: &[u8],
content_type: &str,
) -> Result<BlobDescriptor, String> {
self.upload(data, content_type).await
}
async fn download(&self, _addr: &(), sha256: &str) -> Result<Vec<u8>, String> {
self.download(sha256).await
}
async fn exists(&self, _addr: &(), sha256: &str) -> Result<bool, String> {
self.exists(sha256).await
}
async fn delete(&self, _addr: &(), sha256: &str) -> Result<bool, String> {
self.delete(sha256).await
}
async fn list(&self, _addr: &(), pubkey: &str) -> Result<Vec<BlobDescriptor>, String> {
self.list(pubkey).await
}
async fn upload_file(
&self,
_addr: &(),
path: &std::path::Path,
content_type: &str,
) -> Result<BlobDescriptor, String> {
self.upload_file(path, content_type).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::auth::Signer;
#[test]
fn test_client_creation() {
let signer = Signer::generate();
let client = BlossomClient::new(vec!["https://blossom.example.com".into()], signer);
assert_eq!(client.servers.len(), 1);
}
}