use anyhow::{Context, Result};
use async_trait::async_trait;
#[async_trait]
pub trait BlobTransport: Send + Sync {
async fn put(&self, url: &str, body: Vec<u8>) -> Result<()>;
async fn get(&self, url: &str) -> Result<Vec<u8>>;
}
#[derive(Debug, Clone)]
pub struct HttpBlobTransport {
http: reqwest::Client,
}
impl HttpBlobTransport {
#[must_use]
pub fn new(http: reqwest::Client) -> Self {
Self { http }
}
}
#[async_trait]
impl BlobTransport for HttpBlobTransport {
async fn put(&self, url: &str, body: Vec<u8>) -> Result<()> {
let resp = self
.http
.put(url)
.header("content-type", "application/octet-stream")
.body(body)
.send()
.await
.with_context(|| format!("PUT presigned {url}"))?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("blob PUT returned {status}: {text}");
}
Ok(())
}
async fn get(&self, url: &str) -> Result<Vec<u8>> {
let resp = self
.http
.get(url)
.send()
.await
.with_context(|| format!("GET presigned {url}"))?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("blob GET returned {status}: {text}");
}
Ok(resp.bytes().await?.to_vec())
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use std::collections::HashMap;
use std::sync::Mutex;
#[derive(Default)]
struct MemTransport {
objects: Mutex<HashMap<String, Vec<u8>>>,
}
#[async_trait]
impl BlobTransport for MemTransport {
async fn put(&self, url: &str, body: Vec<u8>) -> Result<()> {
self.objects.lock().unwrap().insert(url.to_owned(), body);
Ok(())
}
async fn get(&self, url: &str) -> Result<Vec<u8>> {
self.objects
.lock()
.unwrap()
.get(url)
.cloned()
.ok_or_else(|| anyhow::anyhow!("not found: {url}"))
}
}
#[tokio::test]
async fn injectable_transport_round_trips() {
let t = MemTransport::default();
t.put("memory://x", b"hello".to_vec()).await.unwrap();
assert_eq!(t.get("memory://x").await.unwrap(), b"hello");
assert!(t.get("memory://missing").await.is_err());
}
}