xerv_core/traits/
pipeline.rs1use super::context::PipelineCtx;
4use std::future::Future;
5use std::pin::Pin;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10pub enum PipelineState {
11 Initializing,
13 Running,
15 Paused,
17 Draining,
19 Stopped,
21 Error,
23}
24
25#[derive(Debug, Clone)]
27pub struct PipelineSettings {
28 pub max_concurrent_executions: u32,
30 pub execution_timeout: Duration,
32 pub circuit_breaker_threshold: f64,
34 pub circuit_breaker_window: Duration,
36 pub max_concurrent_versions: u32,
38 pub drain_timeout: Duration,
40 pub drain_grace_period: Duration,
42}
43
44impl Default for PipelineSettings {
45 fn default() -> Self {
46 Self {
47 max_concurrent_executions: 100,
48 execution_timeout: Duration::from_secs(60),
49 circuit_breaker_threshold: 0.05, circuit_breaker_window: Duration::from_secs(60),
51 max_concurrent_versions: 5,
52 drain_timeout: Duration::from_secs(30 * 60), drain_grace_period: Duration::from_secs(5 * 60), }
55 }
56}
57
58impl PipelineSettings {
59 pub fn with_concurrency(mut self, max: u32) -> Self {
61 self.max_concurrent_executions = max;
62 self
63 }
64
65 pub fn with_timeout(mut self, timeout: Duration) -> Self {
67 self.execution_timeout = timeout;
68 self
69 }
70
71 pub fn with_circuit_breaker(mut self, threshold: f64, window: Duration) -> Self {
73 self.circuit_breaker_threshold = threshold;
74 self.circuit_breaker_window = window;
75 self
76 }
77}
78
79#[derive(Debug, Clone)]
81pub struct PipelineConfig {
82 pub id: String,
84 pub driver: String,
86 pub settings: PipelineSettings,
88 pub config: serde_yaml::Value,
90}
91
92impl PipelineConfig {
93 pub fn new(id: impl Into<String>) -> Self {
95 Self {
96 id: id.into(),
97 driver: String::new(),
98 settings: PipelineSettings::default(),
99 config: serde_yaml::Value::Null,
100 }
101 }
102
103 pub fn with_driver(mut self, driver: impl Into<String>) -> Self {
105 self.driver = driver.into();
106 self
107 }
108
109 pub fn with_settings(mut self, settings: PipelineSettings) -> Self {
111 self.settings = settings;
112 self
113 }
114
115 pub fn with_config(mut self, config: serde_yaml::Value) -> Self {
117 self.config = config;
118 self
119 }
120
121 pub fn get_string(&self, key: &str) -> Option<&str> {
123 self.config.get(key).and_then(|v| v.as_str())
124 }
125
126 pub fn get_i64(&self, key: &str) -> Option<i64> {
128 self.config.get(key).and_then(|v| v.as_i64())
129 }
130
131 pub fn get_f64(&self, key: &str) -> Option<f64> {
133 self.config.get(key).and_then(|v| v.as_f64())
134 }
135
136 pub fn get_bool(&self, key: &str) -> Option<bool> {
138 self.config.get(key).and_then(|v| v.as_bool())
139 }
140}
141
142pub type HookFuture<'a> = Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
144
145pub trait PipelineHook: Send + Sync {
149 fn on_start<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
151 let _ = ctx;
152 Box::pin(async {})
153 }
154
155 fn on_drain<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
157 let _ = ctx;
158 Box::pin(async {})
159 }
160
161 fn on_stop<'a>(&'a self, ctx: PipelineCtx) -> HookFuture<'a> {
163 let _ = ctx;
164 Box::pin(async {})
165 }
166
167 fn on_error<'a>(&'a self, ctx: PipelineCtx, error: &'a str) -> HookFuture<'a> {
169 let _ = (ctx, error);
170 Box::pin(async {})
171 }
172
173 fn on_trace_complete<'a>(
175 &'a self,
176 ctx: PipelineCtx,
177 trace_id: crate::types::TraceId,
178 ) -> HookFuture<'a> {
179 let _ = (ctx, trace_id);
180 Box::pin(async {})
181 }
182
183 fn on_trace_failed<'a>(
185 &'a self,
186 ctx: PipelineCtx,
187 trace_id: crate::types::TraceId,
188 error: &'a str,
189 ) -> HookFuture<'a> {
190 let _ = (ctx, trace_id, error);
191 Box::pin(async {})
192 }
193}
194
195#[allow(dead_code)]
212pub struct DefaultPipelineHook;
213
214impl PipelineHook for DefaultPipelineHook {}
215
216#[cfg(test)]
217mod pipeline_hook_tests {
218 use super::*;
219
220 #[tokio::test]
221 async fn default_hook_does_nothing() {
222 let hook = DefaultPipelineHook;
223 let ctx = PipelineCtx::new("test_pipeline", 1);
224
225 hook.on_start(ctx.clone()).await;
227 hook.on_drain(ctx.clone()).await;
228 hook.on_stop(ctx.clone()).await;
229 hook.on_error(ctx.clone(), "test error").await;
230 hook.on_trace_complete(ctx.clone(), crate::types::TraceId::new())
231 .await;
232 hook.on_trace_failed(ctx, crate::types::TraceId::new(), "test failure")
233 .await;
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240
241 #[test]
242 fn pipeline_settings_default() {
243 let settings = PipelineSettings::default();
244 assert_eq!(settings.max_concurrent_executions, 100);
245 assert_eq!(settings.execution_timeout, Duration::from_secs(60));
246 }
247
248 #[test]
249 fn pipeline_config_creation() {
250 let mut config_map = serde_yaml::Mapping::new();
251 config_map.insert(
252 serde_yaml::Value::String("api_key".to_string()),
253 serde_yaml::Value::String("sk_test_123".to_string()),
254 );
255 config_map.insert(
256 serde_yaml::Value::String("threshold".to_string()),
257 serde_yaml::Value::Number(0.8.into()),
258 );
259
260 let config = PipelineConfig::new("order_processing")
261 .with_driver("OrderPipeline")
262 .with_config(serde_yaml::Value::Mapping(config_map));
263
264 assert_eq!(config.id, "order_processing");
265 assert_eq!(config.driver, "OrderPipeline");
266 assert_eq!(config.get_string("api_key"), Some("sk_test_123"));
267 }
268}