Skip to main content

praisonai/workflows/
mod.rs

1//! Workflow system for PraisonAI
2//!
3//! This module provides multi-agent workflow patterns:
4//! - AgentTeam: Coordinates multiple agents
5//! - AgentFlow: Defines workflow execution patterns
6//! - Route, Parallel, Loop, Repeat: Workflow patterns
7
8use serde::{Deserialize, Serialize};
9use std::sync::Arc;
10
11use crate::agent::Agent;
12use crate::error::{Error, Result};
13
14/// Process type for workflow execution
15#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
16#[serde(rename_all = "lowercase")]
17pub enum Process {
18    /// Execute agents sequentially
19    #[default]
20    Sequential,
21    /// Execute agents in parallel
22    Parallel,
23    /// Use hierarchical manager-based execution
24    Hierarchical,
25}
26
27/// Step result from workflow execution
28#[derive(Debug, Clone, Serialize, Deserialize)]
29pub struct StepResult {
30    /// Agent name that produced this result
31    pub agent: String,
32    /// The output content
33    pub output: String,
34    /// Whether the step succeeded
35    pub success: bool,
36    /// Error message if failed
37    pub error: Option<String>,
38}
39
40impl StepResult {
41    /// Create a successful step result
42    pub fn success(agent: impl Into<String>, output: impl Into<String>) -> Self {
43        Self {
44            agent: agent.into(),
45            output: output.into(),
46            success: true,
47            error: None,
48        }
49    }
50
51    /// Create a failed step result
52    pub fn failure(agent: impl Into<String>, error: impl Into<String>) -> Self {
53        Self {
54            agent: agent.into(),
55            output: String::new(),
56            success: false,
57            error: Some(error.into()),
58        }
59    }
60}
61
62/// Workflow context passed between agents
63#[derive(Debug, Clone, Default, Serialize, Deserialize)]
64pub struct WorkflowContext {
65    /// Variables available to all agents
66    pub variables: std::collections::HashMap<String, String>,
67    /// Results from previous steps
68    pub results: Vec<StepResult>,
69}
70
71impl WorkflowContext {
72    /// Create a new empty context
73    pub fn new() -> Self {
74        Self::default()
75    }
76
77    /// Set a variable
78    pub fn set(&mut self, key: impl Into<String>, value: impl Into<String>) {
79        self.variables.insert(key.into(), value.into());
80    }
81
82    /// Get a variable
83    pub fn get(&self, key: &str) -> Option<&String> {
84        self.variables.get(key)
85    }
86
87    /// Add a step result
88    pub fn add_result(&mut self, result: StepResult) {
89        // Also store as variable for next agent
90        let var_name = format!("{}_output", result.agent);
91        self.variables.insert(var_name, result.output.clone());
92        self.results.push(result);
93    }
94
95    /// Get the last result
96    pub fn last_result(&self) -> Option<&StepResult> {
97        self.results.last()
98    }
99}
100
101/// Agent team for multi-agent workflows
102///
103/// Coordinates multiple agents to work together on tasks.
104///
105/// # Example
106///
107/// ```rust,ignore
108/// let team = AgentTeam::new()
109///     .agent(researcher)
110///     .agent(writer)
111///     .process(Process::Sequential)
112///     .build();
113///
114/// let result = team.start("Research and write about AI").await?;
115/// ```
116pub struct AgentTeam {
117    agents: Vec<Arc<Agent>>,
118    process: Process,
119    verbose: bool,
120}
121
122impl AgentTeam {
123    /// Check if verbose mode is enabled
124    pub fn is_verbose(&self) -> bool {
125        self.verbose
126    }
127}
128
129impl AgentTeam {
130    /// Create a new agent team builder
131    #[allow(clippy::new_ret_no_self)]
132    pub fn new() -> AgentTeamBuilder {
133        AgentTeamBuilder::new()
134    }
135
136    /// Run the team with a task
137    pub async fn start(&self, task: &str) -> Result<String> {
138        match self.process {
139            Process::Sequential => self.run_sequential(task).await,
140            Process::Parallel => self.run_parallel(task).await,
141            Process::Hierarchical => self.run_hierarchical(task).await,
142        }
143    }
144
145    /// Alias for start
146    pub async fn run(&self, task: &str) -> Result<String> {
147        self.start(task).await
148    }
149
150    async fn run_sequential(&self, task: &str) -> Result<String> {
151        let mut context = WorkflowContext::new();
152        context.set("task", task);
153
154        let mut current_input = task.to_string();
155
156        for agent in &self.agents {
157            // Build prompt with context
158            let prompt = if context.results.is_empty() {
159                current_input.clone()
160            } else {
161                let prev_output = context
162                    .last_result()
163                    .map(|r| r.output.as_str())
164                    .unwrap_or("");
165                format!("{}\n\nPrevious output:\n{}", current_input, prev_output)
166            };
167
168            match agent.chat(&prompt).await {
169                Ok(output) => {
170                    context.add_result(StepResult::success(agent.name(), &output));
171                    current_input = output;
172                }
173                Err(e) => {
174                    context.add_result(StepResult::failure(agent.name(), e.to_string()));
175                    return Err(Error::workflow(format!(
176                        "Agent {} failed: {}",
177                        agent.name(),
178                        e
179                    )));
180                }
181            }
182        }
183
184        // Return the last output
185        context
186            .last_result()
187            .map(|r| r.output.clone())
188            .ok_or_else(|| Error::workflow("No results from workflow"))
189    }
190
191    async fn run_parallel(&self, task: &str) -> Result<String> {
192        use futures::future::join_all;
193
194        let futures: Vec<_> = self
195            .agents
196            .iter()
197            .map(|agent| {
198                let agent = Arc::clone(agent);
199                let task = task.to_string();
200                async move {
201                    agent
202                        .chat(&task)
203                        .await
204                        .map(|output| StepResult::success(agent.name(), output))
205                        .unwrap_or_else(|e| StepResult::failure(agent.name(), e.to_string()))
206                }
207            })
208            .collect();
209
210        let results = join_all(futures).await;
211
212        // Combine results
213        let combined: Vec<String> = results
214            .iter()
215            .filter(|r| r.success)
216            .map(|r| format!("## {}\n{}", r.agent, r.output))
217            .collect();
218
219        if combined.is_empty() {
220            Err(Error::workflow("All agents failed"))
221        } else {
222            Ok(combined.join("\n\n"))
223        }
224    }
225
226    async fn run_hierarchical(&self, task: &str) -> Result<String> {
227        // For now, hierarchical is similar to sequential with validation
228        // A manager agent would validate each step
229        self.run_sequential(task).await
230    }
231
232    /// Get the number of agents
233    pub fn len(&self) -> usize {
234        self.agents.len()
235    }
236
237    /// Check if empty
238    pub fn is_empty(&self) -> bool {
239        self.agents.is_empty()
240    }
241}
242
243impl Default for AgentTeam {
244    fn default() -> Self {
245        Self {
246            agents: Vec::new(),
247            process: Process::Sequential,
248            verbose: false,
249        }
250    }
251}
252
253/// Builder for AgentTeam
254pub struct AgentTeamBuilder {
255    agents: Vec<Arc<Agent>>,
256    process: Process,
257    verbose: bool,
258}
259
260impl AgentTeamBuilder {
261    /// Create a new builder
262    pub fn new() -> Self {
263        Self {
264            agents: Vec::new(),
265            process: Process::Sequential,
266            verbose: false,
267        }
268    }
269
270    /// Add an agent
271    pub fn agent(mut self, agent: Agent) -> Self {
272        self.agents.push(Arc::new(agent));
273        self
274    }
275
276    /// Add an agent (Arc version)
277    pub fn agent_arc(mut self, agent: Arc<Agent>) -> Self {
278        self.agents.push(agent);
279        self
280    }
281
282    /// Set the process type
283    pub fn process(mut self, process: Process) -> Self {
284        self.process = process;
285        self
286    }
287
288    /// Enable verbose output
289    pub fn verbose(mut self, enabled: bool) -> Self {
290        self.verbose = enabled;
291        self
292    }
293
294    /// Build the team
295    pub fn build(self) -> AgentTeam {
296        AgentTeam {
297            agents: self.agents,
298            process: self.process,
299            verbose: self.verbose,
300        }
301    }
302}
303
304impl Default for AgentTeamBuilder {
305    fn default() -> Self {
306        Self::new()
307    }
308}
309
310/// AgentFlow - Workflow definition with patterns
311///
312/// Defines complex workflow patterns like Route, Parallel, Loop.
313pub struct AgentFlow {
314    steps: Vec<FlowStep>,
315}
316
317/// A step in a workflow
318pub enum FlowStep {
319    /// Execute a single agent
320    Agent(Arc<Agent>),
321    /// Route to different agents based on condition
322    Route(Route),
323    /// Execute agents in parallel
324    Parallel(Parallel),
325    /// Loop over items
326    Loop(Loop),
327    /// Repeat a step
328    Repeat(Repeat),
329}
330
331/// Route pattern - conditional branching
332pub struct Route {
333    /// Condition function
334    pub condition: Box<dyn Fn(&str) -> bool + Send + Sync>,
335    /// Agent to use if condition is true
336    pub if_true: Arc<Agent>,
337    /// Agent to use if condition is false
338    pub if_false: Option<Arc<Agent>>,
339}
340
341/// Parallel pattern - concurrent execution
342pub struct Parallel {
343    /// Agents to run in parallel
344    pub agents: Vec<Arc<Agent>>,
345}
346
347/// Loop pattern - iterate over items
348pub struct Loop {
349    /// Agent to execute for each item
350    pub agent: Arc<Agent>,
351    /// Items to iterate over
352    pub items: Vec<String>,
353}
354
355/// Repeat pattern - repeat execution
356pub struct Repeat {
357    /// Agent to repeat
358    pub agent: Arc<Agent>,
359    /// Number of times to repeat
360    pub times: usize,
361}
362
363impl AgentFlow {
364    /// Create a new workflow
365    pub fn new() -> Self {
366        Self { steps: Vec::new() }
367    }
368
369    /// Add a step
370    pub fn step(mut self, step: FlowStep) -> Self {
371        self.steps.push(step);
372        self
373    }
374
375    /// Add an agent step
376    pub fn agent(self, agent: Agent) -> Self {
377        self.step(FlowStep::Agent(Arc::new(agent)))
378    }
379
380    /// Execute the workflow
381    pub async fn run(&self, input: &str) -> Result<String> {
382        let mut current = input.to_string();
383
384        for step in &self.steps {
385            current = match step {
386                FlowStep::Agent(agent) => agent.chat(&current).await?,
387                FlowStep::Route(route) => {
388                    if (route.condition)(&current) {
389                        route.if_true.chat(&current).await?
390                    } else if let Some(agent) = &route.if_false {
391                        agent.chat(&current).await?
392                    } else {
393                        current
394                    }
395                }
396                FlowStep::Parallel(parallel) => {
397                    use futures::future::join_all;
398
399                    let futures: Vec<_> =
400                        parallel.agents.iter().map(|a| a.chat(&current)).collect();
401
402                    let results = join_all(futures).await;
403                    let outputs: Vec<String> = results.into_iter().filter_map(|r| r.ok()).collect();
404
405                    outputs.join("\n\n")
406                }
407                FlowStep::Loop(loop_step) => {
408                    let mut outputs = Vec::new();
409                    for item in &loop_step.items {
410                        let prompt = format!("{}\n\nItem: {}", current, item);
411                        outputs.push(loop_step.agent.chat(&prompt).await?);
412                    }
413                    outputs.join("\n\n")
414                }
415                FlowStep::Repeat(repeat) => {
416                    let mut output = current.clone();
417                    for _ in 0..repeat.times {
418                        output = repeat.agent.chat(&output).await?;
419                    }
420                    output
421                }
422            };
423        }
424
425        Ok(current)
426    }
427}
428
429impl Default for AgentFlow {
430    fn default() -> Self {
431        Self::new()
432    }
433}
434
435#[cfg(test)]
436mod tests {
437    use super::*;
438
439    #[test]
440    fn test_workflow_context() {
441        let mut ctx = WorkflowContext::new();
442        ctx.set("key", "value");
443        assert_eq!(ctx.get("key"), Some(&"value".to_string()));
444    }
445
446    #[test]
447    fn test_step_result() {
448        let success = StepResult::success("agent1", "output");
449        assert!(success.success);
450
451        let failure = StepResult::failure("agent1", "error");
452        assert!(!failure.success);
453    }
454
455    #[test]
456    fn test_agent_team_builder() {
457        let team = AgentTeam::new()
458            .process(Process::Parallel)
459            .verbose(true)
460            .build();
461
462        assert!(team.is_empty());
463    }
464}