oxify-engine 0.1.0

Workflow execution engine for OxiFY - DAG orchestration, scheduling, and state management
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
//! Execution metrics and statistics
//!
//! This module provides metrics collection for workflow execution monitoring.

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
use uuid::Uuid;

/// Execution metrics collector
#[derive(Debug, Default)]
pub struct ExecutionMetrics {
    /// Total workflows executed
    total_workflows: AtomicU64,

    /// Successful workflow completions
    successful_workflows: AtomicU64,

    /// Failed workflow executions
    failed_workflows: AtomicU64,

    /// Total nodes executed
    total_nodes: AtomicU64,

    /// Successful node completions
    successful_nodes: AtomicU64,

    /// Failed node executions
    failed_nodes: AtomicU64,

    /// Retried nodes
    retried_nodes: AtomicU64,

    /// Timed out nodes
    timed_out_nodes: AtomicU64,

    /// Checkpoints created
    checkpoints_created: AtomicU64,

    /// Executions resumed from checkpoint
    executions_resumed: AtomicU64,

    /// Per-workflow execution times (in milliseconds)
    workflow_durations: Arc<RwLock<Vec<u64>>>,

    /// Per-node execution times (in milliseconds)
    node_durations: Arc<RwLock<Vec<u64>>>,

    /// Active executions
    active_executions: Arc<RwLock<HashMap<Uuid, ExecutionInfo>>>,
}

/// Information about an active execution
#[derive(Debug, Clone)]
pub struct ExecutionInfo {
    /// Workflow ID
    pub workflow_id: Uuid,

    /// Execution ID
    pub execution_id: Uuid,

    /// Start time
    pub started_at: Instant,

    /// Current level being executed
    pub current_level: usize,

    /// Completed nodes count
    pub completed_nodes: usize,

    /// Total nodes count
    pub total_nodes: usize,
}

/// Execution statistics snapshot
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ExecutionStats {
    /// Total workflows executed
    pub total_workflows: u64,

    /// Successful workflow completions
    pub successful_workflows: u64,

    /// Failed workflow executions
    pub failed_workflows: u64,

    /// Success rate (0.0 - 1.0)
    pub workflow_success_rate: f64,

    /// Total nodes executed
    pub total_nodes: u64,

    /// Successful node completions
    pub successful_nodes: u64,

    /// Failed node executions
    pub failed_nodes: u64,

    /// Retried nodes
    pub retried_nodes: u64,

    /// Timed out nodes
    pub timed_out_nodes: u64,

    /// Checkpoints created
    pub checkpoints_created: u64,

    /// Executions resumed
    pub executions_resumed: u64,

    /// Average workflow duration (milliseconds)
    pub avg_workflow_duration_ms: f64,

    /// P50 workflow duration (milliseconds)
    pub p50_workflow_duration_ms: u64,

    /// P95 workflow duration (milliseconds)
    pub p95_workflow_duration_ms: u64,

    /// P99 workflow duration (milliseconds)
    pub p99_workflow_duration_ms: u64,

    /// Average node duration (milliseconds)
    pub avg_node_duration_ms: f64,

    /// Currently active executions
    pub active_executions: usize,
}

impl ExecutionMetrics {
    /// Create a new metrics collector
    pub fn new() -> Self {
        Self::default()
    }

    /// Record workflow started
    pub fn record_workflow_started(
        &self,
        workflow_id: Uuid,
        execution_id: Uuid,
        total_nodes: usize,
    ) {
        self.total_workflows.fetch_add(1, Ordering::Relaxed);

        let info = ExecutionInfo {
            workflow_id,
            execution_id,
            started_at: Instant::now(),
            current_level: 0,
            completed_nodes: 0,
            total_nodes,
        };

        self.active_executions
            .write()
            .unwrap()
            .insert(execution_id, info);
    }

    /// Record workflow completed successfully
    pub fn record_workflow_completed(&self, execution_id: Uuid) {
        self.successful_workflows.fetch_add(1, Ordering::Relaxed);

        if let Some(info) = self
            .active_executions
            .write()
            .unwrap()
            .remove(&execution_id)
        {
            let duration_ms = info.started_at.elapsed().as_millis() as u64;
            self.workflow_durations.write().unwrap().push(duration_ms);
        }
    }

