Skip to main content

dbx_core/automation/
executor.rs

1//! Execution engine for callable objects
2//!
3//! 통합 실행 엔진으로 UDF, 트리거, 스케줄 작업을 실행합니다.
4
5use super::callable::{Callable, ExecutionContext, Value};
6use crate::error::{DbxError, DbxResult};
7use std::collections::HashMap;
8use std::sync::{Arc, RwLock};
9use std::time::{Duration, Instant};
10
11/// 통합 실행 엔진
12pub struct ExecutionEngine {
13    /// 등록된 callable 객체들
14    callables: RwLock<HashMap<String, Arc<dyn Callable>>>,
15    /// 메트릭 수집기
16    metrics: Arc<RwLock<ExecutionMetrics>>,
17}
18
19impl ExecutionEngine {
20    pub fn new() -> Self {
21        Self {
22            callables: RwLock::new(HashMap::new()),
23            metrics: Arc::new(RwLock::new(ExecutionMetrics::new())),
24        }
25    }
26
27    /// Callable 등록
28    pub fn register(&self, callable: Arc<dyn Callable>) -> DbxResult<()> {
29        let name = callable.name().to_string();
30        let mut callables = self.callables.write().map_err(|_| DbxError::LockPoisoned)?;
31
32        if callables.contains_key(&name) {
33            return Err(DbxError::DuplicateCallable(name));
34        }
35
36        callables.insert(name, callable);
37        Ok(())
38    }
39
40    /// Callable 등록 해제
41    pub fn unregister(&self, name: &str) -> DbxResult<()> {
42        let mut callables = self.callables.write().map_err(|_| DbxError::LockPoisoned)?;
43
44        callables
45            .remove(name)
46            .ok_or_else(|| DbxError::CallableNotFound(name.to_string()))?;
47
48        Ok(())
49    }
50
51    /// Callable 실행
52    pub fn execute(&self, name: &str, ctx: &ExecutionContext, args: &[Value]) -> DbxResult<Value> {
53        // Callable 조회
54        let callables = self.callables.read().map_err(|_| DbxError::LockPoisoned)?;
55
56        let callable = callables
57            .get(name)
58            .ok_or_else(|| DbxError::CallableNotFound(name.to_string()))?
59            .clone();
60
61        drop(callables); // 락 해제
62
63        // 메트릭 시작
64        let start = Instant::now();
65
66        // 실행
67        let result = callable.call(ctx, args);
68
69        // 메트릭 기록
70        let elapsed = start.elapsed();
71        let success = result.is_ok();
72
73        if let Ok(mut metrics) = self.metrics.write() {
74            metrics.record(name, elapsed, success);
75        }
76
77        result
78    }
79
80    /// 등록된 callable 목록
81    pub fn list(&self) -> DbxResult<Vec<String>> {
82        let callables = self.callables.read().map_err(|_| DbxError::LockPoisoned)?;
83
84        Ok(callables.keys().cloned().collect())
85    }
86
87    /// 메트릭 조회
88    pub fn metrics(&self) -> DbxResult<ExecutionMetrics> {
89        let metrics = self.metrics.read().map_err(|_| DbxError::LockPoisoned)?;
90
91        Ok(metrics.clone())
92    }
93}
94
95impl Default for ExecutionEngine {
96    fn default() -> Self {
97        Self::new()
98    }
99}
100
101/// 실행 메트릭
102#[derive(Debug, Clone)]
103pub struct ExecutionMetrics {
104    /// 함수별 호출 횟수
105    pub call_counts: HashMap<String, u64>,
106    /// 함수별 총 실행 시간
107    pub total_durations: HashMap<String, Duration>,
108    /// 함수별 실패 횟수
109    pub error_counts: HashMap<String, u64>,
110}
111
112impl ExecutionMetrics {
113    pub fn new() -> Self {
114        Self {
115            call_counts: HashMap::new(),
116            total_durations: HashMap::new(),
117            error_counts: HashMap::new(),
118        }
119    }
120
121    /// 메트릭 기록
122    pub fn record(&mut self, name: &str, duration: Duration, success: bool) {
123        // 호출 횟수
124        *self.call_counts.entry(name.to_string()).or_insert(0) += 1;
125
126        // 실행 시간
127        *self
128            .total_durations
129            .entry(name.to_string())
130            .or_insert(Duration::ZERO) += duration;
131
132        // 실패 횟수
133        if !success {
134            *self.error_counts.entry(name.to_string()).or_insert(0) += 1;
135        }
136    }
137
138    /// 평균 실행 시간
139    pub fn avg_duration(&self, name: &str) -> Option<Duration> {
140        let total = self.total_durations.get(name)?;
141        let count = self.call_counts.get(name)?;
142
143        if *count == 0 {
144            return None;
145        }
146
147        Some(*total / (*count as u32))
148    }
149
150    /// 성공률
151    pub fn success_rate(&self, name: &str) -> Option<f64> {
152        let total = *self.call_counts.get(name)?;
153        let errors = self.error_counts.get(name).copied().unwrap_or(0);
154
155        if total == 0 {
156            return None;
157        }
158
159        Some((total - errors) as f64 / total as f64)
160    }
161}
162
163impl Default for ExecutionMetrics {
164    fn default() -> Self {
165        Self::new()
166    }
167}
168
169#[cfg(test)]
170mod tests {
171    use super::*;
172    use crate::automation::callable::{DataType, Signature};
173
174    struct TestCallable {
175        name: String,
176        signature: Signature,
177    }
178
179    impl TestCallable {
180        fn new(name: impl Into<String>) -> Self {
181            Self {
182                name: name.into(),
183                signature: Signature {
184                    params: vec![DataType::Int],
185                    return_type: DataType::Int,
186                    is_variadic: false,
187                },
188            }
189        }
190    }
191
192    impl Callable for TestCallable {
193        fn call(&self, _ctx: &ExecutionContext, args: &[Value]) -> DbxResult<Value> {
194            // 단순히 첫 번째 인자를 반환
195            Ok(args.first().cloned().unwrap_or(Value::Null))
196        }
197
198        fn name(&self) -> &str {
199            &self.name
200        }
201
202        fn signature(&self) -> &Signature {
203            &self.signature
204        }
205    }
206
207    #[test]
208    fn test_register_and_execute() {
209        let engine = ExecutionEngine::new();
210        let callable = Arc::new(TestCallable::new("test_func"));
211
212        // 등록
213        engine.register(callable).unwrap();
214
215        // 실행
216        let ctx =
217            ExecutionContext::new(Arc::new(crate::engine::Database::open_in_memory().unwrap()));
218        let result = engine
219            .execute("test_func", &ctx, &[Value::Int(42)])
220            .unwrap();
221
222        assert_eq!(result.as_i64().unwrap(), 42);
223    }
224
225    #[test]
226    fn test_duplicate_registration() {
227        let engine = ExecutionEngine::new();
228        let callable1 = Arc::new(TestCallable::new("test_func"));
229        let callable2 = Arc::new(TestCallable::new("test_func"));
230
231        engine.register(callable1).unwrap();
232        let result = engine.register(callable2);
233
234        assert!(result.is_err());
235    }
236
237    #[test]
238    fn test_metrics() {
239        let engine = ExecutionEngine::new();
240        let callable = Arc::new(TestCallable::new("test_func"));
241
242        engine.register(callable).unwrap();
243
244        let ctx =
245            ExecutionContext::new(Arc::new(crate::engine::Database::open_in_memory().unwrap()));
246
247        // 여러 번 실행
248        for _ in 0..10 {
249            let _ = engine.execute("test_func", &ctx, &[Value::Int(42)]);
250        }
251
252        let metrics = engine.metrics().unwrap();
253        assert_eq!(metrics.call_counts.get("test_func"), Some(&10));
254    }
255}