capsule_core/wasm/
state.rs

1use std::sync::Arc;
2
3use anyhow::Result;
4use wasmtime::component::{ResourceTable, bindgen};
5use wasmtime::{ResourceLimiter, StoreLimits};
6use wasmtime_wasi::{WasiCtx, WasiView};
7use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView};
8
9use crate::wasm::commands::create::CreateInstance;
10use crate::wasm::commands::run::RunInstance;
11use crate::wasm::runtime::Runtime;
12use crate::wasm::utilities::task_config::TaskConfig;
13
14use capsule::host::api::{Host, HttpError, HttpResponse, TaskError};
15
16bindgen!({
17    path: "./capsule-wit",
18    world: "capsule-agent",
19    async: true,
20});
21
22pub use capsule::host::api as host_api;
23
24pub struct State {
25    pub ctx: WasiCtx,
26    pub http_ctx: WasiHttpCtx,
27    pub table: ResourceTable,
28    pub limits: StoreLimits,
29    pub runtime: Option<Arc<Runtime>>,
30}
31
32impl WasiView for State {
33    fn ctx(&mut self) -> &mut WasiCtx {
34        &mut self.ctx
35    }
36    fn table(&mut self) -> &mut ResourceTable {
37        &mut self.table
38    }
39}
40
41impl WasiHttpView for State {
42    fn ctx(&mut self) -> &mut WasiHttpCtx {
43        &mut self.http_ctx
44    }
45    fn table(&mut self) -> &mut ResourceTable {
46        &mut self.table
47    }
48}
49
50impl Host for State {
51    async fn schedule_task(
52        &mut self,
53        name: String,
54        args: String,
55        config: String,
56    ) -> Result<String, TaskError> {
57        let runtime = match &self.runtime {
58            Some(r) => Arc::clone(r),
59            None => {
60                return Err(TaskError::InternalError(
61                    "No runtime available for recursive task execution".to_string(),
62                ));
63            }
64        };
65
66        let task_config: TaskConfig = serde_json::from_str(&config).unwrap_or_default();
67        let policy = task_config.to_execution_policy();
68        let max_retries = policy.max_retries;
69
70        let mut last_error: Option<String> = None;
71
72        for attempt in 0..=max_retries {
73            let create_cmd = CreateInstance::new(policy.clone(), vec![]).task_name(&name);
74
75            let (store, instance, task_id) = match runtime.execute(create_cmd).await {
76                Ok(result) => result,
77                Err(e) => {
78                    runtime
79                        .task_reporter
80                        .lock()
81                        .await
82                        .task_failed(&name, &e.to_string());
83                    last_error = Some(format!("Failed to create instance: {}", e));
84                    continue;
85                }
86            };
87
88            let args_json = format!(
89                r#"{{"task_name": "{}", "args": {}, "kwargs": {{}}}}"#,
90                name, args
91            );
92
93            runtime
94                .task_reporter
95                .lock()
96                .await
97                .task_running(&name, &task_id);
98
99            let start_time = std::time::Instant::now();
100
101            let run_cmd = RunInstance::new(task_id, policy.clone(), store, instance, args_json);
102
103            match runtime.execute(run_cmd).await {
104                Ok(result) => {
105                    if result.is_empty() {
106                        last_error = Some("Task failed".to_string());
107                        if attempt < max_retries {
108                            continue;
109                        }
110                    } else {
111                        let elapsed = start_time.elapsed();
112                        runtime
113                            .task_reporter
114                            .lock()
115                            .await
116                            .task_completed_with_time(&name, elapsed);
117                        return Ok(result);
118                    }
119                }
120                Err(e) => {
121                    runtime
122                        .task_reporter
123                        .lock()
124                        .await
125                        .task_failed(&name, &e.to_string());
126                    last_error = Some(format!("Failed to run instance: {}", e));
127                    if attempt < max_retries {
128                        continue;
129                    }
130                }
131            }
132        }
133
134        Ok(last_error.unwrap_or_else(|| "Unknown error".to_string()))
135    }
136
137    async fn http_request(
138        &mut self,
139        method: String,
140        url: String,
141        headers: Vec<(String, String)>,
142        body: Option<String>,
143    ) -> Result<HttpResponse, HttpError> {
144        let client = reqwest::Client::new();
145
146        let mut request_builder = match method.to_uppercase().as_str() {
147            "GET" => client.get(&url),
148            "POST" => client.post(&url),
149            "PUT" => client.put(&url),
150            "DELETE" => client.delete(&url),
151            "PATCH" => client.patch(&url),
152            "HEAD" => client.head(&url),
153            _ => {
154                return Err(HttpError::InvalidUrl(format!(
155                    "Unsupported method: {}",
156                    method
157                )));
158            }
159        };
160
161        for (key, value) in headers {
162            request_builder = request_builder.header(key, value);
163        }
164
165        if let Some(body_content) = body {
166            request_builder = request_builder.body(body_content);
167        }
168
169        let response = request_builder
170            .send()
171            .await
172            .map_err(|e| HttpError::NetworkError(e.to_string()))?;
173
174        let status = response.status().as_u16();
175        let response_headers: Vec<(String, String)> = response
176            .headers()
177            .iter()
178            .map(|(k, v)| (k.to_string(), v.to_str().unwrap_or("").to_string()))
179            .collect();
180
181        let body_text = response
182            .text()
183            .await
184            .map_err(|e| HttpError::NetworkError(e.to_string()))?;
185
186        Ok(HttpResponse {
187            status,
188            headers: response_headers,
189            body: body_text,
190        })
191    }
192}
193
194impl ResourceLimiter for State {
195    fn memory_growing(
196        &mut self,
197        current: usize,
198        desired: usize,
199        maximum: Option<usize>,
200    ) -> Result<bool> {
201        self.limits.memory_growing(current, desired, maximum)
202    }
203
204    fn table_growing(
205        &mut self,
206        current: usize,
207        desired: usize,
208        maximum: Option<usize>,
209    ) -> Result<bool> {
210        self.limits.table_growing(current, desired, maximum)
211    }
212}