capsule_core/wasm/commands/
run.rs1use std::sync::Arc;
2
3use wasmtime::Store;
4
5use crate::config::log::{InstanceState, UpdateInstanceLog};
6use crate::wasm::execution_policy::ExecutionPolicy;
7use crate::wasm::runtime::{Runtime, RuntimeCommand, WasmRuntimeError};
8use crate::wasm::state::{CapsuleAgent, State};
9
10pub struct RunInstance {
11 task_id: String,
12 policy: ExecutionPolicy,
13 store: Store<State>,
14 instance: CapsuleAgent,
15 args_json: String,
16}
17
18impl RunInstance {
19 pub fn new(
20 task_id: String,
21 policy: ExecutionPolicy,
22 store: Store<State>,
23 instance: CapsuleAgent,
24 args_json: String,
25 ) -> Self {
26 Self {
27 task_id,
28 policy,
29 store,
30 instance,
31 args_json,
32 }
33 }
34}
35
36impl RuntimeCommand for RunInstance {
37 type Output = String;
38
39 async fn execute(mut self, runtime: Arc<Runtime>) -> Result<Self::Output, WasmRuntimeError> {
40 runtime
41 .log
42 .update_log(UpdateInstanceLog {
43 task_id: self.task_id.clone(),
44 state: InstanceState::Running,
45 fuel_consumed: self.policy.compute.as_fuel() - self.store.get_fuel().unwrap_or(0),
46 })
47 .await?;
48
49 let wasm_future = self
50 .instance
51 .capsule_host_task_runner()
52 .call_run(&mut self.store, &self.args_json);
53
54 let result = match self.policy.timeout_duration() {
55 Some(duration) => match tokio::time::timeout(duration, wasm_future).await {
56 Ok(inner_result) => inner_result,
57 Err(_elapsed) => {
58 runtime
59 .log
60 .update_log(UpdateInstanceLog {
61 task_id: self.task_id.clone(),
62 state: InstanceState::TimedOut,
63 fuel_consumed: self.policy.compute.as_fuel()
64 - self.store.get_fuel().unwrap_or(0),
65 })
66 .await?;
67
68 runtime
69 .task_reporter
70 .lock()
71 .await
72 .task_failed(&self.policy.name, "Timed out");
73
74 return Ok(String::new());
75 }
76 },
77 None => wasm_future.await,
78 };
79
80 match result {
81 Ok(Ok(value)) => {
82 runtime
83 .log
84 .update_log(UpdateInstanceLog {
85 task_id: self.task_id,
86 state: InstanceState::Completed,
87 fuel_consumed: self.policy.compute.as_fuel()
88 - self.store.get_fuel().unwrap_or(0),
89 })
90 .await?;
91 Ok(value)
92 }
93 Ok(Err(error_msg)) => {
94 runtime
95 .log
96 .update_log(UpdateInstanceLog {
97 task_id: self.task_id,
98 state: InstanceState::Failed,
99 fuel_consumed: self.policy.compute.as_fuel()
100 - self.store.get_fuel().unwrap_or(0),
101 })
102 .await?;
103
104 runtime
105 .task_reporter
106 .lock()
107 .await
108 .task_failed(&self.policy.name, &error_msg);
109
110 Ok(String::new())
111 }
112 Err(e) => {
113 runtime
114 .log
115 .update_log(UpdateInstanceLog {
116 task_id: self.task_id,
117 state: InstanceState::Failed,
118 fuel_consumed: self.policy.compute.as_fuel()
119 - self.store.get_fuel().unwrap_or(0),
120 })
121 .await?;
122
123 runtime
124 .task_reporter
125 .lock()
126 .await
127 .task_failed(&self.policy.name, &e.to_string());
128
129 Ok(String::new())
130 }
131 }
132 }
133}