capsule_core/wasm/
state.rs1use std::sync::Arc;
2
3use anyhow::Result;
4use wasmtime::component::{ResourceTable, bindgen};
5use wasmtime::{ResourceLimiter, StoreLimits};
6use wasmtime_wasi::{WasiCtx, WasiView};
7use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
8
9use crate::wasm::commands::create::CreateInstance;
10use crate::wasm::commands::run::RunInstance;
11use crate::wasm::runtime::Runtime;
12use crate::wasm::utilities::task_config::TaskConfig;
13
14use capsule::host::api::{Host, HttpError, HttpResponse, TaskError};
15
16bindgen!({
17 path: "./capsule-wit",
18 world: "capsule-agent",
19 async: true,
20});
21
22pub use capsule::host::api as host_api;
23
24pub struct State {
25 pub ctx: WasiCtx,
26 pub http_ctx: WasiHttpCtx,
27 pub table: ResourceTable,
28 pub limits: StoreLimits,
29 pub runtime: Option<Arc<Runtime>>,
30}
31
32impl WasiView for State {
33 fn ctx(&mut self) -> &mut WasiCtx {
34 &mut self.ctx
35 }
36 fn table(&mut self) -> &mut ResourceTable {
37 &mut self.table
38 }
39}
40
41impl WasiHttpView for State {
42 fn ctx(&mut self) -> &mut WasiHttpCtx {
43 &mut self.http_ctx
44 }
45 fn table(&mut self) -> &mut ResourceTable {
46 &mut self.table
47 }
48}
49
50impl Host for State {
51 async fn schedule_task(
52 &mut self,
53 name: String,
54 args: String,
55 config: String,
56 ) -> Result<String, TaskError> {
57 let runtime = match &self.runtime {
58 Some(r) => Arc::clone(r),
59 None => {
60 return Err(TaskError::InternalError(
61 "No runtime available for recursive task execution".to_string(),
62 ));
63 }
64 };
65
66 let task_config: TaskConfig = serde_json::from_str(&config).unwrap_or_default();
67 let policy = task_config.to_execution_policy();
68 let max_retries = policy.max_retries;
69
70 let mut last_error: Option<String> = None;
71
72 for attempt in 0..=max_retries {
73 let create_cmd = CreateInstance::new(policy.clone(), vec![]).task_name(&name);
74
75 let (store, instance, task_id) = match runtime.execute(create_cmd).await {
76 Ok(result) => result,
77 Err(e) => {
78 runtime
79 .task_reporter
80 .lock()
81 .await
82 .task_failed(&name, &e.to_string());
83 last_error = Some(format!("Failed to create instance: {}", e));
84 continue;
85 }
86 };
87
88 let args_json = format!(
89 r#"{{"task_name": "{}", "args": {}, "kwargs": {{}}}}"#,
90 name, args
91 );
92
93 runtime
94 .task_reporter
95 .lock()
96 .await
97 .task_running(&name, &task_id);
98
99 let start_time = std::time::Instant::now();
100
101 let run_cmd = RunInstance::new(task_id, policy.clone(), store, instance, args_json);
102
103 match runtime.execute(run_cmd).await {
104 Ok(result) => {
105 if result.is_empty() {
106 last_error = Some("Task failed".to_string());
107 if attempt < max_retries {
108 continue;
109 }
110 } else {
111 let elapsed = start_time.elapsed();
112 runtime
113 .task_reporter
114 .lock()
115 .await
116 .task_completed_with_time(&name, elapsed);
117 return Ok(result);
118 }
119 }
120 Err(e) => {
121 runtime
122 .task_reporter
123 .lock()
124 .await
125 .task_failed(&name, &e.to_string());
126 last_error = Some(format!("Failed to run instance: {}", e));
127 if attempt < max_retries {
128 continue;
129 }
130 }
131 }
132 }
133
134 Ok(last_error.unwrap_or_else(|| "Unknown error".to_string()))
135 }
136
137 async fn http_request(
138 &mut self,
139 method: String,
140 url: String,
141 headers: Vec<(String, String)>,
142 body: Option<String>,
143 ) -> Result<HttpResponse, HttpError> {
144 let client = reqwest::Client::new();
145
146 let mut request_builder = match method.to_uppercase().as_str() {
147 "GET" => client.get(&url),
148 "POST" => client.post(&url),
149 "PUT" => client.put(&url),
150 "DELETE" => client.delete(&url),
151 "PATCH" => client.patch(&url),
152 "HEAD" => client.head(&url),
153 _ => {
154 return Err(HttpError::InvalidUrl(format!(
155 "Unsupported method: {}",
156 method
157 )));
158 }
159 };
160
161 for (key, value) in headers {
162 request_builder = request_builder.header(key, value);
163 }
164
165 if let Some(body_content) = body {
166 request_builder = request_builder.body(body_content);
167 }
168
169 let response = request_builder
170 .send()
171 .await
172 .map_err(|e| HttpError::NetworkError(e.to_string()))?;
173
174 let status = response.status().as_u16();
175 let response_headers: Vec<(String, String)> = response
176 .headers()
177 .iter()
178 .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
179 .collect();
180
181 let body_text = response
182 .text()
183 .await
184 .map_err(|e| HttpError::NetworkError(e.to_string()))?;
185
186 Ok(HttpResponse {
187 status,
188 headers: response_headers,
189 body: body_text,
190 })
191 }
192}
193
194impl ResourceLimiter for State {
195 fn memory_growing(
196 &mut self,
197 current: usize,
198 desired: usize,
199 maximum: Option<usize>,
200 ) -> Result<bool> {
201 self.limits.memory_growing(current, desired, maximum)
202 }
203
204 fn table_growing(
205 &mut self,
206 current: usize,
207 desired: usize,
208 maximum: Option<usize>,
209 ) -> Result<bool> {
210 self.limits.table_growing(current, desired, maximum)
211 }
212}