use async_trait::async_trait;
use serde::{Deserialize, Deserializer, Serialize};
use serde_yaml::Value;
use std::collections::HashMap;
use thiserror::Error;
use tokio::sync::oneshot;
use tokio::time::Duration;
pub mod config;
pub use runtime::Runtime;
pub(crate) mod modules;
mod runtime;
pub const SHUTDOWN_MESSAGE_ID: &str = "SHUTDOWN_SIGNAL";
pub(crate) fn deserialize_optional_duration<'de, D>(
deserializer: D,
) -> Result<Option<Duration>, D::Error>
where
D: Deserializer<'de>,
{
let opt: Option<String> = Option::deserialize(deserializer)?;
match opt {
Some(s) => parse_duration::parse(&s)
.map(Some)
.map_err(serde::de::Error::custom),
None => Ok(None),
}
}
pub(crate) fn deserialize_duration<'de, D>(deserializer: D) -> Result<Duration, D::Error>
where
D: Deserializer<'de>,
{
let s: String = String::deserialize(deserializer)?;
parse_duration::parse(&s).map_err(serde::de::Error::custom)
}
#[derive(Deserialize, Default, Clone)]
pub struct BatchingPolicy {
#[serde(default, deserialize_with = "deserialize_optional_duration")]
pub duration: Option<Duration>,
pub size: Option<usize>,
pub max_batch_bytes: Option<usize>,
}
impl BatchingPolicy {
pub fn effective_size(&self) -> usize {
self.size.unwrap_or(500)
}
pub fn effective_duration(&self) -> Duration {
self.duration.unwrap_or_else(|| Duration::from_secs(10))
}
pub fn effective_max_batch_bytes(&self) -> usize {
self.max_batch_bytes.unwrap_or(10_485_760)
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum BackoffStrategy {
Constant,
Linear,
#[default]
Exponential,
}
fn default_max_retries() -> u32 {
3
}
fn default_initial_wait() -> Duration {
Duration::from_secs(1)
}
fn default_max_wait() -> Duration {
Duration::from_secs(30)
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct RetryPolicy {
#[serde(default = "default_max_retries")]
pub max_retries: u32,
#[serde(
default = "default_initial_wait",
deserialize_with = "deserialize_duration"
)]
pub initial_wait: Duration,
#[serde(
default = "default_max_wait",
deserialize_with = "deserialize_duration"
)]
pub max_wait: Duration,
#[serde(default)]
pub backoff: BackoffStrategy,
}
impl RetryPolicy {
pub fn compute_wait(&self, attempt: u32) -> Duration {
let wait = match self.backoff {
BackoffStrategy::Constant => self.initial_wait,
BackoffStrategy::Linear => self.initial_wait.saturating_mul(attempt + 1),
BackoffStrategy::Exponential => self
.initial_wait
.saturating_mul(1u32.checked_shl(attempt).unwrap_or(u32::MAX)),
};
wait.min(self.max_wait)
}
}
#[derive(Serialize, Deserialize, Default, Clone, Debug, PartialEq)]
pub enum MessageType {
#[default]
Default,
BeginStream(String),
EndStream(String),
}
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct Message {
pub bytes: Vec<u8>,
pub metadata: HashMap<String, Value>,
pub message_type: MessageType,
pub stream_id: Option<String>,
}
pub type MessageBatch = Vec<Message>;
#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)]
pub struct MetricEntry {
pub total_received: u64,
pub total_completed: u64,
pub total_process_errors: u64,
pub total_output_errors: u64,
pub total_filtered: u64,
pub streams_started: u64,
pub streams_completed: u64,
pub duplicates_rejected: u64,
pub stale_entries_removed: u64,
pub in_flight: usize,
pub throughput_per_sec: f64,
pub cpu_usage_percent: Option<f32>,
pub memory_used_bytes: Option<u64>,
pub memory_total_bytes: Option<u64>,
pub input_bytes: u64,
pub output_bytes: u64,
pub bytes_per_sec: f64,
pub latency_avg_ms: f64,
pub latency_min_ms: f64,
pub latency_max_ms: f64,
pub total_retries: u64,
pub total_retries_exhausted: u64,
}
pub type CallbackChan = oneshot::Sender<Status>;
pub fn new_callback_chan() -> (oneshot::Sender<Status>, oneshot::Receiver<Status>) {
oneshot::channel()
}
#[derive(Clone, Debug)]
pub enum Status {
Processed,
Errored(Vec<String>),
}
#[async_trait]
pub trait Closer {
async fn close(&mut self) -> Result<(), Error> {
Ok(())
}
}
#[async_trait]
pub trait Input: Closer {
async fn read(&mut self) -> Result<(Message, Option<CallbackChan>), Error>;
}
#[async_trait]
pub trait InputBatch: Closer {
async fn read_batch(&mut self) -> Result<(MessageBatch, Option<CallbackChan>), Error>;
}
#[async_trait]
pub trait Output: Closer {
async fn write(&mut self, message: Message) -> Result<(), Error>;
}
#[async_trait]
pub trait OutputBatch: Closer {
async fn write_batch(&mut self, message_batch: MessageBatch) -> Result<(), Error>;
async fn batch_size(&self) -> usize {
500
}
async fn interval(&self) -> Duration {
Duration::from_secs(10)
}
async fn max_batch_bytes(&self) -> usize {
10_485_760
}
}
#[async_trait]
pub trait Processor: Closer {
async fn process(&self, message: Message) -> Result<MessageBatch, Error>;
}
#[async_trait]
pub trait Metrics: Closer + Send {
fn record(&mut self, metric: MetricEntry);
}
#[derive(Debug, Error)]
pub enum Error {
#[error("Unable to serialize YAML object")]
UnableToSerializeYamlObject(
#[from]
#[source]
serde_yaml::Error,
),
#[error("Unable to serialize JSON object")]
UnableToSerializeJsonObject(
#[from]
#[source]
serde_json::Error,
),
#[error("Validation error: {0}")]
Validation(String),
#[error("Execution error: {0}")]
ExecutionError(String),
#[error("End of input reached")]
EndOfInput,
#[error("Internal server error: unable to secure lock")]
UnableToSecureLock,
#[error("Duplicate registered name: {0}")]
DuplicateRegisteredName(String),
#[error("Invalid validation schema: {0}")]
InvalidValidationSchema(String),
#[error("Configuration validation failed: {0}")]
ConfigFailedValidation(String),
#[error("Configuration item not found: {0}")]
ConfigurationItemNotFound(String),
#[error("Not yet implemented")]
NotYetImplemented,
#[error("Pipeline processing error: {0}")]
UnableToSendToChannel(String),
#[error("Channel receive error")]
RecvChannelError(
#[from]
#[source]
flume::RecvError,
),
#[error("Processor failure: {0}")]
ProcessingError(String),
#[error("Conditional check failed")]
ConditionalCheckfailed,
#[error("Input error: {0}")]
InputError(String),
#[error("Output error: {0}")]
OutputError(String),
#[error("No input to return")]
NoInputToReturn,
#[error("unretryable error: {0}")]
UnRetryable(String),
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_batching_policy_deserialize_seconds() {
let yaml = r#"
duration: "10s"
size: 500
"#;
let policy: BatchingPolicy = serde_yaml::from_str(yaml).unwrap();
assert_eq!(policy.duration, Some(Duration::from_secs(10)));
assert_eq!(policy.size, Some(500));
}
#[test]
fn test_batching_policy_deserialize_milliseconds() {
let yaml = r#"
duration: "100ms"
size: 100
"#;
let policy: BatchingPolicy = serde_yaml::from_str(yaml).unwrap();
assert_eq!(policy.duration, Some(Duration::from_millis(100)));
assert_eq!(policy.size, Some(100));
}
#[test]
fn test_batching_policy_deserialize_minutes() {
let yaml = r#"
duration: "5m"
"#;
let policy: BatchingPolicy = serde_yaml::from_str(yaml).unwrap();
assert_eq!(policy.duration, Some(Duration::from_secs(300)));
assert!(policy.size.is_none());
}
#[test]
fn test_batching_policy_deserialize_complex_duration() {
let yaml = r#"
duration: "1m 30s"
size: 1000
"#;
let policy: BatchingPolicy = serde_yaml::from_str(yaml).unwrap();
assert_eq!(policy.duration, Some(Duration::from_secs(90)));
assert_eq!(policy.size, Some(1000));
}
#[test]
fn test_batching_policy_deserialize_no_duration() {
let yaml = r#"
size: 250
"#;
let policy: BatchingPolicy = serde_yaml::from_str(yaml).unwrap();
assert!(policy.duration.is_none());
assert_eq!(policy.size, Some(250));
}
#[test]
fn test_batching_policy_effective_defaults() {
let policy = BatchingPolicy::default();
assert_eq!(policy.effective_size(), 500);
assert_eq!(policy.effective_duration(), Duration::from_secs(10));
}
#[test]
fn test_batching_policy_effective_with_values() {
let yaml = r#"
duration: "30s"
size: 1000
"#;
let policy: BatchingPolicy = serde_yaml::from_str(yaml).unwrap();
assert_eq!(policy.effective_size(), 1000);
assert_eq!(policy.effective_duration(), Duration::from_secs(30));
}
#[test]
fn test_deserialize_duration_valid() {
#[derive(Deserialize)]
struct T {
#[serde(deserialize_with = "deserialize_duration")]
d: Duration,
}
let t: T = serde_yaml::from_str("d: \"5s\"").expect("failed to deserialize");
assert_eq!(t.d, Duration::from_secs(5));
}
#[test]
fn test_deserialize_duration_millis() {
#[derive(Deserialize)]
struct T {
#[serde(deserialize_with = "deserialize_duration")]
d: Duration,
}
let t: T = serde_yaml::from_str("d: \"100ms\"").expect("failed to deserialize");
assert_eq!(t.d, Duration::from_millis(100));
}
#[test]
fn test_retry_policy_defaults() {
let policy: RetryPolicy = serde_yaml::from_str("{}").expect("failed to deserialize");
assert_eq!(policy.max_retries, 3);
assert_eq!(policy.initial_wait, Duration::from_secs(1));
assert_eq!(policy.max_wait, Duration::from_secs(30));
assert!(matches!(policy.backoff, BackoffStrategy::Exponential));
}
#[test]
fn test_retry_policy_custom() {
let yaml = r#"
max_retries: 5
initial_wait: "2s"
max_wait: "60s"
backoff: "linear"
"#;
let policy: RetryPolicy = serde_yaml::from_str(yaml).expect("failed to deserialize");
assert_eq!(policy.max_retries, 5);
assert_eq!(policy.initial_wait, Duration::from_secs(2));
assert_eq!(policy.max_wait, Duration::from_secs(60));
assert!(matches!(policy.backoff, BackoffStrategy::Linear));
}
#[test]
fn test_retry_policy_constant_backoff() {
let yaml = r#"
max_retries: 3
initial_wait: "2s"
backoff: "constant"
"#;
let policy: RetryPolicy = serde_yaml::from_str(yaml).expect("failed to deserialize");
assert!(matches!(policy.backoff, BackoffStrategy::Constant));
}
#[test]
fn test_compute_wait_exponential() {
let policy = RetryPolicy {
max_retries: 5,
initial_wait: Duration::from_secs(1),
max_wait: Duration::from_secs(30),
backoff: BackoffStrategy::Exponential,
};
assert_eq!(policy.compute_wait(0), Duration::from_secs(1));
assert_eq!(policy.compute_wait(1), Duration::from_secs(2));
assert_eq!(policy.compute_wait(2), Duration::from_secs(4));
assert_eq!(policy.compute_wait(3), Duration::from_secs(8));
assert_eq!(policy.compute_wait(10), Duration::from_secs(30));
}
#[test]
fn test_compute_wait_linear() {
let policy = RetryPolicy {
max_retries: 5,
initial_wait: Duration::from_secs(2),
max_wait: Duration::from_secs(30),
backoff: BackoffStrategy::Linear,
};
assert_eq!(policy.compute_wait(0), Duration::from_secs(2));
assert_eq!(policy.compute_wait(1), Duration::from_secs(4));
assert_eq!(policy.compute_wait(2), Duration::from_secs(6));
assert_eq!(policy.compute_wait(20), Duration::from_secs(30));
}
#[test]
fn test_compute_wait_constant() {
let policy = RetryPolicy {
max_retries: 5,
initial_wait: Duration::from_secs(3),
max_wait: Duration::from_secs(30),
backoff: BackoffStrategy::Constant,
};
assert_eq!(policy.compute_wait(0), Duration::from_secs(3));
assert_eq!(policy.compute_wait(1), Duration::from_secs(3));
assert_eq!(policy.compute_wait(5), Duration::from_secs(3));
}
#[test]
fn test_unretryable_error() {
let err = Error::UnRetryable("auth failed".into());
assert_eq!(format!("{err}"), "unretryable error: auth failed");
assert!(matches!(err, Error::UnRetryable(_)));
}
}