1use crate::agent::ExecutionStep;
2use crate::tooling::types::WorkflowToolContext;
3use async_trait::async_trait;
4use serde::{Deserialize, Serialize};
5use serde_json::{Map, Value, json};
6use std::collections::BTreeMap;
7use std::sync::Arc;
8
9#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
10#[serde(rename_all = "snake_case")]
11pub enum WorkflowFailurePolicy {
12 ContinueBestEffort,
13 FailWorkflow,
14 PauseForIntervention,
15}
16
17impl Default for WorkflowFailurePolicy {
18 fn default() -> Self {
19 Self::ContinueBestEffort
20 }
21}
22
23#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
24pub struct RetryPolicy {
25 pub max_attempts: usize,
26}
27
28impl Default for RetryPolicy {
29 fn default() -> Self {
30 Self { max_attempts: 1 }
31 }
32}
33
34#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
35#[serde(tag = "type", content = "value", rename_all = "snake_case")]
36pub enum TaskTarget {
37 AgentId(String),
38 Capabilities(Vec<String>),
39}
40
41#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
42pub struct TaskDefinition {
43 pub id: String,
44 pub target: TaskTarget,
45 pub prompt: String,
46 #[serde(default)]
47 pub input_bindings: BTreeMap<String, String>,
48 pub input_transform: Option<String>,
49 pub output_transform: Option<String>,
50 pub output_key: Option<String>,
51 pub retry_policy: Option<RetryPolicy>,
52 pub failure_policy: Option<WorkflowFailurePolicy>,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
56#[serde(tag = "kind", rename_all = "snake_case")]
57pub enum WorkflowNodeKind {
58 Task {
59 task_id: Option<String>,
60 task: Option<TaskDefinition>,
61 },
62 Decision {
63 condition: String,
64 },
65 HumanGate {
66 prompt: String,
67 },
68 Transform {
69 transform_id: String,
70 input_key: Option<String>,
71 },
72 Join,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
76pub struct WorkflowNodeDefinition {
77 pub id: String,
78 #[serde(flatten)]
79 pub kind: WorkflowNodeKind,
80 pub output_key: Option<String>,
81 pub retry_policy: Option<RetryPolicy>,
82 pub failure_policy: Option<WorkflowFailurePolicy>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
86#[serde(tag = "type", content = "value", rename_all = "snake_case")]
87pub enum WorkflowEdgeTransition {
88 Always,
89 OnSuccess,
90 OnFailure,
91 Condition(String),
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
95pub struct WorkflowEdgeDefinition {
96 pub from: String,
97 pub to: String,
98 pub transition: WorkflowEdgeTransition,
99}
100
101#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
102pub struct WorkflowDefinition {
103 pub id: String,
104 pub name: String,
105 #[serde(default)]
106 pub nodes: Vec<WorkflowNodeDefinition>,
107 #[serde(default)]
108 pub edges: Vec<WorkflowEdgeDefinition>,
109 pub retry_policy: Option<RetryPolicy>,
110 pub failure_policy: Option<WorkflowFailurePolicy>,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
114pub struct WorkflowContext {
115 #[serde(default)]
116 pub values: BTreeMap<String, Value>,
117}
118
119impl WorkflowContext {
120 pub fn insert(&mut self, key: impl Into<String>, value: Value) {
121 self.values.insert(key.into(), value);
122 }
123
124 pub fn get(&self, key: &str) -> Option<&Value> {
125 self.values.get(key)
126 }
127
128 pub fn to_value(&self) -> Value {
129 let mut map = Map::new();
130 for (key, value) in &self.values {
131 map.insert(key.clone(), value.clone());
132 }
133 Value::Object(map)
134 }
135
136 pub fn lookup_path(&self, path: &str) -> Option<Value> {
137 if path.is_empty() {
138 return None;
139 }
140
141 let mut segments = path.split('.');
142 let first = segments.next()?;
143 let mut current = self.values.get(first)?.clone();
144 for segment in segments {
145 current = match current {
146 Value::Object(map) => map.get(segment)?.clone(),
147 _ => return None,
148 };
149 }
150 Some(current)
151 }
152}
153
154#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
155#[serde(rename_all = "snake_case")]
156pub enum NodeStatus {
157 Pending,
158 Running,
159 Completed,
160 Failed,
161 Skipped,
162 Paused,
163}
164
165impl NodeStatus {
166 pub fn is_terminal(&self) -> bool {
167 matches!(self, Self::Completed | Self::Failed | Self::Skipped)
168 }
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
172pub struct NodeRunState {
173 pub node_id: String,
174 pub status: NodeStatus,
175 pub attempts: usize,
176 pub started_at: Option<u128>,
177 pub completed_at: Option<u128>,
178 pub last_error: Option<String>,
179 pub output_key: String,
180 pub output: Option<Value>,
181 #[serde(default)]
182 pub activated_incoming: Vec<String>,
183 pub session_id: Option<String>,
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
187#[serde(rename_all = "snake_case")]
188pub enum InterventionStatus {
189 Pending,
190 Resolved,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
194pub struct InterventionRequest {
195 pub id: String,
196 pub workflow_id: String,
197 pub run_id: String,
198 pub node_id: String,
199 pub prompt: String,
200 pub reason: String,
201 pub response: Option<String>,
202 pub status: InterventionStatus,
203 pub created_at: u128,
204 pub resolved_at: Option<u128>,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
208#[serde(rename_all = "snake_case")]
209pub enum WorkflowStatus {
210 Pending,
211 Running,
212 Paused,
213 Failed,
214 Completed,
215 CompletedWithFailures,
216}
217
218#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
219pub struct WorkflowRunState {
220 pub workflow_id: String,
221 pub run_id: String,
222 pub status: WorkflowStatus,
223 pub created_at: u128,
224 pub updated_at: u128,
225 pub input: Value,
226 pub context: WorkflowContext,
227 pub node_states: BTreeMap<String, NodeRunState>,
228 #[serde(default)]
229 pub pending_interventions: Vec<InterventionRequest>,
230 #[serde(default)]
231 pub failed_nodes: Vec<String>,
232}
233
234#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
235pub struct WorkflowRequest {
236 pub workflow_id: String,
237 #[serde(default = "default_input")]
238 pub input: Value,
239}
240
241fn default_input() -> Value {
242 json!({})
243}
244
245impl WorkflowRequest {
246 pub fn new(workflow_id: impl Into<String>, input: Value) -> Self {
247 Self {
248 workflow_id: workflow_id.into(),
249 input,
250 }
251 }
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
255pub struct WorkflowResponse {
256 pub workflow_id: String,
257 pub run_id: String,
258 pub status: WorkflowStatus,
259 pub context: WorkflowContext,
260 #[serde(default)]
261 pub events: Vec<WorkflowEvent>,
262}
263
264#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
265#[serde(tag = "type", rename_all = "snake_case")]
266pub enum WorkflowEvent {
267 WorkflowStarted {
268 workflow_id: String,
269 run_id: String,
270 },
271 NodeReady {
272 node_id: String,
273 },
274 NodeStarted {
275 node_id: String,
276 attempt: usize,
277 },
278 NodeCompleted {
279 node_id: String,
280 output_key: String,
281 },
282 NodeFailed {
283 node_id: String,
284 error: String,
285 },
286 NodeRetryScheduled {
287 node_id: String,
288 attempt: usize,
289 error: String,
290 },
291 NodeSkipped {
292 node_id: String,
293 },
294 InterventionRequested {
295 intervention_id: String,
296 node_id: String,
297 reason: String,
298 },
299 InterventionResolved {
300 intervention_id: String,
301 node_id: String,
302 },
303 WorkflowPaused {
304 run_id: String,
305 reason: String,
306 },
307 WorkflowCompleted {
308 run_id: String,
309 status: WorkflowStatus,
310 },
311}
312
313#[async_trait(?Send)]
314pub trait WorkflowEventListener: 'static {
315 async fn on_event(&self, event: &WorkflowEvent) -> Result<(), String>;
316}
317
318#[derive(Debug, Clone, PartialEq)]
319pub struct WorkflowTaskResult {
320 pub content: String,
321 pub value: Value,
322 pub agent_id: String,
323 pub steps: Vec<ExecutionStep>,
324}
325
326#[async_trait(?Send)]
327pub trait WorkflowTaskRunner: 'static {
328 async fn run_task(
329 &self,
330 target: &TaskTarget,
331 metadata: &WorkflowToolContext,
332 workspace_dir: &std::path::Path,
333 prompt: &str,
334 ) -> Result<WorkflowTaskResult, String>;
335}
336
337#[async_trait(?Send)]
338pub trait WorkflowTransform: 'static {
339 async fn apply(&self, input: &Value, context: &WorkflowContext) -> Result<Value, String>;
340}
341
342pub type TransformRegistry = BTreeMap<String, Arc<dyn WorkflowTransform>>;