Skip to main content

kapsl_rag/runtime/
mod.rs

1use std::collections::HashMap;
2use std::io::{self, BufRead, BufReader, Write};
3use std::path::{Path, PathBuf};
4use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
5
6use bytes::Bytes;
7use kapsl_rag_sdk::protocol::{ConnectorRequest, ConnectorRequestKind, ConnectorResponse};
8use wasmtime::{Engine, Linker, Module, Store};
9use wasmtime_wasi::preview2::pipe::{MemoryInputPipe, MemoryOutputPipe};
10use wasmtime_wasi::preview2::preview1::{
11    add_to_linker_sync, WasiPreview1Adapter, WasiPreview1View,
12};
13use wasmtime_wasi::preview2::{DirPerms, FilePerms, Table, WasiCtx, WasiCtxBuilder, WasiView};
14
15#[derive(thiserror::Error, Debug)]
16pub enum RuntimeError {
17    #[error("io error: {0}")]
18    Io(String),
19    #[error("serialization error: {0}")]
20    Serialization(String),
21    #[error("connector exited")]
22    ConnectorExited,
23    #[error("wasm error: {0}")]
24    Wasm(String),
25}
26
27impl From<io::Error> for RuntimeError {
28    fn from(err: io::Error) -> Self {
29        RuntimeError::Io(err.to_string())
30    }
31}
32
33impl From<serde_json::Error> for RuntimeError {
34    fn from(err: serde_json::Error) -> Self {
35        RuntimeError::Serialization(err.to_string())
36    }
37}
38
39pub trait ConnectorRuntime {
40    fn send(&mut self, request: ConnectorRequest) -> Result<ConnectorResponse, RuntimeError>;
41    fn close(&mut self) -> Result<(), RuntimeError>;
42}
43
44struct WasiState {
45    table: Table,
46    ctx: WasiCtx,
47    adapter: WasiPreview1Adapter,
48}
49
50impl WasiState {
51    fn new(ctx: WasiCtx) -> Self {
52        Self {
53            table: Table::new(),
54            ctx,
55            adapter: WasiPreview1Adapter::new(),
56        }
57    }
58}
59
60impl WasiView for WasiState {
61    fn table(&self) -> &Table {
62        &self.table
63    }
64
65    fn table_mut(&mut self) -> &mut Table {
66        &mut self.table
67    }
68
69    fn ctx(&self) -> &WasiCtx {
70        &self.ctx
71    }
72
73    fn ctx_mut(&mut self) -> &mut WasiCtx {
74        &mut self.ctx
75    }
76}
77
78impl WasiPreview1View for WasiState {
79    fn adapter(&self) -> &WasiPreview1Adapter {
80        &self.adapter
81    }
82
83    fn adapter_mut(&mut self) -> &mut WasiPreview1Adapter {
84        &mut self.adapter
85    }
86}
87
88#[derive(Debug, Clone, Default)]
89pub struct WasiPermissions {
90    pub preopen_dirs: Vec<PreopenDir>,
91    pub env: HashMap<String, String>,
92}
93
94impl WasiPermissions {
95    pub fn allow_dir(
96        mut self,
97        host_path: impl Into<PathBuf>,
98        guest_path: impl Into<String>,
99        read_only: bool,
100    ) -> Self {
101        self.preopen_dirs.push(PreopenDir {
102            host_path: host_path.into(),
103            guest_path: guest_path.into(),
104            read_only,
105        });
106        self
107    }
108
109    pub fn with_env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
110        self.env.insert(key.into(), value.into());
111        self
112    }
113}
114
115#[derive(Debug, Clone)]
116pub struct PreopenDir {
117    pub host_path: PathBuf,
118    pub guest_path: String,
119    pub read_only: bool,
120}
121
122pub struct ConnectorClient<R: ConnectorRuntime> {
123    runtime: R,
124    next_id: u64,
125}
126
127impl<R: ConnectorRuntime> ConnectorClient<R> {
128    pub fn new(runtime: R) -> Self {
129        Self {
130            runtime,
131            next_id: 1,
132        }
133    }
134
135    pub fn request(
136        &mut self,
137        kind: ConnectorRequestKind,
138    ) -> Result<ConnectorResponse, RuntimeError> {
139        let id = format!("req-{}", self.next_id);
140        self.next_id += 1;
141        let request = ConnectorRequest { id, kind };
142        self.runtime.send(request)
143    }
144
145    pub fn shutdown(&mut self) -> Result<(), RuntimeError> {
146        self.runtime.close()
147    }
148}
149
150pub struct SidecarConnectorRuntime {
151    child: Child,
152    stdin: ChildStdin,
153    stdout: BufReader<ChildStdout>,
154}
155
156impl SidecarConnectorRuntime {
157    pub fn spawn(path: &Path) -> Result<Self, RuntimeError> {
158        let mut child = Command::new(path)
159            .stdin(Stdio::piped())
160            .stdout(Stdio::piped())
161            .stderr(Stdio::inherit())
162            .spawn()?;
163
164        let stdin = child.stdin.take().ok_or(RuntimeError::ConnectorExited)?;
165        let stdout = child.stdout.take().ok_or(RuntimeError::ConnectorExited)?;
166        Ok(Self {
167            child,
168            stdin,
169            stdout: BufReader::new(stdout),
170        })
171    }
172}
173
174impl ConnectorRuntime for SidecarConnectorRuntime {
175    fn send(&mut self, request: ConnectorRequest) -> Result<ConnectorResponse, RuntimeError> {
176        let json = serde_json::to_string(&request)?;
177        self.stdin.write_all(json.as_bytes())?;
178        self.stdin.write_all(b"\n")?;
179        self.stdin.flush()?;
180
181        let mut line = String::new();
182        let bytes = self.stdout.read_line(&mut line)?;
183        if bytes == 0 {
184            return Err(RuntimeError::ConnectorExited);
185        }
186        let response = serde_json::from_str(&line)?;
187        Ok(response)
188    }
189
190    fn close(&mut self) -> Result<(), RuntimeError> {
191        let _ = self.child.id();
192        let _ = self.child.kill();
193        let _ = self.child.wait();
194        Ok(())
195    }
196}
197
198pub struct WasmConnectorRuntime {
199    engine: Engine,
200    module: Module,
201    permissions: WasiPermissions,
202}
203
204impl WasmConnectorRuntime {
205    pub fn spawn(path: &Path) -> Result<Self, RuntimeError> {
206        Self::spawn_with_permissions(path, WasiPermissions::default())
207    }
208
209    pub fn spawn_with_permissions(
210        path: &Path,
211        permissions: WasiPermissions,
212    ) -> Result<Self, RuntimeError> {
213        let engine = Engine::default();
214        let module =
215            Module::from_file(&engine, path).map_err(|e| RuntimeError::Wasm(e.to_string()))?;
216        Ok(Self {
217            engine,
218            module,
219            permissions,
220        })
221    }
222
223    fn run_once(&self, input: &str) -> Result<String, RuntimeError> {
224        // Run the WASM connector to completion for this request. This keeps the
225        // implementation simple and sandboxed, at the cost of per-request startup.
226        let mut linker = Linker::new(&self.engine);
227        add_to_linker_sync(&mut linker).map_err(|e| RuntimeError::Wasm(e.to_string()))?;
228
229        let stdin = MemoryInputPipe::new(Bytes::from(input.as_bytes().to_vec()));
230        let stdout = MemoryOutputPipe::new(4 * 1024 * 1024);
231        let stderr = MemoryOutputPipe::new(256 * 1024);
232
233        let mut builder = WasiCtxBuilder::new();
234        let _ = builder.stdin(stdin);
235        let _ = builder.stdout(stdout.clone());
236        let _ = builder.stderr(stderr.clone());
237
238        for (key, value) in &self.permissions.env {
239            validate_env_kv(key, value)?;
240            let _ = builder.env(key, value);
241        }
242
243        for dir in &self.permissions.preopen_dirs {
244            validate_guest_path(&dir.guest_path)?;
245            let cap_dir =
246                cap_std::fs::Dir::open_ambient_dir(&dir.host_path, cap_std::ambient_authority())
247                    .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
248            let (dir_perms, file_perms) = perms_for(dir.read_only);
249            let _ = builder.preopened_dir(cap_dir, dir_perms, file_perms, &dir.guest_path);
250        }
251
252        let wasi = builder.build();
253        let mut store = Store::new(&self.engine, WasiState::new(wasi));
254        let instance = linker
255            .instantiate(&mut store, &self.module)
256            .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
257
258        let start = instance
259            .get_typed_func::<(), ()>(&mut store, "_start")
260            .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
261        start
262            .call(&mut store, ())
263            .map_err(|e| RuntimeError::Wasm(e.to_string()))?;
264
265        let output = stdout.contents();
266        let output =
267            String::from_utf8(output.to_vec()).map_err(|e| RuntimeError::Wasm(e.to_string()))?;
268        Ok(output)
269    }
270}
271
272impl ConnectorRuntime for WasmConnectorRuntime {
273    fn send(&mut self, request: ConnectorRequest) -> Result<ConnectorResponse, RuntimeError> {
274        let payload = serde_json::to_string(&request)?;
275        let output = self.run_once(&format!("{payload}\n"))?;
276        let mut last_line = None;
277        for line in output.lines() {
278            let trimmed = line.trim();
279            if !trimmed.is_empty() {
280                last_line = Some(trimmed.to_string());
281            }
282        }
283        let last_line = last_line.ok_or(RuntimeError::ConnectorExited)?;
284        let response = serde_json::from_str(&last_line)?;
285        Ok(response)
286    }
287
288    fn close(&mut self) -> Result<(), RuntimeError> {
289        Ok(())
290    }
291}
292
293fn validate_env_kv(key: &str, value: &str) -> Result<(), RuntimeError> {
294    if key.is_empty() {
295        return Err(RuntimeError::Wasm("env key cannot be empty".to_string()));
296    }
297    if key.contains('\0') || value.contains('\0') {
298        return Err(RuntimeError::Wasm(
299            "env key/value cannot contain NUL".to_string(),
300        ));
301    }
302    Ok(())
303}
304
305fn validate_guest_path(path: &str) -> Result<(), RuntimeError> {
306    if path.is_empty() || !path.starts_with('/') {
307        return Err(RuntimeError::Wasm(
308            "preopened guest path must be absolute".to_string(),
309        ));
310    }
311    if path.contains('\0') {
312        return Err(RuntimeError::Wasm(
313            "preopened guest path cannot contain NUL".to_string(),
314        ));
315    }
316    Ok(())
317}
318
319fn perms_for(read_only: bool) -> (DirPerms, FilePerms) {
320    let dir_perms = if read_only {
321        DirPerms::READ
322    } else {
323        DirPerms::READ | DirPerms::MUTATE
324    };
325    let file_perms = if read_only {
326        FilePerms::READ
327    } else {
328        FilePerms::READ | FilePerms::WRITE
329    };
330    (dir_perms, file_perms)
331}