Skip to main content

enki_next/workflow/
types.rs

1use crate::agent::ExecutionStep;
2use crate::tooling::types::WorkflowToolContext;
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value, json};
6use std::collections::BTreeMap;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10#[serde(rename_all = "snake_case")]
11pub enum WorkflowFailurePolicy {
12    ContinueBestEffort,
13    FailWorkflow,
14    PauseForIntervention,
15}
16
17impl Default for WorkflowFailurePolicy {
18    fn default() -> Self {
19        Self::ContinueBestEffort
20    }
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct RetryPolicy {
25    pub max_attempts: usize,
26}
27
28impl Default for RetryPolicy {
29    fn default() -> Self {
30        Self { max_attempts: 1 }
31    }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35#[serde(tag = "type", content = "value", rename_all = "snake_case")]
36pub enum TaskTarget {
37    AgentId(String),
38    Capabilities(Vec<String>),
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct TaskDefinition {
43    pub id: String,
44    pub target: TaskTarget,
45    pub prompt: String,
46    #[serde(default)]
47    pub input_bindings: BTreeMap<String, String>,
48    pub input_transform: Option<String>,
49    pub output_transform: Option<String>,
50    pub output_key: Option<String>,
51    pub retry_policy: Option<RetryPolicy>,
52    pub failure_policy: Option<WorkflowFailurePolicy>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56#[serde(tag = "kind", rename_all = "snake_case")]
57pub enum WorkflowNodeKind {
58    Task {
59        task_id: Option<String>,
60        task: Option<TaskDefinition>,
61    },
62    Decision {
63        condition: String,
64    },
65    HumanGate {
66        prompt: String,
67    },
68    Transform {
69        transform_id: String,
70        input_key: Option<String>,
71    },
72    Join,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76pub struct WorkflowNodeDefinition {
77    pub id: String,
78    #[serde(flatten)]
79    pub kind: WorkflowNodeKind,
80    pub output_key: Option<String>,
81    pub retry_policy: Option<RetryPolicy>,
82    pub failure_policy: Option<WorkflowFailurePolicy>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(tag = "type", content = "value", rename_all = "snake_case")]
87pub enum WorkflowEdgeTransition {
88    Always,
89    OnSuccess,
90    OnFailure,
91    Condition(String),
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
95pub struct WorkflowEdgeDefinition {
96    pub from: String,
97    pub to: String,
98    pub transition: WorkflowEdgeTransition,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct WorkflowDefinition {
103    pub id: String,
104    pub name: String,
105    #[serde(default)]
106    pub nodes: Vec<WorkflowNodeDefinition>,
107    #[serde(default)]
108    pub edges: Vec<WorkflowEdgeDefinition>,
109    pub retry_policy: Option<RetryPolicy>,
110    pub failure_policy: Option<WorkflowFailurePolicy>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
114pub struct WorkflowContext {
115    #[serde(default)]
116    pub values: BTreeMap<String, Value>,
117}
118
119impl WorkflowContext {
120    pub fn insert(&mut self, key: impl Into<String>, value: Value) {
121        self.values.insert(key.into(), value);
122    }
123
124    pub fn get(&self, key: &str) -> Option<&Value> {
125        self.values.get(key)
126    }
127
128    pub fn to_value(&self) -> Value {
129        let mut map = Map::new();
130        for (key, value) in &self.values {
131            map.insert(key.clone(), value.clone());
132        }
133        Value::Object(map)
134    }
135
136    pub fn lookup_path(&self, path: &str) -> Option<Value> {
137        if path.is_empty() {
138            return None;
139        }
140
141        let mut segments = path.split('.');
142        let first = segments.next()?;
143        let mut current = self.values.get(first)?.clone();
144        for segment in segments {
145            current = match current {
146                Value::Object(map) => map.get(segment)?.clone(),
147                _ => return None,
148            };
149        }
150        Some(current)
151    }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155#[serde(rename_all = "snake_case")]
156pub enum NodeStatus {
157    Pending,
158    Running,
159    Completed,
160    Failed,
161    Skipped,
162    Paused,
163}
164
165impl NodeStatus {
166    pub fn is_terminal(&self) -> bool {
167        matches!(self, Self::Completed | Self::Failed | Self::Skipped)
168    }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
172pub struct NodeRunState {
173    pub node_id: String,
174    pub status: NodeStatus,
175    pub attempts: usize,
176    pub started_at: Option<u128>,
177    pub completed_at: Option<u128>,
178    pub last_error: Option<String>,
179    pub output_key: String,
180    pub output: Option<Value>,
181    #[serde(default)]
182    pub activated_incoming: Vec<String>,
183    pub session_id: Option<String>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
187#[serde(rename_all = "snake_case")]
188pub enum InterventionStatus {
189    Pending,
190    Resolved,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194pub struct InterventionRequest {
195    pub id: String,
196    pub workflow_id: String,
197    pub run_id: String,
198    pub node_id: String,
199    pub prompt: String,
200    pub reason: String,
201    pub response: Option<String>,
202    pub status: InterventionStatus,
203    pub created_at: u128,
204    pub resolved_at: Option<u128>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
208#[serde(rename_all = "snake_case")]
209pub enum WorkflowStatus {
210    Pending,
211    Running,
212    Paused,
213    Failed,
214    Completed,
215    CompletedWithFailures,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
219pub struct WorkflowRunState {
220    pub workflow_id: String,
221    pub run_id: String,
222    pub status: WorkflowStatus,
223    pub created_at: u128,
224    pub updated_at: u128,
225    pub input: Value,
226    pub context: WorkflowContext,
227    pub node_states: BTreeMap<String, NodeRunState>,
228    #[serde(default)]
229    pub pending_interventions: Vec<InterventionRequest>,
230    #[serde(default)]
231    pub failed_nodes: Vec<String>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
235pub struct WorkflowRequest {
236    pub workflow_id: String,
237    #[serde(default = "default_input")]
238    pub input: Value,
239}
240
241fn default_input() -> Value {
242    json!({})
243}
244
245impl WorkflowRequest {
246    pub fn new(workflow_id: impl Into<String>, input: Value) -> Self {
247        Self {
248            workflow_id: workflow_id.into(),
249            input,
250        }
251    }
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
255pub struct WorkflowResponse {
256    pub workflow_id: String,
257    pub run_id: String,
258    pub status: WorkflowStatus,
259    pub context: WorkflowContext,
260    #[serde(default)]
261    pub events: Vec<WorkflowEvent>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
265#[serde(tag = "type", rename_all = "snake_case")]
266pub enum WorkflowEvent {
267    WorkflowStarted {
268        workflow_id: String,
269        run_id: String,
270    },
271    NodeReady {
272        node_id: String,
273    },
274    NodeStarted {
275        node_id: String,
276        attempt: usize,
277    },
278    NodeCompleted {
279        node_id: String,
280        output_key: String,
281    },
282    NodeFailed {
283        node_id: String,
284        error: String,
285    },
286    NodeRetryScheduled {
287        node_id: String,
288        attempt: usize,
289        error: String,
290    },
291    NodeSkipped {
292        node_id: String,
293    },
294    InterventionRequested {
295        intervention_id: String,
296        node_id: String,
297        reason: String,
298    },
299    InterventionResolved {
300        intervention_id: String,
301        node_id: String,
302    },
303    WorkflowPaused {
304        run_id: String,
305        reason: String,
306    },
307    WorkflowCompleted {
308        run_id: String,
309        status: WorkflowStatus,
310    },
311}
312
313#[async_trait(?Send)]
314pub trait WorkflowEventListener: 'static {
315    async fn on_event(&self, event: &WorkflowEvent) -> Result<(), String>;
316}
317
318#[derive(Debug, Clone, PartialEq)]
319pub struct WorkflowTaskResult {
320    pub content: String,
321    pub value: Value,
322    pub agent_id: String,
323    pub steps: Vec<ExecutionStep>,
324}
325
326#[async_trait(?Send)]
327pub trait WorkflowTaskRunner: 'static {
328    async fn run_task(
329        &self,
330        target: &TaskTarget,
331        metadata: &WorkflowToolContext,
332        workspace_dir: &std::path::Path,
333        prompt: &str,
334    ) -> Result<WorkflowTaskResult, String>;
335}
336
337#[async_trait(?Send)]
338pub trait WorkflowTransform: 'static {
339    async fn apply(&self, input: &Value, context: &WorkflowContext) -> Result<Value, String>;
340}
341
342pub type TransformRegistry = BTreeMap<String, Arc<dyn WorkflowTransform>>;