kapsl-rag 0.1.0

Retrieval-augmented generation implementation with vector store and WASM plugins for Kapsl
Documentation
use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::{Path, PathBuf};

use kapsl_rag_sdk::manifest::{ConnectorManifest, ConnectorRuntime as ManifestRuntime};
use kapsl_rag_sdk::types::ConnectorConfig;
use serde::Deserialize;
use serde_json::Value;

use crate::runtime::{
    ConnectorClient, ConnectorRuntime as RuntimeTrait, SidecarConnectorRuntime, WasiPermissions,
    WasmConnectorRuntime,
};

#[derive(thiserror::Error, Debug)]
pub enum ExtensionError {
    #[error("io error: {0}")]
    Io(String),
    #[error("manifest not found in {0}")]
    ManifestMissing(String),
    #[error("invalid manifest: {0}")]
    InvalidManifest(String),
    #[error("invalid config: {0}")]
    InvalidConfig(String),
    #[error("extension not installed: {0}")]
    NotInstalled(String),
    #[error("runtime error: {0}")]
    Runtime(String),
}

impl From<io::Error> for ExtensionError {
    fn from(err: io::Error) -> Self {
        ExtensionError::Io(err.to_string())
    }
}

#[derive(Debug, Clone)]
pub struct InstalledExtension {
    pub manifest: ConnectorManifest,
    pub path: PathBuf,
}

#[derive(Debug, Clone)]
pub struct ExtensionRegistry {
    pub root: PathBuf,
}

impl ExtensionRegistry {
    pub fn new(root: impl Into<PathBuf>) -> Self {
        Self { root: root.into() }
    }

    pub fn discover(&self) -> Result<Vec<InstalledExtension>, ExtensionError> {
        let mut extensions = Vec::new();
        if !self.root.exists() {
            return Ok(extensions);
        }
        for entry in fs::read_dir(&self.root)? {
            let entry = entry?;
            let path = entry.path();
            if !path.is_dir() {
                continue;
            }
            if let Ok(manifest) = load_manifest(&path) {
                extensions.push(InstalledExtension { manifest, path });
            }
        }
        Ok(extensions)
    }

    pub fn install_from_dir(&self, source: &Path) -> Result<InstalledExtension, ExtensionError> {
        let manifest = load_manifest(source)?;
        let target = self.root.join(&manifest.id);
        if target.exists() {
            fs::remove_dir_all(&target)?;
        }
        copy_dir_all(source, &target)?;
        Ok(InstalledExtension {
            manifest,
            path: target,
        })
    }

    pub fn uninstall(&self, extension_id: &str) -> Result<(), ExtensionError> {
        let target = self.root.join(extension_id);
        if !target.exists() {
            return Err(ExtensionError::NotInstalled(extension_id.to_string()));
        }
        fs::remove_dir_all(target)?;
        Ok(())
    }
}

#[derive(Debug, Clone)]
pub struct ExtensionManager {
    pub registry: ExtensionRegistry,
    pub config_root: PathBuf,
}

impl ExtensionManager {
    pub fn new(registry: ExtensionRegistry, config_root: impl Into<PathBuf>) -> Self {
        Self {
            registry,
            config_root: config_root.into(),
        }
    }

    pub fn set_workspace_config(
        &self,
        workspace_id: &str,
        extension_id: &str,
        config: &ConnectorConfig,
    ) -> Result<(), ExtensionError> {
        let dir = self.config_root.join(workspace_id);
        fs::create_dir_all(&dir)?;
        let path = dir.join(format!("{extension_id}.json"));
        let data = serde_json::to_vec_pretty(config)
            .map_err(|e| ExtensionError::InvalidManifest(e.to_string()))?;
        fs::write(path, data)?;
        Ok(())
    }

    pub fn get_workspace_config(
        &self,
        workspace_id: &str,
        extension_id: &str,
    ) -> Result<Option<ConnectorConfig>, ExtensionError> {
        let path = self
            .config_root
            .join(workspace_id)
            .join(format!("{extension_id}.json"));
        if !path.exists() {
            return Ok(None);
        }
        let data = fs::read_to_string(path)?;
        let config = serde_json::from_str(&data)
            .map_err(|e| ExtensionError::InvalidManifest(e.to_string()))?;
        Ok(Some(config))
    }