    /// Record workflow failed
    pub fn record_workflow_failed(&self, execution_id: Uuid) {
        self.failed_workflows.fetch_add(1, Ordering::Relaxed);

        if let Some(info) = self
            .active_executions
            .write()
            .unwrap()
            .remove(&execution_id)
        {
            let duration_ms = info.started_at.elapsed().as_millis() as u64;
            self.workflow_durations.write().unwrap().push(duration_ms);
        }
    }

    /// Record node started
    pub fn record_node_started(&self, execution_id: Uuid) {
        self.total_nodes.fetch_add(1, Ordering::Relaxed);

        if let Some(info) = self
            .active_executions
            .write()
            .unwrap()
            .get_mut(&execution_id)
        {
            info.completed_nodes += 1;
        }
    }

    /// Record node completed successfully
    pub fn record_node_completed(&self, duration: Duration) {
        self.successful_nodes.fetch_add(1, Ordering::Relaxed);
        self.node_durations
            .write()
            .unwrap()
            .push(duration.as_millis() as u64);
    }

    /// Record node failed
    pub fn record_node_failed(&self) {
        self.failed_nodes.fetch_add(1, Ordering::Relaxed);
    }

    /// Record node retry
    pub fn record_node_retry(&self) {
        self.retried_nodes.fetch_add(1, Ordering::Relaxed);
    }

    /// Record node timeout
    pub fn record_node_timeout(&self) {
        self.timed_out_nodes.fetch_add(1, Ordering::Relaxed);
    }

    /// Record checkpoint created
    pub fn record_checkpoint_created(&self) {
        self.checkpoints_created.fetch_add(1, Ordering::Relaxed);
    }

    /// Record execution resumed
    pub fn record_execution_resumed(&self) {
        self.executions_resumed.fetch_add(1, Ordering::Relaxed);
    }

    /// Update current level for an execution
    pub fn update_level(&self, execution_id: Uuid, level: usize) {
        if let Some(info) = self
            .active_executions
            .write()
            .unwrap()
            .get_mut(&execution_id)
        {
            info.current_level = level;
        }
    }

    /// Get current statistics
    pub fn get_stats(&self) -> ExecutionStats {
        let total_workflows = self.total_workflows.load(Ordering::Relaxed);
        let successful_workflows = self.successful_workflows.load(Ordering::Relaxed);
        let failed_workflows = self.failed_workflows.load(Ordering::Relaxed);

        let workflow_success_rate = if total_workflows > 0 {
            successful_workflows as f64 / total_workflows as f64
        } else {
            0.0
        };

        let workflow_durations = self.workflow_durations.read().unwrap();
        let (avg_workflow, p50_workflow, p95_workflow, p99_workflow) =
            calculate_percentiles(&workflow_durations);

        let node_durations = self.node_durations.read().unwrap();
        let (avg_node, _, _, _) = calculate_percentiles(&node_durations);

        ExecutionStats {
            total_workflows,
            successful_workflows,
            failed_workflows,
            workflow_success_rate,
            total_nodes: self.total_nodes.load(Ordering::Relaxed),
            successful_nodes: self.successful_nodes.load(Ordering::Relaxed),
            failed_nodes: self.failed_nodes.load(Ordering::Relaxed),
            retried_nodes: self.retried_nodes.load(Ordering::Relaxed),
            timed_out_nodes: self.timed_out_nodes.load(Ordering::Relaxed),
            checkpoints_created: self.checkpoints_created.load(Ordering::Relaxed),
            executions_resumed: self.executions_resumed.load(Ordering::Relaxed),
            avg_workflow_duration_ms: avg_workflow,
            p50_workflow_duration_ms: p50_workflow,
            p95_workflow_duration_ms: p95_workflow,
            p99_workflow_duration_ms: p99_workflow,
            avg_node_duration_ms: avg_node,
            active_executions: self.active_executions.read().unwrap().len(),
        }
    }

