xerv_core/traits/
pipeline.rs

1//! Pipeline configuration and lifecycle hooks.
2
3use super::context::PipelineCtx;
4use std::future::Future;
5use std::pin::Pin;
6use std::time::Duration;
7
8/// Pipeline state.
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum PipelineState {
11    /// Pipeline is initializing.
12    Initializing,
13    /// Pipeline is running and accepting new traces.
14    Running,
15    /// Pipeline is paused (not accepting new traces, but processing existing ones).
16    Paused,
17    /// Pipeline is draining (not accepting new traces, waiting for existing to complete).
18    Draining,
19    /// Pipeline has stopped.
20    Stopped,
21    /// Pipeline has encountered an error.
22    Error,
23}
24
25/// Pipeline runtime settings.
26#[derive(Debug, Clone)]
27pub struct PipelineSettings {
28    /// Maximum concurrent trace executions.
29    pub max_concurrent_executions: u32,
30    /// Execution timeout.
31    pub execution_timeout: Duration,
32    /// Error rate threshold for circuit breaker (0.0 to 1.0).
33    pub circuit_breaker_threshold: f64,
34    /// Window for measuring error rate.
35    pub circuit_breaker_window: Duration,
36    /// Maximum concurrent versions during deployment.
37    pub max_concurrent_versions: u32,
38    /// Drain timeout for deployments.
39    pub drain_timeout: Duration,
40    /// Grace period before hard drain.
41    pub drain_grace_period: Duration,
42}
43
44impl Default for PipelineSettings {
45    fn default() -> Self {
46        Self {
47            max_concurrent_executions: 100,
48            execution_timeout: Duration::from_secs(60),
49            circuit_breaker_threshold: 0.05, // 5%
50            circuit_breaker_window: Duration::from_secs(60),
51            max_concurrent_versions: 5,
52            drain_timeout: Duration::from_secs(30 * 60), // 30 minutes
53            drain_grace_period: Duration::from_secs(5 * 60), // 5 minutes
54        }
55    }
56}
57
58impl PipelineSettings {
59    /// Set max concurrent executions.
60    pub fn with_concurrency(mut self, max: u32) -> Self {
61        self.max_concurrent_executions = max;
62        self
63    }
64
65    /// Set execution timeout.
66    pub fn with_timeout(mut self, timeout: Duration) -> Self {
67        self.execution_timeout = timeout;
68        self
69    }
70
71    /// Set circuit breaker threshold.
72    pub fn with_circuit_breaker(mut self, threshold: f64, window: Duration) -> Self {
73        self.circuit_breaker_threshold = threshold;
74        self.circuit_breaker_window = window;
75        self
76    }
77}
78
79/// Configuration for a pipeline.
80#[derive(Debug, Clone)]
81pub struct PipelineConfig {
82    /// Pipeline ID.
83    pub id: String,
84    /// Pipeline driver (Rust struct name).
85    pub driver: String,
86    /// Runtime settings.
87    pub settings: PipelineSettings,
88    /// Global configuration values (from YAML).
89    pub config: serde_yaml::Value,
90}
91
92impl PipelineConfig {
93    /// Create a new pipeline config.
94    pub fn new(id: impl Into<String>) -> Self {
95        Self {
96            id: id.into(),
97            driver: String::new(),
98            settings: PipelineSettings::default(),
99            config: serde_yaml::Value::Null,
100        }
101    }
102
103    /// Set the driver.
104    pub fn with_driver(mut self, driver: impl Into<String>) -> Self {
105        self.driver = driver.into();
106        self
107    }
108
109    /// Set the settings.
110    pub fn with_settings(mut self, settings: PipelineSettings) -> Self {
111        self.settings = settings;
112        self
113    }
114
115    /// Set the config.
116    pub fn with_config(mut self, config: serde_yaml::Value) -> Self {
117        self.config = config;
118        self
119    }
120
121    /// Get a config value as string.
122    pub fn get_string(&self, key: &str) -> Option<&str> {
123        self.config.get(key).and_then(|v| v.as_str())
124    }
125
126    /// Get a config value as i64.
127    pub fn get_i64(&self, key: &str) -> Option<i64> {
128        self.config.get(key).and_then(|v| v.as_i64())
129    }
130
131    /// Get a config value as f64.
132    pub fn get_f64(&self, key: &str) -> Option<f64> {
133        self.config.get(key).and_then(|v| v.as_f64())
134    }
135
136    /// Get a config value as bool.
137    pub fn get_bool(&self, key: &str) -> Option<bool> {
138        self.config.get(key).and_then(|v| v.as_bool())
139    }
140}
141
142/// Boxed async result for hooks.
143pub type HookFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
144
145/// Lifecycle hooks for a pipeline.
146///
147/// These hooks allow pipelines to execute code at key lifecycle events.
148pub trait PipelineHook: Send + Sync {
149    /// Called when the pipeline starts.
150    fn on_start<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
151        let _ = ctx;
152        Box::pin(async {})
153    }
154
155    /// Called when the pipeline begins draining (preparing for shutdown/upgrade).
156    fn on_drain<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
157        let _ = ctx;
158        Box::pin(async {})
159    }
160
161    /// Called when the pipeline stops.
162    fn on_stop<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
163        let _ = ctx;
164        Box::pin(async {})
165    }
166
167    /// Called when the pipeline encounters an error.
168    fn on_error<'a>(&'a self, ctx: PipelineCtx, error: &'a str) -> HookFuture<'a> {
169        let _ = (ctx, error);
170        Box::pin(async {})
171    }
172
173    /// Called when a trace completes successfully.
174    fn on_trace_complete<'a>(
175        &'a self,
176        ctx: PipelineCtx,
177        trace_id: crate::types::TraceId,
178    ) -> HookFuture<'a> {
179        let _ = (ctx, trace_id);
180        Box::pin(async {})
181    }
182
183    /// Called when a trace fails.
184    fn on_trace_failed<'a>(
185        &'a self,
186        ctx: PipelineCtx,
187        trace_id: crate::types::TraceId,
188        error: &'a str,
189    ) -> HookFuture<'a> {
190        let _ = (ctx, trace_id, error);
191        Box::pin(async {})
192    }
193}
194
195/// A default pipeline hook implementation that does nothing.
196///
197/// This is provided as a convenience for users who need a no-op hook
198/// implementation. It's useful when you need to satisfy a `PipelineHook`
199/// bound but don't need any custom hook behavior.
200///
201/// # Example
202///
203/// ```ignore
204/// use xerv_core::traits::pipeline::{DefaultPipelineHook, PipelineHook};
205///
206/// fn setup_pipeline(hook: Option<Box<dyn PipelineHook>>) {
207///     let hook = hook.unwrap_or_else(|| Box::new(DefaultPipelineHook));
208///     // ...
209/// }
210/// ```
211#[allow(dead_code)]
212pub struct DefaultPipelineHook;
213
214impl PipelineHook for DefaultPipelineHook {}
215
216#[cfg(test)]
217mod pipeline_hook_tests {
218    use super::*;
219
220    #[tokio::test]
221    async fn default_hook_does_nothing() {
222        let hook = DefaultPipelineHook;
223        let ctx = PipelineCtx::new("test_pipeline", 1);
224
225        // All hooks should complete without error
226        hook.on_start(ctx.clone()).await;
227        hook.on_drain(ctx.clone()).await;
228        hook.on_stop(ctx.clone()).await;
229        hook.on_error(ctx.clone(), "test error").await;
230        hook.on_trace_complete(ctx.clone(), crate::types::TraceId::new())
231            .await;
232        hook.on_trace_failed(ctx, crate::types::TraceId::new(), "test failure")
233            .await;
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240
241    #[test]
242    fn pipeline_settings_default() {
243        let settings = PipelineSettings::default();
244        assert_eq!(settings.max_concurrent_executions, 100);
245        assert_eq!(settings.execution_timeout, Duration::from_secs(60));
246    }
247
248    #[test]
249    fn pipeline_config_creation() {
250        let mut config_map = serde_yaml::Mapping::new();
251        config_map.insert(
252            serde_yaml::Value::String("api_key".to_string()),
253            serde_yaml::Value::String("sk_test_123".to_string()),
254        );
255        config_map.insert(
256            serde_yaml::Value::String("threshold".to_string()),
257            serde_yaml::Value::Number(0.8.into()),
258        );
259
260        let config = PipelineConfig::new("order_processing")
261            .with_driver("OrderPipeline")
262            .with_config(serde_yaml::Value::Mapping(config_map));
263
264        assert_eq!(config.id, "order_processing");
265        assert_eq!(config.driver, "OrderPipeline");
266        assert_eq!(config.get_string("api_key"), Some("sk_test_123"));
267    }
268}