1use crate::{
2 event::{Event, EventStream},
3 step::{StepInput, StepInputMetadata, StepType},
4 step_impls::SubWorkflowStep,
5 workflow::{Workflow, WorkflowRun, WorkflowState, WorkflowStepRecord},
6};
7
8pub struct Runtime {
10 event_stream: EventStream,
11}
12
13impl Runtime {
14 pub fn new() -> Self {
15 Self {
16 event_stream: EventStream::new(),
17 }
18 }
19
20 pub fn event_stream(&self) -> &EventStream {
22 &self.event_stream
23 }
24
25 pub async fn execute(&self, workflow: Workflow) -> WorkflowRun {
27 self.execute_with_parent(workflow, None).await
28 }
29
30 pub async fn execute_with_parent(
32 &self,
33 mut workflow: Workflow,
34 parent_workflow_id: Option<String>,
35 ) -> WorkflowRun {
36 let workflow_id = workflow.id.clone();
37
38 self.event_stream.workflow_started(
40 &workflow_id,
41 serde_json::json!({
42 "step_count": workflow.steps.len(),
43 "parent_workflow_id": parent_workflow_id,
44 }),
45 );
46
47 workflow.state = WorkflowState::Running;
48
49 let mut run = WorkflowRun {
50 workflow_id: workflow_id.clone(),
51 state: WorkflowState::Running,
52 steps: Vec::new(),
53 final_output: None,
54 parent_workflow_id: parent_workflow_id.clone(),
55 };
56
57 let mut current_data = workflow.initial_input.clone();
58
59 for (step_index, step) in workflow.steps.iter().enumerate() {
61 let step_name = step.name().to_string();
62 let step_type_enum = step.step_type();
63 let step_type = format!("{:?}", step_type_enum);
64
65 self.event_stream.step_started(
67 &workflow_id,
68 step_index,
69 serde_json::json!({
70 "step_name": &step_name,
71 "step_type": &step_type,
72 }),
73 );
74
75 let input = StepInput {
77 data: current_data.clone(),
78 metadata: StepInputMetadata {
79 step_index,
80 previous_step: if step_index > 0 {
81 Some(workflow.steps[step_index - 1].name().to_string())
82 } else {
83 None
84 },
85 workflow_id: workflow_id.clone(),
86 },
87 };
88
89 let result = if step_type_enum == StepType::SubWorkflow {
91 let sub_step = unsafe {
94 let ptr =
96 step.as_ref() as *const dyn crate::step::Step as *const SubWorkflowStep;
97 &*ptr
98 };
99 sub_step.execute_with_runtime(input.clone(), self).await
100 } else {
101 let ctx = crate::step::ExecutionContext::with_event_stream(&self.event_stream);
103 step.execute_with_context(input.clone(), ctx).await
104 };
105
106 match result {
107 Ok(output) => {
108 self.event_stream.step_completed(
110 &workflow_id,
111 step_index,
112 serde_json::json!({
113 "step_name": &step_name,
114 "execution_time_ms": output.metadata.execution_time_ms,
115 }),
116 );
117
118 run.steps.push(WorkflowStepRecord {
120 step_index,
121 step_name: step_name.clone(),
122 step_type: step_type.clone(),
123 input: input.data,
124 output: Some(output.data.clone()),
125 execution_time_ms: Some(output.metadata.execution_time_ms),
126 });
127
128 current_data = output.data;
130 }
131 Err(e) => {
132 self.event_stream.step_failed(
134 &workflow_id,
135 step_index,
136 &e.to_string(),
137 serde_json::json!({
138 "step_name": &step_name,
139 }),
140 );
141
142 self.event_stream.workflow_failed(
144 &workflow_id,
145 &e.to_string(),
146 serde_json::json!({
147 "failed_step": step_index,
148 "failed_step_name": &step_name,
149 }),
150 );
151
152 workflow.state = WorkflowState::Failed;
153 run.state = WorkflowState::Failed;
154 return run;
155 }
156 }
157 }
158
159 run.final_output = Some(current_data);
161 run.state = WorkflowState::Completed;
162 workflow.state = WorkflowState::Completed;
163
164 self.event_stream.workflow_completed(
165 &workflow_id,
166 serde_json::json!({
167 "steps_completed": run.steps.len(),
168 }),
169 );
170
171 run
172 }
173
174 pub fn events_from_offset(&self, offset: u64) -> Vec<Event> {
176 self.event_stream.from_offset(offset)
177 }
178}
179
180impl Default for Runtime {
181 fn default() -> Self {
182 Self::new()
183 }
184}