Skip to main content

agent_runtime/
runtime.rs

1use crate::{
2    event::{Event, EventStream, EventType},
3    step::{StepInput, StepInputMetadata, StepType},
4    step_impls::SubWorkflowStep,
5    workflow::{Workflow, WorkflowRun, WorkflowState, WorkflowStepRecord},
6};
7
8/// Runtime for executing workflows
9pub struct Runtime {
10    event_stream: EventStream,
11}
12
13impl Runtime {
14    pub fn new() -> Self {
15        Self {
16            event_stream: EventStream::new(),
17        }
18    }
19
20    /// Get a reference to the event stream for subscribing to events
21    pub fn event_stream(&self) -> &EventStream {
22        &self.event_stream
23    }
24
25    /// Execute a workflow and return the run with complete history
26    pub async fn execute(&self, workflow: Workflow) -> WorkflowRun {
27        self.execute_with_parent(workflow, None).await
28    }
29
30    /// Execute a workflow with optional parent workflow context
31    pub async fn execute_with_parent(
32        &self,
33        mut workflow: Workflow,
34        parent_workflow_id: Option<String>,
35    ) -> WorkflowRun {
36        let workflow_id = workflow.id.clone();
37
38        // Emit workflow started event
39        self.event_stream.append_with_parent(
40            EventType::WorkflowStarted,
41            workflow_id.clone(),
42            parent_workflow_id.clone(),
43            serde_json::json!({
44                "step_count": workflow.steps.len(),
45                "parent_workflow_id": parent_workflow_id,
46            }),
47        );
48
49        workflow.state = WorkflowState::Running;
50
51        let mut run = WorkflowRun {
52            workflow_id: workflow_id.clone(),
53            state: WorkflowState::Running,
54            steps: Vec::new(),
55            final_output: None,
56            parent_workflow_id: parent_workflow_id.clone(),
57        };
58
59        let mut current_data = workflow.initial_input.clone();
60
61        // Execute each step in sequence
62        for (step_index, step) in workflow.steps.iter().enumerate() {
63            let step_name = step.name().to_string();
64            let step_type_enum = step.step_type();
65            let step_type = format!("{:?}", step_type_enum);
66
67            // Emit step started event
68            self.event_stream.append_with_parent(
69                EventType::WorkflowStepStarted,
70                workflow_id.clone(),
71                parent_workflow_id.clone(),
72                serde_json::json!({
73                    "step_index": step_index,
74                    "step_name": &step_name,
75                    "step_type": &step_type,
76                }),
77            );
78
79            // Create step input
80            let input = StepInput {
81                data: current_data.clone(),
82                metadata: StepInputMetadata {
83                    step_index,
84                    previous_step: if step_index > 0 {
85                        Some(workflow.steps[step_index - 1].name().to_string())
86                    } else {
87                        None
88                    },
89                    workflow_id: workflow_id.clone(),
90                },
91            };
92
93            // Execute step - special handling for SubWorkflowStep
94            let result = if step_type_enum == StepType::SubWorkflow {
95                // Cast to SubWorkflowStep and execute with this runtime
96                // to share the event stream
97                let sub_step = unsafe {
98                    // SAFETY: We just checked step_type is SubWorkflow
99                    let ptr =
100                        step.as_ref() as *const dyn crate::step::Step as *const SubWorkflowStep;
101                    &*ptr
102                };
103                sub_step.execute_with_runtime(input.clone(), self).await
104            } else {
105                // Execute with event stream context
106                let ctx = crate::step::ExecutionContext::with_event_stream(&self.event_stream);
107                step.execute_with_context(input.clone(), ctx).await
108            };
109
110            match result {
111                Ok(output) => {
112                    // Emit step completed
113                    self.event_stream.append_with_parent(
114                        EventType::WorkflowStepCompleted,
115                        workflow_id.clone(),
116                        parent_workflow_id.clone(),
117                        serde_json::json!({
118                            "step_index": step_index,
119                            "step_name": &step_name,
120                            "execution_time_ms": output.metadata.execution_time_ms,
121                        }),
122                    );
123
124                    // Record step
125                    run.steps.push(WorkflowStepRecord {
126                        step_index,
127                        step_name: step_name.clone(),
128                        step_type: step_type.clone(),
129                        input: input.data,
130                        output: Some(output.data.clone()),
131                        execution_time_ms: Some(output.metadata.execution_time_ms),
132                    });
133
134                    // Pass output to next step
135                    current_data = output.data;
136                }
137                Err(e) => {
138                    // Emit step failed
139                    self.event_stream.append_with_parent(
140                        EventType::AgentFailed, // TODO: Add StepFailed event type
141                        workflow_id.clone(),
142                        parent_workflow_id.clone(),
143                        serde_json::json!({
144                            "step_name": &step_name,
145                            "error": e.to_string(),
146                        }),
147                    );
148
149                    // Emit workflow failed
150                    self.event_stream.append_with_parent(
151                        EventType::WorkflowFailed,
152                        workflow_id.clone(),
153                        parent_workflow_id.clone(),
154                        serde_json::json!({
155                            "error": e.to_string(),
156                            "failed_step": step_index,
157                        }),
158                    );
159
160                    workflow.state = WorkflowState::Failed;
161                    run.state = WorkflowState::Failed;
162                    return run;
163                }
164            }
165        }
166
167        // Workflow completed successfully
168        run.final_output = Some(current_data);
169        run.state = WorkflowState::Completed;
170        workflow.state = WorkflowState::Completed;
171
172        self.event_stream.append_with_parent(
173            EventType::WorkflowCompleted,
174            workflow_id.clone(),
175            parent_workflow_id.clone(),
176            serde_json::json!({
177                "steps_completed": run.steps.len(),
178            }),
179        );
180
181        run
182    }
183
184    /// Get events from a specific offset (for replay)
185    pub fn events_from_offset(&self, offset: u64) -> Vec<Event> {
186        self.event_stream.from_offset(offset)
187    }
188}
189
190impl Default for Runtime {
191    fn default() -> Self {
192        Self::new()
193    }
194}