    /// Get list of active executions
    pub fn get_active_executions(&self) -> Vec<ExecutionInfo> {
        self.active_executions
            .read()
            .unwrap()
            .values()
            .cloned()
            .collect()
    }

    /// Reset all metrics
    pub fn reset(&self) {
        self.total_workflows.store(0, Ordering::Relaxed);
        self.successful_workflows.store(0, Ordering::Relaxed);
        self.failed_workflows.store(0, Ordering::Relaxed);
        self.total_nodes.store(0, Ordering::Relaxed);
        self.successful_nodes.store(0, Ordering::Relaxed);
        self.failed_nodes.store(0, Ordering::Relaxed);
        self.retried_nodes.store(0, Ordering::Relaxed);
        self.timed_out_nodes.store(0, Ordering::Relaxed);
        self.checkpoints_created.store(0, Ordering::Relaxed);
        self.executions_resumed.store(0, Ordering::Relaxed);
        self.workflow_durations.write().unwrap().clear();
        self.node_durations.write().unwrap().clear();
        self.active_executions.write().unwrap().clear();
    }
}

/// Calculate percentiles from a list of values
fn calculate_percentiles(values: &[u64]) -> (f64, u64, u64, u64) {
    if values.is_empty() {
        return (0.0, 0, 0, 0);
    }

    let mut sorted = values.to_vec();
    sorted.sort_unstable();

    let avg = sorted.iter().sum::<u64>() as f64 / sorted.len() as f64;
    let p50 = percentile(&sorted, 50);
    let p95 = percentile(&sorted, 95);
    let p99 = percentile(&sorted, 99);

    (avg, p50, p95, p99)
}

/// Calculate a specific percentile
fn percentile(sorted: &[u64], p: usize) -> u64 {
    if sorted.is_empty() {
        return 0;
    }

    let idx = (p * sorted.len() / 100).min(sorted.len() - 1);
    sorted[idx]
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_metrics_workflow_tracking() {
        let metrics = ExecutionMetrics::new();

        let workflow_id = Uuid::new_v4();
        let execution_id = Uuid::new_v4();

        metrics.record_workflow_started(workflow_id, execution_id, 5);

        let active = metrics.get_active_executions();
        assert_eq!(active.len(), 1);
        assert_eq!(active[0].execution_id, execution_id);

        metrics.record_workflow_completed(execution_id);

        let stats = metrics.get_stats();
        assert_eq!(stats.total_workflows, 1);
        assert_eq!(stats.successful_workflows, 1);
        assert_eq!(stats.active_executions, 0);
    }

    #[test]
    fn test_metrics_node_tracking() {
        let metrics = ExecutionMetrics::new();

        metrics.record_node_completed(Duration::from_millis(100));
        metrics.record_node_completed(Duration::from_millis(200));
        metrics.record_node_failed();
        metrics.record_node_retry();

        let stats = metrics.get_stats();
        assert_eq!(stats.successful_nodes, 2);
        assert_eq!(stats.failed_nodes, 1);
        assert_eq!(stats.retried_nodes, 1);
    }

    #[test]
    fn test_metrics_reset() {
        let metrics = ExecutionMetrics::new();

        let workflow_id = Uuid::new_v4();
        let execution_id = Uuid::new_v4();

        metrics.record_workflow_started(workflow_id, execution_id, 5);
        metrics.record_workflow_completed(execution_id);

        let stats = metrics.get_stats();
        assert_eq!(stats.total_workflows, 1);

        metrics.reset();

        let stats = metrics.get_stats();
        assert_eq!(stats.total_workflows, 0);
    }

    #[test]
    fn test_percentile_calculation() {
        let values = vec![10, 20, 30, 40, 50, 60, 70, 80, 90, 100];
        let (avg, p50, p95, p99) = calculate_percentiles(&values);

        assert!((avg - 55.0).abs() < 0.001);
        // With 10 values, p50 at idx (50*10/100)=5 is the 6th value (60)
        assert_eq!(p50, 60);
        assert_eq!(p95, 100);
        assert_eq!(p99, 100);
    }
}