use async_trait::async_trait;
use crate::error::ToolError;
use super::item::SpoolItem;
#[derive(Debug, Clone, PartialEq)]
pub struct SpoolMeta {
pub key: String,
pub size: u64,
}
#[async_trait]
pub trait SpoolBackend: Send + Sync {
fn kind(&self) -> &'static str;
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError>;
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError>;
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError>;
async fn delete(&self, key: &str) -> Result<(), ToolError>;
async fn total_bytes(&self) -> Result<u64, ToolError> {
Ok(self.list().await?.iter().map(|m| m.size).sum())
}
async fn len(&self) -> Result<usize, ToolError> {
Ok(self.list().await?.len())
}
async fn is_empty(&self) -> Result<bool, ToolError> {
Ok(self.len().await? == 0)
}
}
#[derive(Debug, Clone)]
pub struct LocalDiskBackend {
dir: std::path::PathBuf,
}
impl LocalDiskBackend {
pub async fn open(dir: impl Into<std::path::PathBuf>) -> Result<Self, ToolError> {
let dir = dir.into();
tokio::fs::create_dir_all(&dir).await.map_err(|e| {
ToolError::Io(format!("spool dir '{}' create failed: {e}", dir.display()))
})?;
Ok(Self { dir })
}
fn path_for(&self, key: &str) -> std::path::PathBuf {
self.dir.join(format!("{key}.json"))
}
}
#[async_trait]
impl SpoolBackend for LocalDiskBackend {
fn kind(&self) -> &'static str {
"local_disk"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let path = self.path_for(&item.object_key());
tokio::fs::write(&path, item.to_bytes()).await.map_err(|e| {
ToolError::Io(format!("spool write '{}' failed: {e}", path.display()))
})
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
let mut rd = match tokio::fs::read_dir(&self.dir).await {
Ok(rd) => rd,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(ToolError::Io(format!("spool list failed: {e}"))),
};
let mut metas = Vec::new();
while let Some(entry) = rd
.next_entry()
.await
.map_err(|e| ToolError::Io(format!("spool list entry failed: {e}")))?
{
let name = entry.file_name().to_string_lossy().into_owned();
let Some(key) = name.strip_suffix(".json") else {
continue;
};
let size = entry
.metadata()
.await
.map(|m| m.len())
.unwrap_or(0);
metas.push(SpoolMeta { key: key.to_string(), size });
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
let path = self.path_for(key);
let bytes = tokio::fs::read(&path).await.map_err(|e| {
ToolError::Io(format!("spool read '{}' failed: {e}", path.display()))
})?;
SpoolItem::from_bytes(&bytes)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
let path = self.path_for(key);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => Err(ToolError::Io(format!(
"spool delete '{}' failed: {e}",
path.display()
))),
}
}
}
#[derive(Clone)]
pub struct NatsObjectBackend {
store: async_nats::jetstream::object_store::ObjectStore,
bucket: String,
}
impl NatsObjectBackend {
pub async fn open(
js: &async_nats::jetstream::Context,
bucket: &str,
) -> Result<Self, ToolError> {
let store = match js.get_object_store(bucket).await {
Ok(s) => s,
Err(_) => js
.create_object_store(async_nats::jetstream::object_store::Config {
bucket: bucket.to_string(),
description: Some("NoETL subscription spool (RFC #90 Phase 4)".to_string()),
..Default::default()
})
.await
.map_err(|e| {
ToolError::ExecutionFailed(format!(
"spool object store bucket '{bucket}' open/create failed: {e}"
))
})?,
};
Ok(Self {
store,
bucket: bucket.to_string(),
})
}
}
#[async_trait]
impl SpoolBackend for NatsObjectBackend {
fn kind(&self) -> &'static str {
"nats_object"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let key = item.object_key();
let meta = async_nats::jetstream::object_store::ObjectMetadata {
name: key.clone(),
description: Some(item.spool_ref()),
..Default::default()
};
let mut reader = std::io::Cursor::new(item.to_bytes());
self.store.put(meta, &mut reader).await.map_err(|e| {
ToolError::ExecutionFailed(format!(
"spool object_put '{key}' to '{}' failed: {e}",
self.bucket
))
})?;
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
use futures::StreamExt;
let mut stream = self.store.list().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_list '{}' failed: {e}", self.bucket))
})?;
let mut metas = Vec::new();
while let Some(item) = stream.next().await {
match item {
Ok(info) if !info.deleted => {
metas.push(SpoolMeta {
key: info.name,
size: info.size as u64,
});
}
Ok(_) => {} Err(e) => tracing::warn!(bucket = %self.bucket, "spool list entry error: {e}"),
}
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
use tokio::io::AsyncReadExt;
let mut object = self.store.get(key).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_get '{key}' failed: {e}"))
})?;
let mut buf = Vec::new();
object.read_to_end(&mut buf).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_get '{key}' read failed: {e}"))
})?;
SpoolItem::from_bytes(&buf)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
self.store.delete(key).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_delete '{key}' failed: {e}"))
})
}
}
#[cfg(feature = "gcs")]
#[derive(Clone)]
pub struct GcsBackend {
client: reqwest::Client,
auth: Option<crate::auth::GcpAuth>,
bucket: String,
prefix: String,
endpoint: String,
}
#[cfg(feature = "gcs")]
impl GcsBackend {
pub async fn open(bucket: &str, prefix: &str) -> Result<Self, ToolError> {
Ok(Self {
client: reqwest::Client::new(),
auth: Some(crate::auth::GcpAuth::new()),
bucket: bucket.to_string(),
prefix: Self::norm_prefix(prefix),
endpoint: "https://storage.googleapis.com".to_string(),
})
}
pub fn with_endpoint(bucket: &str, prefix: &str, endpoint: &str, use_adc: bool) -> Self {
Self {
client: reqwest::Client::new(),
auth: use_adc.then(crate::auth::GcpAuth::new),
bucket: bucket.to_string(),
prefix: Self::norm_prefix(prefix),
endpoint: endpoint.trim_end_matches('/').to_string(),
}
}
fn norm_prefix(prefix: &str) -> String {
if prefix.is_empty() || prefix.ends_with('/') {
prefix.to_string()
} else {
format!("{prefix}/")
}
}
fn name_for(&self, key: &str) -> String {
format!("{}{}", self.prefix, key)
}
fn enc_path(name: &str) -> String {
let mut out = String::with_capacity(name.len() * 3);
for b in name.bytes() {
match b {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => {
out.push(b as char)
}
_ => out.push_str(&format!("%{b:02X}")),
}
}
out
}
async fn auth_header(&self) -> Result<Option<String>, ToolError> {
match &self.auth {
Some(gcp) => {
let token = gcp
.get_token(&["https://www.googleapis.com/auth/devstorage.read_write"])
.await?;
Ok(Some(format!("Bearer {token}")))
}
None => Ok(None),
}
}
}
#[cfg(feature = "gcs")]
#[async_trait]
impl SpoolBackend for GcsBackend {
fn kind(&self) -> &'static str {
"gcs"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let name = self.name_for(&item.object_key());
let url = format!("{}/upload/storage/v1/b/{}/o", self.endpoint, self.bucket);
let mut req = self
.client
.post(&url)
.query(&[("uploadType", "media"), ("name", name.as_str())])
.header("Content-Type", "application/json")
.body(item.to_bytes());
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs put '{name}' failed: {e}"))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool gcs put '{name}' to '{}' returned {status}: {body}",
self.bucket
)));
}
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
let url = format!("{}/storage/v1/b/{}/o", self.endpoint, self.bucket);
let mut metas = Vec::new();
let mut page_token: Option<String> = None;
loop {
let mut query: Vec<(&str, String)> = vec![("prefix", self.prefix.clone())];
if let Some(tok) = &page_token {
query.push(("pageToken", tok.clone()));
}
let mut req = self.client.get(&url).query(&query);
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs list '{}' failed: {e}", self.bucket))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool gcs list '{}' returned {status}: {body}",
self.bucket
)));
}
let page: GcsListResponse = resp.json().await.map_err(|e| {
ToolError::Json(format!("spool gcs list decode failed: {e}"))
})?;
for obj in page.items {
let Some(key) = obj.name.strip_prefix(&self.prefix) else {
continue;
};
if key.is_empty() {
continue; }
let size = obj.size.parse::<u64>().unwrap_or(0);
metas.push(SpoolMeta {
key: key.to_string(),
size,
});
}
match page.next_page_token {
Some(tok) if !tok.is_empty() => page_token = Some(tok),
_ => break,
}
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
let name = self.name_for(key);
let url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
Self::enc_path(&name)
);
let mut req = self.client.get(&url).query(&[("alt", "media")]);
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs get '{name}' failed: {e}"))
})?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
return Err(ToolError::ExecutionFailed(format!(
"spool gcs get '{name}' returned {status}: {body}"
)));
}
let bytes = resp.bytes().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs get '{name}' read failed: {e}"))
})?;
SpoolItem::from_bytes(&bytes)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
let name = self.name_for(key);
let url = format!(
"{}/storage/v1/b/{}/o/{}",
self.endpoint,
self.bucket,
Self::enc_path(&name)
);
let mut req = self.client.delete(&url);
if let Some(auth) = self.auth_header().await? {
req = req.header("Authorization", auth);
}
let resp = req.send().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool gcs delete '{name}' failed: {e}"))
})?;
if resp.status().is_success() || resp.status() == reqwest::StatusCode::NOT_FOUND {
Ok(())
} else {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
Err(ToolError::ExecutionFailed(format!(
"spool gcs delete '{name}' returned {status}: {body}"
)))
}
}
}
#[cfg(feature = "gcs")]
#[derive(serde::Deserialize)]
struct GcsListResponse {
#[serde(default)]
items: Vec<GcsObject>,
#[serde(rename = "nextPageToken")]
next_page_token: Option<String>,
}
#[cfg(feature = "gcs")]
#[derive(serde::Deserialize)]
struct GcsObject {
name: String,
#[serde(default)]
size: String,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::source::PolledMessage;
fn item(seq: u64, id: &str, data: serde_json::Value) -> SpoolItem {
let msg = PolledMessage {
id: id.to_string(),
data,
headers: serde_json::Map::new(),
attributes: serde_json::Value::Null,
metadata: serde_json::Value::Null,
ack_id: None,
};
SpoolItem::new("subscriptions/t", "nats", msg, None, seq, None, "default", "circuit_open", seq)
}
#[tokio::test]
async fn local_disk_put_list_get_delete_roundtrip() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend::open(&tmp).await.unwrap();
assert!(backend.is_empty().await.unwrap());
backend.put(&item(2, "b", serde_json::json!({"v": 2}))).await.unwrap();
backend.put(&item(1, "a", serde_json::json!({"v": 1}))).await.unwrap();
backend.put(&item(3, "c", serde_json::json!({"v": 3}))).await.unwrap();
let metas = backend.list().await.unwrap();
assert_eq!(metas.len(), 3);
let got = backend.get(&metas[0].key).await.unwrap();
assert_eq!(got.recv_seq, 1);
assert_eq!(backend.get(&metas[2].key).await.unwrap().recv_seq, 3);
assert!(backend.total_bytes().await.unwrap() > 0);
backend.delete(&metas[0].key).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 2);
backend.delete(&metas[0].key).await.unwrap();
let _ = std::fs::remove_dir_all(&tmp);
}
#[tokio::test]
async fn local_disk_list_missing_dir_is_empty() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-missing-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend { dir: tmp.clone() };
assert_eq!(backend.list().await.unwrap().len(), 0);
}
#[tokio::test]
async fn local_disk_overwrite_is_idempotent_on_key() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-idem-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend::open(&tmp).await.unwrap();
backend.put(&item(7, "same", serde_json::json!(1))).await.unwrap();
backend.put(&item(7, "same", serde_json::json!(2))).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 1);
let _ = std::fs::remove_dir_all(&tmp);
}
#[cfg(feature = "gcs")]
#[test]
fn gcs_prefix_is_normalized_to_trailing_slash() {
assert_eq!(GcsBackend::norm_prefix("subscriptions/orders"), "subscriptions/orders/");
assert_eq!(GcsBackend::norm_prefix("subscriptions/orders/"), "subscriptions/orders/");
assert_eq!(GcsBackend::norm_prefix(""), "");
}
#[cfg(feature = "gcs")]
#[test]
fn gcs_name_for_joins_prefix_and_key() {
let b = GcsBackend::with_endpoint("bkt", "sub/spool", "http://x", false);
assert_eq!(b.name_for("00000000000000000001-abc"), "sub/spool/00000000000000000001-abc");
}
#[cfg(feature = "gcs")]
#[test]
fn gcs_enc_path_encodes_slashes_and_reserved() {
assert_eq!(GcsBackend::enc_path("a/b-c.d_e~f"), "a%2Fb-c.d_e~f");
assert_eq!(GcsBackend::enc_path("k=1&v"), "k%3D1%26v");
assert_eq!(GcsBackend::enc_path("AZaz09-._~"), "AZaz09-._~");
}
#[cfg(feature = "gcs")]
#[tokio::test]
#[ignore]
async fn gcs_live_put_list_get_delete_roundtrip() {
let Ok(bucket) = std::env::var("NOETL_GCS_TEST_BUCKET") else {
eprintln!("skipping: NOETL_GCS_TEST_BUCKET unset");
return;
};
let prefix = format!("noetl-spool-test/{}", std::process::id());
let backend = GcsBackend::open(&bucket, &prefix).await.unwrap();
assert!(backend.is_empty().await.unwrap(), "test prefix must start empty");
backend.put(&item(2, "b", serde_json::json!({"v": 2}))).await.unwrap();
backend.put(&item(1, "a", serde_json::json!({"v": 1}))).await.unwrap();
backend.put(&item(3, "c", serde_json::json!({"v": 3}))).await.unwrap();
let metas = backend.list().await.unwrap();
assert_eq!(metas.len(), 3, "all three items listed");
assert_eq!(backend.get(&metas[0].key).await.unwrap().recv_seq, 1);
assert_eq!(backend.get(&metas[2].key).await.unwrap().recv_seq, 3);
assert!(backend.total_bytes().await.unwrap() > 0);
let got = backend.get(&metas[0].key).await.unwrap();
assert_eq!(got.message_id, "a");
backend.delete(&metas[0].key).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 2);
backend.delete(&metas[0].key).await.unwrap();
for m in backend.list().await.unwrap() {
backend.delete(&m.key).await.unwrap();
}
assert!(backend.is_empty().await.unwrap());
}
}