Skip to main content

capsule_core/wasm/commands/
run.rs

1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9use crate::wasm::utilities::task_config::{TaskError, TaskExecution, TaskResult};
10
11pub struct RunInstance {
12    task_id: String,
13    policy: ExecutionPolicy,
14    store: Store<State>,
15    instance: CapsuleAgent,
16    args_json: String,
17}
18
19impl RunInstance {
20    pub fn new(
21        task_id: String,
22        policy: ExecutionPolicy,
23        store: Store<State>,
24        instance: CapsuleAgent,
25        args_json: String,
26    ) -> Self {
27        Self {
28            task_id,
29            policy,
30            store,
31            instance,
32            args_json,
33        }
34    }
35}
36
37impl RuntimeCommand for RunInstance {
38    type Output = String;
39
40    async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
41        runtime
42            .log
43            .update_log(UpdateInstanceLog {
44                task_id: self.task_id.clone(),
45                state: InstanceState::Running,
46                fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
47            })
48            .await?;
49
50        let wasm_future = self
51            .instance
52            .capsule_host_task_runner()
53            .call_run(&mut self.store, &self.args_json);
54
55        let response = match self.policy.timeout_duration() {
56            Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
57                Ok(wasm_result) => match wasm_result {
58                    Ok(inner_result) => match inner_result {
59                        Ok(json_string) => {
60                            let result_object = serde_json::from_str(&json_string)
61                                .unwrap_or(serde_json::Value::String(json_string));
62
63                            let result = result_object
64                                .get("result")
65                                .cloned()
66                                .unwrap_or(result_object);
67
68                            TaskResult {
69                                success: true,
70                                result: Some(result),
71                                error: None,
72                                execution: TaskExecution {
73                                    task_name: self.policy.name.clone(),
74                                    duration_ms: duration.as_millis() as u64,
75                                    retries: self.policy.max_retries,
76                                    fuel_consumed: self.policy.compute.as_fuel()
77                                        - self.store.get_fuel().unwrap_or(0),
78                                },
79                            }
80                        }
81                        Err(error_string) => TaskResult {
82                            success: false,
83                            result: None,
84                            error: Some(TaskError {
85                                error_type: "task_error".to_string(),
86                                message: error_string,
87                            }),
88                            execution: TaskExecution {
89                                task_name: self.policy.name.clone(),
90                                duration_ms: duration.as_millis() as u64,
91                                retries: self.policy.max_retries,
92                                fuel_consumed: self.policy.compute.as_fuel()
93                                    - self.store.get_fuel().unwrap_or(0),
94                            },
95                        },
96                    },
97                    Err(e) => TaskResult {
98                        success: false,
99                        result: None,
100                        error: Some(TaskError {
101                            error_type: "Wasm_error".to_string(),
102                            message: e.to_string(),
103                        }),
104                        execution: TaskExecution {
105                            task_name: self.policy.name.clone(),
106                            duration_ms: duration.as_millis() as u64,
107                            retries: self.policy.max_retries,
108                            fuel_consumed: self.policy.compute.as_fuel()
109                                - self.store.get_fuel().unwrap_or(0),
110                        },
111                    },
112                },
113                Err(_elapsed) => TaskResult {
114                    success: false,
115                    result: None,
116                    error: Some(TaskError {
117                        error_type: "timeout".to_string(),
118                        message: format!("timeout after {}ms", duration.as_millis()),
119                    }),
120                    execution: TaskExecution {
121                        task_name: self.policy.name.clone(),
122                        duration_ms: duration.as_millis() as u64,
123                        retries: self.policy.max_retries,
124                        fuel_consumed: self.policy.compute.as_fuel()
125                            - self.store.get_fuel().unwrap_or(0),
126                    },
127                },
128            },
129            None => match wasm_future.await {
130                Ok(inner_result) => match inner_result {
131                    Ok(json_string) => {
132                        let result_object = serde_json::from_str(&json_string)
133                            .unwrap_or(serde_json::Value::String(json_string));
134
135                        let result = result_object
136                            .get("result")
137                            .cloned()
138                            .unwrap_or(result_object);
139
140                        TaskResult {
141                            success: true,
142                            result: Some(result),
143                            error: None,
144                            execution: TaskExecution {
145                                task_name: self.policy.name.clone(),
146                                duration_ms: 0,
147                                retries: self.policy.max_retries,
148                                fuel_consumed: self.policy.compute.as_fuel()
149                                    - self.store.get_fuel().unwrap_or(0),
150                            },
151                        }
152                    }
153                    Err(error_string) => TaskResult {
154                        success: false,
155                        result: None,
156                        error: Some(TaskError {
157                            error_type: "task_error".to_string(),
158                            message: error_string,
159                        }),
160                        execution: TaskExecution {
161                            task_name: self.policy.name.clone(),
162                            duration_ms: 0,
163                            retries: self.policy.max_retries,
164                            fuel_consumed: self.policy.compute.as_fuel()
165                                - self.store.get_fuel().unwrap_or(0),
166                        },
167                    },
168                },
169                Err(e) => TaskResult {
170                    success: false,
171                    result: None,
172                    error: Some(TaskError {
173                        error_type: "wasm_error".to_string(),
174                        message: e.to_string(),
175                    }),
176                    execution: TaskExecution {
177                        task_name: self.policy.name.clone(),
178                        duration_ms: 0,
179                        retries: self.policy.max_retries,
180                        fuel_consumed: self.policy.compute.as_fuel()
181                            - self.store.get_fuel().unwrap_or(0),
182                    },
183                },
184            },
185        };
186
187        let state = if response.success {
188            InstanceState::Completed
189        } else {
190            InstanceState::Failed
191        };
192
193        runtime
194            .log
195            .update_log(UpdateInstanceLog {
196                task_id: self.task_id.clone(),
197                state,
198                fuel_consumed: response.execution.fuel_consumed,
199            })
200            .await?;
201
202        if !response.success {
203            let error_message = response
204                .error
205                .as_ref()
206                .map(|e| e.message.as_str())
207                .unwrap_or("Unknown error");
208
209            runtime
210                .task_reporter
211                .lock()
212                .await
213                .task_failed(&self.policy.name, error_message);
214        }
215
216        let json_output = serde_json::to_string(&response)
217            .map_err(|e| WasmRuntimeError::SerializationError(e.to_string()))?;
218
219        Ok(json_output)
220    }
221}