    pub fn get_workspace_connector_config(
        &self,
        workspace_id: &str,
        extension_id: &str,
    ) -> Result<Option<ConnectorConfig>, ExtensionError> {
        let config = self.get_workspace_config(workspace_id, extension_id)?;
        Ok(config.map(strip_wasi_block))
    }

    pub fn list_configs(
        &self,
        workspace_id: &str,
    ) -> Result<HashMap<String, ConnectorConfig>, ExtensionError> {
        let mut configs = HashMap::new();
        let dir = self.config_root.join(workspace_id);
        if !dir.exists() {
            return Ok(configs);
        }
        for entry in fs::read_dir(&dir)? {
            let entry = entry?;
            let path = entry.path();
            if path.extension().and_then(|s| s.to_str()) != Some("json") {
                continue;
            }
            if let Some(stem) = path.file_stem().and_then(|s| s.to_str()) {
                let data = fs::read_to_string(&path)?;
                let config = serde_json::from_str(&data)
                    .map_err(|e| ExtensionError::InvalidManifest(e.to_string()))?;
                configs.insert(stem.to_string(), config);
            }
        }
        Ok(configs)
    }

    pub fn get_workspace_wasi_permissions(
        &self,
        workspace_id: &str,
        extension_id: &str,
    ) -> Result<WasiPermissions, ExtensionError> {
        let config = self.get_workspace_config(workspace_id, extension_id)?;
        wasi_permissions_from_config(config.as_ref())
    }

    pub fn launch_connector(
        &self,
        workspace_id: &str,
        extension: &InstalledExtension,
    ) -> Result<ConnectorClient<ConnectorRuntimeHandle>, ExtensionError> {
        let entrypoint = resolve_entrypoint(extension)?;
        let runtime = match extension.manifest.runtime {
            ManifestRuntime::Wasm => {
                let permissions =
                    self.get_workspace_wasi_permissions(workspace_id, &extension.manifest.id)?;
                ConnectorRuntimeHandle::Wasm(
                    WasmConnectorRuntime::spawn_with_permissions(&entrypoint, permissions)
                        .map_err(|e| ExtensionError::Runtime(e.to_string()))?,
                )
            }
            ManifestRuntime::Sidecar => {
                let runtime = SidecarConnectorRuntime::spawn(&entrypoint)
                    .map_err(|e| ExtensionError::Runtime(e.to_string()))?;
                ConnectorRuntimeHandle::Sidecar(runtime)
            }
        };
        Ok(ConnectorClient::new(runtime))
    }
}

pub enum ConnectorRuntimeHandle {
    Wasm(WasmConnectorRuntime),
    Sidecar(SidecarConnectorRuntime),
}

impl RuntimeTrait for ConnectorRuntimeHandle {
    fn send(
        &mut self,
        request: kapsl_rag_sdk::protocol::ConnectorRequest,
    ) -> Result<kapsl_rag_sdk::protocol::ConnectorResponse, crate::runtime::RuntimeError> {
        match self {
            ConnectorRuntimeHandle::Wasm(runtime) => runtime.send(request),
            ConnectorRuntimeHandle::Sidecar(runtime) => runtime.send(request),
        }
    }

    fn close(&mut self) -> Result<(), crate::runtime::RuntimeError> {
        match self {
            ConnectorRuntimeHandle::Wasm(runtime) => runtime.close(),
            ConnectorRuntimeHandle::Sidecar(runtime) => runtime.close(),
        }
    }
}

#[derive(Debug, Deserialize, Default)]
struct WasiConfig {
    #[serde(default)]
    env: HashMap<String, String>,
    #[serde(default)]
    preopen_dirs: Vec<WasiDirConfig>,
}

#[derive(Debug, Deserialize)]
struct WasiDirConfig {
    host_path: String,
    guest_path: String,
    #[serde(default)]
    read_only: bool,
}

