Skip to main content

agent_runtime/
runtime.rs

1use crate::{
2    event::{Event, EventStream},
3    step::{StepInput, StepInputMetadata, StepType},
4    step_impls::SubWorkflowStep,
5    workflow::{Workflow, WorkflowRun, WorkflowState, WorkflowStepRecord},
6};
7
8/// Runtime for executing workflows
9pub struct Runtime {
10    event_stream: EventStream,
11}
12
13impl Runtime {
14    pub fn new() -> Self {
15        Self {
16            event_stream: EventStream::new(),
17        }
18    }
19
20    /// Get a reference to the event stream for subscribing to events
21    pub fn event_stream(&self) -> &EventStream {
22        &self.event_stream
23    }
24
25    /// Execute a workflow and return the run with complete history
26    pub async fn execute(&self, workflow: Workflow) -> WorkflowRun {
27        self.execute_with_parent(workflow, None).await
28    }
29
30    /// Execute a workflow with optional parent workflow context
31    pub async fn execute_with_parent(
32        &self,
33        mut workflow: Workflow,
34        parent_workflow_id: Option<String>,
35    ) -> WorkflowRun {
36        let workflow_id = workflow.id.clone();
37
38        // Emit Workflow::Started event
39        self.event_stream.workflow_started(
40            &workflow_id,
41            serde_json::json!({
42                "step_count": workflow.steps.len(),
43                "parent_workflow_id": parent_workflow_id,
44            }),
45        );
46
47        workflow.state = WorkflowState::Running;
48
49        let mut run = WorkflowRun {
50            workflow_id: workflow_id.clone(),
51            state: WorkflowState::Running,
52            steps: Vec::new(),
53            final_output: None,
54            parent_workflow_id: parent_workflow_id.clone(),
55        };
56
57        let mut current_data = workflow.initial_input.clone();
58
59        // Execute each step in sequence
60        for (step_index, step) in workflow.steps.iter().enumerate() {
61            let step_name = step.name().to_string();
62            let step_type_enum = step.step_type();
63            let step_type = format!("{:?}", step_type_enum);
64
65            // Emit WorkflowStep::Started event
66            self.event_stream.step_started(
67                &workflow_id,
68                step_index,
69                serde_json::json!({
70                    "step_name": &step_name,
71                    "step_type": &step_type,
72                }),
73            );
74
75            // Create step input
76            let input = StepInput {
77                data: current_data.clone(),
78                metadata: StepInputMetadata {
79                    step_index,
80                    previous_step: if step_index > 0 {
81                        Some(workflow.steps[step_index - 1].name().to_string())
82                    } else {
83                        None
84                    },
85                    workflow_id: workflow_id.clone(),
86                },
87            };
88
89            // Execute step - special handling for SubWorkflowStep
90            let result = if step_type_enum == StepType::SubWorkflow {
91                // Cast to SubWorkflowStep and execute with this runtime
92                // to share the event stream
93                let sub_step = unsafe {
94                    // SAFETY: We just checked step_type is SubWorkflow
95                    let ptr =
96                        step.as_ref() as *const dyn crate::step::Step as *const SubWorkflowStep;
97                    &*ptr
98                };
99                sub_step.execute_with_runtime(input.clone(), self).await
100            } else {
101                // Execute with event stream context
102                let ctx = crate::step::ExecutionContext::with_event_stream(&self.event_stream);
103                step.execute_with_context(input.clone(), ctx).await
104            };
105
106            match result {
107                Ok(output) => {
108                    // Emit WorkflowStep::Completed event
109                    self.event_stream.step_completed(
110                        &workflow_id,
111                        step_index,
112                        serde_json::json!({
113                            "step_name": &step_name,
114                            "execution_time_ms": output.metadata.execution_time_ms,
115                        }),
116                    );
117
118                    // Record step
119                    run.steps.push(WorkflowStepRecord {
120                        step_index,
121                        step_name: step_name.clone(),
122                        step_type: step_type.clone(),
123                        input: input.data,
124                        output: Some(output.data.clone()),
125                        execution_time_ms: Some(output.metadata.execution_time_ms),
126                    });
127
128                    // Pass output to next step
129                    current_data = output.data;
130                }
131                Err(e) => {
132                    // Emit WorkflowStep::Failed event
133                    self.event_stream.step_failed(
134                        &workflow_id,
135                        step_index,
136                        &e.to_string(),
137                        serde_json::json!({
138                            "step_name": &step_name,
139                        }),
140                    );
141
142                    // Emit Workflow::Failed event
143                    self.event_stream.workflow_failed(
144                        &workflow_id,
145                        &e.to_string(),
146                        serde_json::json!({
147                            "failed_step": step_index,
148                            "failed_step_name": &step_name,
149                        }),
150                    );
151
152                    workflow.state = WorkflowState::Failed;
153                    run.state = WorkflowState::Failed;
154                    return run;
155                }
156            }
157        }
158
159        // Workflow completed successfully
160        run.final_output = Some(current_data);
161        run.state = WorkflowState::Completed;
162        workflow.state = WorkflowState::Completed;
163
164        self.event_stream.workflow_completed(
165            &workflow_id,
166            serde_json::json!({
167                "steps_completed": run.steps.len(),
168            }),
169        );
170
171        run
172    }
173
174    /// Get events from a specific offset (for replay)
175    pub fn events_from_offset(&self, offset: u64) -> Vec<Event> {
176        self.event_stream.from_offset(offset)
177    }
178}
179
180impl Default for Runtime {
181    fn default() -> Self {
182        Self::new()
183    }
184}