1use std::collections::HashMap;
2use std::io::{self, BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
5
6use bytes::Bytes;
7use kapsl_rag_sdk::protocol::{ConnectorRequest, ConnectorRequestKind, ConnectorResponse};
8use wasmtime::{Engine, Linker, Module, Store};
9use wasmtime_wasi::preview2::pipe::{MemoryInputPipe, MemoryOutputPipe};
10use wasmtime_wasi::preview2::preview1::{
11 add_to_linker_sync, WasiPreview1Adapter, WasiPreview1View,
12};
13use wasmtime_wasi::preview2::{DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView};
14
15#[derive(thiserror::Error, Debug)]
16pub enum RuntimeError {
17 #[error("io error: {0}")]
18 Io(String),
19 #[error("serialization error: {0}")]
20 Serialization(String),
21 #[error("connector exited")]
22 ConnectorExited,
23 #[error("wasm error: {0}")]
24 Wasm(String),
25}
26
27impl From<io::Error> for RuntimeError {
28 fn from(err: io::Error) -> Self {
29 RuntimeError::Io(err.to_string())
30 }
31}
32
33impl From<serde_json::Error> for RuntimeError {
34 fn from(err: serde_json::Error) -> Self {
35 RuntimeError::Serialization(err.to_string())
36 }
37}
38
39pub trait ConnectorRuntime {
40 fn send(&mut self, request: ConnectorRequest) -> Result<ConnectorResponse, RuntimeError>;
41 fn close(&mut self) -> Result<(), RuntimeError>;
42}
43
44struct WasiState {
45 table: Table,
46 ctx: WasiCtx,
47 adapter: WasiPreview1Adapter,
48}
49
50impl WasiState {
51 fn new(ctx: WasiCtx) -> Self {
52 Self {
53 table: Table::new(),
54 ctx,
55 adapter: WasiPreview1Adapter::new(),
56 }
57 }
58}
59
60impl WasiView for WasiState {
61 fn table(&self) -> &Table {
62 &self.table
63 }
64
65 fn table_mut(&mut self) -> &mut Table {
66 &mut self.table
67 }
68
69 fn ctx(&self) -> &WasiCtx {
70 &self.ctx
71 }
72
73 fn ctx_mut(&mut self) -> &mut WasiCtx {
74 &mut self.ctx
75 }
76}
77
78impl WasiPreview1View for WasiState {
79 fn adapter(&self) -> &WasiPreview1Adapter {
80 &self.adapter
81 }
82
83 fn adapter_mut(&mut self) -> &mut WasiPreview1Adapter {
84 &mut self.adapter
85 }
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct WasiPermissions {
90 pub preopen_dirs: Vec<PreopenDir>,
91 pub env: HashMap<String, String>,
92}
93
94impl WasiPermissions {
95 pub fn allow_dir(
96 mut self,
97 host_path: impl Into<PathBuf>,
98 guest_path: impl Into<String>,
99 read_only: bool,
100 ) -> Self {
101 self.preopen_dirs.push(PreopenDir {
102 host_path: host_path.into(),
103 guest_path: guest_path.into(),
104 read_only,
105 });
106 self
107 }
108
109 pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
110 self.env.insert(key.into(), value.into());
111 self
112 }
113}
114
115#[derive(Debug, Clone)]
116pub struct PreopenDir {
117 pub host_path: PathBuf,
118 pub guest_path: String,
119 pub read_only: bool,
120}
121
122pub struct ConnectorClient<R: ConnectorRuntime> {
123 runtime: R,
124 next_id: u64,
125}
126
127impl<R: ConnectorRuntime> ConnectorClient<R> {
128 pub fn new(runtime: R) -> Self {
129 Self {
130 runtime,
131 next_id: 1,
132 }
133 }
134
135 pub fn request(
136 &mut self,
137 kind: ConnectorRequestKind,
138 ) -> Result<ConnectorResponse, RuntimeError> {
139 let id = format!("req-{}", self.next_id);
140 self.next_id += 1;
141 let request = ConnectorRequest { id, kind };
142 self.runtime.send(request)
143 }
144
145 pub fn shutdown(&mut self) -> Result<(), RuntimeError> {
146 self.runtime.close()
147 }
148}
149
150pub struct SidecarConnectorRuntime {
151 child: Child,
152 stdin: ChildStdin,
153 stdout: BufReader<ChildStdout>,
154}
155
156impl SidecarConnectorRuntime {
157 pub fn spawn(path: &Path) -> Result<Self, RuntimeError> {
158 let mut child = Command::new(path)
159 .stdin(Stdio::piped())
160 .stdout(Stdio::piped())
161 .stderr(Stdio::inherit())
162 .spawn()?;
163
164 let stdin = child.stdin.take().ok_or(RuntimeError::ConnectorExited)?;
165 let stdout = child.stdout.take().ok_or(RuntimeError::ConnectorExited)?;
166 Ok(Self {
167 child,
168 stdin,
169 stdout: BufReader::new(stdout),
170 })
171 }
172}
173
174impl ConnectorRuntime for SidecarConnectorRuntime {
175 fn send(&mut self, request: ConnectorRequest) -> Result<ConnectorResponse, RuntimeError> {
176 let json = serde_json::to_string(&request)?;
177 self.stdin.write_all(json.as_bytes())?;
178 self.stdin.write_all(b"\n")?;
179 self.stdin.flush()?;
180
181 let mut line = String::new();
182 let bytes = self.stdout.read_line(&mut line)?;
183 if bytes == 0 {
184 return Err(RuntimeError::ConnectorExited);
185 }
186 let response = serde_json::from_str(&line)?;
187 Ok(response)
188 }
189
190 fn close(&mut self) -> Result<(), RuntimeError> {
191 let _ = self.child.id();
192 let _ = self.child.kill();
193 let _ = self.child.wait();
194 Ok(())
195 }
196}
197
198pub struct WasmConnectorRuntime {
199 engine: Engine,
200 module: Module,
201 permissions: WasiPermissions,
202}
203
204impl WasmConnectorRuntime {
205 pub fn spawn(path: &Path) -> Result<Self, RuntimeError> {
206 Self::spawn_with_permissions(path, WasiPermissions::default())
207 }
208
209 pub fn spawn_with_permissions(
210 path: &Path,
211 permissions: WasiPermissions,
212 ) -> Result<Self, RuntimeError> {
213 let engine = Engine::default();
214 let module =
215 Module::from_file(&engine, path).map_err(|e| RuntimeError::Wasm(e.to_string()))?;
216 Ok(Self {
217 engine,
218 module,
219 permissions,
220 })
221 }
222
223 fn run_once(&self, input: &str) -> Result<String, RuntimeError> {
224 let mut linker = Linker::new(&self.engine);
227 add_to_linker_sync(&mut linker).map_err(|e| RuntimeError::Wasm(e.to_string()))?;
228
229 let stdin = MemoryInputPipe::new(Bytes::from(input.as_bytes().to_vec()));
230 let stdout = MemoryOutputPipe::new(4 * 1024 * 1024);
231 let stderr = MemoryOutputPipe::new(256 * 1024);
232
233 let mut builder = WasiCtxBuilder::new();
234 let _ = builder.stdin(stdin);
235 let _ = builder.stdout(stdout.clone());
236 let _ = builder.stderr(stderr.clone());
237
238 for (key, value) in &self.permissions.env {
239 validate_env_kv(key, value)?;
240 let _ = builder.env(key, value);
241 }
242
243 for dir in &self.permissions.preopen_dirs {
244 validate_guest_path(&dir.guest_path)?;
245 let cap_dir =
246 cap_std::fs::Dir::open_ambient_dir(&dir.host_path, cap_std::ambient_authority())
247 .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
248 let (dir_perms, file_perms) = perms_for(dir.read_only);
249 let _ = builder.preopened_dir(cap_dir, dir_perms, file_perms, &dir.guest_path);
250 }
251
252 let wasi = builder.build();
253 let mut store = Store::new(&self.engine, WasiState::new(wasi));
254 let instance = linker
255 .instantiate(&mut store, &self.module)
256 .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
257
258 let start = instance
259 .get_typed_func::<(), ()>(&mut store, "_start")
260 .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
261 start
262 .call(&mut store, ())
263 .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
264
265 let output = stdout.contents();
266 let output =
267 String::from_utf8(output.to_vec()).map_err(|e| RuntimeError::Wasm(e.to_string()))?;
268 Ok(output)
269 }
270}
271
272impl ConnectorRuntime for WasmConnectorRuntime {
273 fn send(&mut self, request: ConnectorRequest) -> Result<ConnectorResponse, RuntimeError> {
274 let payload = serde_json::to_string(&request)?;
275 let output = self.run_once(&format!("{payload}\n"))?;
276 let mut last_line = None;
277 for line in output.lines() {
278 let trimmed = line.trim();
279 if !trimmed.is_empty() {
280 last_line = Some(trimmed.to_string());
281 }
282 }
283 let last_line = last_line.ok_or(RuntimeError::ConnectorExited)?;
284 let response = serde_json::from_str(&last_line)?;
285 Ok(response)
286 }
287
288 fn close(&mut self) -> Result<(), RuntimeError> {
289 Ok(())
290 }
291}
292
293fn validate_env_kv(key: &str, value: &str) -> Result<(), RuntimeError> {
294 if key.is_empty() {
295 return Err(RuntimeError::Wasm("env key cannot be empty".to_string()));
296 }
297 if key.contains('\0') || value.contains('\0') {
298 return Err(RuntimeError::Wasm(
299 "env key/value cannot contain NUL".to_string(),
300 ));
301 }
302 Ok(())
303}
304
305fn validate_guest_path(path: &str) -> Result<(), RuntimeError> {
306 if path.is_empty() || !path.starts_with('/') {
307 return Err(RuntimeError::Wasm(
308 "preopened guest path must be absolute".to_string(),
309 ));
310 }
311 if path.contains('\0') {
312 return Err(RuntimeError::Wasm(
313 "preopened guest path cannot contain NUL".to_string(),
314 ));
315 }
316 Ok(())
317}
318
319fn perms_for(read_only: bool) -> (DirPerms, FilePerms) {
320 let dir_perms = if read_only {
321 DirPerms::READ
322 } else {
323 DirPerms::READ | DirPerms::MUTATE
324 };
325 let file_perms = if read_only {
326 FilePerms::READ
327 } else {
328 FilePerms::READ | FilePerms::WRITE
329 };
330 (dir_perms, file_perms)
331}