use async_trait::async_trait;
use crate::error::ToolError;
use super::item::SpoolItem;
#[derive(Debug, Clone, PartialEq)]
pub struct SpoolMeta {
pub key: String,
pub size: u64,
}
#[async_trait]
pub trait SpoolBackend: Send + Sync {
fn kind(&self) -> &'static str;
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError>;
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError>;
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError>;
async fn delete(&self, key: &str) -> Result<(), ToolError>;
async fn total_bytes(&self) -> Result<u64, ToolError> {
Ok(self.list().await?.iter().map(|m| m.size).sum())
}
async fn len(&self) -> Result<usize, ToolError> {
Ok(self.list().await?.len())
}
async fn is_empty(&self) -> Result<bool, ToolError> {
Ok(self.len().await? == 0)
}
}
#[derive(Debug, Clone)]
pub struct LocalDiskBackend {
dir: std::path::PathBuf,
}
impl LocalDiskBackend {
pub async fn open(dir: impl Into<std::path::PathBuf>) -> Result<Self, ToolError> {
let dir = dir.into();
tokio::fs::create_dir_all(&dir).await.map_err(|e| {
ToolError::Io(format!("spool dir '{}' create failed: {e}", dir.display()))
})?;
Ok(Self { dir })
}
fn path_for(&self, key: &str) -> std::path::PathBuf {
self.dir.join(format!("{key}.json"))
}
}
#[async_trait]
impl SpoolBackend for LocalDiskBackend {
fn kind(&self) -> &'static str {
"local_disk"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let path = self.path_for(&item.object_key());
tokio::fs::write(&path, item.to_bytes()).await.map_err(|e| {
ToolError::Io(format!("spool write '{}' failed: {e}", path.display()))
})
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
let mut rd = match tokio::fs::read_dir(&self.dir).await {
Ok(rd) => rd,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(ToolError::Io(format!("spool list failed: {e}"))),
};
let mut metas = Vec::new();
while let Some(entry) = rd
.next_entry()
.await
.map_err(|e| ToolError::Io(format!("spool list entry failed: {e}")))?
{
let name = entry.file_name().to_string_lossy().into_owned();
let Some(key) = name.strip_suffix(".json") else {
continue;
};
let size = entry
.metadata()
.await
.map(|m| m.len())
.unwrap_or(0);
metas.push(SpoolMeta { key: key.to_string(), size });
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
let path = self.path_for(key);
let bytes = tokio::fs::read(&path).await.map_err(|e| {
ToolError::Io(format!("spool read '{}' failed: {e}", path.display()))
})?;
SpoolItem::from_bytes(&bytes)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
let path = self.path_for(key);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => Err(ToolError::Io(format!(
"spool delete '{}' failed: {e}",
path.display()
))),
}
}
}
#[derive(Clone)]
pub struct NatsObjectBackend {
store: async_nats::jetstream::object_store::ObjectStore,
bucket: String,
}
impl NatsObjectBackend {
pub async fn open(
js: &async_nats::jetstream::Context,
bucket: &str,
) -> Result<Self, ToolError> {
let store = match js.get_object_store(bucket).await {
Ok(s) => s,
Err(_) => js
.create_object_store(async_nats::jetstream::object_store::Config {
bucket: bucket.to_string(),
description: Some("NoETL subscription spool (RFC #90 Phase 4)".to_string()),
..Default::default()
})
.await
.map_err(|e| {
ToolError::ExecutionFailed(format!(
"spool object store bucket '{bucket}' open/create failed: {e}"
))
})?,
};
Ok(Self {
store,
bucket: bucket.to_string(),
})
}
}
#[async_trait]
impl SpoolBackend for NatsObjectBackend {
fn kind(&self) -> &'static str {
"nats_object"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let key = item.object_key();
let meta = async_nats::jetstream::object_store::ObjectMetadata {
name: key.clone(),
description: Some(item.spool_ref()),
..Default::default()
};
let mut reader = std::io::Cursor::new(item.to_bytes());
self.store.put(meta, &mut reader).await.map_err(|e| {
ToolError::ExecutionFailed(format!(
"spool object_put '{key}' to '{}' failed: {e}",
self.bucket
))
})?;
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
use futures::StreamExt;
let mut stream = self.store.list().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_list '{}' failed: {e}", self.bucket))
})?;
let mut metas = Vec::new();
while let Some(item) = stream.next().await {
match item {
Ok(info) if !info.deleted => {
metas.push(SpoolMeta {
key: info.name,
size: info.size as u64,
});
}
Ok(_) => {} Err(e) => tracing::warn!(bucket = %self.bucket, "spool list entry error: {e}"),
}
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
use tokio::io::AsyncReadExt;
let mut object = self.store.get(key).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_get '{key}' failed: {e}"))
})?;
let mut buf = Vec::new();
object.read_to_end(&mut buf).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_get '{key}' read failed: {e}"))
})?;
SpoolItem::from_bytes(&buf)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
self.store.delete(key).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_delete '{key}' failed: {e}"))
})
}
}
#[cfg(feature = "gcs")]
#[derive(Clone)]
pub struct GcsBackend {
client: reqwest::Client,
auth: Option<crate::auth::GcpAuth>,
bucket: String,
prefix: String,
endpoint: String,
}
#[cfg(feature = "gcs")]
impl GcsBackend {
pub async fn open(bucket: &str, prefix: &str) -> Result<Self, ToolError> {
Ok(Self {
client: reqwest::Client::new(),
auth: Some(crate::auth::GcpAuth::new()),
bucket: bucket.to_string(),
prefix: Self::norm_prefix(prefix),
endpoint: "https://storage.googleapis.com".to_string(),
})
}
pub fn with_endpoint(bucket: &str, prefix: &str, endpoint: &str, use_adc: bool) -> Self {
Self {
client: reqwest::Client::new(),
auth: use_adc.then(crate::auth::GcpAuth::new),
bucket: bucket.to_string(),
prefix: Self::norm_prefix(prefix),
endpoint: endpoint.trim_end_matches('/').to_string(),
}
}
fn norm_prefix(prefix: &str) -> String {
if prefix.is_empty() || prefix.ends_with('/') {
prefix.to_string()
} else {
format!("{prefix}/")
}
}
fn name_for(&self, key: &str) -> String {
format!("{}{}", self.prefix, key)
}
fn enc_path(name: &str) -> String {
let mut out = String::with_capacity(name.len() * 3);
for b in name.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}
async fn auth_header(&self) -> Result<Option<String>, ToolError> {
match &self.auth {
Some(gcp) => {
let token = gcp
.get_token(&["https://www.googleapis.com/auth/devstorage.read_write"])
.await?;
Ok(Some(format!("Bearer {token}")))
}
None => Ok(None),
}
}
}
#[cfg(feature = "gcs")]
#[async_trait]
impl SpoolBackend for GcsBackend {
fn kind(&self) -> &'static str {
"gcs"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let name = self.name_for(&item.object_key());
let url = format!("{}/upload/storage/v1/b/{}/o", self.endpoint, self.bucket);
let mut req = self
.client
.post(&url)
.query(&[("uploadType", "media"), ("name", name.as_str())])
.header("Content-Type", "application/json")
.body(item.to_bytes());
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs put '{name}' failed: {e}"))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool gcs put '{name}' to '{}' returned {status}: {body}",
self.bucket
)));
}
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
let url = format!("{}/storage/v1/b/{}/o", self.endpoint, self.bucket);
let mut metas = Vec::new();
let mut page_token: Option<String> = None;
loop {
let mut query: Vec<(&str, String)> = vec![("prefix", self.prefix.clone())];
if let Some(tok) = &page_token {
query.push(("pageToken", tok.clone()));
}
let mut req = self.client.get(&url).query(&query);
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs list '{}' failed: {e}", self.bucket))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool gcs list '{}' returned {status}: {body}",
self.bucket
)));
}
let page: GcsListResponse = resp.json().await.map_err(|e| {
ToolError::Json(format!("spool gcs list decode failed: {e}"))
})?;
for obj in page.items {
let Some(key) = obj.name.strip_prefix(&self.prefix) else {
continue;
};
if key.is_empty() {
continue; }
let size = obj.size.parse::<u64>().unwrap_or(0);
metas.push(SpoolMeta {
key: key.to_string(),
size,
});
}
match page.next_page_token {
Some(tok) if !tok.is_empty() => page_token = Some(tok),
_ => break,
}
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
let name = self.name_for(key);
let url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
Self::enc_path(&name)
);
let mut req = self.client.get(&url).query(&[("alt", "media")]);
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs get '{name}' failed: {e}"))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool gcs get '{name}' returned {status}: {body}"
)));
}
let bytes = resp.bytes().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs get '{name}' read failed: {e}"))
})?;
SpoolItem::from_bytes(&bytes)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
let name = self.name_for(key);
let url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
Self::enc_path(&name)
);
let mut req = self.client.delete(&url);
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs delete '{name}' failed: {e}"))
})?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NOT_FOUND {
Ok(())
} else {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
Err(ToolError::ExecutionFailed(format!(
"spool gcs delete '{name}' returned {status}: {body}"
)))
}
}
}
#[cfg(feature = "gcs")]
#[derive(serde::Deserialize)]
struct GcsListResponse {
#[serde(default)]
items: Vec<GcsObject>,
#[serde(rename = "nextPageToken")]
next_page_token: Option<String>,
}
#[cfg(feature = "gcs")]
#[derive(serde::Deserialize)]
struct GcsObject {
name: String,
#[serde(default)]
size: String,
}
#[cfg(feature = "s3")]
#[derive(Clone)]
pub struct S3Backend {
client: reqwest::Client,
bucket: String,
prefix: String,
endpoint: String,
region: String,
access_key: String,
secret_key: String,
session_token: Option<String>,
}
#[cfg(feature = "s3")]
impl S3Backend {
pub fn new(
bucket: &str,
prefix: &str,
endpoint: &str,
region: &str,
access_key: &str,
secret_key: &str,
session_token: Option<String>,
) -> Self {
Self {
client: reqwest::Client::new(),
bucket: bucket.to_string(),
prefix: Self::norm_prefix(prefix),
endpoint: endpoint.trim_end_matches('/').to_string(),
region: region.to_string(),
access_key: access_key.to_string(),
secret_key: secret_key.to_string(),
session_token,
}
}
fn norm_prefix(prefix: &str) -> String {
if prefix.is_empty() || prefix.ends_with('/') {
prefix.to_string()
} else {
format!("{prefix}/")
}
}
fn name_for(&self, key: &str) -> String {
format!("{}{}", self.prefix, key)
}
fn host(&self) -> String {
self.endpoint
.trim_start_matches("https://")
.trim_start_matches("http://")
.to_string()
}
fn uri_encode_path(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' | b'/' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}
fn uri_encode_query(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for b in s.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}
async fn signed_request(
&self,
method: reqwest::Method,
object_path: &str,
query: &[(&str, String)],
body: Vec<u8>,
content_type: Option<&str>,
) -> Result<reqwest::Response, ToolError> {
let (amzdate, datestamp) = amz_dates();
let host = self.host();
let payload_hash = super::item::sha256_hex(&body);
let canonical_path = if object_path.is_empty() {
format!("/{}", self.bucket)
} else {
format!("/{}/{}", self.bucket, object_path)
};
let canonical_uri = Self::uri_encode_path(&canonical_path);
let mut q: Vec<(String, String)> = query
.iter()
.map(|(k, v)| (Self::uri_encode_query(k), Self::uri_encode_query(v)))
.collect();
q.sort();
let canonical_query = q
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join("&");
let mut signed: Vec<(String, String)> = vec![
("host".to_string(), host.clone()),
("x-amz-content-sha256".to_string(), payload_hash.clone()),
("x-amz-date".to_string(), amzdate.clone()),
];
if let Some(tok) = &self.session_token {
signed.push(("x-amz-security-token".to_string(), tok.clone()));
}
signed.sort();
let canonical_headers = signed
.iter()
.map(|(k, v)| format!("{k}:{v}\n"))
.collect::<String>();
let signed_headers = signed
.iter()
.map(|(k, _)| k.as_str())
.collect::<Vec<_>>()
.join(";");
let canonical_request = format!(
"{}\n{}\n{}\n{}\n{}\n{}",
method.as_str(),
canonical_uri,
canonical_query,
canonical_headers,
signed_headers,
payload_hash,
);
let scope = format!("{datestamp}/{}/s3/aws4_request", self.region);
let string_to_sign = format!(
"AWS4-HMAC-SHA256\n{}\n{}\n{}",
amzdate,
scope,
super::item::sha256_hex(canonical_request.as_bytes()),
);
let key = sigv4_signing_key(&self.secret_key, &datestamp, &self.region, "s3");
let signature = hex_lower(&hmac_sha256(&key, string_to_sign.as_bytes()));
let authorization = format!(
"AWS4-HMAC-SHA256 Credential={}/{}, SignedHeaders={}, Signature={}",
self.access_key, scope, signed_headers, signature,
);
let mut url = format!("{}{}", self.endpoint, canonical_uri);
if !canonical_query.is_empty() {
url.push('?');
url.push_str(&canonical_query);
}
let mut req = self
.client
.request(method, &url)
.header("x-amz-content-sha256", &payload_hash)
.header("x-amz-date", &amzdate)
.header("Authorization", authorization);
if let Some(tok) = &self.session_token {
req = req.header("x-amz-security-token", tok);
}
if let Some(ct) = content_type {
req = req.header("Content-Type", ct);
}
if !body.is_empty() {
req = req.body(body);
}
req.send()
.await
.map_err(|e| ToolError::ExecutionFailed(format!("spool s3 request to '{}' failed: {e}", self.bucket)))
}
}
#[cfg(feature = "s3")]
#[async_trait]
impl SpoolBackend for S3Backend {
fn kind(&self) -> &'static str {
"s3"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let name = self.name_for(&item.object_key());
let resp = self
.signed_request(
reqwest::Method::PUT,
&name,
&[],
item.to_bytes(),
Some("application/json"),
)
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool s3 put '{name}' to '{}' returned {status}: {body}",
self.bucket
)));
}
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
let mut metas = Vec::new();
let mut cont: Option<String> = None;
loop {
let mut query: Vec<(&str, String)> = vec![
("list-type", "2".to_string()),
("prefix", self.prefix.clone()),
];
if let Some(c) = &cont {
query.push(("continuation-token", c.clone()));
}
let resp = self
.signed_request(reqwest::Method::GET, "", &query, Vec::new(), None)
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool s3 list '{}' returned {status}: {body}",
self.bucket
)));
}
let xml = resp.text().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool s3 list '{}' read failed: {e}", self.bucket))
})?;
let (objects, next) = parse_list_v2(&xml);
for (key, size) in objects {
let Some(bare) = key.strip_prefix(&self.prefix) else {
continue;
};
if bare.is_empty() {
continue;
}
metas.push(SpoolMeta {
key: bare.to_string(),
size,
});
}
match next {
Some(t) if !t.is_empty() => cont = Some(t),
_ => break,
}
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
let name = self.name_for(key);
let resp = self
.signed_request(reqwest::Method::GET, &name, &[], Vec::new(), None)
.await?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool s3 get '{name}' returned {status}: {body}"
)));
}
let bytes = resp.bytes().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool s3 get '{name}' read failed: {e}"))
})?;
SpoolItem::from_bytes(&bytes)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
let name = self.name_for(key);
let resp = self
.signed_request(reqwest::Method::DELETE, &name, &[], Vec::new(), None)
.await?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NOT_FOUND {
Ok(())
} else {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
Err(ToolError::ExecutionFailed(format!(
"spool s3 delete '{name}' returned {status}: {body}"
)))
}
}
}
#[cfg(feature = "s3")]
fn parse_list_v2(xml: &str) -> (Vec<(String, u64)>, Option<String>) {
let mut out = Vec::new();
let mut rest = xml;
while let Some(start) = rest.find("<Contents>") {
let after = &rest[start + "<Contents>".len()..];
let Some(end) = after.find("</Contents>") else {
break;
};
let block = &after[..end];
if let Some(key) = xml_tag(block, "Key") {
let size = xml_tag(block, "Size")
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0);
out.push((key, size));
}
rest = &after[end + "</Contents>".len()..];
}
let next = xml_tag(xml, "NextContinuationToken");
(out, next)
}
#[cfg(feature = "s3")]
fn xml_tag(haystack: &str, tag: &str) -> Option<String> {
let open = format!("<{tag}>");
let close = format!("</{tag}>");
let start = haystack.find(&open)? + open.len();
let end = haystack[start..].find(&close)? + start;
Some(haystack[start..end].to_string())
}
#[cfg(feature = "s3")]
fn hmac_sha256(key: &[u8], data: &[u8]) -> Vec<u8> {
use hmac::{Hmac, Mac};
use sha2::Sha256;
let mut mac =
<Hmac<Sha256> as Mac>::new_from_slice(key).expect("HMAC accepts a key of any length");
mac.update(data);
mac.finalize().into_bytes().to_vec()
}
#[cfg(feature = "s3")]
fn sigv4_signing_key(secret: &str, datestamp: &str, region: &str, service: &str) -> Vec<u8> {
let k_date = hmac_sha256(format!("AWS4{secret}").as_bytes(), datestamp.as_bytes());
let k_region = hmac_sha256(&k_date, region.as_bytes());
let k_service = hmac_sha256(&k_region, service.as_bytes());
hmac_sha256(&k_service, b"aws4_request")
}
#[cfg(feature = "s3")]
fn hex_lower(bytes: &[u8]) -> String {
let mut s = String::with_capacity(bytes.len() * 2);
for b in bytes {
s.push_str(&format!("{b:02x}"));
}
s
}
#[cfg(feature = "s3")]
fn amz_dates() -> (String, String) {
let now = chrono::Utc::now();
(
now.format("%Y%m%dT%H%M%SZ").to_string(),
now.format("%Y%m%d").to_string(),
)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::source::PolledMessage;
fn item(seq: u64, id: &str, data: serde_json::Value) -> SpoolItem {
let msg = PolledMessage {
id: id.to_string(),
data,
headers: serde_json::Map::new(),
attributes: serde_json::Value::Null,
metadata: serde_json::Value::Null,
ack_id: None,
};
SpoolItem::new("subscriptions/t", "nats", msg, None, seq, None, "default", "circuit_open", seq)
}
#[tokio::test]
async fn local_disk_put_list_get_delete_roundtrip() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend::open(&tmp).await.unwrap();
assert!(backend.is_empty().await.unwrap());
backend.put(&item(2, "b", serde_json::json!({"v": 2}))).await.unwrap();
backend.put(&item(1, "a", serde_json::json!({"v": 1}))).await.unwrap();
backend.put(&item(3, "c", serde_json::json!({"v": 3}))).await.unwrap();
let metas = backend.list().await.unwrap();
assert_eq!(metas.len(), 3);
let got = backend.get(&metas[0].key).await.unwrap();
assert_eq!(got.recv_seq, 1);
assert_eq!(backend.get(&metas[2].key).await.unwrap().recv_seq, 3);
assert!(backend.total_bytes().await.unwrap() > 0);
backend.delete(&metas[0].key).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 2);
backend.delete(&metas[0].key).await.unwrap();
let _ = std::fs::remove_dir_all(&tmp);
}
#[tokio::test]
async fn local_disk_list_missing_dir_is_empty() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-missing-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend { dir: tmp.clone() };
assert_eq!(backend.list().await.unwrap().len(), 0);
}
#[tokio::test]
async fn local_disk_overwrite_is_idempotent_on_key() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-idem-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend::open(&tmp).await.unwrap();
backend.put(&item(7, "same", serde_json::json!(1))).await.unwrap();
backend.put(&item(7, "same", serde_json::json!(2))).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 1);
let _ = std::fs::remove_dir_all(&tmp);
}
#[cfg(feature = "gcs")]
#[test]
fn gcs_prefix_is_normalized_to_trailing_slash() {
assert_eq!(GcsBackend::norm_prefix("subscriptions/orders"), "subscriptions/orders/");
assert_eq!(GcsBackend::norm_prefix("subscriptions/orders/"), "subscriptions/orders/");
assert_eq!(GcsBackend::norm_prefix(""), "");
}
#[cfg(feature = "gcs")]
#[test]
fn gcs_name_for_joins_prefix_and_key() {
let b = GcsBackend::with_endpoint("bkt", "sub/spool", "http://x", false);
assert_eq!(b.name_for("00000000000000000001-abc"), "sub/spool/00000000000000000001-abc");
}
#[cfg(feature = "gcs")]
#[test]
fn gcs_enc_path_encodes_slashes_and_reserved() {
assert_eq!(GcsBackend::enc_path("a/b-c.d_e~f"), "a%2Fb-c.d_e~f");
assert_eq!(GcsBackend::enc_path("k=1&v"), "k%3D1%26v");
assert_eq!(GcsBackend::enc_path("AZaz09-._~"), "AZaz09-._~");
}
#[cfg(feature = "gcs")]
#[tokio::test]
#[ignore]
async fn gcs_live_put_list_get_delete_roundtrip() {
let Ok(bucket) = std::env::var("NOETL_GCS_TEST_BUCKET") else {
eprintln!("skipping: NOETL_GCS_TEST_BUCKET unset");
return;
};
let prefix = format!("noetl-spool-test/{}", std::process::id());
let backend = GcsBackend::open(&bucket, &prefix).await.unwrap();
assert!(backend.is_empty().await.unwrap(), "test prefix must start empty");
backend.put(&item(2, "b", serde_json::json!({"v": 2}))).await.unwrap();
backend.put(&item(1, "a", serde_json::json!({"v": 1}))).await.unwrap();
backend.put(&item(3, "c", serde_json::json!({"v": 3}))).await.unwrap();
let metas = backend.list().await.unwrap();
assert_eq!(metas.len(), 3, "all three items listed");
assert_eq!(backend.get(&metas[0].key).await.unwrap().recv_seq, 1);
assert_eq!(backend.get(&metas[2].key).await.unwrap().recv_seq, 3);
assert!(backend.total_bytes().await.unwrap() > 0);
let got = backend.get(&metas[0].key).await.unwrap();
assert_eq!(got.message_id, "a");
backend.delete(&metas[0].key).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 2);
backend.delete(&metas[0].key).await.unwrap();
for m in backend.list().await.unwrap() {
backend.delete(&m.key).await.unwrap();
}
assert!(backend.is_empty().await.unwrap());
}
#[cfg(feature = "s3")]
#[test]
fn s3_sigv4_signing_key_matches_aws_reference_vector() {
let key = sigv4_signing_key(
"wJalrXUtnFEMI/K7MDENG+bPxRfiCYEXAMPLEKEY",
"20120215",
"us-east-1",
"iam",
);
assert_eq!(
hex_lower(&key),
"f4780e2d9f65fa895f9c67b32ce1baf0b0d8a43505a000a1a9e090d414db404d"
);
}
#[cfg(feature = "s3")]
#[test]
fn s3_prefix_normalized_and_name_joined() {
let b = S3Backend::new(
"bkt", "sub/spool", "http://minio:9000", "us-east-1", "ak", "sk", None,
);
assert_eq!(b.prefix, "sub/spool/");
assert_eq!(
b.name_for("00000000000000000001-abc"),
"sub/spool/00000000000000000001-abc"
);
assert_eq!(b.host(), "minio:9000");
assert_eq!(S3Backend::norm_prefix(""), "");
}
#[cfg(feature = "s3")]
#[test]
fn s3_uri_encoders() {
assert_eq!(S3Backend::uri_encode_path("a/b-c.d_e~f"), "a/b-c.d_e~f");
assert_eq!(S3Backend::uri_encode_query("a/b=c"), "a%2Fb%3Dc");
assert_eq!(S3Backend::uri_encode_query("aGVsbG8+/w=="), "aGVsbG8%2B%2Fw%3D%3D");
}
#[cfg(feature = "s3")]
#[test]
fn s3_parse_list_v2_extracts_keys_sizes_and_token() {
let xml = r#"<?xml version="1.0" encoding="UTF-8"?>
<ListBucketResult xmlns="http://s3.amazonaws.com/doc/2006-03-01/">
<Name>bkt</Name>
<Prefix>sub/spool/</Prefix>
<IsTruncated>true</IsTruncated>
<Contents><Key>sub/spool/00000000000000000001-a</Key><Size>120</Size><StorageClass>STANDARD</StorageClass></Contents>
<Contents><Key>sub/spool/00000000000000000002-b</Key><Size>240</Size></Contents>
<NextContinuationToken>1ueGcxLPRx1Tr/XYExampleToken</NextContinuationToken>
</ListBucketResult>"#;
let (objs, next) = parse_list_v2(xml);
assert_eq!(objs.len(), 2);
assert_eq!(objs[0], ("sub/spool/00000000000000000001-a".to_string(), 120));
assert_eq!(objs[1].1, 240);
assert_eq!(next.as_deref(), Some("1ueGcxLPRx1Tr/XYExampleToken"));
let (objs2, next2) = parse_list_v2("<ListBucketResult><IsTruncated>false</IsTruncated></ListBucketResult>");
assert!(objs2.is_empty());
assert!(next2.is_none());
}
#[cfg(feature = "s3")]
#[tokio::test]
#[ignore]
async fn s3_live_put_list_get_delete_roundtrip() {
let Ok(bucket) = std::env::var("NOETL_S3_TEST_BUCKET") else {
eprintln!("skipping: NOETL_S3_TEST_BUCKET unset");
return;
};
let endpoint = std::env::var("NOETL_S3_ENDPOINT").unwrap_or_else(|_| "https://s3.us-east-1.amazonaws.com".to_string());
let region = std::env::var("NOETL_S3_REGION").unwrap_or_else(|_| "us-east-1".to_string());
let access = std::env::var("NOETL_S3_ACCESS_KEY").expect("NOETL_S3_ACCESS_KEY");
let secret = std::env::var("NOETL_S3_SECRET_KEY").expect("NOETL_S3_SECRET_KEY");
let prefix = format!("noetl-spool-test/{}", std::process::id());
let backend = S3Backend::new(&bucket, &prefix, &endpoint, ®ion, &access, &secret, None);
assert!(backend.is_empty().await.unwrap(), "test prefix must start empty");
backend.put(&item(2, "b", serde_json::json!({"v": 2}))).await.unwrap();
backend.put(&item(1, "a", serde_json::json!({"v": 1}))).await.unwrap();
backend.put(&item(3, "c", serde_json::json!({"v": 3}))).await.unwrap();
let metas = backend.list().await.unwrap();
assert_eq!(metas.len(), 3, "all three items listed");
assert_eq!(backend.get(&metas[0].key).await.unwrap().recv_seq, 1);
assert_eq!(backend.get(&metas[2].key).await.unwrap().recv_seq, 3);
assert!(backend.total_bytes().await.unwrap() > 0);
let got = backend.get(&metas[0].key).await.unwrap();
assert_eq!(got.message_id, "a");
backend.delete(&metas[0].key).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 2);
backend.delete(&metas[0].key).await.unwrap();
for m in backend.list().await.unwrap() {
backend.delete(&m.key).await.unwrap();
}
assert!(backend.is_empty().await.unwrap());
}
}