1use crate::{
2 event::{Event, EventStream, EventType},
3 step::{StepInput, StepInputMetadata, StepType},
4 step_impls::SubWorkflowStep,
5 workflow::{Workflow, WorkflowRun, WorkflowState, WorkflowStepRecord},
6};
7
8pub struct Runtime {
10 event_stream: EventStream,
11}
12
13impl Runtime {
14 pub fn new() -> Self {
15 Self {
16 event_stream: EventStream::new(),
17 }
18 }
19
20 pub fn event_stream(&self) -> &EventStream {
22 &self.event_stream
23 }
24
25 pub async fn execute(&self, workflow: Workflow) -> WorkflowRun {
27 self.execute_with_parent(workflow, None).await
28 }
29
30 pub async fn execute_with_parent(
32 &self,
33 mut workflow: Workflow,
34 parent_workflow_id: Option<String>,
35 ) -> WorkflowRun {
36 let workflow_id = workflow.id.clone();
37
38 self.event_stream.append_with_parent(
40 EventType::WorkflowStarted,
41 workflow_id.clone(),
42 parent_workflow_id.clone(),
43 serde_json::json!({
44 "step_count": workflow.steps.len(),
45 "parent_workflow_id": parent_workflow_id,
46 }),
47 );
48
49 workflow.state = WorkflowState::Running;
50
51 let mut run = WorkflowRun {
52 workflow_id: workflow_id.clone(),
53 state: WorkflowState::Running,
54 steps: Vec::new(),
55 final_output: None,
56 parent_workflow_id: parent_workflow_id.clone(),
57 };
58
59 let mut current_data = workflow.initial_input.clone();
60
61 for (step_index, step) in workflow.steps.iter().enumerate() {
63 let step_name = step.name().to_string();
64 let step_type_enum = step.step_type();
65 let step_type = format!("{:?}", step_type_enum);
66
67 self.event_stream.append_with_parent(
69 EventType::WorkflowStepStarted,
70 workflow_id.clone(),
71 parent_workflow_id.clone(),
72 serde_json::json!({
73 "step_index": step_index,
74 "step_name": &step_name,
75 "step_type": &step_type,
76 }),
77 );
78
79 let input = StepInput {
81 data: current_data.clone(),
82 metadata: StepInputMetadata {
83 step_index,
84 previous_step: if step_index > 0 {
85 Some(workflow.steps[step_index - 1].name().to_string())
86 } else {
87 None
88 },
89 workflow_id: workflow_id.clone(),
90 },
91 };
92
93 let result = if step_type_enum == StepType::SubWorkflow {
95 let sub_step = unsafe {
98 let ptr =
100 step.as_ref() as *const dyn crate::step::Step as *const SubWorkflowStep;
101 &*ptr
102 };
103 sub_step.execute_with_runtime(input.clone(), self).await
104 } else {
105 let ctx = crate::step::ExecutionContext::with_event_stream(&self.event_stream);
107 step.execute_with_context(input.clone(), ctx).await
108 };
109
110 match result {
111 Ok(output) => {
112 self.event_stream.append_with_parent(
114 EventType::WorkflowStepCompleted,
115 workflow_id.clone(),
116 parent_workflow_id.clone(),
117 serde_json::json!({
118 "step_index": step_index,
119 "step_name": &step_name,
120 "execution_time_ms": output.metadata.execution_time_ms,
121 }),
122 );
123
124 run.steps.push(WorkflowStepRecord {
126 step_index,
127 step_name: step_name.clone(),
128 step_type: step_type.clone(),
129 input: input.data,
130 output: Some(output.data.clone()),
131 execution_time_ms: Some(output.metadata.execution_time_ms),
132 });
133
134 current_data = output.data;
136 }
137 Err(e) => {
138 self.event_stream.append_with_parent(
140 EventType::AgentFailed, workflow_id.clone(),
142 parent_workflow_id.clone(),
143 serde_json::json!({
144 "step_name": &step_name,
145 "error": e.to_string(),
146 }),
147 );
148
149 self.event_stream.append_with_parent(
151 EventType::WorkflowFailed,
152 workflow_id.clone(),
153 parent_workflow_id.clone(),
154 serde_json::json!({
155 "error": e.to_string(),
156 "failed_step": step_index,
157 }),
158 );
159
160 workflow.state = WorkflowState::Failed;
161 run.state = WorkflowState::Failed;
162 return run;
163 }
164 }
165 }
166
167 run.final_output = Some(current_data);
169 run.state = WorkflowState::Completed;
170 workflow.state = WorkflowState::Completed;
171
172 self.event_stream.append_with_parent(
173 EventType::WorkflowCompleted,
174 workflow_id.clone(),
175 parent_workflow_id.clone(),
176 serde_json::json!({
177 "steps_completed": run.steps.len(),
178 }),
179 );
180
181 run
182 }
183
184 pub fn events_from_offset(&self, offset: u64) -> Vec<Event> {
186 self.event_stream.from_offset(offset)
187 }
188}
189
190impl Default for Runtime {
191 fn default() -> Self {
192 Self::new()
193 }
194}