pub mod azure;
pub mod gcs;
mod gcs_auth;
pub mod local;
pub mod placeholder;
pub mod s3;
pub mod stdout;
use std::path::Path;
use crate::config::DestinationConfig;
use crate::error::Result;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteCommitProtocol {
Atomic,
FinalizeOnClose,
Streaming,
}
#[derive(Debug, Clone)]
pub struct DestinationCapabilities {
pub commit_protocol: WriteCommitProtocol,
pub idempotent_overwrite: bool,
pub retry_safe: bool,
pub partial_write_risk: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ObjectMeta {
pub key: String,
pub size_bytes: u64,
}
pub trait Destination: Send + Sync {
fn write(&self, local_path: &Path, remote_key: &str) -> Result<()>;
fn capabilities(&self) -> DestinationCapabilities;
fn list_prefix(&self, prefix: &str) -> Result<Vec<ObjectMeta>> {
let _ = prefix;
anyhow::bail!("list_prefix is not supported by this destination backend")
}
fn read(&self, key: &str) -> Result<Vec<u8>> {
let _ = key;
anyhow::bail!("read is not supported by this destination backend")
}
fn head(&self, key: &str) -> Result<Option<ObjectMeta>> {
let _ = key;
anyhow::bail!("head is not supported by this destination backend")
}
fn r#move(&self, from: &str, to: &str) -> Result<()> {
let _ = (from, to);
anyhow::bail!("move is not supported by this destination backend")
}
}
pub fn log_capabilities(export_name: &str, dest: &dyn Destination, max_retries: u32) {
let caps = dest.capabilities();
log::debug!(
"export '{}': destination commit_protocol={:?} idempotent={} retry_safe={} partial_risk={}",
export_name,
caps.commit_protocol,
caps.idempotent_overwrite,
caps.retry_safe,
caps.partial_write_risk,
);
if !caps.retry_safe && max_retries > 0 {
log::warn!(
"export '{}': destination is not retry-safe (max_retries={}); \
partial artifacts may exist at destination on failure — manual cleanup may be needed",
export_name,
max_retries,
);
}
}
pub fn create_destination(config: &DestinationConfig) -> Result<Box<dyn Destination>> {
use crate::config::DestinationType;
match config.destination_type {
DestinationType::Local => Ok(Box::new(local::LocalDestination::new(config)?)),
DestinationType::S3 => Ok(Box::new(s3::S3Destination::new(config)?)),
DestinationType::Gcs => Ok(Box::new(gcs::GcsDestination::new(config)?)),
DestinationType::Azure => Ok(Box::new(azure::AzureDestination::new(config)?)),
DestinationType::Stdout => Ok(Box::new(stdout::StdoutDestination::new()?)),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::Path;
struct MockDest {
caps: DestinationCapabilities,
}
impl Destination for MockDest {
fn write(&self, _local: &Path, _key: &str) -> Result<()> {
Ok(())
}
fn capabilities(&self) -> DestinationCapabilities {
self.caps.clone()
}
}
fn atomic_safe() -> MockDest {
MockDest {
caps: DestinationCapabilities {
commit_protocol: WriteCommitProtocol::Atomic,
idempotent_overwrite: true,
retry_safe: true,
partial_write_risk: false,
},
}
}
fn streaming_unsafe() -> MockDest {
MockDest {
caps: DestinationCapabilities {
commit_protocol: WriteCommitProtocol::Streaming,
idempotent_overwrite: false,
retry_safe: false,
partial_write_risk: true,
},
}
}
#[test]
fn log_capabilities_retry_safe_no_panic() {
log_capabilities("orders", &atomic_safe(), 3);
}
#[test]
fn log_capabilities_not_retry_safe_with_retries_no_panic() {
log_capabilities("orders", &streaming_unsafe(), 3);
}
#[test]
fn log_capabilities_zero_retries_no_panic() {
log_capabilities("orders", &streaming_unsafe(), 0);
}
#[test]
fn create_destination_local_succeeds() {
use crate::config::{DestinationConfig, DestinationType};
let dir = tempfile::TempDir::new().unwrap();
let config = DestinationConfig {
destination_type: DestinationType::Local,
path: Some(dir.path().to_str().unwrap().to_string()),
..Default::default()
};
let dest = create_destination(&config).unwrap();
let caps = dest.capabilities();
assert_eq!(caps.commit_protocol, WriteCommitProtocol::Atomic);
assert!(caps.idempotent_overwrite);
}
#[test]
fn create_destination_stdout_succeeds() {
use crate::config::{DestinationConfig, DestinationType};
let config = DestinationConfig {
destination_type: DestinationType::Stdout,
..Default::default()
};
let dest = create_destination(&config).unwrap();
let caps = dest.capabilities();
assert_eq!(caps.commit_protocol, WriteCommitProtocol::Streaming);
}
}