pub mod host_methods;
pub mod js_runtime;
pub mod manifest;
pub mod slots;
pub mod wasm_runtime;
use std::{
collections::HashMap,
sync::{Arc, OnceLock},
};
use anyhow::Result;
use futures::future::BoxFuture;
pub use js_runtime::Plugin;
pub use manifest::{
LEGACY_MANIFEST_FILE, MANIFEST_FILE, PluginManifest, PluginSlashCommand, PluginToolDef,
load_manifest, scan_plugins,
};
pub use slots::{ContextEngineSlot, MemoryItem, MemorySlot, MemoryStoreSlot, SlotRegistry};
use tracing::{info, warn};
#[derive(Debug, Clone, Default)]
pub struct PluginInvocationContext {
pub target_id: String,
pub channel: String,
pub agent_id: String,
pub peer_id: String,
pub chat_id: String,
pub session_key: String,
pub is_group: bool,
}
pub trait PluginBackgroundHost: Send + Sync {
fn cron_register(
&self,
plugin: String,
name: String,
schedule_json: String,
ctx: Option<PluginInvocationContext>,
) -> BoxFuture<'static, std::result::Result<String, String>>;
fn sse_subscribe(
&self,
plugin: String,
name: String,
url: String,
headers_json: String,
resume_key: String,
ctx: Option<PluginInvocationContext>,
) -> BoxFuture<'static, std::result::Result<String, String>>;
fn sse_status(
&self,
plugin: String,
name: String,
ctx: Option<PluginInvocationContext>,
) -> BoxFuture<'static, std::result::Result<String, String>>;
fn push_outbound(
&self,
channel: String,
peer_id: String,
message_json: String,
ctx: Option<PluginInvocationContext>,
) -> BoxFuture<'static, std::result::Result<String, String>>;
fn submit_agent_turn(
&self,
session_key: String,
prompt: String,
route_json: String,
ctx: Option<PluginInvocationContext>,
) -> BoxFuture<'static, std::result::Result<String, String>>;
}
static PLUGIN_BACKGROUND_HOST: OnceLock<Arc<dyn PluginBackgroundHost>> = OnceLock::new();
pub fn set_plugin_background_host(host: Arc<dyn PluginBackgroundHost>) {
if PLUGIN_BACKGROUND_HOST.set(host).is_err() {
warn!("plugin background host already installed, ignoring duplicate install");
}
}
pub(crate) async fn cron_register(
plugin: String,
name: String,
schedule_json: String,
ctx: Option<PluginInvocationContext>,
) -> std::result::Result<String, String> {
let Some(host) = PLUGIN_BACKGROUND_HOST.get().cloned() else {
return Err("plugin background host is not installed".to_owned());
};
host.cron_register(plugin, name, schedule_json, ctx).await
}
pub(crate) async fn sse_subscribe(
plugin: String,
name: String,
url: String,
headers_json: String,
resume_key: String,
ctx: Option<PluginInvocationContext>,
) -> std::result::Result<String, String> {
let Some(host) = PLUGIN_BACKGROUND_HOST.get().cloned() else {
return Err("plugin background host is not installed".to_owned());
};
host.sse_subscribe(plugin, name, url, headers_json, resume_key, ctx)
.await
}
pub(crate) async fn sse_status(
plugin: String,
name: String,
ctx: Option<PluginInvocationContext>,
) -> std::result::Result<String, String> {
let Some(host) = PLUGIN_BACKGROUND_HOST.get().cloned() else {
return Err("plugin background host is not installed".to_owned());
};
host.sse_status(plugin, name, ctx).await
}
pub(crate) async fn push_outbound(
channel: String,
peer_id: String,
message_json: String,
ctx: Option<PluginInvocationContext>,
) -> std::result::Result<String, String> {
let Some(host) = PLUGIN_BACKGROUND_HOST.get().cloned() else {
return Err("plugin background host is not installed".to_owned());
};
host.push_outbound(channel, peer_id, message_json, ctx)
.await
}
pub(crate) async fn submit_agent_turn(
session_key: String,
prompt: String,
route_json: String,
ctx: Option<PluginInvocationContext>,
) -> std::result::Result<String, String> {
let Some(host) = PLUGIN_BACKGROUND_HOST.get().cloned() else {
return Err("plugin background host is not installed".to_owned());
};
host.submit_agent_turn(session_key, prompt, route_json, ctx)
.await
}
use rsclaw_config::schema::PluginsConfig;
pub use wasm_runtime::{WasmPlugin, WasmToolDef, load_wasm_plugin};
pub struct PluginRegistry {
plugins: HashMap<String, Plugin>,
wasm_plugins: Vec<WasmPlugin>,
pub slots: SlotRegistry,
}
impl PluginRegistry {
pub fn new() -> Self {
Self {
plugins: HashMap::new(),
wasm_plugins: Vec::new(),
slots: SlotRegistry::new(),
}
}
pub fn get_js(&self, name: &str) -> Option<&Plugin> {
self.plugins.get(name)
}
pub fn js_plugins_iter(&self) -> impl Iterator<Item = (&String, &Plugin)> {
self.plugins.iter()
}
pub fn all(&self) -> impl Iterator<Item = &Plugin> {
self.plugins.values()
}
pub fn wasm_all(&self) -> &[WasmPlugin] {
&self.wasm_plugins
}
pub fn len(&self) -> usize {
self.plugins.len() + self.wasm_plugins.len()
}
pub fn is_empty(&self) -> bool {
self.plugins.is_empty() && self.wasm_plugins.is_empty()
}
pub fn js_count(&self) -> usize {
self.plugins.len()
}
pub fn wasm_count(&self) -> usize {
self.wasm_plugins.len()
}
pub fn take_wasm_plugins(&mut self) -> Vec<WasmPlugin> {
std::mem::take(&mut self.wasm_plugins)
}
}
impl Default for PluginRegistry {
fn default() -> Self {
Self::new()
}
}
pub async fn load_all_plugins(
plugins_dir: &std::path::Path,
config: Option<&PluginsConfig>,
wasm_browser: Arc<tokio::sync::Mutex<Option<rsclaw_browser::BrowserSession>>>,
notify_tx: Option<tokio::sync::broadcast::Sender<rsclaw_channel::OutboundMessage>>,
providers: Option<Arc<rsclaw_provider::registry::ProviderRegistry>>,
vision_model: Option<String>,
) -> Result<PluginRegistry> {
let manifests = scan_plugins(plugins_dir)?;
let mut registry = PluginRegistry::new();
let host_dispatch = Arc::new(host_methods::HostMethodRegistry::new(
notify_tx,
Arc::clone(&wasm_browser),
));
let wasm_engine = if manifests.iter().any(|m| m.is_wasm()) {
let mut wasm_config = wasmtime::Config::new();
wasm_config.async_support(true);
wasm_config.epoch_interruption(true);
let engine = wasmtime::Engine::new(&wasm_config)
.map_err(|e| anyhow::anyhow!("create wasmtime engine: {e}"))?;
let tick_engine = engine.clone();
tokio::spawn(async move {
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(100));
loop {
ticker.tick().await;
tick_engine.increment_epoch();
}
});
Some(engine)
} else {
None
};
for manifest in manifests {
let enabled = config
.and_then(|c| c.entries.as_ref())
.and_then(|e| e.get(&manifest.name))
.and_then(|e| e.enabled)
.unwrap_or(true);
if !enabled {
info!(plugin = %manifest.name, "plugin disabled via config");
continue;
}
if manifest.is_wasm() {
let engine = wasm_engine.as_ref().expect("wasm engine initialized");
match load_wasm_plugin(
&manifest,
engine,
Arc::clone(&wasm_browser),
providers.clone(),
vision_model.clone(),
)
.await
{
Ok(plugin) => {
info!(
plugin = %plugin.name,
tools = plugin.tools.len(),
version = ?manifest.version,
"WASM plugin loaded"
);
registry.wasm_plugins.push(plugin);
}
Err(e) => {
warn!(plugin = %manifest.name, "failed to load WASM plugin: {e:#}");
}
}
} else {
match Plugin::spawn(manifest, host_dispatch.clone()).await {
Ok(plugin) => {
info!(plugin = %plugin.manifest.name, "JS plugin started");
registry
.plugins
.insert(plugin.manifest.name.clone(), plugin);
}
Err(e) => {
warn!("failed to start plugin: {e:#}");
}
}
}
}
info!(
total = registry.len(),
js = registry.js_count(),
wasm = registry.wasm_count(),
"plugins loaded"
);
Ok(registry)
}