use super::context::PipelineCtx;
use std::future::Future;
use std::pin::Pin;
use std::time::Duration;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PipelineState {
Initializing,
Running,
Paused,
Draining,
Stopped,
Error,
}
#[derive(Debug, Clone)]
pub struct PipelineSettings {
pub max_concurrent_executions: u32,
pub execution_timeout: Duration,
pub circuit_breaker_threshold: f64,
pub circuit_breaker_window: Duration,
pub max_concurrent_versions: u32,
pub drain_timeout: Duration,
pub drain_grace_period: Duration,
}
impl Default for PipelineSettings {
fn default() -> Self {
Self {
max_concurrent_executions: 100,
execution_timeout: Duration::from_secs(60),
circuit_breaker_threshold: 0.05, circuit_breaker_window: Duration::from_secs(60),
max_concurrent_versions: 5,
drain_timeout: Duration::from_secs(30 * 60), drain_grace_period: Duration::from_secs(5 * 60), }
}
}
impl PipelineSettings {
pub fn with_concurrency(mut self, max: u32) -> Self {
self.max_concurrent_executions = max;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.execution_timeout = timeout;
self
}
pub fn with_circuit_breaker(mut self, threshold: f64, window: Duration) -> Self {
self.circuit_breaker_threshold = threshold;
self.circuit_breaker_window = window;
self
}
}
#[derive(Debug, Clone)]
pub struct PipelineConfig {
pub id: String,
pub driver: String,
pub settings: PipelineSettings,
pub config: serde_yaml::Value,
}
impl PipelineConfig {
pub fn new(id: impl Into<String>) -> Self {
Self {
id: id.into(),
driver: String::new(),
settings: PipelineSettings::default(),
config: serde_yaml::Value::Null,
}
}
pub fn with_driver(mut self, driver: impl Into<String>) -> Self {
self.driver = driver.into();
self
}
pub fn with_settings(mut self, settings: PipelineSettings) -> Self {
self.settings = settings;
self
}
pub fn with_config(mut self, config: serde_yaml::Value) -> Self {
self.config = config;
self
}
pub fn get_string(&self, key: &str) -> Option<&str> {
self.config.get(key).and_then(|v| v.as_str())
}
pub fn get_i64(&self, key: &str) -> Option<i64> {
self.config.get(key).and_then(|v| v.as_i64())
}
pub fn get_f64(&self, key: &str) -> Option<f64> {
self.config.get(key).and_then(|v| v.as_f64())
}
pub fn get_bool(&self, key: &str) -> Option<bool> {
self.config.get(key).and_then(|v| v.as_bool())
}
}
pub type HookFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
pub trait PipelineHook: Send + Sync {
fn on_start<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
let _ = ctx;
Box::pin(async {})
}
fn on_drain<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
let _ = ctx;
Box::pin(async {})
}
fn on_stop<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
let _ = ctx;
Box::pin(async {})
}
fn on_error<'a>(&'a self, ctx: PipelineCtx, error: &'a str) -> HookFuture<'a> {
let _ = (ctx, error);
Box::pin(async {})
}
fn on_trace_complete<'a>(
&'a self,
ctx: PipelineCtx,
trace_id: crate::types::TraceId,
) -> HookFuture<'a> {
let _ = (ctx, trace_id);
Box::pin(async {})
}
fn on_trace_failed<'a>(
&'a self,
ctx: PipelineCtx,
trace_id: crate::types::TraceId,
error: &'a str,
) -> HookFuture<'a> {
let _ = (ctx, trace_id, error);
Box::pin(async {})
}
}
#[allow(dead_code)]
pub struct DefaultPipelineHook;
impl PipelineHook for DefaultPipelineHook {}
#[cfg(test)]
mod pipeline_hook_tests {
use super::*;
#[tokio::test]
async fn default_hook_does_nothing() {
let hook = DefaultPipelineHook;
let ctx = PipelineCtx::new("test_pipeline", 1);
hook.on_start(ctx.clone()).await;
hook.on_drain(ctx.clone()).await;
hook.on_stop(ctx.clone()).await;
hook.on_error(ctx.clone(), "test error").await;
hook.on_trace_complete(ctx.clone(), crate::types::TraceId::new())
.await;
hook.on_trace_failed(ctx, crate::types::TraceId::new(), "test failure")
.await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn pipeline_settings_default() {
let settings = PipelineSettings::default();
assert_eq!(settings.max_concurrent_executions, 100);
assert_eq!(settings.execution_timeout, Duration::from_secs(60));
}
#[test]
fn pipeline_config_creation() {
let mut config_map = serde_yaml::Mapping::new();
config_map.insert(
serde_yaml::Value::String("api_key".to_string()),
serde_yaml::Value::String("sk_test_123".to_string()),
);
config_map.insert(
serde_yaml::Value::String("threshold".to_string()),
serde_yaml::Value::Number(0.8.into()),
);
let config = PipelineConfig::new("order_processing")
.with_driver("OrderPipeline")
.with_config(serde_yaml::Value::Mapping(config_map));
assert_eq!(config.id, "order_processing");
assert_eq!(config.driver, "OrderPipeline");
assert_eq!(config.get_string("api_key"), Some("sk_test_123"));
}
}