Skip to main content

capsule_core/wasm/commands/
run.rs

1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9use crate::wasm::utilities::task_config::{TaskError, TaskExecution, TaskResult};
10
11pub struct RunInstance {
12    task_id: String,
13    policy: ExecutionPolicy,
14    store: Store<State>,
15    instance: CapsuleAgent,
16    args_json: String,
17}
18
19impl RunInstance {
20    pub fn new(
21        task_id: String,
22        policy: ExecutionPolicy,
23        store: Store<State>,
24        instance: CapsuleAgent,
25        args_json: String,
26    ) -> Self {
27        Self {
28            task_id,
29            policy,
30            store,
31            instance,
32            args_json,
33        }
34    }
35}
36
37impl RuntimeCommand for RunInstance {
38    type Output = String;
39
40    async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
41        let start_time = std::time::Instant::now();
42
43        runtime
44            .log
45            .update_log(UpdateInstanceLog {
46                task_id: self.task_id.clone(),
47                state: InstanceState::Running,
48                fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
49            })
50            .await?;
51
52        let wasm_future = self
53            .instance
54            .capsule_host_task_runner()
55            .call_run(&mut self.store, &self.args_json);
56
57        let response = match self.policy.timeout_duration() {
58            Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
59                Ok(wasm_result) => match wasm_result {
60                    Ok(inner_result) => match inner_result {
61                        Ok(json_string) => {
62                            let result_object = serde_json::from_str(&json_string)
63                                .unwrap_or(serde_json::Value::String(json_string));
64
65                            let result = result_object
66                                .get("result")
67                                .cloned()
68                                .unwrap_or(result_object);
69
70                            TaskResult {
71                                success: true,
72                                result: Some(result),
73                                error: None,
74                                execution: TaskExecution {
75                                    task_name: self.policy.name.clone(),
76                                    duration_ms: start_time.elapsed().as_millis() as u64,
77                                    retries: self.policy.max_retries,
78                                    fuel_consumed: self.policy.compute.as_fuel()
79                                        - self.store.get_fuel().unwrap_or(0),
80                                },
81                            }
82                        }
83                        Err(error_string) => TaskResult {
84                            success: false,
85                            result: None,
86                            error: Some(TaskError {
87                                error_type: "task_error".to_string(),
88                                message: error_string,
89                            }),
90                            execution: TaskExecution {
91                                task_name: self.policy.name.clone(),
92                                duration_ms: start_time.elapsed().as_millis() as u64,
93                                retries: self.policy.max_retries,
94                                fuel_consumed: self.policy.compute.as_fuel()
95                                    - self.store.get_fuel().unwrap_or(0),
96                            },
97                        },
98                    },
99                    Err(e) => TaskResult {
100                        success: false,
101                        result: None,
102                        error: Some(TaskError {
103                            error_type: "Wasm_error".to_string(),
104                            message: e.to_string(),
105                        }),
106                        execution: TaskExecution {
107                            task_name: self.policy.name.clone(),
108                            duration_ms: start_time.elapsed().as_millis() as u64,
109                            retries: self.policy.max_retries,
110                            fuel_consumed: self.policy.compute.as_fuel()
111                                - self.store.get_fuel().unwrap_or(0),
112                        },
113                    },
114                },
115                Err(_elapsed) => TaskResult {
116                    success: false,
117                    result: None,
118                    error: Some(TaskError {
119                        error_type: "timeout".to_string(),
120                        message: format!("timeout after {}ms", duration.as_millis()),
121                    }),
122                    execution: TaskExecution {
123                        task_name: self.policy.name.clone(),
124                        duration_ms: start_time.elapsed().as_millis() as u64,
125                        retries: self.policy.max_retries,
126                        fuel_consumed: self.policy.compute.as_fuel()
127                            - self.store.get_fuel().unwrap_or(0),
128                    },
129                },
130            },
131            None => match wasm_future.await {
132                Ok(inner_result) => match inner_result {
133                    Ok(json_string) => {
134                        let result_object = serde_json::from_str(&json_string)
135                            .unwrap_or(serde_json::Value::String(json_string));
136
137                        let result = result_object
138                            .get("result")
139                            .cloned()
140                            .unwrap_or(result_object);
141
142                        if result.get("error_type").is_some() && result.get("message").is_some() {
143                            TaskResult {
144                                success: false,
145                                result: None,
146                                error: Some(TaskError {
147                                    error_type: result
148                                        .get("error_type")
149                                        .and_then(|v| v.as_str())
150                                        .unwrap_or_default()
151                                        .to_string(),
152                                    message: result
153                                        .get("message")
154                                        .and_then(|v| v.as_str())
155                                        .unwrap_or_default()
156                                        .to_string(),
157                                }),
158                                execution: TaskExecution {
159                                    task_name: self.policy.name.clone(),
160                                    duration_ms: start_time.elapsed().as_millis() as u64,
161                                    retries: self.policy.max_retries,
162                                    fuel_consumed: self.policy.compute.as_fuel()
163                                        - self.store.get_fuel().unwrap_or(0),
164                                },
165                            }
166                        } else {
167                            TaskResult {
168                                success: true,
169                                result: Some(result),
170                                error: None,
171                                execution: TaskExecution {
172                                    task_name: self.policy.name.clone(),
173                                    duration_ms: start_time.elapsed().as_millis() as u64,
174                                    retries: self.policy.max_retries,
175                                    fuel_consumed: self.policy.compute.as_fuel()
176                                        - self.store.get_fuel().unwrap_or(0),
177                                },
178                            }
179                        }
180                    }
181                    Err(error_string) => TaskResult {
182                        success: false,
183                        result: None,
184                        error: Some(TaskError {
185                            error_type: "task_error".to_string(),
186                            message: error_string,
187                        }),
188                        execution: TaskExecution {
189                            task_name: self.policy.name.clone(),
190                            duration_ms: start_time.elapsed().as_millis() as u64,
191                            retries: self.policy.max_retries,
192                            fuel_consumed: self.policy.compute.as_fuel()
193                                - self.store.get_fuel().unwrap_or(0),
194                        },
195                    },
196                },
197                Err(e) => TaskResult {
198                    success: false,
199                    result: None,
200                    error: Some(TaskError {
201                        error_type: "wasm_error".to_string(),
202                        message: e.to_string(),
203                    }),
204                    execution: TaskExecution {
205                        task_name: self.policy.name.clone(),
206                        duration_ms: start_time.elapsed().as_millis() as u64,
207                        retries: self.policy.max_retries,
208                        fuel_consumed: self.policy.compute.as_fuel()
209                            - self.store.get_fuel().unwrap_or(0),
210                    },
211                },
212            },
213        };
214
215        let state = if response.success {
216            InstanceState::Completed
217        } else {
218            InstanceState::Failed
219        };
220
221        runtime
222            .log
223            .update_log(UpdateInstanceLog {
224                task_id: self.task_id.clone(),
225                state,
226                fuel_consumed: response.execution.fuel_consumed,
227            })
228            .await?;
229
230        if !response.success {
231            let error_message = response
232                .error
233                .as_ref()
234                .map(|e| e.message.as_str())
235                .unwrap_or("Unknown error");
236
237            runtime
238                .task_reporter
239                .lock()
240                .await
241                .task_failed(&self.policy.name, error_message);
242        }
243
244        let json_output = serde_json::to_string(&response)
245            .map_err(|e| WasmRuntimeError::SerializationError(e.to_string()))?;
246
247        Ok(json_output)
248    }
249}