Skip to main content

capsule_core/wasm/commands/
run.rs

1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9use crate::wasm::utilities::task_config::{TaskError, TaskExecution, TaskResult};
10
11pub struct RunInstance {
12    task_id: String,
13    policy: ExecutionPolicy,
14    store: Store<State>,
15    instance: CapsuleAgent,
16    args_json: String,
17}
18
19impl RunInstance {
20    pub fn new(
21        task_id: String,
22        policy: ExecutionPolicy,
23        store: Store<State>,
24        instance: CapsuleAgent,
25        args_json: String,
26    ) -> Self {
27        Self {
28            task_id,
29            policy,
30            store,
31            instance,
32            args_json,
33        }
34    }
35}
36
37impl RuntimeCommand for RunInstance {
38    type Output = String;
39
40    async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
41        let start_time = std::time::Instant::now();
42
43        runtime
44            .log
45            .update_log(UpdateInstanceLog {
46                task_id: self.task_id.clone(),
47                state: InstanceState::Running,
48                fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
49            })
50            .await?;
51
52        let wasm_future = self
53            .instance
54            .capsule_host_task_runner()
55            .call_run(&mut self.store, &self.args_json);
56
57        let response = match self.policy.timeout_duration() {
58            Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
59                Ok(wasm_result) => match wasm_result {
60                    Ok(inner_result) => match inner_result {
61                        Ok(json_string) => {
62                            let result_object = serde_json::from_str(&json_string)
63                                .unwrap_or(serde_json::Value::String(json_string));
64
65                            let result = result_object
66                                .get("result")
67                                .cloned()
68                                .unwrap_or(result_object);
69
70                            TaskResult {
71                                success: true,
72                                result: Some(result),
73                                error: None,
74                                execution: TaskExecution {
75                                    task_name: self.policy.name.clone(),
76                                    duration_ms: start_time.elapsed().as_millis() as u64,
77                                    retries: self.policy.max_retries,
78                                    fuel_consumed: self.policy.compute.as_fuel()
79                                        - self.store.get_fuel().unwrap_or(0),
80                                },
81                            }
82                        }
83                        Err(error_string) => TaskResult {
84                            success: false,
85                            result: None,
86                            error: Some(TaskError {
87                                error_type: "task_error".to_string(),
88                                message: error_string,
89                            }),
90                            execution: TaskExecution {
91                                task_name: self.policy.name.clone(),
92                                duration_ms: start_time.elapsed().as_millis() as u64,
93                                retries: self.policy.max_retries,
94                                fuel_consumed: self.policy.compute.as_fuel()
95                                    - self.store.get_fuel().unwrap_or(0),
96                            },
97                        },
98                    },
99                    Err(e) => TaskResult {
100                        success: false,
101                        result: None,
102                        error: Some(TaskError {
103                            error_type: "Wasm_error".to_string(),
104                            message: e.to_string(),
105                        }),
106                        execution: TaskExecution {
107                            task_name: self.policy.name.clone(),
108                            duration_ms: start_time.elapsed().as_millis() as u64,
109                            retries: self.policy.max_retries,
110                            fuel_consumed: self.policy.compute.as_fuel()
111                                - self.store.get_fuel().unwrap_or(0),
112                        },
113                    },
114                },
115                Err(_elapsed) => TaskResult {
116                    success: false,
117                    result: None,
118                    error: Some(TaskError {
119                        error_type: "timeout".to_string(),
120                        message: format!("timeout after {}ms", duration.as_millis()),
121                    }),
122                    execution: TaskExecution {
123                        task_name: self.policy.name.clone(),
124                        duration_ms: start_time.elapsed().as_millis() as u64,
125                        retries: self.policy.max_retries,
126                        fuel_consumed: self.policy.compute.as_fuel()
127                            - self.store.get_fuel().unwrap_or(0),
128                    },
129                },
130            },
131            None => match wasm_future.await {
132                Ok(inner_result) => match inner_result {
133                    Ok(json_string) => {
134                        let result_object = serde_json::from_str(&json_string)
135                            .unwrap_or(serde_json::Value::String(json_string));
136
137                        let result = result_object
138                            .get("result")
139                            .cloned()
140                            .unwrap_or(result_object);
141
142                        TaskResult {
143                            success: true,
144                            result: Some(result),
145                            error: None,
146                            execution: TaskExecution {
147                                task_name: self.policy.name.clone(),
148                                duration_ms: start_time.elapsed().as_millis() as u64,
149                                retries: self.policy.max_retries,
150                                fuel_consumed: self.policy.compute.as_fuel()
151                                    - self.store.get_fuel().unwrap_or(0),
152                            },
153                        }
154                    }
155                    Err(error_string) => TaskResult {
156                        success: false,
157                        result: None,
158                        error: Some(TaskError {
159                            error_type: "task_error".to_string(),
160                            message: error_string,
161                        }),
162                        execution: TaskExecution {
163                            task_name: self.policy.name.clone(),
164                            duration_ms: start_time.elapsed().as_millis() as u64,
165                            retries: self.policy.max_retries,
166                            fuel_consumed: self.policy.compute.as_fuel()
167                                - self.store.get_fuel().unwrap_or(0),
168                        },
169                    },
170                },
171                Err(e) => TaskResult {
172                    success: false,
173                    result: None,
174                    error: Some(TaskError {
175                        error_type: "wasm_error".to_string(),
176                        message: e.to_string(),
177                    }),
178                    execution: TaskExecution {
179                        task_name: self.policy.name.clone(),
180                        duration_ms: start_time.elapsed().as_millis() as u64,
181                        retries: self.policy.max_retries,
182                        fuel_consumed: self.policy.compute.as_fuel()
183                            - self.store.get_fuel().unwrap_or(0),
184                    },
185                },
186            },
187        };
188
189        let state = if response.success {
190            InstanceState::Completed
191        } else {
192            InstanceState::Failed
193        };
194
195        runtime
196            .log
197            .update_log(UpdateInstanceLog {
198                task_id: self.task_id.clone(),
199                state,
200                fuel_consumed: response.execution.fuel_consumed,
201            })
202            .await?;
203
204        if !response.success {
205            let error_message = response
206                .error
207                .as_ref()
208                .map(|e| e.message.as_str())
209                .unwrap_or("Unknown error");
210
211            runtime
212                .task_reporter
213                .lock()
214                .await
215                .task_failed(&self.policy.name, error_message);
216        }
217
218        let json_output = serde_json::to_string(&response)
219            .map_err(|e| WasmRuntimeError::SerializationError(e.to_string()))?;
220
221        Ok(json_output)
222    }
223}