capsule_core/wasm/commands/
run.rs

1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9
10pub struct RunInstance {
11    task_id: String,
12    policy: ExecutionPolicy,
13    store: Store<State>,
14    instance: CapsuleAgent,
15    args_json: String,
16}
17
18impl RunInstance {
19    pub fn new(
20        task_id: String,
21        policy: ExecutionPolicy,
22        store: Store<State>,
23        instance: CapsuleAgent,
24        args_json: String,
25    ) -> Self {
26        Self {
27            task_id,
28            policy,
29            store,
30            instance,
31            args_json,
32        }
33    }
34}
35
36impl RuntimeCommand for RunInstance {
37    type Output = String;
38
39    async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
40        runtime
41            .log
42            .update_log(UpdateInstanceLog {
43                task_id: self.task_id.clone(),
44                state: InstanceState::Running,
45                fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
46            })
47            .await?;
48
49        let wasm_future = self
50            .instance
51            .capsule_host_task_runner()
52            .call_run(&mut self.store, &self.args_json);
53
54        let result = match self.policy.timeout_duration() {
55            Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
56                Ok(inner_result) => inner_result,
57                Err(_elapsed) => {
58                    runtime
59                        .log
60                        .update_log(UpdateInstanceLog {
61                            task_id: self.task_id.clone(),
62                            state: InstanceState::TimedOut,
63                            fuel_consumed: self.policy.compute.as_fuel()
64                                - self.store.get_fuel().unwrap_or(0),
65                        })
66                        .await?;
67
68                    runtime
69                        .task_reporter
70                        .lock()
71                        .await
72                        .task_failed(&self.policy.name, "Timed out");
73
74                    return Ok(String::new());
75                }
76            },
77            None => wasm_future.await,
78        };
79
80        match result {
81            Ok(Ok(value)) => {
82                runtime
83                    .log
84                    .update_log(UpdateInstanceLog {
85                        task_id: self.task_id,
86                        state: InstanceState::Completed,
87                        fuel_consumed: self.policy.compute.as_fuel()
88                            - self.store.get_fuel().unwrap_or(0),
89                    })
90                    .await?;
91                Ok(value)
92            }
93            Ok(Err(error_msg)) => {
94                runtime
95                    .log
96                    .update_log(UpdateInstanceLog {
97                        task_id: self.task_id,
98                        state: InstanceState::Failed,
99                        fuel_consumed: self.policy.compute.as_fuel()
100                            - self.store.get_fuel().unwrap_or(0),
101                    })
102                    .await?;
103
104                runtime
105                    .task_reporter
106                    .lock()
107                    .await
108                    .task_failed(&self.policy.name, &error_msg);
109
110                Ok(String::new())
111            }
112            Err(e) => {
113                runtime
114                    .log
115                    .update_log(UpdateInstanceLog {
116                        task_id: self.task_id,
117                        state: InstanceState::Failed,
118                        fuel_consumed: self.policy.compute.as_fuel()
119                            - self.store.get_fuel().unwrap_or(0),
120                    })
121                    .await?;
122
123                runtime
124                    .task_reporter
125                    .lock()
126                    .await
127                    .task_failed(&self.policy.name, &e.to_string());
128
129                Ok(String::new())
130            }
131        }
132    }
133}