use faucet_core::DEFAULT_BATCH_SIZE;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "snake_case")]
pub enum S3FileFormat {
#[default]
JsonLines,
JsonArray,
RawText,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct S3SourceConfig {
pub bucket: String,
pub prefix: Option<String>,
pub region: Option<String>,
pub endpoint_url: Option<String>,
pub file_format: S3FileFormat,
pub max_objects: Option<usize>,
pub concurrency: usize,
#[serde(default = "default_batch_size")]
pub batch_size: usize,
#[cfg(feature = "compression")]
#[serde(default)]
pub compression: faucet_core::CompressionConfig,
}
fn default_batch_size() -> usize {
DEFAULT_BATCH_SIZE
}
impl S3SourceConfig {
pub fn new(bucket: impl Into<String>) -> Self {
Self {
bucket: bucket.into(),
prefix: None,
region: None,
endpoint_url: None,
file_format: S3FileFormat::default(),
max_objects: None,
concurrency: 10,
batch_size: DEFAULT_BATCH_SIZE,
#[cfg(feature = "compression")]
compression: faucet_core::CompressionConfig::default(),
}
}
pub fn prefix(mut self, prefix: impl Into<String>) -> Self {
self.prefix = Some(prefix.into());
self
}
pub fn region(mut self, region: impl Into<String>) -> Self {
self.region = Some(region.into());
self
}
pub fn endpoint_url(mut self, url: impl Into<String>) -> Self {
self.endpoint_url = Some(url.into());
self
}
pub fn file_format(mut self, format: S3FileFormat) -> Self {
self.file_format = format;
self
}
pub fn max_objects(mut self, max: usize) -> Self {
self.max_objects = Some(max);
self
}
pub fn concurrency(mut self, concurrency: usize) -> Self {
self.concurrency = concurrency;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
#[cfg(feature = "compression")]
pub fn compression(mut self, c: faucet_core::CompressionConfig) -> Self {
self.compression = c;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_config() {
let config = S3SourceConfig::new("my-bucket");
assert_eq!(config.bucket, "my-bucket");
assert!(config.prefix.is_none());
assert!(config.region.is_none());
assert!(config.endpoint_url.is_none());
assert!(matches!(config.file_format, S3FileFormat::JsonLines));
assert!(config.max_objects.is_none());
}
#[test]
fn builder_methods() {
let config = S3SourceConfig::new("my-bucket")
.prefix("data/")
.region("us-west-2")
.endpoint_url("http://localhost:9000")
.file_format(S3FileFormat::JsonArray)
.max_objects(10);
assert_eq!(config.bucket, "my-bucket");
assert_eq!(config.prefix.as_deref(), Some("data/"));
assert_eq!(config.region.as_deref(), Some("us-west-2"));
assert_eq!(
config.endpoint_url.as_deref(),
Some("http://localhost:9000")
);
assert!(matches!(config.file_format, S3FileFormat::JsonArray));
assert_eq!(config.max_objects, Some(10));
}
#[test]
fn file_format_default_is_json_lines() {
let format = S3FileFormat::default();
assert!(matches!(format, S3FileFormat::JsonLines));
}
#[test]
fn batch_size_defaults_to_default_batch_size() {
let config = S3SourceConfig::new("my-bucket");
assert_eq!(config.batch_size, faucet_core::DEFAULT_BATCH_SIZE);
}
#[test]
fn with_batch_size_overrides_default() {
let config = S3SourceConfig::new("my-bucket").with_batch_size(500);
assert_eq!(config.batch_size, 500);
}
#[test]
fn batch_size_zero_is_accepted_as_no_batching_sentinel() {
let config = S3SourceConfig::new("my-bucket").with_batch_size(0);
assert_eq!(config.batch_size, 0);
assert!(faucet_core::validate_batch_size(config.batch_size).is_ok());
}
#[test]
fn batch_size_above_max_is_rejected_by_validate_batch_size() {
let config =
S3SourceConfig::new("my-bucket").with_batch_size(faucet_core::MAX_BATCH_SIZE + 1);
assert!(faucet_core::validate_batch_size(config.batch_size).is_err());
}
#[test]
fn batch_size_deserializes_from_json() {
let json = r#"{
"bucket": "my-bucket",
"prefix": null,
"region": null,
"endpoint_url": null,
"file_format": "json_lines",
"max_objects": null,
"concurrency": 10,
"batch_size": 250
}"#;
let config: S3SourceConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.batch_size, 250);
}
#[cfg(feature = "compression")]
#[test]
fn compression_default_is_auto() {
let cfg = S3SourceConfig::new("bucket");
assert_eq!(cfg.compression, faucet_core::CompressionConfig::Auto);
}
}