fn wasi_permissions_from_config(
    config: Option<&ConnectorConfig>,
) -> Result<WasiPermissions, ExtensionError> {
    let Some(config) = config else {
        return Ok(WasiPermissions::default());
    };
    let obj = match config {
        serde_json::Value::Object(_) => config,
        _ => return Ok(WasiPermissions::default()),
    };

    let wasi_value = obj.get("wasi");
    if wasi_value.is_none() {
        return Ok(WasiPermissions::default());
    }
    let wasi_value = wasi_value.unwrap();
    let parsed: WasiConfig = serde_json::from_value(wasi_value.clone())
        .map_err(|e| ExtensionError::InvalidConfig(e.to_string()))?;

    let mut permissions = WasiPermissions::default();
    for (key, value) in parsed.env {
        validate_env_kv(&key, &value)?;
        permissions = permissions.with_env(key, value);
    }

    for dir in parsed.preopen_dirs {
        validate_host_path(&dir.host_path)?;
        validate_guest_path(&dir.guest_path)?;
        permissions =
            permissions.allow_dir(PathBuf::from(dir.host_path), dir.guest_path, dir.read_only);
    }

    Ok(permissions)
}

fn strip_wasi_block(config: ConnectorConfig) -> ConnectorConfig {
    match config {
        Value::Object(mut map) => {
            map.remove("wasi");
            Value::Object(map)
        }
        other => other,
    }
}

fn validate_env_kv(key: &str, value: &str) -> Result<(), ExtensionError> {
    if key.is_empty() {
        return Err(ExtensionError::InvalidConfig(
            "env key cannot be empty".to_string(),
        ));
    }
    if key.contains('\0') || value.contains('\0') {
        return Err(ExtensionError::InvalidConfig(
            "env key/value cannot contain NUL".to_string(),
        ));
    }
    Ok(())
}

fn validate_guest_path(path: &str) -> Result<(), ExtensionError> {
    if path.is_empty() || !path.starts_with('/') {
        return Err(ExtensionError::InvalidConfig(
            "preopened guest path must be absolute".to_string(),
        ));
    }
    if path.contains('\0') {
        return Err(ExtensionError::InvalidConfig(
            "preopened guest path cannot contain NUL".to_string(),
        ));
    }
    Ok(())
}

fn validate_host_path(path: &str) -> Result<(), ExtensionError> {
    let host_path = Path::new(path);
    if !host_path.is_absolute() {
        return Err(ExtensionError::InvalidConfig(
            "preopened host path must be absolute".to_string(),
        ));
    }
    Ok(())
}

fn load_manifest(dir: &Path) -> Result<ConnectorManifest, ExtensionError> {
    let toml_path = dir.join("rag-extension.toml");
    let json_path = dir.join("rag-extension.json");

    if toml_path.exists() {
        let data = fs::read_to_string(&toml_path)?;
        let manifest =
            toml::from_str(&data).map_err(|e| ExtensionError::InvalidManifest(e.to_string()))?;
        return Ok(manifest);
    }

    if json_path.exists() {
        let data = fs::read_to_string(&json_path)?;
        let manifest = serde_json::from_str(&data)
            .map_err(|e| ExtensionError::InvalidManifest(e.to_string()))?;
        return Ok(manifest);
    }

    Err(ExtensionError::ManifestMissing(dir.display().to_string()))
}

fn resolve_entrypoint(extension: &InstalledExtension) -> Result<PathBuf, ExtensionError> {
    let entry = extension.manifest.entrypoint.as_deref();
    let runtime = &extension.manifest.runtime;
    let default_entry = match runtime {
        ManifestRuntime::Wasm => "connector.wasm",
        ManifestRuntime::Sidecar => "connector",
    };
    let entry = entry.unwrap_or(default_entry);
    let path = Path::new(entry);
    let resolved = if path.is_absolute() {
        path.to_path_buf()
    } else {
        extension.path.join(entry)
    };
    if !resolved.exists() {
        return Err(ExtensionError::InvalidConfig(format!(
            "entrypoint not found: {}",
            resolved.display()
        )));
    }
    Ok(resolved)
}

fn copy_dir_all(src: &Path, dst: &Path) -> Result<(), ExtensionError> {
    fs::create_dir_all(dst)?;
    for entry in fs::read_dir(src)? {
        let entry = entry?;
        let ty = entry.file_type()?;
        let src_path = entry.path();
        let dst_path = dst.join(entry.file_name());
        if ty.is_dir() {
            copy_dir_all(&src_path, &dst_path)?;
        } else {
            fs::copy(&src_path, &dst_path)?;
        }
    }
    Ok(())
}