capsule_core/wasm/commands/
run.rs1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9use crate::wasm::utilities::task_config::{TaskError, TaskExecution, TaskResult};
10
11pub struct RunInstance {
12 task_id: String,
13 policy: ExecutionPolicy,
14 store: Store<State>,
15 instance: CapsuleAgent,
16 args_json: String,
17}
18
19impl RunInstance {
20 pub fn new(
21 task_id: String,
22 policy: ExecutionPolicy,
23 store: Store<State>,
24 instance: CapsuleAgent,
25 args_json: String,
26 ) -> Self {
27 Self {
28 task_id,
29 policy,
30 store,
31 instance,
32 args_json,
33 }
34 }
35}
36
37impl RuntimeCommand for RunInstance {
38 type Output = String;
39
40 async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
41 runtime
42 .log
43 .update_log(UpdateInstanceLog {
44 task_id: self.task_id.clone(),
45 state: InstanceState::Running,
46 fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
47 })
48 .await?;
49
50 let wasm_future = self
51 .instance
52 .capsule_host_task_runner()
53 .call_run(&mut self.store, &self.args_json);
54
55 let response = match self.policy.timeout_duration() {
56 Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
57 Ok(wasm_result) => match wasm_result {
58 Ok(inner_result) => match inner_result {
59 Ok(json_string) => {
60 let result_object = serde_json::from_str(&json_string)
61 .unwrap_or(serde_json::Value::String(json_string));
62
63 let result = result_object
64 .get("result")
65 .cloned()
66 .unwrap_or(result_object);
67
68 TaskResult {
69 success: true,
70 result: Some(result),
71 error: None,
72 execution: TaskExecution {
73 task_name: self.policy.name.clone(),
74 duration_ms: duration.as_millis() as u64,
75 retries: self.policy.max_retries,
76 fuel_consumed: self.policy.compute.as_fuel()
77 - self.store.get_fuel().unwrap_or(0),
78 },
79 }
80 }
81 Err(error_string) => TaskResult {
82 success: false,
83 result: None,
84 error: Some(TaskError {
85 error_type: "task_error".to_string(),
86 message: error_string,
87 }),
88 execution: TaskExecution {
89 task_name: self.policy.name.clone(),
90 duration_ms: duration.as_millis() as u64,
91 retries: self.policy.max_retries,
92 fuel_consumed: self.policy.compute.as_fuel()
93 - self.store.get_fuel().unwrap_or(0),
94 },
95 },
96 },
97 Err(e) => TaskResult {
98 success: false,
99 result: None,
100 error: Some(TaskError {
101 error_type: "Wasm_error".to_string(),
102 message: e.to_string(),
103 }),
104 execution: TaskExecution {
105 task_name: self.policy.name.clone(),
106 duration_ms: duration.as_millis() as u64,
107 retries: self.policy.max_retries,
108 fuel_consumed: self.policy.compute.as_fuel()
109 - self.store.get_fuel().unwrap_or(0),
110 },
111 },
112 },
113 Err(_elapsed) => TaskResult {
114 success: false,
115 result: None,
116 error: Some(TaskError {
117 error_type: "timeout".to_string(),
118 message: format!("timeout after {}ms", duration.as_millis()),
119 }),
120 execution: TaskExecution {
121 task_name: self.policy.name.clone(),
122 duration_ms: duration.as_millis() as u64,
123 retries: self.policy.max_retries,
124 fuel_consumed: self.policy.compute.as_fuel()
125 - self.store.get_fuel().unwrap_or(0),
126 },
127 },
128 },
129 None => match wasm_future.await {
130 Ok(inner_result) => match inner_result {
131 Ok(json_string) => {
132 let result_object = serde_json::from_str(&json_string)
133 .unwrap_or(serde_json::Value::String(json_string));
134
135 let result = result_object
136 .get("result")
137 .cloned()
138 .unwrap_or(result_object);
139
140 TaskResult {
141 success: true,
142 result: Some(result),
143 error: None,
144 execution: TaskExecution {
145 task_name: self.policy.name.clone(),
146 duration_ms: 0,
147 retries: self.policy.max_retries,
148 fuel_consumed: self.policy.compute.as_fuel()
149 - self.store.get_fuel().unwrap_or(0),
150 },
151 }
152 }
153 Err(error_string) => TaskResult {
154 success: false,
155 result: None,
156 error: Some(TaskError {
157 error_type: "task_error".to_string(),
158 message: error_string,
159 }),
160 execution: TaskExecution {
161 task_name: self.policy.name.clone(),
162 duration_ms: 0,
163 retries: self.policy.max_retries,
164 fuel_consumed: self.policy.compute.as_fuel()
165 - self.store.get_fuel().unwrap_or(0),
166 },
167 },
168 },
169 Err(e) => TaskResult {
170 success: false,
171 result: None,
172 error: Some(TaskError {
173 error_type: "wasm_error".to_string(),
174 message: e.to_string(),
175 }),
176 execution: TaskExecution {
177 task_name: self.policy.name.clone(),
178 duration_ms: 0,
179 retries: self.policy.max_retries,
180 fuel_consumed: self.policy.compute.as_fuel()
181 - self.store.get_fuel().unwrap_or(0),
182 },
183 },
184 },
185 };
186
187 let state = if response.success {
188 InstanceState::Completed
189 } else {
190 InstanceState::Failed
191 };
192
193 runtime
194 .log
195 .update_log(UpdateInstanceLog {
196 task_id: self.task_id.clone(),
197 state,
198 fuel_consumed: response.execution.fuel_consumed,
199 })
200 .await?;
201
202 if !response.success {
203 let error_message = response
204 .error
205 .as_ref()
206 .map(|e| e.message.as_str())
207 .unwrap_or("Unknown error");
208
209 runtime
210 .task_reporter
211 .lock()
212 .await
213 .task_failed(&self.policy.name, error_message);
214 }
215
216 let json_output = serde_json::to_string(&response)
217 .map_err(|e| WasmRuntimeError::SerializationError(e.to_string()))?;
218
219 Ok(json_output)
220 }
221}