use async_trait::async_trait;
use crate::error::ToolError;
use super::item::SpoolItem;
#[derive(Debug, Clone, PartialEq)]
pub struct SpoolMeta {
pub key: String,
pub size: u64,
}
#[async_trait]
pub trait SpoolBackend: Send + Sync {
fn kind(&self) -> &'static str;
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError>;
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError>;
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError>;
async fn delete(&self, key: &str) -> Result<(), ToolError>;
async fn total_bytes(&self) -> Result<u64, ToolError> {
Ok(self.list().await?.iter().map(|m| m.size).sum())
}
async fn len(&self) -> Result<usize, ToolError> {
Ok(self.list().await?.len())
}
async fn is_empty(&self) -> Result<bool, ToolError> {
Ok(self.len().await? == 0)
}
}
#[derive(Debug, Clone)]
pub struct LocalDiskBackend {
dir: std::path::PathBuf,
}
impl LocalDiskBackend {
pub async fn open(dir: impl Into<std::path::PathBuf>) -> Result<Self, ToolError> {
let dir = dir.into();
tokio::fs::create_dir_all(&dir).await.map_err(|e| {
ToolError::Io(format!("spool dir '{}' create failed: {e}", dir.display()))
})?;
Ok(Self { dir })
}
fn path_for(&self, key: &str) -> std::path::PathBuf {
self.dir.join(format!("{key}.json"))
}
}
#[async_trait]
impl SpoolBackend for LocalDiskBackend {
fn kind(&self) -> &'static str {
"local_disk"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let path = self.path_for(&item.object_key());
tokio::fs::write(&path, item.to_bytes()).await.map_err(|e| {
ToolError::Io(format!("spool write '{}' failed: {e}", path.display()))
})
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
let mut rd = match tokio::fs::read_dir(&self.dir).await {
Ok(rd) => rd,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(ToolError::Io(format!("spool list failed: {e}"))),
};
let mut metas = Vec::new();
while let Some(entry) = rd
.next_entry()
.await
.map_err(|e| ToolError::Io(format!("spool list entry failed: {e}")))?
{
let name = entry.file_name().to_string_lossy().into_owned();
let Some(key) = name.strip_suffix(".json") else {
continue;
};
let size = entry
.metadata()
.await
.map(|m| m.len())
.unwrap_or(0);
metas.push(SpoolMeta { key: key.to_string(), size });
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
let path = self.path_for(key);
let bytes = tokio::fs::read(&path).await.map_err(|e| {
ToolError::Io(format!("spool read '{}' failed: {e}", path.display()))
})?;
SpoolItem::from_bytes(&bytes)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
let path = self.path_for(key);
match tokio::fs::remove_file(&path).await {
Ok(()) => Ok(()),
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), Err(e) => Err(ToolError::Io(format!(
"spool delete '{}' failed: {e}",
path.display()
))),
}
}
}
#[derive(Clone)]
pub struct NatsObjectBackend {
store: async_nats::jetstream::object_store::ObjectStore,
bucket: String,
}
impl NatsObjectBackend {
pub async fn open(
js: &async_nats::jetstream::Context,
bucket: &str,
) -> Result<Self, ToolError> {
let store = match js.get_object_store(bucket).await {
Ok(s) => s,
Err(_) => js
.create_object_store(async_nats::jetstream::object_store::Config {
bucket: bucket.to_string(),
description: Some("NoETL subscription spool (RFC #90 Phase 4)".to_string()),
..Default::default()
})
.await
.map_err(|e| {
ToolError::ExecutionFailed(format!(
"spool object store bucket '{bucket}' open/create failed: {e}"
))
})?,
};
Ok(Self {
store,
bucket: bucket.to_string(),
})
}
}
#[async_trait]
impl SpoolBackend for NatsObjectBackend {
fn kind(&self) -> &'static str {
"nats_object"
}
async fn put(&self, item: &SpoolItem) -> Result<(), ToolError> {
let key = item.object_key();
let meta = async_nats::jetstream::object_store::ObjectMetadata {
name: key.clone(),
description: Some(item.spool_ref()),
..Default::default()
};
let mut reader = std::io::Cursor::new(item.to_bytes());
self.store.put(meta, &mut reader).await.map_err(|e| {
ToolError::ExecutionFailed(format!(
"spool object_put '{key}' to '{}' failed: {e}",
self.bucket
))
})?;
Ok(())
}
async fn list(&self) -> Result<Vec<SpoolMeta>, ToolError> {
use futures::StreamExt;
let mut stream = self.store.list().await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_list '{}' failed: {e}", self.bucket))
})?;
let mut metas = Vec::new();
while let Some(item) = stream.next().await {
match item {
Ok(info) if !info.deleted => {
metas.push(SpoolMeta {
key: info.name,
size: info.size as u64,
});
}
Ok(_) => {} Err(e) => tracing::warn!(bucket = %self.bucket, "spool list entry error: {e}"),
}
}
metas.sort_by(|a, b| a.key.cmp(&b.key));
Ok(metas)
}
async fn get(&self, key: &str) -> Result<SpoolItem, ToolError> {
use tokio::io::AsyncReadExt;
let mut object = self.store.get(key).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_get '{key}' failed: {e}"))
})?;
let mut buf = Vec::new();
object.read_to_end(&mut buf).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_get '{key}' read failed: {e}"))
})?;
SpoolItem::from_bytes(&buf)
}
async fn delete(&self, key: &str) -> Result<(), ToolError> {
self.store.delete(key).await.map_err(|e| {
ToolError::ExecutionFailed(format!("spool object_delete '{key}' failed: {e}"))
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tools::source::PolledMessage;
fn item(seq: u64, id: &str, data: serde_json::Value) -> SpoolItem {
let msg = PolledMessage {
id: id.to_string(),
data,
headers: serde_json::Map::new(),
attributes: serde_json::Value::Null,
metadata: serde_json::Value::Null,
ack_id: None,
};
SpoolItem::new("subscriptions/t", "nats", msg, None, seq, None, "default", "circuit_open", seq)
}
#[tokio::test]
async fn local_disk_put_list_get_delete_roundtrip() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-test-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend::open(&tmp).await.unwrap();
assert!(backend.is_empty().await.unwrap());
backend.put(&item(2, "b", serde_json::json!({"v": 2}))).await.unwrap();
backend.put(&item(1, "a", serde_json::json!({"v": 1}))).await.unwrap();
backend.put(&item(3, "c", serde_json::json!({"v": 3}))).await.unwrap();
let metas = backend.list().await.unwrap();
assert_eq!(metas.len(), 3);
let got = backend.get(&metas[0].key).await.unwrap();
assert_eq!(got.recv_seq, 1);
assert_eq!(backend.get(&metas[2].key).await.unwrap().recv_seq, 3);
assert!(backend.total_bytes().await.unwrap() > 0);
backend.delete(&metas[0].key).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 2);
backend.delete(&metas[0].key).await.unwrap();
let _ = std::fs::remove_dir_all(&tmp);
}
#[tokio::test]
async fn local_disk_list_missing_dir_is_empty() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-missing-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend { dir: tmp.clone() };
assert_eq!(backend.list().await.unwrap().len(), 0);
}
#[tokio::test]
async fn local_disk_overwrite_is_idempotent_on_key() {
let tmp = std::env::temp_dir().join(format!("noetl-spool-idem-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&tmp);
let backend = LocalDiskBackend::open(&tmp).await.unwrap();
backend.put(&item(7, "same", serde_json::json!(1))).await.unwrap();
backend.put(&item(7, "same", serde_json::json!(2))).await.unwrap();
assert_eq!(backend.len().await.unwrap(), 1);
let _ = std::fs::remove_dir_all(&tmp);
}
}