capsule_core/wasm/commands/
run.rs1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9use crate::wasm::utilities::task_config::{TaskError, TaskExecution, TaskResult};
10
11pub struct RunInstance {
12 task_id: String,
13 policy: ExecutionPolicy,
14 store: Store<State>,
15 instance: CapsuleAgent,
16 args_json: String,
17}
18
19impl RunInstance {
20 pub fn new(
21 task_id: String,
22 policy: ExecutionPolicy,
23 store: Store<State>,
24 instance: CapsuleAgent,
25 args_json: String,
26 ) -> Self {
27 Self {
28 task_id,
29 policy,
30 store,
31 instance,
32 args_json,
33 }
34 }
35}
36
37impl RuntimeCommand for RunInstance {
38 type Output = String;
39
40 async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
41 let start_time = std::time::Instant::now();
42
43 runtime
44 .log
45 .update_log(UpdateInstanceLog {
46 task_id: self.task_id.clone(),
47 state: InstanceState::Running,
48 fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
49 })
50 .await?;
51
52 let wasm_future = self
53 .instance
54 .capsule_host_task_runner()
55 .call_run(&mut self.store, &self.args_json);
56
57 let response = match self.policy.timeout_duration() {
58 Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
59 Ok(wasm_result) => match wasm_result {
60 Ok(inner_result) => match inner_result {
61 Ok(json_string) => {
62 let result_object = serde_json::from_str(&json_string)
63 .unwrap_or(serde_json::Value::String(json_string));
64
65 let result = result_object
66 .get("result")
67 .cloned()
68 .unwrap_or(result_object);
69
70 TaskResult {
71 success: true,
72 result: Some(result),
73 error: None,
74 execution: TaskExecution {
75 task_name: self.policy.name.clone(),
76 duration_ms: start_time.elapsed().as_millis() as u64,
77 retries: self.policy.max_retries,
78 fuel_consumed: self.policy.compute.as_fuel()
79 - self.store.get_fuel().unwrap_or(0),
80 },
81 }
82 }
83 Err(error_string) => TaskResult {
84 success: false,
85 result: None,
86 error: Some(TaskError {
87 error_type: "task_error".to_string(),
88 message: error_string,
89 }),
90 execution: TaskExecution {
91 task_name: self.policy.name.clone(),
92 duration_ms: start_time.elapsed().as_millis() as u64,
93 retries: self.policy.max_retries,
94 fuel_consumed: self.policy.compute.as_fuel()
95 - self.store.get_fuel().unwrap_or(0),
96 },
97 },
98 },
99 Err(e) => TaskResult {
100 success: false,
101 result: None,
102 error: Some(TaskError {
103 error_type: "Wasm_error".to_string(),
104 message: e.to_string(),
105 }),
106 execution: TaskExecution {
107 task_name: self.policy.name.clone(),
108 duration_ms: start_time.elapsed().as_millis() as u64,
109 retries: self.policy.max_retries,
110 fuel_consumed: self.policy.compute.as_fuel()
111 - self.store.get_fuel().unwrap_or(0),
112 },
113 },
114 },
115 Err(_elapsed) => TaskResult {
116 success: false,
117 result: None,
118 error: Some(TaskError {
119 error_type: "timeout".to_string(),
120 message: format!("timeout after {}ms", duration.as_millis()),
121 }),
122 execution: TaskExecution {
123 task_name: self.policy.name.clone(),
124 duration_ms: start_time.elapsed().as_millis() as u64,
125 retries: self.policy.max_retries,
126 fuel_consumed: self.policy.compute.as_fuel()
127 - self.store.get_fuel().unwrap_or(0),
128 },
129 },
130 },
131 None => match wasm_future.await {
132 Ok(inner_result) => match inner_result {
133 Ok(json_string) => {
134 let result_object = serde_json::from_str(&json_string)
135 .unwrap_or(serde_json::Value::String(json_string));
136
137 let result = result_object
138 .get("result")
139 .cloned()
140 .unwrap_or(result_object);
141
142 if result.get("error_type").is_some() && result.get("message").is_some() {
143 TaskResult {
144 success: false,
145 result: None,
146 error: Some(TaskError {
147 error_type: result
148 .get("error_type")
149 .and_then(|v| v.as_str())
150 .unwrap_or_default()
151 .to_string(),
152 message: result
153 .get("message")
154 .and_then(|v| v.as_str())
155 .unwrap_or_default()
156 .to_string(),
157 }),
158 execution: TaskExecution {
159 task_name: self.policy.name.clone(),
160 duration_ms: start_time.elapsed().as_millis() as u64,
161 retries: self.policy.max_retries,
162 fuel_consumed: self.policy.compute.as_fuel()
163 - self.store.get_fuel().unwrap_or(0),
164 },
165 }
166 } else {
167 TaskResult {
168 success: true,
169 result: Some(result),
170 error: None,
171 execution: TaskExecution {
172 task_name: self.policy.name.clone(),
173 duration_ms: start_time.elapsed().as_millis() as u64,
174 retries: self.policy.max_retries,
175 fuel_consumed: self.policy.compute.as_fuel()
176 - self.store.get_fuel().unwrap_or(0),
177 },
178 }
179 }
180 }
181 Err(error_string) => TaskResult {
182 success: false,
183 result: None,
184 error: Some(TaskError {
185 error_type: "task_error".to_string(),
186 message: error_string,
187 }),
188 execution: TaskExecution {
189 task_name: self.policy.name.clone(),
190 duration_ms: start_time.elapsed().as_millis() as u64,
191 retries: self.policy.max_retries,
192 fuel_consumed: self.policy.compute.as_fuel()
193 - self.store.get_fuel().unwrap_or(0),
194 },
195 },
196 },
197 Err(e) => TaskResult {
198 success: false,
199 result: None,
200 error: Some(TaskError {
201 error_type: "wasm_error".to_string(),
202 message: e.to_string(),
203 }),
204 execution: TaskExecution {
205 task_name: self.policy.name.clone(),
206 duration_ms: start_time.elapsed().as_millis() as u64,
207 retries: self.policy.max_retries,
208 fuel_consumed: self.policy.compute.as_fuel()
209 - self.store.get_fuel().unwrap_or(0),
210 },
211 },
212 },
213 };
214
215 let state = if response.success {
216 InstanceState::Completed
217 } else {
218 InstanceState::Failed
219 };
220
221 runtime
222 .log
223 .update_log(UpdateInstanceLog {
224 task_id: self.task_id.clone(),
225 state,
226 fuel_consumed: response.execution.fuel_consumed,
227 })
228 .await?;
229
230 if !response.success {
231 let error_message = response
232 .error
233 .as_ref()
234 .map(|e| e.message.as_str())
235 .unwrap_or("Unknown error");
236
237 runtime
238 .task_reporter
239 .lock()
240 .await
241 .task_failed(&self.policy.name, error_message);
242 }
243
244 let json_output = serde_json::to_string(&response)
245 .map_err(|e| WasmRuntimeError::SerializationError(e.to_string()))?;
246
247 Ok(json_output)
248 }
249}