capsule_core/wasm/commands/
run.rs1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9use crate::wasm::utilities::task_config::{TaskError, TaskExecution, TaskResult};
10
11pub struct RunInstance {
12 task_id: String,
13 policy: ExecutionPolicy,
14 store: Store<State>,
15 instance: CapsuleAgent,
16 args_json: String,
17}
18
19impl RunInstance {
20 pub fn new(
21 task_id: String,
22 policy: ExecutionPolicy,
23 store: Store<State>,
24 instance: CapsuleAgent,
25 args_json: String,
26 ) -> Self {
27 Self {
28 task_id,
29 policy,
30 store,
31 instance,
32 args_json,
33 }
34 }
35}
36
37impl RuntimeCommand for RunInstance {
38 type Output = String;
39
40 async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
41 let start_time = std::time::Instant::now();
42
43 runtime
44 .log
45 .update_log(UpdateInstanceLog {
46 task_id: self.task_id.clone(),
47 state: InstanceState::Running,
48 fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
49 })
50 .await?;
51
52 let wasm_future = self
53 .instance
54 .capsule_host_task_runner()
55 .call_run(&mut self.store, &self.args_json);
56
57 let response = match self.policy.timeout_duration() {
58 Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
59 Ok(wasm_result) => match wasm_result {
60 Ok(inner_result) => match inner_result {
61 Ok(json_string) => {
62 let result_object = serde_json::from_str(&json_string)
63 .unwrap_or(serde_json::Value::String(json_string));
64
65 let result = result_object
66 .get("result")
67 .cloned()
68 .unwrap_or(result_object);
69
70 TaskResult {
71 success: true,
72 result: Some(result),
73 error: None,
74 execution: TaskExecution {
75 task_name: self.policy.name.clone(),
76 duration_ms: start_time.elapsed().as_millis() as u64,
77 retries: self.policy.max_retries,
78 fuel_consumed: self.policy.compute.as_fuel()
79 - self.store.get_fuel().unwrap_or(0),
80 },
81 }
82 }
83 Err(error_string) => TaskResult {
84 success: false,
85 result: None,
86 error: Some(TaskError {
87 error_type: "task_error".to_string(),
88 message: error_string,
89 }),
90 execution: TaskExecution {
91 task_name: self.policy.name.clone(),
92 duration_ms: start_time.elapsed().as_millis() as u64,
93 retries: self.policy.max_retries,
94 fuel_consumed: self.policy.compute.as_fuel()
95 - self.store.get_fuel().unwrap_or(0),
96 },
97 },
98 },
99 Err(e) => TaskResult {
100 success: false,
101 result: None,
102 error: Some(TaskError {
103 error_type: "Wasm_error".to_string(),
104 message: e.to_string(),
105 }),
106 execution: TaskExecution {
107 task_name: self.policy.name.clone(),
108 duration_ms: start_time.elapsed().as_millis() as u64,
109 retries: self.policy.max_retries,
110 fuel_consumed: self.policy.compute.as_fuel()
111 - self.store.get_fuel().unwrap_or(0),
112 },
113 },
114 },
115 Err(_elapsed) => TaskResult {
116 success: false,
117 result: None,
118 error: Some(TaskError {
119 error_type: "timeout".to_string(),
120 message: format!("timeout after {}ms", duration.as_millis()),
121 }),
122 execution: TaskExecution {
123 task_name: self.policy.name.clone(),
124 duration_ms: start_time.elapsed().as_millis() as u64,
125 retries: self.policy.max_retries,
126 fuel_consumed: self.policy.compute.as_fuel()
127 - self.store.get_fuel().unwrap_or(0),
128 },
129 },
130 },
131 None => match wasm_future.await {
132 Ok(inner_result) => match inner_result {
133 Ok(json_string) => {
134 let result_object = serde_json::from_str(&json_string)
135 .unwrap_or(serde_json::Value::String(json_string));
136
137 let result = result_object
138 .get("result")
139 .cloned()
140 .unwrap_or(result_object);
141
142 TaskResult {
143 success: true,
144 result: Some(result),
145 error: None,
146 execution: TaskExecution {
147 task_name: self.policy.name.clone(),
148 duration_ms: start_time.elapsed().as_millis() as u64,
149 retries: self.policy.max_retries,
150 fuel_consumed: self.policy.compute.as_fuel()
151 - self.store.get_fuel().unwrap_or(0),
152 },
153 }
154 }
155 Err(error_string) => TaskResult {
156 success: false,
157 result: None,
158 error: Some(TaskError {
159 error_type: "task_error".to_string(),
160 message: error_string,
161 }),
162 execution: TaskExecution {
163 task_name: self.policy.name.clone(),
164 duration_ms: start_time.elapsed().as_millis() as u64,
165 retries: self.policy.max_retries,
166 fuel_consumed: self.policy.compute.as_fuel()
167 - self.store.get_fuel().unwrap_or(0),
168 },
169 },
170 },
171 Err(e) => TaskResult {
172 success: false,
173 result: None,
174 error: Some(TaskError {
175 error_type: "wasm_error".to_string(),
176 message: e.to_string(),
177 }),
178 execution: TaskExecution {
179 task_name: self.policy.name.clone(),
180 duration_ms: start_time.elapsed().as_millis() as u64,
181 retries: self.policy.max_retries,
182 fuel_consumed: self.policy.compute.as_fuel()
183 - self.store.get_fuel().unwrap_or(0),
184 },
185 },
186 },
187 };
188
189 let state = if response.success {
190 InstanceState::Completed
191 } else {
192 InstanceState::Failed
193 };
194
195 runtime
196 .log
197 .update_log(UpdateInstanceLog {
198 task_id: self.task_id.clone(),
199 state,
200 fuel_consumed: response.execution.fuel_consumed,
201 })
202 .await?;
203
204 if !response.success {
205 let error_message = response
206 .error
207 .as_ref()
208 .map(|e| e.message.as_str())
209 .unwrap_or("Unknown error");
210
211 runtime
212 .task_reporter
213 .lock()
214 .await
215 .task_failed(&self.policy.name, error_message);
216 }
217
218 let json_output = serde_json::to_string(&response)
219 .map_err(|e| WasmRuntimeError::SerializationError(e.to_string()))?;
220
221 Ok(json_output)
222 }
223}