use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::channels::wasm::{
LoadedChannel, RegisteredEndpoint, SharedWasmChannel, TELEGRAM_CHANNEL_NAME, WasmChannelLoader,
WasmChannelRouter, WasmChannelRuntime, bot_username_setting_key,
};
use crate::channels::{ChannelManager, OutgoingResponse};
use crate::extensions::discovery::OnlineDiscovery;
use crate::extensions::registry::ExtensionRegistry;
use crate::extensions::{
ActivateResult, AuthResult, ConfigureResult, ExtensionError, ExtensionKind, ExtensionSource,
InstallResult, InstalledExtension, RegistryEntry, ResultSource, SearchResult, ToolAuthState,
UpgradeOutcome, UpgradeResult, VerificationChallenge,
};
use crate::hooks::HookRegistry;
use crate::pairing::PairingStore;
use crate::secrets::{CreateSecretParams, SecretsStore};
use crate::tools::ToolRegistry;
use crate::tools::mcp::McpClient;
use crate::tools::mcp::auth::{
authorize_mcp_server, canonical_resource_uri, discover_full_oauth_metadata,
find_available_port, is_authenticated, register_client,
};
use crate::tools::mcp::config::McpServerConfig;
use crate::tools::mcp::session::McpSessionManager;
use crate::tools::wasm::{WasmToolLoader, WasmToolRuntime, discover_tools};
struct PendingAuth {
_name: String,
_kind: ExtensionKind,
created_at: std::time::Instant,
task_handle: Option<tokio::task::JoinHandle<()>>,
}
struct HostedOAuthFlowStart {
name: String,
kind: ExtensionKind,
auth_url: String,
expected_state: String,
flow: crate::cli::oauth_defaults::PendingOAuthFlow,
}
fn normalize_oauth_callback_path(path: &str) -> String {
let trimmed_path = path.trim_end_matches('/');
if trimmed_path.is_empty() {
"/oauth/callback".to_string()
} else if trimmed_path.ends_with("/oauth/callback") {
trimmed_path.to_string()
} else {
format!("{trimmed_path}/oauth/callback")
}
}
fn normalize_hosted_callback_url(callback_url: &str) -> String {
if let Ok(mut parsed) = url::Url::parse(callback_url) {
let normalized_path = normalize_oauth_callback_path(parsed.path());
parsed.set_path(&normalized_path);
return parsed.to_string();
}
let normalized_callback_url = callback_url.trim_end_matches('/');
if normalized_callback_url.ends_with("/oauth/callback") {
normalized_callback_url.to_string()
} else {
format!("{normalized_callback_url}/oauth/callback")
}
}
struct ChannelRuntimeState {
channel_manager: Arc<ChannelManager>,
wasm_channel_runtime: Arc<WasmChannelRuntime>,
pairing_store: Arc<PairingStore>,
wasm_channel_router: Arc<WasmChannelRouter>,
wasm_channel_owner_ids: std::collections::HashMap<String, i64>,
}
pub struct ExtensionSetupSchema {
pub secrets: Vec<crate::channels::web::types::SecretFieldInfo>,
pub fields: Vec<crate::channels::web::types::SetupFieldInfo>,
}
const ALLOWED_GLOBAL_SETUP_SETTING_PATHS: &[&str] = &[
"llm_backend",
"selected_model",
"ollama_base_url",
"openai_compatible_base_url",
];
#[cfg(test)]
type TestWasmChannelLoader =
Arc<dyn Fn(&str) -> Result<LoadedChannel, ExtensionError> + Send + Sync>;
#[cfg(test)]
type TestTelegramBindingResolver =
Arc<dyn Fn(&str, Option<i64>) -> Result<TelegramBindingResult, ExtensionError> + Send + Sync>;
const TELEGRAM_OWNER_BIND_TIMEOUT_SECS: u64 = 120;
const TELEGRAM_OWNER_BIND_CHALLENGE_TTL_SECS: u64 = 300;
const TELEGRAM_GET_UPDATES_TIMEOUT_SECS: u64 = 25;
const TELEGRAM_OWNER_BIND_CODE_LEN: usize = 8;
#[derive(Debug, Clone, PartialEq, Eq)]
struct TelegramBindingData {
owner_id: i64,
bot_username: Option<String>,
binding_state: TelegramOwnerBindingState,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TelegramOwnerBindingState {
Existing,
VerifiedNow,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct PendingTelegramVerificationChallenge {
code: String,
bot_username: Option<String>,
expires_at_unix: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
enum TelegramBindingResult {
Bound(TelegramBindingData),
Pending(VerificationChallenge),
}
fn telegram_request_error(action: &'static str, error: &reqwest::Error) -> ExtensionError {
tracing::warn!(
action,
status = error.status().map(|status| status.as_u16()),
is_timeout = error.is_timeout(),
is_connect = error.is_connect(),
"Telegram API request failed"
);
ExtensionError::Other(format!("Telegram {action} request failed"))
}
fn telegram_response_parse_error(action: &'static str, error: &reqwest::Error) -> ExtensionError {
tracing::warn!(
action,
status = error.status().map(|status| status.as_u16()),
is_timeout = error.is_timeout(),
"Telegram API response parse failed"
);
ExtensionError::Other(format!("Failed to parse Telegram {action} response"))
}
#[derive(Debug, serde::Deserialize)]
struct TelegramGetMeResponse {
ok: bool,
#[serde(default)]
result: Option<TelegramGetMeUser>,
#[serde(default)]
description: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramGetMeUser {
#[serde(default)]
username: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramGetUpdatesResponse {
ok: bool,
#[serde(default)]
result: Vec<TelegramUpdate>,
#[serde(default)]
description: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramApiOkResponse {
ok: bool,
#[serde(default)]
description: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramUpdate {
update_id: i64,
#[serde(default)]
message: Option<TelegramMessage>,
#[serde(default)]
edited_message: Option<TelegramMessage>,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramMessage {
chat: TelegramChat,
#[serde(default)]
from: Option<TelegramUser>,
#[serde(default)]
text: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramChat {
#[serde(rename = "type")]
chat_type: String,
}
#[derive(Debug, serde::Deserialize)]
struct TelegramUser {
id: i64,
is_bot: bool,
}
fn build_wasm_channel_runtime_config_updates(
tunnel_url: Option<&str>,
webhook_secret: Option<&str>,
owner_id: Option<i64>,
) -> HashMap<String, serde_json::Value> {
let mut config_updates = HashMap::new();
if let Some(tunnel_url) = tunnel_url {
config_updates.insert(
"tunnel_url".to_string(),
serde_json::Value::String(tunnel_url.to_string()),
);
}
if let Some(secret) = webhook_secret {
config_updates.insert(
"webhook_secret".to_string(),
serde_json::Value::String(secret.to_string()),
);
}
if let Some(owner_id) = owner_id {
config_updates.insert("owner_id".to_string(), serde_json::json!(owner_id));
}
config_updates
}
fn channel_auth_instructions(
channel_name: &str,
secret: &crate::channels::wasm::SecretSetupSchema,
) -> String {
if channel_name == TELEGRAM_CHANNEL_NAME && secret.name == "telegram_bot_token" {
return format!(
"{} After you submit it, IronClaw will show a one-time verification code. Send `/start CODE` to your bot in Telegram and IronClaw will finish setup automatically.",
secret.prompt
);
}
secret.prompt.clone()
}
fn unix_timestamp_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
}
fn generate_telegram_verification_code() -> String {
use rand::Rng;
rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(TELEGRAM_OWNER_BIND_CODE_LEN)
.map(char::from)
.collect::<String>()
.to_lowercase()
}
fn telegram_verification_deep_link(bot_username: Option<&str>, code: &str) -> Option<String> {
bot_username
.filter(|username| !username.trim().is_empty())
.map(|username| format!("https://t.me/{username}?start={code}"))
}
fn telegram_verification_instructions(bot_username: Option<&str>, code: &str) -> String {
if let Some(username) = bot_username.filter(|username| !username.trim().is_empty()) {
return format!(
"Send `/start {code}` to @{username} in Telegram. IronClaw will finish setup automatically."
);
}
format!("Send `/start {code}` to your Telegram bot. IronClaw will finish setup automatically.")
}
fn telegram_message_matches_verification_code(text: &str, code: &str) -> bool {
let trimmed = text.trim();
trimmed == code
|| trimmed == format!("/start {code}")
|| trimmed
.split_whitespace()
.map(|token| token.trim_matches(|c: char| !c.is_ascii_alphanumeric() && c != '-'))
.any(|token| token == code)
}
async fn send_telegram_text_message(
client: &reqwest::Client,
endpoint: &str,
chat_id: i64,
text: &str,
) -> Result<(), ExtensionError> {
let response = client
.post(endpoint)
.json(&serde_json::json!({
"chat_id": chat_id,
"text": text,
}))
.send()
.await
.map_err(|e| telegram_request_error("sendMessage", &e))?;
if !response.status().is_success() {
return Err(ExtensionError::Other(format!(
"Telegram sendMessage failed (HTTP {})",
response.status()
)));
}
let payload: TelegramApiOkResponse = response
.json()
.await
.map_err(|e| telegram_response_parse_error("sendMessage", &e))?;
if !payload.ok {
return Err(ExtensionError::Other(payload.description.unwrap_or_else(
|| "Telegram sendMessage returned ok=false".to_string(),
)));
}
Ok(())
}
pub struct ExtensionManager {
registry: ExtensionRegistry,
discovery: OnlineDiscovery,
mcp_session_manager: Arc<McpSessionManager>,
mcp_process_manager: Arc<crate::tools::mcp::process::McpProcessManager>,
mcp_clients: RwLock<HashMap<String, Arc<McpClient>>>,
wasm_tool_runtime: Option<Arc<WasmToolRuntime>>,
wasm_tools_dir: PathBuf,
wasm_channels_dir: PathBuf,
channel_runtime: RwLock<Option<ChannelRuntimeState>>,
relay_channel_manager: RwLock<Option<Arc<ChannelManager>>>,
secrets: Arc<dyn SecretsStore + Send + Sync>,
tool_registry: Arc<ToolRegistry>,
hooks: Option<Arc<HookRegistry>>,
pending_auth: RwLock<HashMap<String, PendingAuth>>,
tunnel_url: Option<String>,
user_id: String,
store: Option<Arc<dyn crate::db::Database>>,
active_channel_names: RwLock<HashSet<String>>,
installed_relay_extensions: RwLock<HashSet<String>>,
activation_errors: RwLock<HashMap<String, String>>,
sse_manager: RwLock<Option<Arc<crate::channels::web::sse::SseManager>>>,
pending_oauth_flows: crate::cli::oauth_defaults::PendingOAuthRegistry,
gateway_token: Option<String>,
relay_config: Option<crate::config::RelayConfig>,
relay_event_tx: Arc<
tokio::sync::Mutex<
Option<tokio::sync::mpsc::Sender<crate::channels::relay::client::ChannelEvent>>,
>,
>,
relay_signing_secret_cache: Arc<std::sync::Mutex<Option<Vec<u8>>>>,
gateway_mode: std::sync::atomic::AtomicBool,
gateway_base_url: RwLock<Option<String>>,
pending_telegram_verification: RwLock<HashMap<String, PendingTelegramVerificationChallenge>>,
#[cfg(test)]
test_wasm_channel_loader: RwLock<Option<TestWasmChannelLoader>>,
#[cfg(test)]
test_telegram_binding_resolver: RwLock<Option<TestTelegramBindingResolver>>,
}
fn sanitize_url_for_logging(url: &str) -> String {
if url.len() < 10 || !url.contains("://") {
return url.to_string();
}
if let Ok(mut parsed) = url::Url::parse(url) {
parsed.set_query(None);
parsed.set_fragment(None);
let _ = parsed.set_username("");
let _ = parsed.set_password(None);
parsed.to_string()
} else {
url.split(['?', '#']).next().unwrap_or(url).to_string()
}
}
impl ExtensionManager {
pub fn owner_id(&self) -> &str {
&self.user_id
}
pub async fn active_tool_names(&self) -> HashSet<String> {
let mut names = HashSet::new();
match self.list(None, false, &self.user_id).await {
Ok(extensions) => {
for extension in extensions {
match extension.kind {
ExtensionKind::WasmTool if extension.active => {
names.insert(extension.name);
}
ExtensionKind::McpServer if extension.active => {
names.extend(extension.tools);
}
_ => {}
}
}
}
Err(err) => {
tracing::warn!(
owner_id = %self.user_id,
"Failed to list active extensions while resolving autonomous tool scope: {}",
err
);
}
}
names
}
#[allow(clippy::too_many_arguments)]
pub fn new(
mcp_session_manager: Arc<McpSessionManager>,
mcp_process_manager: Arc<crate::tools::mcp::process::McpProcessManager>,
secrets: Arc<dyn SecretsStore + Send + Sync>,
tool_registry: Arc<ToolRegistry>,
hooks: Option<Arc<HookRegistry>>,
wasm_tool_runtime: Option<Arc<WasmToolRuntime>>,
wasm_tools_dir: PathBuf,
wasm_channels_dir: PathBuf,
tunnel_url: Option<String>,
user_id: String,
store: Option<Arc<dyn crate::db::Database>>,
catalog_entries: Vec<RegistryEntry>,
) -> Self {
let registry = if catalog_entries.is_empty() {
ExtensionRegistry::new()
} else {
ExtensionRegistry::new_with_catalog(catalog_entries)
};
Self {
registry,
discovery: OnlineDiscovery::new(),
mcp_session_manager,
mcp_process_manager,
mcp_clients: RwLock::new(HashMap::new()),
wasm_tool_runtime,
wasm_tools_dir,
wasm_channels_dir,
channel_runtime: RwLock::new(None),
relay_channel_manager: RwLock::new(None),
secrets,
tool_registry,
hooks,
pending_auth: RwLock::new(HashMap::new()),
tunnel_url,
user_id,
store,
active_channel_names: RwLock::new(HashSet::new()),
installed_relay_extensions: RwLock::new(HashSet::new()),
activation_errors: RwLock::new(HashMap::new()),
sse_manager: RwLock::new(None),
pending_oauth_flows: crate::cli::oauth_defaults::new_pending_oauth_registry(),
gateway_token: std::env::var("GATEWAY_AUTH_TOKEN").ok(),
relay_config: crate::config::RelayConfig::from_env(),
relay_event_tx: Arc::new(tokio::sync::Mutex::new(None)),
relay_signing_secret_cache: Arc::new(std::sync::Mutex::new(None)),
gateway_mode: std::sync::atomic::AtomicBool::new(false),
gateway_base_url: RwLock::new(None),
pending_telegram_verification: RwLock::new(HashMap::new()),
#[cfg(test)]
test_wasm_channel_loader: RwLock::new(None),
#[cfg(test)]
test_telegram_binding_resolver: RwLock::new(None),
}
}
#[cfg(test)]
async fn set_test_wasm_channel_loader(&self, loader: TestWasmChannelLoader) {
*self.test_wasm_channel_loader.write().await = Some(loader);
}
#[cfg(test)]
async fn set_test_telegram_binding_resolver(&self, resolver: TestTelegramBindingResolver) {
*self.test_telegram_binding_resolver.write().await = Some(resolver);
}
#[cfg(test)]
pub(crate) async fn set_test_telegram_pending_verification(
&self,
code: &str,
bot_username: Option<&str>,
) {
let code = code.to_string();
let bot_username = bot_username.map(str::to_string);
self.set_test_telegram_binding_resolver(Arc::new(move |_token, existing_owner_id| {
if existing_owner_id.is_some() {
return Err(ExtensionError::Other(
"unexpected existing owner binding".to_string(),
));
}
Ok(TelegramBindingResult::Pending(VerificationChallenge {
code: code.clone(),
instructions: telegram_verification_instructions(bot_username.as_deref(), &code),
deep_link: telegram_verification_deep_link(bot_username.as_deref(), &code),
}))
}))
.await;
}
pub async fn enable_gateway_mode(&self, base_url: String) {
self.gateway_mode
.store(true, std::sync::atomic::Ordering::Release);
*self.gateway_base_url.write().await = Some(base_url);
}
pub fn should_use_gateway_mode(&self) -> bool {
if self.gateway_mode.load(std::sync::atomic::Ordering::Acquire) {
return true;
}
if crate::cli::oauth_defaults::use_gateway_callback() {
return true;
}
self.tunnel_url
.as_ref()
.filter(|u| !u.is_empty())
.and_then(|raw| url::Url::parse(raw).ok())
.and_then(|u| u.host_str().map(String::from))
.map(|host| !crate::cli::oauth_defaults::is_loopback_host(&host))
.unwrap_or(false)
}
async fn gateway_callback_redirect_uri(&self) -> Option<String> {
use crate::cli::oauth_defaults;
if oauth_defaults::use_gateway_callback() {
return Some(normalize_hosted_callback_url(
&oauth_defaults::callback_url(),
));
}
if let Some(ref base) = *self.gateway_base_url.read().await {
let base = base.trim_end_matches('/');
return Some(format!("{}/oauth/callback", base));
}
self.tunnel_url
.as_ref()
.filter(|u| !u.is_empty())
.and_then(|raw| {
let url = url::Url::parse(raw).ok()?;
let host = url.host_str().map(String::from)?;
if oauth_defaults::is_loopback_host(&host) {
return None;
}
let base = raw.trim_end_matches('/');
Some(format!("{}/oauth/callback", base))
})
}
fn relay_config(&self) -> Result<&crate::config::RelayConfig, ExtensionError> {
self.relay_config.as_ref().ok_or_else(|| {
ExtensionError::Config(
"CHANNEL_RELAY_URL and CHANNEL_RELAY_API_KEY must be set".to_string(),
)
})
}
pub fn relay_event_tx(
&self,
) -> Arc<
tokio::sync::Mutex<
Option<tokio::sync::mpsc::Sender<crate::channels::relay::client::ChannelEvent>>,
>,
> {
Arc::clone(&self.relay_event_tx)
}
pub fn relay_signing_secret(&self) -> Option<Vec<u8>> {
self.relay_signing_secret_cache.lock().ok()?.clone()
}
async fn clear_relay_webhook_state(&self) {
*self.relay_event_tx.lock().await = None;
if let Ok(mut cache) = self.relay_signing_secret_cache.lock() {
*cache = None;
}
}
pub async fn inject_registry_entry(&self, entry: crate::extensions::RegistryEntry) {
self.registry.cache_discovered(vec![entry]).await;
}
pub async fn set_channel_runtime(
&self,
channel_manager: Arc<ChannelManager>,
wasm_channel_runtime: Arc<WasmChannelRuntime>,
pairing_store: Arc<PairingStore>,
wasm_channel_router: Arc<WasmChannelRouter>,
wasm_channel_owner_ids: std::collections::HashMap<String, i64>,
) {
*self.relay_channel_manager.write().await = Some(Arc::clone(&channel_manager));
*self.channel_runtime.write().await = Some(ChannelRuntimeState {
channel_manager,
wasm_channel_runtime,
pairing_store,
wasm_channel_router,
wasm_channel_owner_ids,
});
}
async fn current_channel_owner_id(&self, name: &str) -> Option<i64> {
{
let rt_guard = self.channel_runtime.read().await;
if let Some(owner_id) = rt_guard
.as_ref()
.and_then(|rt| rt.wasm_channel_owner_ids.get(name).copied())
{
return Some(owner_id);
}
}
let store = self.store.as_ref()?;
let key = format!("channels.wasm_channel_owner_ids.{name}");
match store.get_setting(&self.user_id, &key).await {
Ok(Some(serde_json::Value::Number(n))) => n.as_i64(),
Ok(Some(serde_json::Value::String(s))) => s.parse::<i64>().ok(),
Ok(Some(_)) | Ok(None) => None,
Err(e) => {
tracing::debug!(
channel = %name,
error = %e,
"Failed to read persisted wasm channel owner id"
);
None
}
}
}
async fn set_channel_owner_id(&self, name: &str, owner_id: i64) -> Result<(), ExtensionError> {
if let Some(store) = self.store.as_ref() {
store
.set_setting(
&self.user_id,
&format!("channels.wasm_channel_owner_ids.{name}"),
&serde_json::json!(owner_id),
)
.await
.map_err(|e| ExtensionError::Config(e.to_string()))?;
}
let mut rt_guard = self.channel_runtime.write().await;
if let Some(rt) = rt_guard.as_mut() {
rt.wasm_channel_owner_ids.insert(name.to_string(), owner_id);
}
Ok(())
}
async fn load_channel_runtime_config_overrides(
&self,
name: &str,
) -> HashMap<String, serde_json::Value> {
let mut overrides = HashMap::new();
if name == TELEGRAM_CHANNEL_NAME
&& let Some(store) = self.store.as_ref()
&& let Ok(Some(serde_json::Value::String(username))) = store
.get_setting(&self.user_id, &bot_username_setting_key(name))
.await
&& !username.trim().is_empty()
{
overrides.insert("bot_username".to_string(), serde_json::json!(username));
}
overrides
}
pub async fn has_wasm_channel_owner_binding(&self, name: &str) -> bool {
self.current_channel_owner_id(name).await.is_some()
}
pub(crate) async fn notification_target_for_channel(&self, name: &str) -> Option<String> {
self.current_channel_owner_id(name)
.await
.map(|owner_id| owner_id.to_string())
}
async fn get_pending_telegram_verification(
&self,
name: &str,
) -> Option<PendingTelegramVerificationChallenge> {
let now = unix_timestamp_secs();
let mut guard = self.pending_telegram_verification.write().await;
let challenge = guard.get(name).cloned()?;
if challenge.expires_at_unix <= now {
guard.remove(name);
return None;
}
Some(challenge)
}
async fn set_pending_telegram_verification(
&self,
name: &str,
challenge: PendingTelegramVerificationChallenge,
) {
self.pending_telegram_verification
.write()
.await
.insert(name.to_string(), challenge);
}
async fn clear_pending_telegram_verification(&self, name: &str) {
self.pending_telegram_verification
.write()
.await
.remove(name);
}
async fn issue_telegram_verification_challenge(
&self,
client: &reqwest::Client,
name: &str,
bot_token: &str,
bot_username: Option<&str>,
) -> Result<VerificationChallenge, ExtensionError> {
let delete_webhook_url = format!("https://api.telegram.org/bot{bot_token}/deleteWebhook");
let delete_webhook_resp = client
.post(&delete_webhook_url)
.query(&[("drop_pending_updates", "true")])
.send()
.await
.map_err(|e| telegram_request_error("deleteWebhook", &e))?;
if !delete_webhook_resp.status().is_success() {
return Err(ExtensionError::Other(format!(
"Telegram deleteWebhook failed (HTTP {})",
delete_webhook_resp.status()
)));
}
let challenge = PendingTelegramVerificationChallenge {
code: generate_telegram_verification_code(),
bot_username: bot_username.map(str::to_string),
expires_at_unix: unix_timestamp_secs() + TELEGRAM_OWNER_BIND_CHALLENGE_TTL_SECS,
};
self.set_pending_telegram_verification(name, challenge.clone())
.await;
Ok(VerificationChallenge {
code: challenge.code.clone(),
instructions: telegram_verification_instructions(
challenge.bot_username.as_deref(),
&challenge.code,
),
deep_link: telegram_verification_deep_link(
challenge.bot_username.as_deref(),
&challenge.code,
),
})
}
pub async fn set_relay_channel_manager(&self, channel_manager: Arc<ChannelManager>) {
*self.relay_channel_manager.write().await = Some(channel_manager);
}
pub async fn is_relay_channel(&self, name: &str, user_id: &str) -> bool {
if self.installed_relay_extensions.read().await.contains(name) {
return true;
}
if let Some(ref store) = self.store {
let key = format!("relay:{}:team_id", name);
if let Ok(Some(v)) = store.get_setting(user_id, &key).await {
return v.as_str().is_some_and(|s| !s.is_empty());
}
}
false
}
pub async fn restore_relay_channels(&self, user_id: &str) {
let persisted = self.load_persisted_active_channels(user_id).await;
let already_active = self.active_channel_names.read().await.clone();
for name in &persisted {
if already_active.contains(name) {
continue;
}
if !self.is_relay_channel(name, user_id).await {
continue;
}
match self.activate_stored_relay(name, user_id).await {
Ok(_) => {
tracing::debug!(channel = %name, "Restored persisted relay channel");
}
Err(e) => {
tracing::warn!(
channel = %name,
error = %e,
"Failed to restore persisted relay channel"
);
}
}
}
}
pub fn secrets(&self) -> &Arc<dyn SecretsStore + Send + Sync> {
&self.secrets
}
pub(crate) async fn inject_mcp_client(
&self,
name: String,
client: Arc<crate::tools::mcp::McpClient>,
) {
if name.is_empty() {
tracing::warn!("inject_mcp_client called with empty name; ignoring");
return;
}
if let Err(e) = Self::validate_extension_name(&name) {
tracing::warn!(
error = %e,
name = %name,
"inject_mcp_client called with invalid name; ignoring"
);
return;
}
self.mcp_clients.write().await.insert(name, client);
}
pub async fn set_active_channels(&self, names: Vec<String>) {
let mut active = self.active_channel_names.write().await;
active.extend(names);
}
async fn persist_active_channels(&self, user_id: &str) {
let Some(ref store) = self.store else {
return;
};
let names: Vec<String> = self
.active_channel_names
.read()
.await
.iter()
.cloned()
.collect();
let value = serde_json::json!(names);
if let Err(e) = store
.set_setting(user_id, "activated_channels", &value)
.await
{
tracing::warn!(error = %e, "Failed to persist activated_channels setting");
}
}
pub async fn load_persisted_active_channels(&self, user_id: &str) -> Vec<String> {
let Some(ref store) = self.store else {
return Vec::new();
};
match store.get_setting(user_id, "activated_channels").await {
Ok(Some(value)) => match serde_json::from_value(value) {
Ok(names) => names,
Err(e) => {
tracing::warn!(error = %e, "Failed to deserialize activated_channels");
Vec::new()
}
},
Ok(None) => Vec::new(),
Err(e) => {
tracing::warn!(error = %e, "Failed to load activated_channels setting");
Vec::new()
}
}
}
pub async fn set_sse_sender(&self, sse: Arc<crate::channels::web::sse::SseManager>) {
*self.sse_manager.write().await = Some(sse);
}
pub fn pending_oauth_flows(&self) -> &crate::cli::oauth_defaults::PendingOAuthRegistry {
&self.pending_oauth_flows
}
async fn clear_pending_extension_auth(&self, name: &str) {
{
let mut pending = self.pending_auth.write().await;
if let Some(old) = pending.remove(name)
&& let Some(handle) = old.task_handle
{
handle.abort();
}
}
let mut flows = self.pending_oauth_flows.write().await;
flows.retain(|_, flow| flow.extension_name != name);
}
fn rewrite_oauth_state_param(
auth_url: String,
expected_state: &str,
hosted_state: &str,
) -> String {
if hosted_state == expected_state {
return auth_url;
}
let Ok(mut parsed) = url::Url::parse(&auth_url) else {
return auth_url.replace(
&format!("state={}", urlencoding::encode(expected_state)),
&format!("state={}", urlencoding::encode(hosted_state)),
);
};
let mut replaced = false;
let pairs: Vec<(String, String)> = parsed
.query_pairs()
.map(|(key, value)| {
if key == "state" {
replaced = true;
(key.into_owned(), hosted_state.to_string())
} else {
(key.into_owned(), value.into_owned())
}
})
.collect();
{
let mut query_pairs = parsed.query_pairs_mut();
query_pairs.clear();
for (key, value) in pairs {
query_pairs.append_pair(&key, &value);
}
if !replaced {
query_pairs.append_pair("state", hosted_state);
}
}
parsed.to_string()
}
async fn start_gateway_oauth_flow(&self, request: HostedOAuthFlowStart) -> AuthResult {
use crate::cli::oauth_defaults;
oauth_defaults::sweep_expired_flows(&self.pending_oauth_flows).await;
let hosted_state = oauth_defaults::build_platform_state(&request.expected_state);
let auth_url = Self::rewrite_oauth_state_param(
request.auth_url,
&request.expected_state,
&hosted_state,
);
self.pending_oauth_flows
.write()
.await
.insert(request.expected_state, request.flow);
self.pending_auth.write().await.insert(
request.name.clone(),
PendingAuth {
_name: request.name.clone(),
_kind: request.kind,
created_at: std::time::Instant::now(),
task_handle: None,
},
);
AuthResult::awaiting_authorization(
request.name,
request.kind,
auth_url,
"gateway".to_string(),
)
}
async fn broadcast_extension_status(&self, name: &str, status: &str, message: Option<&str>) {
if let Some(ref sse) = *self.sse_manager.read().await {
sse.broadcast(ironclaw_common::AppEvent::ExtensionStatus {
extension_name: name.to_string(),
status: status.to_string(),
message: message.map(|m| m.to_string()),
});
}
}
pub async fn search(
&self,
query: &str,
discover: bool,
) -> Result<Vec<SearchResult>, ExtensionError> {
let mut results = self.registry.search(query).await;
if discover && results.is_empty() {
tracing::info!("No built-in results for '{}', searching online...", query);
let discovered = self.discovery.discover(query).await;
if !discovered.is_empty() {
self.registry.cache_discovered(discovered.clone()).await;
for entry in discovered {
results.push(SearchResult {
entry,
source: ResultSource::Discovered,
validated: true,
});
}
}
}
Ok(results)
}
pub async fn install(
&self,
name: &str,
url: Option<&str>,
kind_hint: Option<ExtensionKind>,
user_id: &str,
) -> Result<InstallResult, ExtensionError> {
let sanitized_url = url.map(sanitize_url_for_logging);
tracing::info!(extension = %name, url = ?sanitized_url, kind = ?kind_hint, "Installing extension");
Self::validate_extension_name(name)?;
if let Some(entry) = self.registry.get_with_kind(name, kind_hint).await {
return self.install_from_entry(&entry, user_id).await.map_err(|e| {
tracing::error!(extension = %name, error = %e, "Extension install failed");
e
});
}
if let Some(url) = url {
let kind = kind_hint.unwrap_or_else(|| infer_kind_from_url(url));
return match kind {
ExtensionKind::McpServer => self.install_mcp_from_url(name, url, user_id).await,
ExtensionKind::WasmTool => self.install_wasm_tool_from_url(name, url).await,
ExtensionKind::WasmChannel => {
self.install_wasm_channel_from_url(name, url, None).await
}
ExtensionKind::ChannelRelay => {
Err(ExtensionError::InstallFailed(
"Channel relay extensions cannot be installed by URL".to_string(),
))
}
}
.map_err(|e| {
let sanitized = sanitize_url_for_logging(url);
tracing::error!(extension = %name, url = %sanitized, error = %e, "Extension install from URL failed");
e
});
}
let err = ExtensionError::NotFound(format!(
"'{}' not found in registry. Try searching with discover:true or provide a URL.",
name
));
tracing::warn!(extension = %name, "Extension not found in registry");
Err(err)
}
pub async fn auth(&self, name: &str, user_id: &str) -> Result<AuthResult, ExtensionError> {
self.cleanup_expired_auths().await;
let kind = self.determine_installed_kind(name, user_id).await?;
match kind {
ExtensionKind::McpServer => self.auth_mcp(name, user_id).await,
ExtensionKind::WasmTool => self.auth_wasm_tool(name, user_id).await,
ExtensionKind::WasmChannel => self.auth_wasm_channel_status(name, user_id).await,
ExtensionKind::ChannelRelay => self.auth_channel_relay(name, user_id).await,
}
}
pub async fn activate(
&self,
name: &str,
user_id: &str,
) -> Result<ActivateResult, ExtensionError> {
Self::validate_extension_name(name)?;
let kind = self.determine_installed_kind(name, user_id).await?;
match kind {
ExtensionKind::McpServer => self.activate_mcp(name, user_id).await,
ExtensionKind::WasmTool => self.activate_wasm_tool(name, user_id).await,
ExtensionKind::WasmChannel => self.activate_wasm_channel(name, user_id).await,
ExtensionKind::ChannelRelay => self.activate_channel_relay(name, user_id).await,
}
}
pub async fn list(
&self,
kind_filter: Option<ExtensionKind>,
include_available: bool,
user_id: &str,
) -> Result<Vec<InstalledExtension>, ExtensionError> {
let mut extensions = Vec::new();
if kind_filter.is_none() || kind_filter == Some(ExtensionKind::McpServer) {
match self.load_mcp_servers(user_id).await {
Ok(servers) => {
for server in &servers.servers {
let authenticated = is_authenticated(server, &self.secrets, user_id).await;
let clients = self.mcp_clients.read().await;
let active = clients.contains_key(&server.name);
let tools = if active {
self.tool_registry
.list()
.await
.into_iter()
.filter(|t| t.starts_with(&format!("{}_", server.name)))
.collect()
} else {
Vec::new()
};
let display_name = self
.registry
.get_with_kind(&server.name, Some(ExtensionKind::McpServer))
.await
.map(|e| e.display_name);
extensions.push(InstalledExtension {
name: server.name.clone(),
kind: ExtensionKind::McpServer,
display_name,
description: server.description.clone(),
url: Some(server.url.clone()),
authenticated,
active,
tools,
needs_setup: false,
has_auth: false,
installed: true,
activation_error: None,
version: None,
});
}
}
Err(e) => {
tracing::debug!("Failed to load MCP servers for listing: {}", e);
}
}
}
if (kind_filter.is_none() || kind_filter == Some(ExtensionKind::WasmTool))
&& self.wasm_tools_dir.exists()
{
match discover_tools(&self.wasm_tools_dir).await {
Ok(tools) => {
for (name, discovered) in tools {
let active = self.tool_registry.has(&name).await;
let registry_entry = self
.registry
.get_with_kind(&name, Some(ExtensionKind::WasmTool))
.await;
let display_name = registry_entry.as_ref().map(|e| e.display_name.clone());
let auth_state = self.check_tool_auth_status(&name, user_id).await;
let version = if let Some(ref cap_path) = discovered.capabilities_path {
tokio::fs::read(cap_path)
.await
.ok()
.and_then(|bytes| {
crate::tools::wasm::CapabilitiesFile::from_bytes(&bytes).ok()
})
.and_then(|cap| cap.version)
} else {
None
};
let version =
version.or_else(|| registry_entry.and_then(|e| e.version.clone()));
extensions.push(InstalledExtension {
name: name.clone(),
kind: ExtensionKind::WasmTool,
display_name,
description: None,
url: None,
authenticated: auth_state == ToolAuthState::Ready,
active,
tools: if active { vec![name] } else { Vec::new() },
needs_setup: auth_state == ToolAuthState::NeedsSetup,
has_auth: auth_state != ToolAuthState::NoAuth,
installed: true,
activation_error: None,
version,
});
}
}
Err(e) => {
tracing::debug!("Failed to discover WASM tools for listing: {}", e);
}
}
}
if (kind_filter.is_none() || kind_filter == Some(ExtensionKind::WasmChannel))
&& self.wasm_channels_dir.exists()
{
match crate::channels::wasm::discover_channels(&self.wasm_channels_dir).await {
Ok(channels) => {
let active_names = self.active_channel_names.read().await;
let errors = self.activation_errors.read().await;
for (name, discovered) in channels {
let active = active_names.contains(&name);
let auth_state = self.check_channel_auth_status(&name, user_id).await;
let activation_error = errors.get(&name).cloned();
let registry_entry = self
.registry
.get_with_kind(&name, Some(ExtensionKind::WasmChannel))
.await;
let display_name = registry_entry.as_ref().map(|e| e.display_name.clone());
let version = if let Some(ref cap_path) = discovered.capabilities_path {
tokio::fs::read(cap_path)
.await
.ok()
.and_then(|bytes| {
crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(
&bytes,
)
.ok()
})
.and_then(|cap| cap.version)
} else {
None
};
let version =
version.or_else(|| registry_entry.and_then(|e| e.version.clone()));
extensions.push(InstalledExtension {
name,
kind: ExtensionKind::WasmChannel,
display_name,
description: None,
url: None,
authenticated: auth_state == ToolAuthState::Ready,
active,
tools: Vec::new(),
needs_setup: auth_state == ToolAuthState::NeedsSetup,
has_auth: auth_state != ToolAuthState::NoAuth,
installed: true,
activation_error,
version,
});
}
}
Err(e) => {
tracing::debug!("Failed to discover WASM channels for listing: {}", e);
}
}
}
if kind_filter.is_none() || kind_filter == Some(ExtensionKind::ChannelRelay) {
let installed = self.installed_relay_extensions.read().await;
let active_names = self.active_channel_names.read().await;
let errors = self.activation_errors.read().await;
for name in installed.iter() {
let active = active_names.contains(name);
let authenticated = self.is_relay_channel(name, user_id).await;
let activation_error = errors.get(name).cloned();
let registry_entry = self
.registry
.get_with_kind(name, Some(ExtensionKind::ChannelRelay))
.await;
let display_name = registry_entry.as_ref().map(|e| e.display_name.clone());
let description = registry_entry.as_ref().map(|e| e.description.clone());
extensions.push(InstalledExtension {
name: name.clone(),
kind: ExtensionKind::ChannelRelay,
display_name,
description,
url: None,
authenticated,
active,
tools: Vec::new(),
needs_setup: false,
has_auth: true,
installed: true,
activation_error,
version: None,
});
}
}
if include_available {
let installed_names: std::collections::HashSet<(String, ExtensionKind)> = extensions
.iter()
.map(|e| (e.name.clone(), e.kind))
.collect();
for entry in self.registry.all_entries().await {
if let Some(filter) = kind_filter
&& entry.kind != filter
{
continue;
}
if installed_names.contains(&(entry.name.clone(), entry.kind)) {
continue;
}
extensions.push(InstalledExtension {
name: entry.name,
kind: entry.kind,
display_name: Some(entry.display_name),
description: Some(entry.description),
url: None,
authenticated: false,
active: false,
tools: Vec::new(),
needs_setup: false,
has_auth: false,
installed: false,
activation_error: None,
version: entry.version,
});
}
}
Ok(extensions)
}
pub async fn remove(&self, name: &str, user_id: &str) -> Result<String, ExtensionError> {
Self::validate_extension_name(name)?;
let kind = self.determine_installed_kind(name, user_id).await?;
if let Some(pending) = self.pending_auth.write().await.remove(name)
&& let Some(handle) = pending.task_handle
{
handle.abort();
}
self.pending_oauth_flows
.write()
.await
.retain(|_, flow| flow.extension_name != name);
match kind {
ExtensionKind::McpServer => {
let tool_names: Vec<String> = self
.tool_registry
.list()
.await
.into_iter()
.filter(|t| t.starts_with(&format!("{}_", name)))
.collect();
for tool_name in &tool_names {
self.tool_registry.unregister(tool_name).await;
}
self.mcp_clients.write().await.remove(name);
self.remove_mcp_server(name, user_id)
.await
.map_err(|e| ExtensionError::Config(e.to_string()))?;
Ok(format!(
"Removed MCP server '{}' and {} tool(s)",
name,
tool_names.len()
))
}
ExtensionKind::WasmTool => {
self.tool_registry.unregister(name).await;
if let Some(ref rt) = self.wasm_tool_runtime {
rt.remove(name).await;
}
self.activation_errors.write().await.remove(name);
let cap_path = self
.wasm_tools_dir
.join(format!("{}.capabilities.json", name));
self.revoke_credential_mappings(&cap_path).await;
let removed_hooks = self
.unregister_hook_prefix(&format!("plugin.tool:{}::", name))
.await
+ self
.unregister_hook_prefix(&format!("plugin.dev_tool:{}::", name))
.await;
if removed_hooks > 0 {
tracing::info!(
extension = name,
removed_hooks = removed_hooks,
"Removed plugin hooks for WASM tool"
);
}
let wasm_path = self.wasm_tools_dir.join(format!("{}.wasm", name));
if wasm_path.exists() {
tokio::fs::remove_file(&wasm_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
}
if cap_path.exists() {
let _ = tokio::fs::remove_file(&cap_path).await;
}
Ok(format!("Removed WASM tool '{}'", name))
}
ExtensionKind::WasmChannel => {
self.active_channel_names.write().await.remove(name);
self.persist_active_channels(user_id).await;
self.activation_errors.write().await.remove(name);
let wasm_path = self.wasm_channels_dir.join(format!("{}.wasm", name));
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
self.revoke_credential_mappings(&cap_path).await;
if wasm_path.exists() {
tokio::fs::remove_file(&wasm_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
}
if cap_path.exists() {
let _ = tokio::fs::remove_file(&cap_path).await;
}
Ok(format!(
"Removed channel '{}'. Restart IronClaw for the change to take effect.",
name
))
}
ExtensionKind::ChannelRelay => {
self.installed_relay_extensions.write().await.remove(name);
self.active_channel_names.write().await.remove(name);
self.persist_active_channels(user_id).await;
self.activation_errors.write().await.remove(name);
if let Some(ref store) = self.store
&& let Err(e) = store
.delete_setting(user_id, &format!("relay:{}:team_id", name))
.await
{
tracing::warn!(error = %e, name, "Failed to delete relay team_id setting on removal");
}
if let Err(e) = self
.secrets
.delete(user_id, &format!("relay:{}:oauth_state", name))
.await
{
tracing::warn!(error = %e, name, "Failed to delete relay oauth_state secret on removal");
}
let _ = self
.secrets
.delete(user_id, &format!("relay:{}:stream_token", name))
.await;
self.clear_relay_webhook_state().await;
let mut shut_down = false;
if let Some(ref rt) = *self.channel_runtime.read().await
&& let Some(channel) = rt.channel_manager.get_channel(name).await
{
let _ = channel.shutdown().await;
rt.channel_manager.remove(name).await;
shut_down = true;
}
if !shut_down
&& let Some(ref cm) = *self.relay_channel_manager.read().await
&& let Some(channel) = cm.get_channel(name).await
{
let _ = channel.shutdown().await;
cm.remove(name).await;
}
Ok(format!("Removed channel relay '{}'", name))
}
}
}
pub async fn upgrade(
&self,
name: Option<&str>,
user_id: &str,
) -> Result<UpgradeResult, ExtensionError> {
let mut candidates: Vec<(String, ExtensionKind)> = Vec::new();
if let Some(name) = name {
Self::validate_extension_name(name)?;
let kind = self.determine_installed_kind(name, user_id).await?;
if kind == ExtensionKind::McpServer {
return Err(ExtensionError::Other(
"MCP servers don't have WIT versions and cannot be upgraded this way"
.to_string(),
));
}
candidates.push((name.to_string(), kind));
} else {
if self.wasm_tools_dir.exists()
&& let Ok(tools) = discover_tools(&self.wasm_tools_dir).await
{
for (tool_name, _) in tools {
candidates.push((tool_name, ExtensionKind::WasmTool));
}
}
if self.wasm_channels_dir.exists()
&& let Ok(channels) =
crate::channels::wasm::discover_channels(&self.wasm_channels_dir).await
{
for (ch_name, _) in channels {
candidates.push((ch_name, ExtensionKind::WasmChannel));
}
}
}
if candidates.is_empty() {
return Ok(UpgradeResult {
results: Vec::new(),
message: "No WASM extensions installed.".to_string(),
});
}
let mut outcomes = Vec::new();
for (ext_name, kind) in &candidates {
let outcome = self.upgrade_one(ext_name, *kind, user_id).await;
outcomes.push(outcome);
}
let upgraded = outcomes.iter().filter(|o| o.status == "upgraded").count();
let up_to_date = outcomes
.iter()
.filter(|o| o.status == "already_up_to_date")
.count();
let failed = outcomes.iter().filter(|o| o.status == "failed").count();
let message = format!(
"{} extension(s) checked: {} upgraded, {} already up to date, {} failed",
outcomes.len(),
upgraded,
up_to_date,
failed
);
Ok(UpgradeResult {
results: outcomes,
message,
})
}
async fn upgrade_one(&self, name: &str, kind: ExtensionKind, user_id: &str) -> UpgradeOutcome {
let (cap_dir, host_wit) = match kind {
ExtensionKind::WasmTool => (&self.wasm_tools_dir, crate::tools::wasm::WIT_TOOL_VERSION),
ExtensionKind::WasmChannel => (
&self.wasm_channels_dir,
crate::tools::wasm::WIT_CHANNEL_VERSION,
),
ExtensionKind::McpServer | ExtensionKind::ChannelRelay => {
return UpgradeOutcome {
name: name.to_string(),
kind,
status: "failed".to_string(),
detail: "This extension type cannot be upgraded this way".to_string(),
};
}
};
let cap_path = cap_dir.join(format!("{}.capabilities.json", name));
let declared_wit = if cap_path.exists() {
match tokio::fs::read(&cap_path).await {
Ok(bytes) => {
let wit: Option<String> = match kind {
ExtensionKind::WasmTool => {
crate::tools::wasm::CapabilitiesFile::from_bytes(&bytes)
.ok()
.and_then(|c| c.wit_version)
}
ExtensionKind::WasmChannel => {
crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&bytes)
.ok()
.and_then(|c| c.wit_version)
}
ExtensionKind::McpServer | ExtensionKind::ChannelRelay => None,
};
wit
}
Err(_) => None,
}
} else {
None
};
let needs_upgrade =
crate::tools::wasm::check_wit_version_compat(name, declared_wit.as_deref(), host_wit)
.is_err();
if !needs_upgrade {
return UpgradeOutcome {
name: name.to_string(),
kind,
status: "already_up_to_date".to_string(),
detail: format!(
"WIT {} matches host WIT {}",
declared_wit.as_deref().unwrap_or("unknown"),
host_wit
),
};
}
let entry = self.registry.get_with_kind(name, Some(kind)).await;
let Some(entry) = entry else {
return UpgradeOutcome {
name: name.to_string(),
kind,
status: "not_in_registry".to_string(),
detail: format!(
"Extension '{}' has outdated WIT {} (host: {}), \
but is not in the registry. Reinstall manually with a URL.",
name,
declared_wit.as_deref().unwrap_or("unknown"),
host_wit
),
};
};
let wasm_path = cap_dir.join(format!("{}.wasm", name));
if wasm_path.exists()
&& let Err(e) = tokio::fs::remove_file(&wasm_path).await
{
return UpgradeOutcome {
name: name.to_string(),
kind,
status: "failed".to_string(),
detail: format!("Failed to remove old WASM binary: {}", e),
};
}
if cap_path.exists() {
let _ = tokio::fs::remove_file(&cap_path).await;
}
match self.install_from_entry(&entry, user_id).await {
Ok(_) => {
tracing::info!(
extension = %name,
old_wit = ?declared_wit,
new_host_wit = %host_wit,
"Upgraded WASM extension"
);
UpgradeOutcome {
name: name.to_string(),
kind,
status: "upgraded".to_string(),
detail: format!(
"Upgraded from WIT {} to host WIT {}. Restart to activate.",
declared_wit.as_deref().unwrap_or("unknown"),
host_wit
),
}
}
Err(e) => UpgradeOutcome {
name: name.to_string(),
kind,
status: "failed".to_string(),
detail: format!("Reinstall failed: {}. Old files were removed.", e),
},
}
}
pub async fn extension_info(
&self,
name: &str,
user_id: &str,
) -> Result<serde_json::Value, ExtensionError> {
Self::validate_extension_name(name)?;
let kind = self.determine_installed_kind(name, user_id).await?;
match kind {
ExtensionKind::WasmTool => {
let cap_path = self
.wasm_tools_dir
.join(format!("{}.capabilities.json", name));
let wasm_path = self.wasm_tools_dir.join(format!("{}.wasm", name));
let mut info = serde_json::json!({
"name": name,
"kind": "wasm_tool",
"installed": wasm_path.exists(),
});
if cap_path.exists()
&& let Ok(bytes) = tokio::fs::read(&cap_path).await
&& let Ok(cap) = crate::tools::wasm::CapabilitiesFile::from_bytes(&bytes)
{
info["version"] =
serde_json::json!(cap.version.unwrap_or_else(|| "unknown".into()));
info["wit_version"] =
serde_json::json!(cap.wit_version.unwrap_or_else(|| "unknown".into()));
}
info["host_wit_version"] = serde_json::json!(crate::tools::wasm::WIT_TOOL_VERSION);
Ok(info)
}
ExtensionKind::WasmChannel => {
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
let wasm_path = self.wasm_channels_dir.join(format!("{}.wasm", name));
let mut info = serde_json::json!({
"name": name,
"kind": "wasm_channel",
"installed": wasm_path.exists(),
"active": self.active_channel_names.read().await.contains(name),
});
if cap_path.exists()
&& let Ok(bytes) = tokio::fs::read(&cap_path).await
&& let Ok(cap) =
crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&bytes)
{
info["version"] =
serde_json::json!(cap.version.unwrap_or_else(|| "unknown".into()));
info["wit_version"] =
serde_json::json!(cap.wit_version.unwrap_or_else(|| "unknown".into()));
}
info["host_wit_version"] =
serde_json::json!(crate::tools::wasm::WIT_CHANNEL_VERSION);
Ok(info)
}
ExtensionKind::McpServer => {
let info = serde_json::json!({
"name": name,
"kind": "mcp_server",
"connected": self.mcp_clients.read().await.contains_key(name),
});
Ok(info)
}
ExtensionKind::ChannelRelay => {
let info = serde_json::json!({
"name": name,
"kind": "channel_relay",
"active": self.active_channel_names.read().await.contains(name),
});
Ok(info)
}
}
}
async fn load_mcp_servers(
&self,
user_id: &str,
) -> Result<crate::tools::mcp::config::McpServersFile, crate::tools::mcp::config::ConfigError>
{
if let Some(ref store) = self.store {
crate::tools::mcp::config::load_mcp_servers_from_db(store.as_ref(), user_id).await
} else {
crate::tools::mcp::config::load_mcp_servers().await
}
}
async fn get_mcp_server(
&self,
name: &str,
user_id: &str,
) -> Result<McpServerConfig, crate::tools::mcp::config::ConfigError> {
let servers = self.load_mcp_servers(user_id).await?;
servers.get(name).cloned().ok_or_else(|| {
crate::tools::mcp::config::ConfigError::ServerNotFound {
name: name.to_string(),
}
})
}
async fn add_mcp_server(
&self,
config: McpServerConfig,
user_id: &str,
) -> Result<(), crate::tools::mcp::config::ConfigError> {
config.validate()?;
if let Some(ref store) = self.store {
crate::tools::mcp::config::add_mcp_server_db(store.as_ref(), user_id, config).await
} else {
crate::tools::mcp::config::add_mcp_server(config).await
}
}
async fn remove_mcp_server(
&self,
name: &str,
user_id: &str,
) -> Result<(), crate::tools::mcp::config::ConfigError> {
if let Some(ref store) = self.store {
crate::tools::mcp::config::remove_mcp_server_db(store.as_ref(), user_id, name).await
} else {
crate::tools::mcp::config::remove_mcp_server(name).await
}
}
async fn install_from_entry(
&self,
entry: &RegistryEntry,
user_id: &str,
) -> Result<InstallResult, ExtensionError> {
let primary_result = self
.try_install_from_source(entry, &entry.source, user_id)
.await;
match fallback_decision(&primary_result, &entry.fallback_source) {
FallbackDecision::Return => primary_result,
FallbackDecision::TryFallback => {
let (primary_err, fallback) = match (primary_result, entry.fallback_source.as_ref())
{
(Err(e), Some(f)) => (e, f),
(other, _) => return other,
};
tracing::info!(
extension = %entry.name,
primary_error = %primary_err,
"Primary install failed, trying fallback source"
);
match self.try_install_from_source(entry, fallback, user_id).await {
Ok(result) => Ok(result),
Err(fallback_err) => {
tracing::error!(
extension = %entry.name,
fallback_error = %fallback_err,
"Fallback install also failed"
);
Err(combine_install_errors(primary_err, fallback_err))
}
}
}
}
}
async fn try_install_from_source(
&self,
entry: &RegistryEntry,
source: &ExtensionSource,
user_id: &str,
) -> Result<InstallResult, ExtensionError> {
match entry.kind {
ExtensionKind::McpServer => {
let url = match source {
ExtensionSource::McpUrl { url } => url.clone(),
ExtensionSource::Discovered { url } => url.clone(),
_ => {
return Err(ExtensionError::InstallFailed(
"Registry entry for MCP server has no URL".to_string(),
));
}
};
self.install_mcp_from_url(&entry.name, &url, user_id).await
}
ExtensionKind::WasmTool => match source {
ExtensionSource::WasmDownload {
wasm_url,
capabilities_url,
} => {
self.install_wasm_tool_from_url_with_caps(
&entry.name,
wasm_url,
capabilities_url.as_deref(),
)
.await
}
ExtensionSource::WasmBuildable {
build_dir,
crate_name,
..
} => {
self.install_wasm_from_buildable(
&entry.name,
build_dir.as_deref(),
crate_name.as_deref(),
&self.wasm_tools_dir,
ExtensionKind::WasmTool,
)
.await
}
_ => Err(ExtensionError::InstallFailed(
"WASM tool entry has no download URL or build info".to_string(),
)),
},
ExtensionKind::WasmChannel => match source {
ExtensionSource::WasmDownload {
wasm_url,
capabilities_url,
} => {
self.install_wasm_channel_from_url(
&entry.name,
wasm_url,
capabilities_url.as_deref(),
)
.await
}
ExtensionSource::WasmBuildable {
build_dir,
crate_name,
..
} => {
self.install_wasm_from_buildable(
&entry.name,
build_dir.as_deref(),
crate_name.as_deref(),
&self.wasm_channels_dir,
ExtensionKind::WasmChannel,
)
.await
}
_ => Err(ExtensionError::InstallFailed(
"WASM channel entry has no download URL or build info".to_string(),
)),
},
ExtensionKind::ChannelRelay => {
self.installed_relay_extensions
.write()
.await
.insert(entry.name.clone());
Ok(InstallResult {
name: entry.name.clone(),
kind: ExtensionKind::ChannelRelay,
message: format!(
"'{}' installed. Click Activate to connect your workspace.",
entry.display_name
),
})
}
}
}
async fn install_mcp_from_url(
&self,
name: &str,
url: &str,
user_id: &str,
) -> Result<InstallResult, ExtensionError> {
if self.get_mcp_server(name, user_id).await.is_ok() {
return Err(ExtensionError::AlreadyInstalled(name.to_string()));
}
let config = McpServerConfig::new(name, url);
config
.validate()
.map_err(|e| ExtensionError::InvalidUrl(e.to_string()))?;
self.add_mcp_server(config, user_id)
.await
.map_err(|e| ExtensionError::Config(e.to_string()))?;
tracing::info!("Installed MCP server '{}' at {}", name, url);
Ok(InstallResult {
name: name.to_string(),
kind: ExtensionKind::McpServer,
message: format!(
"MCP server '{}' installed. Run auth next to authenticate.",
name
),
})
}
async fn install_wasm_tool_from_url(
&self,
name: &str,
url: &str,
) -> Result<InstallResult, ExtensionError> {
self.install_wasm_tool_from_url_with_caps(name, url, None)
.await
}
async fn install_wasm_tool_from_url_with_caps(
&self,
name: &str,
url: &str,
capabilities_url: Option<&str>,
) -> Result<InstallResult, ExtensionError> {
self.download_and_install_wasm(name, url, capabilities_url, &self.wasm_tools_dir)
.await?;
Ok(InstallResult {
name: name.to_string(),
kind: ExtensionKind::WasmTool,
message: format!("WASM tool '{}' installed. Run activate to load it.", name),
})
}
async fn install_wasm_channel_from_url(
&self,
name: &str,
url: &str,
capabilities_url: Option<&str>,
) -> Result<InstallResult, ExtensionError> {
self.download_and_install_wasm(name, url, capabilities_url, &self.wasm_channels_dir)
.await?;
Ok(InstallResult {
name: name.to_string(),
kind: ExtensionKind::WasmChannel,
message: format!(
"WASM channel '{}' installed. Run activate to start it.",
name,
),
})
}
async fn download_and_install_wasm(
&self,
name: &str,
url: &str,
capabilities_url: Option<&str>,
target_dir: &std::path::Path,
) -> Result<(), ExtensionError> {
if !url.starts_with("https://") {
return Err(ExtensionError::InstallFailed(
"Only HTTPS URLs are allowed for extension downloads".to_string(),
));
}
const MAX_DOWNLOAD_SIZE: usize = 50 * 1024 * 1024;
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(60))
.build()
.map_err(|e| ExtensionError::DownloadFailed(e.to_string()))?;
let sanitized_url = sanitize_url_for_logging(url);
tracing::debug!(extension = %name, url = %sanitized_url, "Downloading WASM extension");
let response = client.get(url).send().await.map_err(|e| {
tracing::error!(extension = %name, url = %sanitized_url, error = %e, "Download request failed");
ExtensionError::DownloadFailed(e.to_string())
})?;
if !response.status().is_success() {
let status = response.status();
tracing::error!(
extension = %name,
url = %sanitized_url,
status = %status,
"Download returned non-success HTTP status"
);
return Err(ExtensionError::DownloadFailed(format!(
"HTTP {} from {}",
status, url
)));
}
if let Some(len) = response.content_length()
&& len as usize > MAX_DOWNLOAD_SIZE
{
return Err(ExtensionError::InstallFailed(format!(
"Download too large ({} bytes, max {} bytes)",
len, MAX_DOWNLOAD_SIZE
)));
}
let bytes = response
.bytes()
.await
.map_err(|e| ExtensionError::DownloadFailed(e.to_string()))?;
if bytes.len() > MAX_DOWNLOAD_SIZE {
return Err(ExtensionError::InstallFailed(format!(
"Download too large ({} bytes, max {} bytes)",
bytes.len(),
MAX_DOWNLOAD_SIZE
)));
}
tokio::fs::create_dir_all(target_dir)
.await
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
let wasm_path = target_dir.join(format!("{}.wasm", name));
let caps_path = target_dir.join(format!("{}.capabilities.json", name));
if bytes.len() >= 2 && bytes[0] == 0x1f && bytes[1] == 0x8b {
self.extract_wasm_tar_gz(name, &bytes, &wasm_path, &caps_path)?;
} else {
if bytes.len() < 4 || &bytes[..4] != b"\0asm" {
return Err(ExtensionError::InstallFailed(
"Downloaded file is not a valid WASM binary (bad magic number)".to_string(),
));
}
tokio::fs::write(&wasm_path, &bytes)
.await
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
if let Some(caps_url) = capabilities_url {
const MAX_CAPS_SIZE: usize = 1024 * 1024; match client.get(caps_url).send().await {
Ok(resp) if resp.status().is_success() => match resp.bytes().await {
Ok(caps_bytes) if caps_bytes.len() <= MAX_CAPS_SIZE => {
if let Err(e) = tokio::fs::write(&caps_path, &caps_bytes).await {
tracing::warn!(
"Failed to write capabilities for '{}': {}",
name,
e
);
}
}
Ok(caps_bytes) => {
tracing::warn!(
"Capabilities file for '{}' too large ({} bytes, max {})",
name,
caps_bytes.len(),
MAX_CAPS_SIZE
);
}
Err(e) => {
tracing::warn!("Failed to download capabilities for '{}': {}", name, e);
}
},
_ => {
tracing::warn!(
"Failed to download capabilities for '{}' from {}",
name,
caps_url
);
}
}
}
}
tracing::info!(
"Installed WASM extension '{}' from {} to {}",
name,
url,
wasm_path.display()
);
Ok(())
}
fn extract_wasm_tar_gz(
&self,
name: &str,
bytes: &[u8],
target_wasm: &std::path::Path,
target_caps: &std::path::Path,
) -> Result<(), ExtensionError> {
use flate2::read::GzDecoder;
use tar::Archive;
use std::io::Read as _;
let decoder = GzDecoder::new(bytes);
let mut archive = Archive::new(decoder);
archive.set_preserve_permissions(false);
#[cfg(any(unix, target_os = "redox"))]
archive.set_unpack_xattrs(false);
const MAX_ENTRY_SIZE: u64 = 100 * 1024 * 1024;
let wasm_filename = format!("{}.wasm", name);
let caps_filename = format!("{}.capabilities.json", name);
let mut found_wasm = false;
let entries = archive
.entries()
.map_err(|e| ExtensionError::InstallFailed(format!("Bad tar.gz archive: {}", e)))?;
for entry in entries {
let mut entry = entry
.map_err(|e| ExtensionError::InstallFailed(format!("Bad tar.gz entry: {}", e)))?;
if entry.size() > MAX_ENTRY_SIZE {
return Err(ExtensionError::InstallFailed(format!(
"Archive entry too large ({} bytes, max {} bytes)",
entry.size(),
MAX_ENTRY_SIZE
)));
}
let entry_path = entry
.path()
.map_err(|e| {
ExtensionError::InstallFailed(format!("Invalid path in tar.gz: {}", e))
})?
.to_path_buf();
let filename = entry_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("");
if filename == wasm_filename {
let mut data = Vec::with_capacity(entry.size() as usize);
std::io::Read::read_to_end(&mut entry.by_ref().take(MAX_ENTRY_SIZE), &mut data)
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
std::fs::write(target_wasm, &data)
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
found_wasm = true;
} else if filename == caps_filename {
let mut data = Vec::with_capacity(entry.size() as usize);
std::io::Read::read_to_end(&mut entry.by_ref().take(MAX_ENTRY_SIZE), &mut data)
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
std::fs::write(target_caps, &data)
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
}
}
if !found_wasm {
return Err(ExtensionError::InstallFailed(format!(
"tar.gz archive does not contain '{}'",
wasm_filename
)));
}
Ok(())
}
async fn install_wasm_from_buildable(
&self,
name: &str,
build_dir: Option<&str>,
crate_name: Option<&str>,
target_dir: &std::path::Path,
kind: ExtensionKind,
) -> Result<InstallResult, ExtensionError> {
let manifest_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"));
let resolved_dir = match build_dir {
Some(dir) => {
let p = std::path::Path::new(dir);
if p.is_absolute() {
p.to_path_buf()
} else {
manifest_dir.join(dir)
}
}
None => manifest_dir.to_path_buf(),
};
let binary_name = crate_name.unwrap_or(name);
let wasm_src =
crate::registry::artifacts::find_wasm_artifact(&resolved_dir, binary_name, "release")
.ok_or_else(|| {
ExtensionError::InstallFailed(format!(
"'{}' requires building from source. Build artifact not found. \
Run `cargo component build --release` in {} first, \
or use `ironclaw registry install {}`.",
name,
resolved_dir.display(),
name,
))
})?;
let wasm_dst = crate::registry::artifacts::install_wasm_files(
&wasm_src,
&resolved_dir,
name,
target_dir,
true,
)
.await
.map_err(|e| ExtensionError::InstallFailed(e.to_string()))?;
let kind_label = match kind {
ExtensionKind::WasmTool => "WASM tool",
ExtensionKind::WasmChannel => "WASM channel",
ExtensionKind::McpServer => "MCP server",
ExtensionKind::ChannelRelay => "channel relay",
};
tracing::info!(
"Installed {} '{}' from build artifacts at {}",
kind_label,
name,
wasm_dst.display(),
);
Ok(InstallResult {
name: name.to_string(),
kind,
message: format!(
"{} '{}' installed from local build artifacts. Run activate to load it.",
kind_label, name,
),
})
}
async fn auth_mcp(&self, name: &str, user_id: &str) -> Result<AuthResult, ExtensionError> {
let server = self
.get_mcp_server(name, user_id)
.await
.map_err(|e| ExtensionError::NotInstalled(e.to_string()))?;
if is_authenticated(&server, &self.secrets, user_id).await {
return Ok(AuthResult::authenticated(name, ExtensionKind::McpServer));
}
if self.should_use_gateway_mode() {
return match self.auth_mcp_build_url(name, &server, user_id).await {
Ok(result) => Ok(result),
Err(ExtensionError::AuthNotSupported(_)) => Ok(AuthResult::awaiting_token(
name,
ExtensionKind::McpServer,
format!(
"Server '{}' does not support OAuth. \
Please provide an API token/key for this server.",
name
),
None,
)),
Err(e) => Err(e),
};
}
match authorize_mcp_server(&server, &self.secrets, user_id).await {
Ok(_token) => {
tracing::info!("MCP server '{}' authenticated via OAuth", name);
Ok(AuthResult::authenticated(name, ExtensionKind::McpServer))
}
Err(crate::tools::mcp::auth::AuthError::NotSupported) => {
match self.auth_mcp_build_url(name, &server, user_id).await {
Ok(result) => Ok(result),
Err(_) => Ok(AuthResult::awaiting_token(
name,
ExtensionKind::McpServer,
format!(
"Server '{}' does not support OAuth. \
Please provide an API token/key for this server.",
name
),
None,
)),
}
}
Err(e) => {
Ok(AuthResult::awaiting_token(
name,
ExtensionKind::McpServer,
format!(
"OAuth failed for '{}': {}. \
Please provide an API token/key manually.",
name, e
),
None,
))
}
}
}
async fn auth_mcp_build_url(
&self,
name: &str,
server: &McpServerConfig,
user_id: &str,
) -> Result<AuthResult, ExtensionError> {
let metadata = discover_full_oauth_metadata(&server.url)
.await
.map_err(|e| match e {
crate::tools::mcp::auth::AuthError::NotSupported => {
ExtensionError::AuthNotSupported(e.to_string())
}
_ => ExtensionError::AuthFailed(e.to_string()),
})?;
use crate::cli::oauth_defaults;
let is_gateway = self.should_use_gateway_mode();
self.clear_pending_extension_auth(name).await;
let redirect_uri = if let Some(uri) = self.gateway_callback_redirect_uri().await {
uri
} else {
let port = find_available_port()
.await
.map_err(|e| ExtensionError::AuthFailed(e.to_string()))?;
format!("http://localhost:{}/callback", port.1)
};
let (client_id, client_secret) = if let Some(ref oauth) = server.oauth {
(oauth.client_id.clone(), None)
} else if let Some(ref reg_endpoint) = metadata.registration_endpoint {
let registration = register_client(reg_endpoint, &redirect_uri)
.await
.map_err(|e| ExtensionError::AuthFailed(e.to_string()))?;
(registration.client_id, None)
} else {
return Err(ExtensionError::AuthNotSupported(
"Server doesn't support OAuth or Dynamic Client Registration".to_string(),
));
};
let resource = canonical_resource_uri(&server.url);
let mut extra_params = server
.oauth
.as_ref()
.map(|o| o.extra_params.clone())
.unwrap_or_default();
extra_params.insert("resource".to_string(), resource.clone());
let scopes = server
.oauth
.as_ref()
.map(|o| o.scopes.clone())
.unwrap_or_else(|| metadata.scopes_supported.clone());
let oauth_result = oauth_defaults::build_oauth_url(
&metadata.authorization_endpoint,
&client_id,
&redirect_uri,
&scopes,
true, &extra_params,
);
let expected_state = oauth_result.state;
let code_verifier = oauth_result.code_verifier;
if is_gateway {
let mut token_exchange_extra_params = HashMap::new();
token_exchange_extra_params.insert("resource".to_string(), resource.clone());
let flow = oauth_defaults::PendingOAuthFlow {
extension_name: name.to_string(),
display_name: server.name.clone(),
token_url: metadata.token_endpoint,
client_id,
client_secret,
redirect_uri,
code_verifier,
access_token_field: "access_token".to_string(),
secret_name: server.token_secret_name(),
provider: Some(format!("mcp:{}", name)),
validation_endpoint: None,
scopes,
user_id: user_id.to_string(),
secrets: Arc::clone(&self.secrets),
sse_manager: self.sse_manager.read().await.clone(),
gateway_token: self.gateway_token.clone(),
token_exchange_extra_params,
client_id_secret_name: if server.oauth.is_none() {
Some(server.client_id_secret_name())
} else {
None
},
created_at: std::time::Instant::now(),
};
Ok(self
.start_gateway_oauth_flow(HostedOAuthFlowStart {
name: name.to_string(),
kind: ExtensionKind::McpServer,
auth_url: oauth_result.url,
expected_state,
flow,
})
.await)
} else {
self.pending_auth.write().await.insert(
name.to_string(),
PendingAuth {
_name: name.to_string(),
_kind: ExtensionKind::McpServer,
created_at: std::time::Instant::now(),
task_handle: None,
},
);
Ok(AuthResult::awaiting_authorization(
name,
ExtensionKind::McpServer,
oauth_result.url,
"local".to_string(),
))
}
}
async fn auth_wasm_tool(
&self,
name: &str,
user_id: &str,
) -> Result<AuthResult, ExtensionError> {
let cap_path = self
.wasm_tools_dir
.join(format!("{}.capabilities.json", name));
if !cap_path.exists() {
return Ok(AuthResult::no_auth_required(name, ExtensionKind::WasmTool));
}
let cap_bytes = tokio::fs::read(&cap_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let cap_file = crate::tools::wasm::CapabilitiesFile::from_bytes(&cap_bytes)
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let auth = match cap_file.auth {
Some(auth) => auth,
None => {
return Ok(AuthResult::no_auth_required(name, ExtensionKind::WasmTool));
}
};
if let Some(ref env_var) = auth.env_var
&& let Ok(value) = std::env::var(env_var)
{
let params =
CreateSecretParams::new(&auth.secret_name, &value).with_provider(name.to_string());
self.secrets
.create(user_id, params)
.await
.map_err(|e| ExtensionError::AuthFailed(e.to_string()))?;
return Ok(AuthResult::authenticated(name, ExtensionKind::WasmTool));
}
let token_exists = self
.secrets
.exists(user_id, &auth.secret_name)
.await
.unwrap_or(false);
if token_exists {
let needs_reauth = if let Some(ref oauth) = auth.oauth {
let merged = self
.collect_shared_scopes(&auth.secret_name, &oauth.scopes, user_id)
.await;
let needs = self
.needs_scope_expansion(&auth.secret_name, &merged, user_id)
.await;
tracing::debug!(
tool = name,
secret_name = %auth.secret_name,
merged_scopes = ?merged,
needs_reauth = needs,
"Scope expansion check"
);
needs
} else {
false
};
if !needs_reauth {
return Ok(AuthResult::authenticated(name, ExtensionKind::WasmTool));
}
}
if let Some(ref oauth) = auth.oauth {
if self
.needs_setup_credentials(name, &auth, oauth, user_id)
.await
{
let display = auth.display_name.as_deref().unwrap_or(name);
return Ok(AuthResult::needs_setup(
name,
ExtensionKind::WasmTool,
format!(
"Configure OAuth credentials for {} in the Setup tab.",
display
),
auth.setup_url.clone(),
));
}
return self
.start_wasm_oauth(name, &auth, oauth, user_id)
.await
.map_err(|e| ExtensionError::AuthFailed(e.to_string()));
}
let display = auth.display_name.unwrap_or_else(|| name.to_string());
let instructions = auth
.instructions
.unwrap_or_else(|| format!("Please provide your {} API token/key.", display));
Ok(AuthResult::awaiting_token(
name,
ExtensionKind::WasmTool,
instructions,
auth.setup_url,
))
}
async fn check_channel_auth_status(&self, name: &str, user_id: &str) -> ToolAuthState {
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
let Ok(cap_bytes) = tokio::fs::read(&cap_path).await else {
return ToolAuthState::NoAuth;
};
let Ok(cap_file) = crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&cap_bytes)
else {
return ToolAuthState::NoAuth;
};
let required: Vec<_> = cap_file
.setup
.required_secrets
.iter()
.filter(|s| !s.optional)
.collect();
if required.is_empty() {
return ToolAuthState::NoAuth;
}
let all_provided = futures::future::join_all(
required
.iter()
.map(|s| self.secrets.exists(user_id, &s.name)),
)
.await
.into_iter()
.all(|r| r.unwrap_or(false));
if all_provided {
ToolAuthState::Ready
} else {
ToolAuthState::NeedsSetup
}
}
async fn load_tool_capabilities(
&self,
name: &str,
) -> Option<crate::tools::wasm::CapabilitiesFile> {
let cap_path = self
.wasm_tools_dir
.join(format!("{}.capabilities.json", name));
let cap_bytes = tokio::fs::read(&cap_path).await.ok()?;
crate::tools::wasm::CapabilitiesFile::from_bytes(&cap_bytes).ok()
}
async fn collect_shared_scopes(
&self,
secret_name: &str,
base_scopes: &[String],
_user_id: &str,
) -> Vec<String> {
let mut all_scopes: std::collections::BTreeSet<String> =
base_scopes.iter().cloned().collect();
if let Ok(tools) = discover_tools(&self.wasm_tools_dir).await {
for tool_name in tools.keys() {
if let Some(cap) = self.load_tool_capabilities(tool_name).await
&& let Some(auth) = &cap.auth
&& auth.secret_name == secret_name
&& let Some(oauth) = &auth.oauth
{
all_scopes.extend(oauth.scopes.iter().cloned());
}
}
}
all_scopes.into_iter().collect()
}
async fn needs_scope_expansion(
&self,
secret_name: &str,
merged_scopes: &[String],
user_id: &str,
) -> bool {
if merged_scopes.is_empty() {
return false;
}
let scopes_key = format!("{}_scopes", secret_name);
let stored_scopes: std::collections::HashSet<String> =
match self.secrets.get_decrypted(user_id, &scopes_key).await {
Ok(secret) => {
let scopes: std::collections::HashSet<String> = secret
.expose()
.split_whitespace()
.map(String::from)
.collect();
tracing::debug!(
secret_name,
stored_scopes = ?scopes,
"Loaded stored scopes for expansion check"
);
scopes
}
Err(_) => {
tracing::debug!(
secret_name,
"No stored scopes record, forcing re-auth for legacy token"
);
return true;
}
};
merged_scopes
.iter()
.any(|scope| !stored_scopes.contains(scope))
}
async fn find_setup_credential_names(
&self,
tool_name: &str,
) -> (Option<(String, bool)>, Option<(String, bool)>) {
let Some(cap) = self.load_tool_capabilities(tool_name).await else {
return (None, None);
};
let Some(setup) = &cap.setup else {
return (None, None);
};
let mut client_id_entry = None;
let mut client_secret_entry = None;
for secret in &setup.required_secrets {
let lower = secret.name.to_lowercase();
if lower.ends_with("client_id") || lower == "client_id" {
client_id_entry = Some((secret.name.clone(), secret.optional));
} else if lower.ends_with("client_secret") || lower == "client_secret" {
client_secret_entry = Some((secret.name.clone(), secret.optional));
}
}
(client_id_entry, client_secret_entry)
}
async fn needs_setup_credentials(
&self,
name: &str,
auth: &crate::tools::wasm::AuthCapabilitySchema,
oauth: &crate::tools::wasm::OAuthConfigSchema,
user_id: &str,
) -> bool {
let builtin = crate::cli::oauth_defaults::builtin_credentials(&auth.secret_name);
let (id_entry, secret_entry) = self.find_setup_credential_names(name).await;
for (entry, inline, env, fallback) in [
(
&id_entry,
&oauth.client_id,
&oauth.client_id_env,
builtin.as_ref().map(|c| c.client_id),
),
(
&secret_entry,
&oauth.client_secret,
&oauth.client_secret_env,
builtin.as_ref().map(|c| c.client_secret),
),
] {
let Some((ref setup_name, optional)) = *entry else {
continue;
};
if optional {
continue;
}
let resolved = self
.resolve_oauth_credential(inline, env, fallback, Some(setup_name), user_id)
.await
.is_some();
if !resolved {
return true;
}
}
false
}
async fn resolve_oauth_credential(
&self,
inline_value: &Option<String>,
env_var_name: &Option<String>,
builtin_value: Option<&str>,
setup_secret_name: Option<&str>,
user_id: &str,
) -> Option<String> {
if let Some(secret_name) = setup_secret_name
&& let Ok(secret) = self.secrets.get_decrypted(user_id, secret_name).await
{
let val = secret.expose();
if !val.is_empty() {
return Some(val.to_string());
}
}
if let Some(val) = inline_value {
return Some(val.clone());
}
if let Some(env) = env_var_name
&& let Ok(val) = std::env::var(env)
{
return Some(val);
}
builtin_value.map(String::from)
}
async fn start_wasm_oauth(
&self,
name: &str,
auth: &crate::tools::wasm::AuthCapabilitySchema,
oauth: &crate::tools::wasm::OAuthConfigSchema,
user_id: &str,
) -> Result<AuthResult, String> {
use crate::cli::oauth_defaults;
let builtin = oauth_defaults::builtin_credentials(&auth.secret_name);
let (setup_client_id_entry, setup_client_secret_entry) =
self.find_setup_credential_names(name).await;
let setup_client_id_name = setup_client_id_entry.map(|(n, _)| n);
let setup_client_secret_name = setup_client_secret_entry.map(|(n, _)| n);
let client_id = self
.resolve_oauth_credential(
&oauth.client_id,
&oauth.client_id_env,
builtin.as_ref().map(|c| c.client_id),
setup_client_id_name.as_deref(),
user_id,
)
.await
.ok_or_else(|| {
let env_name = oauth
.client_id_env
.as_deref()
.unwrap_or("the client_id env var");
let mut msg = format!(
"OAuth client_id not configured for '{}'. \
Enter it in the Setup tab or set {} env var",
name, env_name
);
if let Some(override_env) =
crate::cli::oauth_defaults::builtin_client_id_override_env(&auth.secret_name)
{
msg.push_str(&format!(", or build with {override_env}"));
}
msg.push('.');
msg
})?;
let client_secret = self
.resolve_oauth_credential(
&oauth.client_secret,
&oauth.client_secret_env,
builtin.as_ref().map(|c| c.client_secret),
setup_client_secret_name.as_deref(),
user_id,
)
.await;
self.clear_pending_extension_auth(name).await;
let redirect_uri = self
.gateway_callback_redirect_uri()
.await
.unwrap_or_else(|| format!("{}/callback", oauth_defaults::callback_url()));
let merged_scopes = self
.collect_shared_scopes(&auth.secret_name, &oauth.scopes, user_id)
.await;
let oauth_result = oauth_defaults::build_oauth_url(
&oauth.authorization_url,
&client_id,
&redirect_uri,
&merged_scopes,
oauth.use_pkce,
&oauth.extra_params,
);
let auth_url = oauth_result.url.clone();
let code_verifier = oauth_result.code_verifier;
let expected_state = oauth_result.state;
let display_name = auth
.display_name
.clone()
.unwrap_or_else(|| name.to_string());
if self.should_use_gateway_mode() {
let proxy_client_secret = oauth_defaults::hosted_proxy_client_secret(
&client_secret,
builtin.as_ref(),
oauth_defaults::exchange_proxy_url().is_some(),
);
let flow = oauth_defaults::PendingOAuthFlow {
extension_name: name.to_string(),
display_name: display_name.clone(),
token_url: oauth.token_url.clone(),
client_id: client_id.clone(),
client_secret: proxy_client_secret,
redirect_uri: redirect_uri.clone(),
code_verifier,
access_token_field: oauth.access_token_field.clone(),
secret_name: auth.secret_name.clone(),
provider: auth.provider.clone(),
validation_endpoint: auth.validation_endpoint.clone(),
scopes: merged_scopes,
user_id: user_id.to_string(),
secrets: Arc::clone(&self.secrets),
sse_manager: self.sse_manager.read().await.clone(),
gateway_token: self.gateway_token.clone(),
token_exchange_extra_params: std::collections::HashMap::new(),
client_id_secret_name: None,
created_at: std::time::Instant::now(),
};
Ok(self
.start_gateway_oauth_flow(HostedOAuthFlowStart {
name: name.to_string(),
kind: ExtensionKind::WasmTool,
auth_url,
expected_state,
flow,
})
.await)
} else {
let listener = oauth_defaults::bind_callback_listener()
.await
.map_err(|e| format!("Failed to start OAuth callback listener: {}", e))?;
let token_url = oauth.token_url.clone();
let access_token_field = oauth.access_token_field.clone();
let secret_name = auth.secret_name.clone();
let provider = auth.provider.clone();
let validation_endpoint = auth.validation_endpoint.clone();
let user_id = user_id.to_string();
let secrets = Arc::clone(&self.secrets);
let sse_manager = self.sse_manager.read().await.clone();
let ext_name = name.to_string();
let task_handle = tokio::spawn(async move {
let result: Result<(), String> = async {
let code = oauth_defaults::wait_for_callback(
listener,
"/callback",
"code",
&display_name,
Some(&expected_state),
)
.await
.map_err(|e| e.to_string())?;
let token_response = oauth_defaults::exchange_oauth_code(
&token_url,
&client_id,
client_secret.as_deref(),
&code,
&redirect_uri,
code_verifier.as_deref(),
&access_token_field,
)
.await
.map_err(|e| e.to_string())?;
if let Some(ref validation) = validation_endpoint {
oauth_defaults::validate_oauth_token(
&token_response.access_token,
validation,
)
.await
.map_err(|e| e.to_string())?;
}
oauth_defaults::store_oauth_tokens(
secrets.as_ref(),
&user_id,
&secret_name,
provider.as_deref(),
&token_response.access_token,
token_response.refresh_token.as_deref(),
token_response.expires_in,
&merged_scopes,
)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
.await;
let (success, message) = match result {
Ok(()) => (true, format!("{} authenticated successfully", display_name)),
Err(ref e) => (
false,
format!("{} authentication failed: {}", display_name, e),
),
};
match &result {
Ok(()) => {
tracing::info!(
tool = %ext_name,
"OAuth completed successfully"
);
}
Err(e) => {
tracing::warn!(
tool = %ext_name,
error = %e,
"WASM tool OAuth failed"
);
}
}
if let Some(ref sse) = sse_manager {
sse.broadcast(ironclaw_common::AppEvent::AuthCompleted {
extension_name: ext_name,
success,
message,
});
}
});
self.pending_auth.write().await.insert(
name.to_string(),
PendingAuth {
_name: name.to_string(),
_kind: ExtensionKind::WasmTool,
created_at: std::time::Instant::now(),
task_handle: Some(task_handle),
},
);
Ok(AuthResult::awaiting_authorization(
name,
ExtensionKind::WasmTool,
auth_url,
"local".to_string(),
))
}
}
fn is_auto_resolved_oauth_field(
secret_name: &str,
cap_file: &crate::tools::wasm::CapabilitiesFile,
) -> bool {
let lower = secret_name.to_lowercase();
let is_client_id = lower.ends_with("client_id") || lower == "client_id";
let is_client_secret = lower.ends_with("client_secret") || lower == "client_secret";
if !is_client_id && !is_client_secret {
return false;
}
let Some(ref auth) = cap_file.auth else {
return false;
};
let Some(ref oauth) = auth.oauth else {
return false;
};
let builtin = crate::cli::oauth_defaults::builtin_credentials(&auth.secret_name);
if is_client_id {
oauth.client_id.is_some()
|| oauth
.client_id_env
.as_ref()
.is_some_and(|e| std::env::var(e).is_ok())
|| builtin.is_some()
} else {
oauth.client_secret.is_some()
|| oauth
.client_secret_env
.as_ref()
.is_some_and(|e| std::env::var(e).is_ok())
|| builtin.is_some()
}
}
async fn check_tool_auth_status(&self, name: &str, user_id: &str) -> ToolAuthState {
let Some(cap_file) = self.load_tool_capabilities(name).await else {
return ToolAuthState::NoAuth;
};
let saved_fields = self.load_tool_setup_fields(name).await.unwrap_or_default();
let setup_is_complete = if let Some(setup) = &cap_file.setup {
let secrets_ready = futures::future::join_all(
setup
.required_secrets
.iter()
.filter(|s| !s.optional)
.filter(|s| !Self::is_auto_resolved_oauth_field(&s.name, &cap_file))
.map(|s| self.secrets.exists(&self.user_id, &s.name)),
)
.await
.into_iter()
.all(|r| r.unwrap_or(false));
if !secrets_ready {
false
} else {
let mut fields_ready = true;
for field in &setup.required_fields {
if field.optional {
continue;
}
if !self
.is_tool_setup_field_provided(name, field, &saved_fields)
.await
{
fields_ready = false;
break;
}
}
fields_ready
}
} else {
true
};
if !setup_is_complete {
return ToolAuthState::NeedsSetup;
}
if let Some(ref auth) = cap_file.auth {
let has_token = self
.secrets
.exists(user_id, &auth.secret_name)
.await
.unwrap_or(false)
|| auth
.env_var
.as_ref()
.is_some_and(|v| std::env::var(v).is_ok());
return if has_token {
ToolAuthState::Ready
} else if auth.oauth.is_some() {
ToolAuthState::NeedsAuth
} else {
ToolAuthState::NeedsSetup
};
}
let setup = match &cap_file.setup {
Some(s) => s,
None => return ToolAuthState::NoAuth,
};
let all_provided = futures::future::join_all(
setup
.required_secrets
.iter()
.filter(|s| !s.optional)
.filter(|s| !Self::is_auto_resolved_oauth_field(&s.name, &cap_file))
.map(|s| self.secrets.exists(user_id, &s.name)),
)
.await
.into_iter()
.all(|r| r.unwrap_or(false));
if all_provided {
ToolAuthState::Ready
} else {
ToolAuthState::NeedsSetup
}
}
async fn auth_wasm_channel_status(
&self,
name: &str,
user_id: &str,
) -> Result<AuthResult, ExtensionError> {
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
if !cap_path.exists() {
return Ok(AuthResult::no_auth_required(
name,
ExtensionKind::WasmChannel,
));
}
let cap_bytes = tokio::fs::read(&cap_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let cap_file = crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&cap_bytes)
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let required_secrets = &cap_file.setup.required_secrets;
if required_secrets.is_empty() {
return Ok(AuthResult::no_auth_required(
name,
ExtensionKind::WasmChannel,
));
}
let mut missing = Vec::new();
for secret in required_secrets {
if secret.optional {
continue;
}
if !self
.secrets
.exists(user_id, &secret.name)
.await
.unwrap_or(false)
{
missing.push(secret);
}
}
if missing.is_empty() {
return Ok(AuthResult::authenticated(name, ExtensionKind::WasmChannel));
}
let secret = &missing[0];
Ok(AuthResult::awaiting_token(
name,
ExtensionKind::WasmChannel,
channel_auth_instructions(name, secret),
cap_file.setup.setup_url.clone(),
))
}
async fn activate_mcp(
&self,
name: &str,
user_id: &str,
) -> Result<ActivateResult, ExtensionError> {
{
let clients = self.mcp_clients.read().await;
if clients.contains_key(name) {
let tools: Vec<String> = self
.tool_registry
.list()
.await
.into_iter()
.filter(|t| t.starts_with(&format!("{}_", name)))
.collect();
return Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::McpServer,
tools_loaded: tools,
message: format!("MCP server '{}' already active", name),
});
}
}
let server = self
.get_mcp_server(name, user_id)
.await
.map_err(|e| ExtensionError::NotInstalled(e.to_string()))?;
let client = crate::tools::mcp::create_client_from_config(
server.clone(),
&self.mcp_session_manager,
&self.mcp_process_manager,
Some(Arc::clone(&self.secrets)),
user_id,
)
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
let mcp_tools = client.list_tools().await.map_err(|e| {
let msg = e.to_string();
let msg_lower = msg.to_ascii_lowercase();
if msg_lower.contains("requires authentication")
|| msg.contains("401")
|| (msg.contains("400")
&& (msg_lower.contains("authorization") || msg_lower.contains("authenticate")))
{
ExtensionError::AuthRequired
} else {
ExtensionError::ActivationFailed(msg)
}
})?;
let tool_impls = client
.create_tools()
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
let tool_names: Vec<String> = mcp_tools
.iter()
.map(|t| format!("{}_{}", name, t.name))
.collect();
for tool in tool_impls {
self.tool_registry.register(tool).await;
}
self.mcp_clients
.write()
.await
.insert(name.to_string(), Arc::new(client));
tracing::info!(
"Activated MCP server '{}' with {} tools",
name,
tool_names.len()
);
Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::McpServer,
tools_loaded: tool_names,
message: format!("Connected to '{}' and loaded tools", name),
})
}
async fn activate_wasm_tool(
&self,
name: &str,
user_id: &str,
) -> Result<ActivateResult, ExtensionError> {
if self.tool_registry.has(name).await {
return Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::WasmTool,
tools_loaded: vec![name.to_string()],
message: format!("WASM tool '{}' already active", name),
});
}
let auth_state = self.check_tool_auth_status(name, user_id).await;
if auth_state == ToolAuthState::NeedsSetup {
return Err(ExtensionError::ActivationFailed(format!(
"Tool '{}' requires configuration. Use the setup form to provide credentials.",
name
)));
}
let runtime = self.wasm_tool_runtime.as_ref().ok_or_else(|| {
ExtensionError::ActivationFailed("WASM runtime not available".to_string())
})?;
let wasm_path = self.wasm_tools_dir.join(format!("{}.wasm", name));
if !wasm_path.exists() {
return Err(ExtensionError::NotInstalled(format!(
"WASM tool '{}' not found at {}",
name,
wasm_path.display()
)));
}
let cap_path = self
.wasm_tools_dir
.join(format!("{}.capabilities.json", name));
let cap_path_option = if cap_path.exists() {
Some(cap_path.as_path())
} else {
None
};
let loader = WasmToolLoader::new(Arc::clone(runtime), Arc::clone(&self.tool_registry))
.with_secrets_store(Arc::clone(&self.secrets));
loader
.load_from_files(name, &wasm_path, cap_path_option)
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
if let Some(ref hooks) = self.hooks
&& let Some(cap_path) = cap_path_option
{
let source = format!("plugin.tool:{}", name);
let registration =
crate::hooks::bootstrap::register_plugin_bundle_from_capabilities_file(
hooks, &source, cap_path,
)
.await;
if registration.total_registered() > 0 {
tracing::info!(
extension = name,
hooks = registration.hooks,
outbound_webhooks = registration.outbound_webhooks,
"Registered plugin hooks for activated WASM tool"
);
}
if registration.errors > 0 {
tracing::warn!(
extension = name,
errors = registration.errors,
"Some plugin hooks failed to register"
);
}
}
tracing::info!("Activated WASM tool '{}'", name);
Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::WasmTool,
tools_loaded: vec![name.to_string()],
message: format!("WASM tool '{}' loaded and ready", name),
})
}
async fn activate_wasm_channel(
&self,
name: &str,
user_id: &str,
) -> Result<ActivateResult, ExtensionError> {
{
let active = self.active_channel_names.read().await;
if active.contains(name) {
return self.refresh_active_channel(name, user_id).await;
}
}
let (
channel_runtime,
channel_manager,
pairing_store,
wasm_channel_router,
wasm_channel_owner_ids,
) = {
let rt_guard = self.channel_runtime.read().await;
let rt = rt_guard.as_ref().ok_or_else(|| {
ExtensionError::ActivationFailed("WASM channel runtime not configured".to_string())
})?;
(
Arc::clone(&rt.wasm_channel_runtime),
Arc::clone(&rt.channel_manager),
Arc::clone(&rt.pairing_store),
Arc::clone(&rt.wasm_channel_router),
rt.wasm_channel_owner_ids.clone(),
)
};
let auth_state = self.check_channel_auth_status(name, user_id).await;
if auth_state != ToolAuthState::Ready && auth_state != ToolAuthState::NoAuth {
return Err(ExtensionError::ActivationFailed(format!(
"Channel '{}' requires configuration. Use the setup form to provide credentials.",
name
)));
}
let wasm_path = self.wasm_channels_dir.join(format!("{}.wasm", name));
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
let cap_path_option = if cap_path.exists() {
Some(cap_path.as_path())
} else {
None
};
#[cfg(test)]
let loaded = if let Some(loader) = self.test_wasm_channel_loader.read().await.as_ref() {
loader(name)?
} else {
let settings_store: Option<Arc<dyn crate::db::SettingsStore>> =
self.store.as_ref().map(|db| Arc::clone(db) as _);
let loader = WasmChannelLoader::new(
Arc::clone(&channel_runtime),
Arc::clone(&pairing_store),
settings_store,
self.user_id.clone(),
)
.with_secrets_store(Arc::clone(&self.secrets));
loader
.load_from_files(name, &wasm_path, cap_path_option)
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?
};
#[cfg(not(test))]
let loaded = {
let settings_store: Option<Arc<dyn crate::db::SettingsStore>> =
self.store.as_ref().map(|db| Arc::clone(db) as _);
let loader = WasmChannelLoader::new(
Arc::clone(&channel_runtime),
Arc::clone(&pairing_store),
settings_store,
self.user_id.clone(),
)
.with_secrets_store(Arc::clone(&self.secrets));
loader
.load_from_files(name, &wasm_path, cap_path_option)
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?
};
self.complete_loaded_wasm_channel_activation(
name,
loaded,
&channel_manager,
&wasm_channel_router,
wasm_channel_owner_ids.get(name).copied(),
)
.await
}
async fn complete_loaded_wasm_channel_activation(
&self,
requested_name: &str,
loaded: LoadedChannel,
channel_manager: &Arc<ChannelManager>,
wasm_channel_router: &Arc<WasmChannelRouter>,
owner_id: Option<i64>,
) -> Result<ActivateResult, ExtensionError> {
let channel_name = loaded.name().to_string();
let owner_actor_id = owner_id.map(|id| id.to_string());
let webhook_secret_name = loaded.webhook_secret_name();
let secret_header = loaded.webhook_secret_header().map(|s| s.to_string());
let sig_key_secret_name = loaded.signature_key_secret_name();
let hmac_secret_name = loaded.hmac_secret_name();
let webhook_secret = self
.secrets
.get_decrypted(&self.user_id, &webhook_secret_name)
.await
.ok()
.map(|s| s.expose().to_string());
let channel_arc = Arc::new(loaded.channel.with_owner_actor_id(owner_actor_id));
{
let resolved_owner_id = owner_id.or(self.current_channel_owner_id(&channel_name).await);
let mut config_updates = build_wasm_channel_runtime_config_updates(
self.tunnel_url.as_deref(),
webhook_secret.as_deref(),
resolved_owner_id,
);
config_updates.extend(
self.load_channel_runtime_config_overrides(&channel_name)
.await,
);
if !config_updates.is_empty() {
channel_arc.update_config(config_updates).await;
tracing::info!(
channel = %channel_name,
has_tunnel = self.tunnel_url.is_some(),
has_webhook_secret = webhook_secret.is_some(),
"Injected runtime config into hot-activated channel"
);
}
}
{
let webhook_path = format!("/webhook/{}", channel_name);
let endpoints = vec![RegisteredEndpoint {
channel_name: channel_name.clone(),
path: webhook_path,
methods: vec!["POST".to_string()],
require_secret: webhook_secret.is_some(),
}];
wasm_channel_router
.register(
Arc::clone(&channel_arc),
endpoints,
webhook_secret,
secret_header,
)
.await;
tracing::info!(channel = %channel_name, "Registered hot-activated channel with webhook router");
if let Some(ref sig_key_name) = sig_key_secret_name
&& let Ok(key_secret) = self
.secrets
.get_decrypted(&self.user_id, sig_key_name)
.await
{
match wasm_channel_router
.register_signature_key(&channel_name, key_secret.expose())
.await
{
Ok(()) => {
tracing::info!(channel = %channel_name, "Registered signature key for hot-activated channel")
}
Err(e) => {
tracing::error!(channel = %channel_name, error = %e, "Failed to register signature key")
}
}
}
if let Some(hmac_name) = &hmac_secret_name {
match self.secrets.get_decrypted(&self.user_id, hmac_name).await {
Ok(secret) => {
wasm_channel_router
.register_hmac_secret(&channel_name, secret.expose())
.await;
tracing::info!(channel = %channel_name, "Registered HMAC signing secret for hot-activated channel");
}
Err(e) => {
tracing::warn!(channel = %channel_name, error = %e, "HMAC secret not found");
}
}
}
}
match inject_channel_credentials_from_secrets(
&channel_arc,
Some(self.secrets.as_ref()),
&channel_name,
&self.user_id,
)
.await
{
Ok(count) => {
if count > 0 {
tracing::info!(
channel = %channel_name,
credentials_injected = count,
"Credentials injected into hot-activated channel"
);
}
}
Err(e) => {
tracing::error!(
channel = %channel_name,
error = %e,
"Failed to inject credentials into hot-activated channel"
);
}
}
channel_manager
.hot_add(Box::new(SharedWasmChannel::new(channel_arc)))
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
self.active_channel_names
.write()
.await
.insert(channel_name.clone());
self.persist_active_channels(&self.user_id).await;
tracing::info!(channel = %channel_name, "Hot-activated WASM channel");
Ok(ActivateResult {
name: channel_name,
kind: ExtensionKind::WasmChannel,
tools_loaded: Vec::new(),
message: format!("Channel '{}' activated and running", requested_name),
})
}
async fn refresh_active_channel(
&self,
name: &str,
user_id: &str,
) -> Result<ActivateResult, ExtensionError> {
let router = {
let rt_guard = self.channel_runtime.read().await;
match rt_guard.as_ref() {
Some(rt) => Arc::clone(&rt.wasm_channel_router),
None => {
return Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::WasmChannel,
tools_loaded: Vec::new(),
message: format!("Channel '{}' is already active", name),
});
}
}
};
let webhook_path = format!("/webhook/{}", name);
let existing_channel = match router.get_channel_for_path(&webhook_path).await {
Some(ch) => ch,
None => {
return Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::WasmChannel,
tools_loaded: Vec::new(),
message: format!("Channel '{}' is already active", name),
});
}
};
let cred_count = match inject_channel_credentials_from_secrets(
&existing_channel,
Some(self.secrets.as_ref()),
name,
user_id,
)
.await
{
Ok(count) => count,
Err(e) => {
tracing::warn!(
channel = %name,
error = %e,
"Failed to refresh credentials on already-active channel"
);
0
}
};
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
let capabilities_file = match tokio::fs::read(&cap_path).await {
Ok(bytes) => crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&bytes).ok(),
Err(_) => None,
};
let webhook_secret_name = capabilities_file
.as_ref()
.map(|f| f.webhook_secret_name())
.unwrap_or_else(|| format!("{}_webhook_secret", name));
let sig_key_secret_name = capabilities_file
.as_ref()
.and_then(|f| f.signature_key_secret_name().map(|s| s.to_string()));
let hmac_secret_name = capabilities_file
.as_ref()
.and_then(|f| f.hmac_secret_name().map(|s| s.to_string()));
let mut config_updates = build_wasm_channel_runtime_config_updates(
self.tunnel_url.as_deref(),
None,
self.current_channel_owner_id(name).await,
);
config_updates.extend(self.load_channel_runtime_config_overrides(name).await);
let mut should_rerun_on_start = false;
if let Ok(secret) = self
.secrets
.get_decrypted(user_id, &webhook_secret_name)
.await
{
router
.update_secret(name, secret.expose().to_string())
.await;
config_updates.insert(
"webhook_secret".to_string(),
serde_json::Value::String(secret.expose().to_string()),
);
should_rerun_on_start = true;
}
if let Some(ref sig_key_name) = sig_key_secret_name
&& let Ok(key_secret) = self.secrets.get_decrypted(user_id, sig_key_name).await
{
match router
.register_signature_key(name, key_secret.expose())
.await
{
Ok(()) => {
tracing::info!(channel = %name, "Refreshed signature verification key")
}
Err(e) => {
tracing::error!(channel = %name, error = %e, "Failed to refresh signature key")
}
}
}
if let Some(ref hmac_secret_name_ref) = hmac_secret_name {
match self
.secrets
.get_decrypted(user_id, hmac_secret_name_ref)
.await
{
Ok(secret) => {
router.register_hmac_secret(name, secret.expose()).await;
tracing::info!(channel = %name, "Refreshed HMAC signing secret");
}
Err(e) => {
tracing::warn!(channel = %name, error = %e, "HMAC secret not found");
}
}
}
if !config_updates.is_empty() {
existing_channel.update_config(config_updates).await;
should_rerun_on_start = true;
}
if cred_count > 0 || should_rerun_on_start {
match existing_channel.call_on_start().await {
Ok(_config) => {
tracing::info!(
channel = %name,
"Re-ran on_start after credential refresh (webhook re-registered)"
);
}
Err(e) => {
tracing::warn!(
channel = %name,
error = %e,
"on_start failed after credential refresh"
);
}
}
}
tracing::info!(
channel = %name,
credentials_refreshed = cred_count,
"Refreshed credentials and config on already-active channel"
);
Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::WasmChannel,
tools_loaded: Vec::new(),
message: format!(
"Channel '{}' is already active; refreshed {} credential(s)",
name, cred_count
),
})
}
fn relay_instance_id(&self, config: &crate::config::RelayConfig, user_id: &str) -> String {
config.instance_id.clone().unwrap_or_else(|| {
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_DNS, user_id.as_bytes()).to_string()
})
}
async fn auth_channel_relay(
&self,
name: &str,
user_id: &str,
) -> Result<AuthResult, ExtensionError> {
if self.is_relay_channel(name, user_id).await {
return Ok(AuthResult::authenticated(name, ExtensionKind::ChannelRelay));
}
let relay_config = self.relay_config()?;
let client = crate::channels::relay::RelayClient::new(
relay_config.url.clone(),
relay_config.api_key.clone(),
relay_config.request_timeout_secs,
)
.map_err(|e| ExtensionError::Config(e.to_string()))?;
let state_nonce = uuid::Uuid::new_v4().to_string();
let state_key = format!("relay:{}:oauth_state", name);
let _ = self.secrets.delete(user_id, &state_key).await;
self.secrets
.create(user_id, CreateSecretParams::new(&state_key, &state_nonce))
.await
.map_err(|e| ExtensionError::AuthFailed(format!("Failed to store OAuth state: {e}")))?;
match client.initiate_oauth(Some(&state_nonce)).await {
Ok(auth_url) => Ok(AuthResult::awaiting_authorization(
name,
ExtensionKind::ChannelRelay,
auth_url,
"redirect".to_string(),
)),
Err(e) => Err(ExtensionError::AuthFailed(e.to_string())),
}
}
async fn activate_channel_relay(
&self,
name: &str,
user_id: &str,
) -> Result<ActivateResult, ExtensionError> {
let team_id_key = format!("relay:{}:team_id", name);
let team_id = if let Some(ref store) = self.store {
store
.get_setting(user_id, &team_id_key)
.await
.ok()
.flatten()
.and_then(|v| v.as_str().map(|s| s.to_string()))
.unwrap_or_default()
} else {
String::new()
};
if team_id.is_empty() {
return Err(ExtensionError::AuthRequired);
}
let relay_config = self.relay_config()?;
let instance_id = self.relay_instance_id(relay_config, user_id);
let client = crate::channels::relay::RelayClient::new(
relay_config.url.clone(),
relay_config.api_key.clone(),
relay_config.request_timeout_secs,
)
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
let signing_secret = client.get_signing_secret(&team_id).await.map_err(|e| {
ExtensionError::Config(format!("Failed to fetch relay signing secret: {e}"))
})?;
let (event_tx, event_rx) = tokio::sync::mpsc::channel(64);
let channel = crate::channels::relay::RelayChannel::new_with_provider(
client.clone(),
crate::channels::relay::channel::RelayProvider::Slack,
team_id.clone(),
instance_id.clone(),
event_tx.clone(),
event_rx,
);
let cm_guard = self.relay_channel_manager.read().await;
let channel_mgr = cm_guard.as_ref().ok_or_else(|| {
ExtensionError::ActivationFailed("Channel manager not initialized".to_string())
})?;
channel_mgr
.hot_add(Box::new(channel))
.await
.map_err(|e| ExtensionError::ActivationFailed(e.to_string()))?;
if let Ok(mut cache) = self.relay_signing_secret_cache.lock() {
*cache = Some(signing_secret);
}
*self.relay_event_tx.lock().await = Some(event_tx);
self.active_channel_names
.write()
.await
.insert(name.to_string());
self.persist_active_channels(user_id).await;
let status_msg = "Slack connected via channel relay".to_string();
self.broadcast_extension_status(name, "active", Some(&status_msg))
.await;
Ok(ActivateResult {
name: name.to_string(),
kind: ExtensionKind::ChannelRelay,
tools_loaded: Vec::new(),
message: status_msg,
})
}
pub async fn activate_stored_relay(
&self,
name: &str,
user_id: &str,
) -> Result<(), ExtensionError> {
self.installed_relay_extensions
.write()
.await
.insert(name.to_string());
self.activate_channel_relay(name, user_id).await?;
Ok(())
}
async fn determine_installed_kind(
&self,
name: &str,
user_id: &str,
) -> Result<ExtensionKind, ExtensionError> {
if self.get_mcp_server(name, user_id).await.is_ok() {
return Ok(ExtensionKind::McpServer);
}
let wasm_path = self.wasm_tools_dir.join(format!("{}.wasm", name));
if wasm_path.exists() {
return Ok(ExtensionKind::WasmTool);
}
let channel_path = self.wasm_channels_dir.join(format!("{}.wasm", name));
if channel_path.exists() {
return Ok(ExtensionKind::WasmChannel);
}
if self.installed_relay_extensions.read().await.contains(name) {
return Ok(ExtensionKind::ChannelRelay);
}
if self.is_relay_channel(name, user_id).await {
return Ok(ExtensionKind::ChannelRelay);
}
Err(ExtensionError::NotInstalled(format!(
"'{}' is not installed as an MCP server, WASM tool, WASM channel, or channel relay",
name
)))
}
fn validate_extension_name(name: &str) -> Result<(), ExtensionError> {
if name.contains('/') || name.contains('\\') || name.contains("..") || name.contains('\0') {
return Err(ExtensionError::InstallFailed(format!(
"Invalid extension name '{}': contains path separator or traversal characters",
name
)));
}
Ok(())
}
fn setup_fields_setting_key(name: &str) -> String {
format!("extensions.{name}.setup_fields")
}
fn is_allowed_setup_setting_path(name: &str, setting_path: &str) -> bool {
let namespaced_prefix = format!("extensions.{name}.");
setting_path.starts_with(&namespaced_prefix)
|| ALLOWED_GLOBAL_SETUP_SETTING_PATHS.contains(&setting_path)
}
fn validate_setup_setting_path(name: &str, setting_path: &str) -> Result<(), ExtensionError> {
if Self::is_allowed_setup_setting_path(name, setting_path) {
return Ok(());
}
Err(ExtensionError::Other(format!(
"Invalid setting_path '{}' for extension '{}': only 'extensions.{}.*' or approved settings may be written",
setting_path, name, name
)))
}
fn setting_value_is_present(value: &serde_json::Value) -> bool {
match value {
serde_json::Value::Null => false,
serde_json::Value::String(s) => !s.trim().is_empty(),
serde_json::Value::Array(a) => !a.is_empty(),
serde_json::Value::Object(o) => !o.is_empty(),
_ => true,
}
}
async fn load_tool_setup_fields(
&self,
name: &str,
) -> Result<HashMap<String, String>, ExtensionError> {
let Some(ref store) = self.store else {
return Ok(HashMap::new());
};
let key = Self::setup_fields_setting_key(name);
match store.get_setting(&self.user_id, &key).await {
Ok(Some(value)) => serde_json::from_value::<HashMap<String, String>>(value)
.map_err(|e| ExtensionError::Other(format!("Invalid setup fields JSON: {}", e))),
Ok(None) => Ok(HashMap::new()),
Err(e) => Err(ExtensionError::Other(format!(
"Failed to read setup fields for '{}': {}",
name, e
))),
}
}
async fn save_tool_setup_fields(
&self,
name: &str,
fields: &HashMap<String, String>,
) -> Result<(), ExtensionError> {
let store = self.store.as_ref().ok_or_else(|| {
ExtensionError::Other("Settings store unavailable for setup field persistence".into())
})?;
let key = Self::setup_fields_setting_key(name);
let value = serde_json::to_value(fields)
.map_err(|e| ExtensionError::Other(format!("Failed to encode setup fields: {}", e)))?;
store
.set_setting(&self.user_id, &key, &value)
.await
.map_err(|e| {
ExtensionError::Other(format!(
"Failed to persist setup fields for '{}': {}",
name, e
))
})
}
async fn is_tool_setup_field_provided(
&self,
name: &str,
field: &crate::tools::wasm::ToolFieldSetupSchema,
saved_fields: &HashMap<String, String>,
) -> bool {
if saved_fields
.get(&field.name)
.is_some_and(|value| !value.trim().is_empty())
{
return true;
}
if let (Some(store), Some(setting_path)) = (&self.store, &field.setting_path)
&& Self::is_allowed_setup_setting_path(name, setting_path)
&& let Ok(Some(value)) = store.get_setting(&self.user_id, setting_path).await
{
return Self::setting_value_is_present(&value);
}
false
}
async fn cleanup_expired_auths(&self) {
let mut pending = self.pending_auth.write().await;
pending.retain(|_, auth| {
let expired = auth.created_at.elapsed() >= std::time::Duration::from_secs(300);
if expired {
if let Some(ref handle) = auth.task_handle {
handle.abort();
}
}
!expired
});
}
pub async fn get_setup_schema(
&self,
name: &str,
user_id: &str,
) -> Result<ExtensionSetupSchema, ExtensionError> {
Self::validate_extension_name(name)?;
let kind = self.determine_installed_kind(name, user_id).await?;
match kind {
ExtensionKind::WasmChannel => {
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
if !cap_path.exists() {
return Ok(ExtensionSetupSchema {
secrets: Vec::new(),
fields: Vec::new(),
});
}
let cap_bytes = tokio::fs::read(&cap_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let cap_file =
crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&cap_bytes)
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let mut secrets = Vec::new();
for secret in &cap_file.setup.required_secrets {
let provided = self
.secrets
.exists(user_id, &secret.name)
.await
.unwrap_or(false);
secrets.push(crate::channels::web::types::SecretFieldInfo {
name: secret.name.clone(),
prompt: secret.prompt.clone(),
optional: secret.optional,
provided,
auto_generate: secret.auto_generate.is_some(),
});
}
Ok(ExtensionSetupSchema {
secrets,
fields: Vec::new(),
})
}
ExtensionKind::WasmTool => {
let Some(cap_file) = self.load_tool_capabilities(name).await else {
return Ok(ExtensionSetupSchema {
secrets: Vec::new(),
fields: Vec::new(),
});
};
let mut secrets = Vec::new();
let mut fields = Vec::new();
if let Some(setup) = &cap_file.setup {
let saved_fields = self.load_tool_setup_fields(name).await.unwrap_or_default();
for secret in &setup.required_secrets {
if Self::is_auto_resolved_oauth_field(&secret.name, &cap_file) {
continue;
}
let provided = self
.secrets
.exists(user_id, &secret.name)
.await
.unwrap_or(false);
secrets.push(crate::channels::web::types::SecretFieldInfo {
name: secret.name.clone(),
prompt: secret.prompt.clone(),
optional: secret.optional,
provided,
auto_generate: false,
});
}
for field in &setup.required_fields {
let provided = self
.is_tool_setup_field_provided(name, field, &saved_fields)
.await;
fields.push(crate::channels::web::types::SetupFieldInfo {
name: field.name.clone(),
prompt: field.prompt.clone(),
optional: field.optional,
provided,
input_type: field.input_type,
});
}
}
Ok(ExtensionSetupSchema { secrets, fields })
}
_ => Ok(ExtensionSetupSchema {
secrets: Vec::new(),
fields: Vec::new(),
}),
}
}
async fn configure_telegram_binding(
&self,
name: &str,
secrets: &std::collections::HashMap<String, String>,
) -> Result<TelegramBindingResult, ExtensionError> {
let explicit_token = secrets
.get("telegram_bot_token")
.map(|v| v.trim().to_string())
.filter(|v| !v.is_empty());
let bot_token = if let Some(token) = explicit_token.clone() {
token
} else {
match self
.secrets
.get_decrypted(&self.user_id, "telegram_bot_token")
.await
{
Ok(secret) => {
let token = secret.expose().trim().to_string();
if token.is_empty() {
return Err(ExtensionError::ValidationFailed(
"Telegram bot token is required before owner verification".to_string(),
));
}
token
}
Err(crate::secrets::SecretError::NotFound(_)) => {
return Err(ExtensionError::ValidationFailed(
"Telegram bot token is required before owner verification".to_string(),
));
}
Err(err) => {
return Err(ExtensionError::Config(format!(
"Failed to read stored Telegram bot token: {err}"
)));
}
}
};
let existing_owner_id = self.current_channel_owner_id(name).await;
let binding = self
.resolve_telegram_binding(name, &bot_token, existing_owner_id)
.await?;
match &binding {
TelegramBindingResult::Bound(data) => {
self.set_channel_owner_id(name, data.owner_id).await?;
if let Some(username) = data.bot_username.as_deref()
&& let Some(store) = self.store.as_ref()
{
store
.set_setting(
&self.user_id,
&bot_username_setting_key(name),
&serde_json::json!(username),
)
.await
.map_err(|e| ExtensionError::Config(e.to_string()))?;
}
}
TelegramBindingResult::Pending(challenge) => {
if let Some(deep_link) = challenge.deep_link.as_deref()
&& let Some(username) = deep_link
.strip_prefix("https://t.me/")
.and_then(|rest| rest.split('?').next())
.filter(|value| !value.trim().is_empty())
&& let Some(store) = self.store.as_ref()
{
store
.set_setting(
&self.user_id,
&bot_username_setting_key(name),
&serde_json::json!(username),
)
.await
.map_err(|e| ExtensionError::Config(e.to_string()))?;
}
}
}
Ok(binding)
}
async fn resolve_telegram_binding(
&self,
name: &str,
bot_token: &str,
existing_owner_id: Option<i64>,
) -> Result<TelegramBindingResult, ExtensionError> {
#[cfg(test)]
if let Some(resolver) = self.test_telegram_binding_resolver.read().await.as_ref() {
return resolver(bot_token, existing_owner_id);
}
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let get_me_url = format!("https://api.telegram.org/bot{bot_token}/getMe");
let get_me_resp = client
.get(&get_me_url)
.send()
.await
.map_err(|e| telegram_request_error("getMe", &e))?;
let get_me_status = get_me_resp.status();
if !get_me_status.is_success() {
return Err(ExtensionError::ValidationFailed(format!(
"Telegram token validation failed (HTTP {get_me_status})"
)));
}
let get_me: TelegramGetMeResponse = get_me_resp
.json()
.await
.map_err(|e| telegram_response_parse_error("getMe", &e))?;
if !get_me.ok {
return Err(ExtensionError::ValidationFailed(
get_me
.description
.unwrap_or_else(|| "Telegram getMe returned ok=false".to_string()),
));
}
let bot_username = get_me
.result
.and_then(|result| result.username)
.filter(|username| !username.trim().is_empty());
if let Some(owner_id) = existing_owner_id {
self.clear_pending_telegram_verification(name).await;
return Ok(TelegramBindingResult::Bound(TelegramBindingData {
owner_id,
bot_username: bot_username.clone(),
binding_state: TelegramOwnerBindingState::Existing,
}));
}
let pending_challenge = self.get_pending_telegram_verification(name).await;
let challenge = if let Some(challenge) = pending_challenge {
challenge
} else {
return Ok(TelegramBindingResult::Pending(
self.issue_telegram_verification_challenge(
&client,
name,
bot_token,
bot_username.as_deref(),
)
.await?,
));
};
let now = unix_timestamp_secs();
if challenge.expires_at_unix <= now {
self.clear_pending_telegram_verification(name).await;
return Ok(TelegramBindingResult::Pending(
self.issue_telegram_verification_challenge(
&client,
name,
bot_token,
bot_username.as_deref(),
)
.await?,
));
}
let deadline = std::time::Instant::now()
+ std::time::Duration::from_secs(TELEGRAM_OWNER_BIND_TIMEOUT_SECS);
let mut offset = 0_i64;
while std::time::Instant::now() < deadline {
let remaining_secs = deadline
.saturating_duration_since(std::time::Instant::now())
.as_secs()
.max(1);
let poll_timeout_secs = TELEGRAM_GET_UPDATES_TIMEOUT_SECS.min(remaining_secs);
let resp = client
.get(format!(
"https://api.telegram.org/bot{bot_token}/getUpdates"
))
.query(&[
("offset", offset.to_string()),
("timeout", poll_timeout_secs.to_string()),
(
"allowed_updates",
"[\"message\",\"edited_message\"]".to_string(),
),
])
.send()
.await
.map_err(|e| telegram_request_error("getUpdates", &e))?;
if !resp.status().is_success() {
return Err(ExtensionError::Other(format!(
"Telegram getUpdates failed (HTTP {})",
resp.status()
)));
}
let updates: TelegramGetUpdatesResponse = resp
.json()
.await
.map_err(|e| telegram_response_parse_error("getUpdates", &e))?;
if !updates.ok {
return Err(ExtensionError::Other(updates.description.unwrap_or_else(
|| "Telegram getUpdates returned ok=false".to_string(),
)));
}
let mut bound_owner_id = None;
for update in updates.result {
offset = offset.max(update.update_id + 1);
let message = update.message.or(update.edited_message);
if let Some(message) = message
&& message.chat.chat_type == "private"
&& let Some(from) = message.from
&& !from.is_bot
&& let Some(text) = message.text.as_deref()
&& telegram_message_matches_verification_code(text, &challenge.code)
{
bound_owner_id = Some(from.id);
}
}
if let Some(owner_id) = bound_owner_id {
if let Err(err) = send_telegram_text_message(
&client,
&format!("https://api.telegram.org/bot{bot_token}/sendMessage"),
owner_id,
"Verification received. Finishing setup...",
)
.await
{
tracing::warn!(
channel = name,
owner_id,
error = %err,
"Failed to send Telegram verification acknowledgment"
);
}
self.clear_pending_telegram_verification(name).await;
if offset > 0 {
let _ = client
.get(format!(
"https://api.telegram.org/bot{bot_token}/getUpdates"
))
.query(&[("offset", offset.to_string()), ("timeout", "0".to_string())])
.send()
.await;
}
return Ok(TelegramBindingResult::Bound(TelegramBindingData {
owner_id,
bot_username,
binding_state: TelegramOwnerBindingState::VerifiedNow,
}));
}
}
self.clear_pending_telegram_verification(name).await;
Err(ExtensionError::ValidationFailed(
"Telegram owner verification timed out. Request a new code and try again.".to_string(),
))
}
async fn notify_telegram_owner_verified(
&self,
channel_name: &str,
binding: Option<&TelegramBindingData>,
) {
let Some(binding) = binding else {
return;
};
if binding.binding_state != TelegramOwnerBindingState::VerifiedNow {
return;
}
let channel_manager = {
let rt_guard = self.channel_runtime.read().await;
rt_guard.as_ref().map(|rt| Arc::clone(&rt.channel_manager))
};
let Some(channel_manager) = channel_manager else {
tracing::debug!(
channel = channel_name,
owner_id = binding.owner_id,
"Skipping Telegram owner confirmation message because channel runtime is unavailable"
);
return;
};
if let Err(err) = channel_manager
.broadcast(
channel_name,
&binding.owner_id.to_string(),
OutgoingResponse::text(
"Telegram owner verified. This bot is now active and ready for you.",
),
)
.await
{
tracing::warn!(
channel = channel_name,
owner_id = binding.owner_id,
error = %err,
"Failed to send Telegram owner verification confirmation"
);
}
}
pub async fn configure(
&self,
name: &str,
secrets: &std::collections::HashMap<String, String>,
fields: &std::collections::HashMap<String, String>,
user_id: &str,
) -> Result<ConfigureResult, ExtensionError> {
Self::validate_extension_name(name)?;
let kind = self.determine_installed_kind(name, user_id).await?;
let mut channel_cap_file: Option<crate::channels::wasm::ChannelCapabilitiesFile> = None;
let (allowed_secrets, setup_fields): (
std::collections::HashSet<String>,
Vec<crate::tools::wasm::ToolFieldSetupSchema>,
) = match kind {
ExtensionKind::WasmChannel => {
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
if !cap_path.exists() {
return Err(ExtensionError::Other(format!(
"Capabilities file not found for '{}'",
name
)));
}
let cap_bytes = tokio::fs::read(&cap_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let cap_file =
crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&cap_bytes)
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let names = cap_file
.setup
.required_secrets
.iter()
.map(|s| s.name.clone())
.collect();
channel_cap_file = Some(cap_file);
(names, Vec::new())
}
ExtensionKind::WasmTool => {
let cap_file = self.load_tool_capabilities(name).await.ok_or_else(|| {
ExtensionError::Other(format!("Capabilities file not found for '{}'", name))
})?;
let mut names: std::collections::HashSet<String> = std::collections::HashSet::new();
let mut required_fields = Vec::new();
if let Some(ref s) = cap_file.setup {
names.extend(s.required_secrets.iter().map(|s| s.name.clone()));
required_fields = s.required_fields.clone();
}
if let Some(ref auth) = cap_file.auth {
names.insert(auth.secret_name.clone());
}
if names.is_empty() && required_fields.is_empty() {
return Err(ExtensionError::Other(format!(
"Tool '{}' has no setup or auth schema — nothing to configure",
name
)));
}
(names, required_fields)
}
ExtensionKind::McpServer => {
let server = self
.get_mcp_server(name, user_id)
.await
.map_err(|e| ExtensionError::NotInstalled(e.to_string()))?;
let mut names = std::collections::HashSet::new();
names.insert(server.token_secret_name());
(names, Vec::new())
}
ExtensionKind::ChannelRelay => (std::collections::HashSet::new(), Vec::new()),
};
let allowed_fields: std::collections::HashSet<String> =
setup_fields.iter().map(|f| f.name.clone()).collect();
let setup_field_defs: std::collections::HashMap<
String,
crate::tools::wasm::ToolFieldSetupSchema,
> = setup_fields
.into_iter()
.map(|f| (f.name.clone(), f))
.collect();
if let Some(ref cap_file) = channel_cap_file
&& let Some(ref endpoint_template) = cap_file.setup.validation_endpoint
&& let Some(secret_def) = cap_file
.setup
.required_secrets
.iter()
.find(|s| !s.optional && secrets.contains_key(&s.name))
&& let Some(token_value) = secrets.get(&secret_def.name)
{
let token = token_value.trim();
if !token.is_empty() {
let url = if name == "telegram" {
endpoint_template.replace(&format!("{{{}}}", secret_def.name), token)
} else {
let encoded =
url::form_urlencoded::byte_serialize(token.as_bytes()).collect::<String>();
endpoint_template.replace(&format!("{{{}}}", secret_def.name), &encoded)
};
crate::tools::builtin::skill_tools::validate_fetch_url(&url)
.map_err(|e| ExtensionError::Other(format!("SSRF blocked: {}", e)))?;
let resp = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(10))
.build()
.map_err(|e| ExtensionError::Other(e.to_string()))?
.get(&url)
.send()
.await
.map_err(|e| {
ExtensionError::Other(format!("Token validation request failed: {}", e))
})?;
if !resp.status().is_success() {
return Err(ExtensionError::ValidationFailed(format!(
"Invalid token (API returned {})",
resp.status()
)));
}
}
}
for (secret_name, secret_value) in secrets {
if !allowed_secrets.contains(secret_name.as_str()) {
return Err(ExtensionError::Other(format!(
"Unknown secret '{}' for extension '{}'",
secret_name, name
)));
}
let trimmed_value = secret_value.trim();
if trimmed_value.is_empty() {
continue;
}
let params =
CreateSecretParams::new(secret_name, trimmed_value).with_provider(name.to_string());
self.secrets
.create(user_id, params)
.await
.map_err(|e| ExtensionError::AuthFailed(e.to_string()))?;
}
let mut restart_required = false;
let mut stored_fields = self.load_tool_setup_fields(name).await.unwrap_or_default();
for (field_name, field_value) in fields {
if !allowed_fields.contains(field_name.as_str()) {
return Err(ExtensionError::Other(format!(
"Unknown field '{}' for extension '{}'",
field_name, name
)));
}
let trimmed = field_value.trim();
if trimmed.is_empty() {
continue;
}
stored_fields.insert(field_name.clone(), trimmed.to_string());
if let Some(field_def) = setup_field_defs.get(field_name) {
if field_def.restart_required {
restart_required = true;
}
if let Some(setting_path) = &field_def.setting_path {
Self::validate_setup_setting_path(name, setting_path)?;
let store = self.store.as_ref().ok_or_else(|| {
ExtensionError::Other(
"Settings store unavailable for setup field persistence".to_string(),
)
})?;
store
.set_setting(
&self.user_id,
setting_path,
&serde_json::Value::String(trimmed.to_string()),
)
.await
.map_err(|e| {
ExtensionError::Other(format!(
"Failed to set '{}' for extension '{}': {}",
setting_path, name, e
))
})?;
}
}
}
if !allowed_fields.is_empty() && !fields.is_empty() {
self.save_tool_setup_fields(name, &stored_fields).await?;
}
for field_def in setup_field_defs.values() {
if field_def.optional {
continue;
}
if !self
.is_tool_setup_field_provided(name, field_def, &stored_fields)
.await
{
return Err(ExtensionError::Other(format!(
"Required field '{}' is missing for extension '{}'",
field_def.name, name
)));
}
}
if let Some(ref cap_file) = channel_cap_file {
for secret_def in &cap_file.setup.required_secrets {
if let Some(ref auto_gen) = secret_def.auto_generate {
let already_provided = secrets
.get(&secret_def.name)
.is_some_and(|v| !v.trim().is_empty());
let already_stored = self
.secrets
.exists(user_id, &secret_def.name)
.await
.unwrap_or(false);
if !already_provided && !already_stored {
use rand::RngCore;
use rand::rngs::OsRng;
let mut bytes = vec![0u8; auto_gen.length];
OsRng.fill_bytes(&mut bytes);
let hex_value: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
let params = CreateSecretParams::new(&secret_def.name, &hex_value)
.with_provider(name.to_string());
self.secrets
.create(user_id, params)
.await
.map_err(|e| ExtensionError::AuthFailed(e.to_string()))?;
tracing::info!(
"Auto-generated secret '{}' for channel '{}'",
secret_def.name,
name
);
}
}
}
}
let mut telegram_binding = None;
if kind == ExtensionKind::WasmChannel && name == TELEGRAM_CHANNEL_NAME {
match self.configure_telegram_binding(name, secrets).await? {
TelegramBindingResult::Bound(binding) => {
telegram_binding = Some(binding);
}
TelegramBindingResult::Pending(verification) => {
return Ok(ConfigureResult {
message: format!(
"Configuration saved for '{}'. {}",
name, verification.instructions
),
activated: false,
restart_required,
auth_url: None,
verification: Some(verification),
});
}
}
}
if kind == ExtensionKind::WasmTool {
match self.activate_wasm_tool(name, user_id).await {
Ok(result) => {
if let Some(cap) = self.load_tool_capabilities(name).await
&& let Some(ref auth_cfg) = cap.auth
&& auth_cfg.oauth.is_some()
{
let _ = self.secrets.delete(user_id, &auth_cfg.secret_name).await;
let _ = self
.secrets
.delete(user_id, &format!("{}_scopes", auth_cfg.secret_name))
.await;
let _ = self
.secrets
.delete(user_id, &format!("{}_refresh_token", auth_cfg.secret_name))
.await;
}
let mut auth_url = None;
if let Ok(auth_result) = Box::pin(self.auth(name, user_id)).await {
auth_url = auth_result.auth_url().map(String::from);
}
let message = if auth_url.is_some() {
format!(
"Configuration saved and tool '{}' activated. Complete OAuth in your browser.",
name
)
} else {
format!(
"Configuration saved and tool '{}' activated. {}",
name, result.message
)
};
return Ok(ConfigureResult {
message,
activated: true,
restart_required,
auth_url,
verification: None,
});
}
Err(e) => {
tracing::debug!(
"Auto-activation of tool '{}' after setup failed: {}",
name,
e
);
return Ok(ConfigureResult {
message: format!("Configuration saved for '{}'.", name),
activated: false,
restart_required,
auth_url: None,
verification: None,
});
}
}
}
let activate_result = match kind {
ExtensionKind::WasmChannel => self.activate_wasm_channel(name, user_id).await,
ExtensionKind::McpServer => self.activate_mcp(name, user_id).await,
ExtensionKind::ChannelRelay => self.activate_channel_relay(name, user_id).await,
ExtensionKind::WasmTool => {
return Ok(ConfigureResult {
message: format!("Configuration saved for '{}'.", name),
activated: false,
restart_required,
auth_url: None,
verification: None,
});
}
};
match activate_result {
Ok(result) => {
self.activation_errors.write().await.remove(name);
self.broadcast_extension_status(name, "active", None).await;
if name == TELEGRAM_CHANNEL_NAME {
self.notify_telegram_owner_verified(name, telegram_binding.as_ref())
.await;
}
let message = if name == TELEGRAM_CHANNEL_NAME {
format!(
"Configuration saved, Telegram owner verified, and '{}' activated. {}",
name, result.message
)
} else {
format!(
"Configuration saved and '{}' activated. {}",
name, result.message
)
};
Ok(ConfigureResult {
message,
activated: true,
restart_required,
auth_url: None,
verification: None,
})
}
Err(e) => {
let error_msg = e.to_string();
tracing::warn!(
extension = name,
error = %e,
"Saved configuration but activation failed"
);
self.activation_errors
.write()
.await
.insert(name.to_string(), error_msg.clone());
self.broadcast_extension_status(name, "failed", Some(&error_msg))
.await;
Ok(ConfigureResult {
message: format!(
"Configuration saved for '{}'. Activation failed: {}",
name, e
),
activated: false,
restart_required,
auth_url: None,
verification: None,
})
}
}
}
pub async fn configure_token(
&self,
name: &str,
token: &str,
user_id: &str,
) -> Result<ConfigureResult, ExtensionError> {
let kind = self.determine_installed_kind(name, user_id).await?;
let secret_name = match kind {
ExtensionKind::WasmChannel => {
let cap_path = self
.wasm_channels_dir
.join(format!("{}.capabilities.json", name));
let cap_bytes = tokio::fs::read(&cap_path)
.await
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let cap_file =
crate::channels::wasm::ChannelCapabilitiesFile::from_bytes(&cap_bytes)
.map_err(|e| ExtensionError::Other(e.to_string()))?;
let mut target = None;
for s in &cap_file.setup.required_secrets {
if s.optional {
continue;
}
if !self.secrets.exists(user_id, &s.name).await.unwrap_or(false) {
target = Some(s.name.clone());
break;
}
}
target
.or_else(|| {
cap_file
.setup
.required_secrets
.iter()
.find(|s| !s.optional)
.map(|s| s.name.clone())
})
.ok_or_else(|| {
ExtensionError::Other(format!("Channel '{}' has no required secrets", name))
})?
}
ExtensionKind::WasmTool => {
let cap = self.load_tool_capabilities(name).await.ok_or_else(|| {
ExtensionError::Other(format!("Capabilities not found for '{}'", name))
})?;
if let Some(ref auth) = cap.auth {
if !self
.secrets
.exists(user_id, &auth.secret_name)
.await
.unwrap_or(false)
{
auth.secret_name.clone()
} else if let Some(ref setup) = cap.setup {
let mut found = None;
for s in &setup.required_secrets {
if !self.secrets.exists(user_id, &s.name).await.unwrap_or(false) {
found = Some(s.name.clone());
break;
}
}
found.unwrap_or_else(|| auth.secret_name.clone())
} else {
auth.secret_name.clone()
}
} else {
cap.setup
.as_ref()
.and_then(|s| s.required_secrets.first())
.map(|s| s.name.clone())
.ok_or_else(|| {
ExtensionError::Other(format!(
"Tool '{}' has no auth or setup secrets",
name
))
})?
}
}
ExtensionKind::McpServer => {
let server = self
.get_mcp_server(name, user_id)
.await
.map_err(|e| ExtensionError::NotInstalled(e.to_string()))?;
server.token_secret_name()
}
ExtensionKind::ChannelRelay => {
return Err(ExtensionError::AuthRequired);
}
};
let mut secrets = std::collections::HashMap::new();
secrets.insert(secret_name, token.to_string());
self.configure(name, &secrets, &std::collections::HashMap::new(), user_id)
.await
}
async fn revoke_credential_mappings(&self, cap_path: &std::path::Path) {
if !cap_path.exists() {
return;
}
let Ok(bytes) = tokio::fs::read(cap_path).await else {
return;
};
let Ok(json) = serde_json::from_slice::<serde_json::Value>(&bytes) else {
return;
};
let secret_names: Vec<String> = json
.get("http")
.and_then(|h| h.get("credentials"))
.and_then(|c| c.as_object())
.map(|creds| {
creds
.values()
.filter_map(|v| v.get("secret_name").and_then(|s| s.as_str()))
.map(String::from)
.collect()
})
.unwrap_or_default();
if secret_names.is_empty() {
return;
}
if let Some(cr) = self.tool_registry.credential_registry() {
cr.remove_mappings_for_secrets(&secret_names);
tracing::info!(
secrets = ?secret_names,
"Revoked credential mappings for removed extension"
);
}
}
async fn unregister_hook_prefix(&self, prefix: &str) -> usize {
let Some(ref hooks) = self.hooks else {
return 0;
};
let names = hooks.list().await;
let mut removed = 0;
for hook_name in names {
if hook_name.starts_with(prefix) && hooks.unregister(&hook_name).await {
removed += 1;
}
}
removed
}
}
async fn inject_channel_credentials_from_secrets(
channel: &Arc<crate::channels::wasm::WasmChannel>,
secrets: Option<&dyn SecretsStore>,
channel_name: &str,
user_id: &str,
) -> Result<usize, String> {
let mut count = 0;
let mut injected_placeholders = std::collections::HashSet::new();
if let Some(secrets) = secrets {
let all_secrets = secrets
.list(user_id)
.await
.map_err(|e| format!("Failed to list secrets: {}", e))?;
let prefix = format!("{}_", channel_name.to_ascii_lowercase());
for secret_meta in all_secrets {
if !secret_meta.name.to_ascii_lowercase().starts_with(&prefix) {
continue;
}
let decrypted = match secrets.get_decrypted(user_id, &secret_meta.name).await {
Ok(d) => d,
Err(e) => {
tracing::warn!(
secret = %secret_meta.name,
error = %e,
"Failed to decrypt secret for channel credential injection"
);
continue;
}
};
let placeholder = secret_meta.name.to_uppercase();
channel
.set_credential(&placeholder, decrypted.expose().to_string())
.await;
injected_placeholders.insert(placeholder);
count += 1;
}
}
count += inject_env_credentials(channel, channel_name, &injected_placeholders).await;
Ok(count)
}
async fn inject_env_credentials(
channel: &Arc<crate::channels::wasm::WasmChannel>,
channel_name: &str,
already_injected: &std::collections::HashSet<String>,
) -> usize {
if channel_name.trim().is_empty() {
return 0;
}
let caps = channel.capabilities();
let Some(ref http_cap) = caps.tool_capabilities.http else {
return 0;
};
let placeholders: Vec<String> = http_cap
.credentials
.values()
.map(|m| m.secret_name.to_uppercase())
.collect();
let resolved = resolve_env_credentials(&placeholders, channel_name, already_injected);
let count = resolved.len();
for (placeholder, value) in resolved {
channel.set_credential(&placeholder, value).await;
}
count
}
pub(crate) fn resolve_env_credentials(
placeholders: &[String],
channel_name: &str,
already_injected: &std::collections::HashSet<String>,
) -> Vec<(String, String)> {
if channel_name.trim().is_empty() {
return Vec::new();
}
let prefix = format!("{}_", channel_name.to_ascii_uppercase());
let mut out = Vec::new();
for placeholder in placeholders {
if already_injected.contains(placeholder) {
continue;
}
if !placeholder.starts_with(&prefix) {
tracing::warn!(
channel = %channel_name,
placeholder = %placeholder,
"Ignoring non-prefixed credential placeholder in environment fallback"
);
continue;
}
if let Ok(value) = std::env::var(placeholder)
&& !value.is_empty()
{
out.push((placeholder.clone(), value));
}
}
out
}
fn infer_kind_from_url(url: &str) -> ExtensionKind {
if url.ends_with(".wasm") || url.ends_with(".tar.gz") {
ExtensionKind::WasmTool
} else {
ExtensionKind::McpServer
}
}
enum FallbackDecision {
Return,
TryFallback,
}
fn fallback_decision(
primary_result: &Result<InstallResult, ExtensionError>,
fallback_source: &Option<Box<ExtensionSource>>,
) -> FallbackDecision {
match (primary_result, fallback_source) {
(Ok(_), _) => FallbackDecision::Return,
(Err(ExtensionError::AlreadyInstalled(_)), _) => FallbackDecision::Return,
(Err(_), Some(_)) => FallbackDecision::TryFallback,
(Err(_), None) => FallbackDecision::Return,
}
}
fn combine_install_errors(
primary_err: ExtensionError,
fallback_err: ExtensionError,
) -> ExtensionError {
if matches!(fallback_err, ExtensionError::AlreadyInstalled(_)) {
return fallback_err;
}
ExtensionError::FallbackFailed {
primary: Box::new(primary_err),
fallback: Box::new(fallback_err),
}
}
#[cfg(test)]
mod tests {
use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use futures::stream;
use crate::channels::wasm::{
ChannelCapabilities, LoadedChannel, PreparedChannelModule, WasmChannel, WasmChannelRouter,
WasmChannelRuntime, WasmChannelRuntimeConfig, bot_username_setting_key,
};
use crate::channels::{
Channel, ChannelManager, IncomingMessage, MessageStream, OutgoingResponse, StatusUpdate,
};
use crate::extensions::ExtensionManager;
use crate::extensions::manager::{
ChannelRuntimeState, FallbackDecision, TelegramBindingData, TelegramBindingResult,
TelegramOwnerBindingState, build_wasm_channel_runtime_config_updates,
combine_install_errors, fallback_decision, infer_kind_from_url,
normalize_hosted_callback_url, send_telegram_text_message,
telegram_message_matches_verification_code,
};
use crate::extensions::{
ExtensionError, ExtensionKind, ExtensionSource, InstallResult, VerificationChallenge,
};
use crate::pairing::PairingStore;
fn require(condition: bool, message: impl Into<String>) -> Result<(), String> {
if condition {
Ok(())
} else {
Err(message.into())
}
}
fn require_eq<T>(actual: T, expected: T, label: &str) -> Result<(), String>
where
T: PartialEq + Debug,
{
if actual == expected {
Ok(())
} else {
Err(format!(
"{label} mismatch: expected {:?}, got {:?}",
expected, actual
))
}
}
#[derive(Clone)]
struct RecordingChannel {
name: String,
broadcasts: Arc<tokio::sync::Mutex<Vec<(String, OutgoingResponse)>>>,
}
#[async_trait]
impl Channel for RecordingChannel {
fn name(&self) -> &str {
&self.name
}
async fn start(&self) -> Result<MessageStream, crate::error::ChannelError> {
Ok(Box::pin(stream::empty()))
}
async fn respond(
&self,
_msg: &IncomingMessage,
_response: OutgoingResponse,
) -> Result<(), crate::error::ChannelError> {
Ok(())
}
async fn send_status(
&self,
_status: StatusUpdate,
_metadata: &serde_json::Value,
) -> Result<(), crate::error::ChannelError> {
Ok(())
}
async fn broadcast(
&self,
user_id: &str,
response: OutgoingResponse,
) -> Result<(), crate::error::ChannelError> {
self.broadcasts
.lock()
.await
.push((user_id.to_string(), response));
Ok(())
}
async fn health_check(&self) -> Result<(), crate::error::ChannelError> {
Ok(())
}
}
#[test]
fn test_infer_kind_from_url() {
assert_eq!(
infer_kind_from_url("https://example.com/tool.wasm"),
ExtensionKind::WasmTool
);
assert_eq!(
infer_kind_from_url("https://example.com/tool-wasm32-wasip2.tar.gz"),
ExtensionKind::WasmTool
);
assert_eq!(
infer_kind_from_url("https://mcp.notion.com"),
ExtensionKind::McpServer
);
assert_eq!(
infer_kind_from_url("https://example.com/mcp"),
ExtensionKind::McpServer
);
}
fn make_ok_result() -> Result<InstallResult, ExtensionError> {
Ok(InstallResult {
name: "test".to_string(),
kind: ExtensionKind::WasmTool,
message: "Installed".to_string(),
})
}
fn make_fallback_source() -> Option<Box<ExtensionSource>> {
Some(Box::new(ExtensionSource::WasmBuildable {
source_dir: "tools-src/test".to_string(),
build_dir: Some("tools-src/test".to_string()),
crate_name: Some("test-tool".to_string()),
}))
}
#[test]
fn test_fallback_decision_success_returns_directly() {
let result = make_ok_result();
let fallback = make_fallback_source();
assert!(matches!(
fallback_decision(&result, &fallback),
FallbackDecision::Return
));
}
#[test]
fn test_fallback_decision_already_installed_skips_fallback() {
let result: Result<InstallResult, ExtensionError> =
Err(ExtensionError::AlreadyInstalled("test".to_string()));
let fallback = make_fallback_source();
assert!(matches!(
fallback_decision(&result, &fallback),
FallbackDecision::Return
));
}
#[test]
fn test_fallback_decision_download_failed_triggers_fallback() {
let result: Result<InstallResult, ExtensionError> =
Err(ExtensionError::DownloadFailed("404 Not Found".to_string()));
let fallback = make_fallback_source();
assert!(matches!(
fallback_decision(&result, &fallback),
FallbackDecision::TryFallback
));
}
#[test]
fn test_fallback_decision_error_without_fallback_returns() {
let result: Result<InstallResult, ExtensionError> =
Err(ExtensionError::DownloadFailed("404 Not Found".to_string()));
let fallback = None;
assert!(matches!(
fallback_decision(&result, &fallback),
FallbackDecision::Return
));
}
#[test]
fn test_combine_errors_includes_both_messages() {
let primary = ExtensionError::DownloadFailed("404 Not Found".to_string());
let fallback = ExtensionError::InstallFailed("cargo not found".to_string());
let combined = combine_install_errors(primary, fallback);
assert!(
matches!(combined, ExtensionError::FallbackFailed { .. }),
"Expected FallbackFailed, got: {combined:?}"
);
let msg = combined.to_string();
assert!(msg.contains("404 Not Found"), "missing primary: {msg}");
assert!(msg.contains("cargo not found"), "missing fallback: {msg}");
}
#[test]
fn test_combine_errors_forwards_already_installed_from_fallback() {
let primary = ExtensionError::DownloadFailed("404".to_string());
let fallback = ExtensionError::AlreadyInstalled("test".to_string());
let combined = combine_install_errors(primary, fallback);
assert!(
matches!(combined, ExtensionError::AlreadyInstalled(ref name) if name == "test"),
"Expected AlreadyInstalled, got: {combined:?}"
);
}
#[test]
fn test_tool_and_channel_paths_are_separate() {
let dir = tempfile::tempdir().expect("temp dir");
let tools_dir = dir.path().join("tools");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&tools_dir).unwrap();
std::fs::create_dir_all(&channels_dir).unwrap();
let name = "telegram";
let tool_wasm = tools_dir.join(format!("{}.wasm", name));
let channel_wasm = channels_dir.join(format!("{}.wasm", name));
std::fs::write(&tool_wasm, b"tool-payload").unwrap();
std::fs::write(&channel_wasm, b"channel-payload").unwrap();
assert!(tool_wasm.exists());
assert!(channel_wasm.exists());
assert_ne!(
std::fs::read(&tool_wasm).unwrap(),
std::fs::read(&channel_wasm).unwrap(),
"Tool and channel files must be independent"
);
std::fs::remove_file(&tool_wasm).unwrap();
assert!(!tool_wasm.exists());
assert!(
channel_wasm.exists(),
"Removing tool must not affect channel"
);
}
#[test]
fn test_determine_kind_priority_tools_before_channels() {
let dir = tempfile::tempdir().expect("temp dir");
let tools_dir = dir.path().join("tools");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&tools_dir).unwrap();
std::fs::create_dir_all(&channels_dir).unwrap();
let name = "ambiguous";
let tool_wasm = tools_dir.join(format!("{}.wasm", name));
let channel_wasm = channels_dir.join(format!("{}.wasm", name));
std::fs::write(&channel_wasm, b"channel").unwrap();
assert!(!tool_wasm.exists());
assert!(channel_wasm.exists());
std::fs::write(&tool_wasm, b"tool").unwrap();
assert!(tool_wasm.exists());
assert!(channel_wasm.exists());
std::fs::remove_file(&channel_wasm).unwrap();
assert!(tool_wasm.exists());
assert!(!channel_wasm.exists());
}
async fn make_test_store() -> (Arc<dyn crate::db::Database>, tempfile::TempDir) {
crate::testing::test_db().await
}
fn make_test_manager_with_dirs(
wasm_runtime: Option<Arc<crate::tools::wasm::WasmToolRuntime>>,
tools_dir: std::path::PathBuf,
channels_dir: std::path::PathBuf,
store: Option<Arc<dyn crate::db::Database>>,
) -> crate::extensions::manager::ExtensionManager {
use crate::secrets::{InMemorySecretsStore, SecretsCrypto};
use crate::tools::mcp::process::McpProcessManager;
use crate::tools::mcp::session::McpSessionManager;
std::fs::create_dir_all(&tools_dir).ok();
std::fs::create_dir_all(&channels_dir).ok();
let key = secrecy::SecretString::from(crate::secrets::keychain::generate_master_key_hex());
let crypto = Arc::new(SecretsCrypto::new(key).expect("crypto"));
let secrets: Arc<dyn crate::secrets::SecretsStore + Send + Sync> =
Arc::new(InMemorySecretsStore::new(crypto));
let tools = Arc::new(crate::tools::ToolRegistry::new());
let mcp = Arc::new(McpSessionManager::new());
crate::extensions::manager::ExtensionManager::new(
mcp,
Arc::new(McpProcessManager::new()),
secrets,
tools,
None, wasm_runtime,
tools_dir,
channels_dir,
None, "test".to_string(), store,
vec![],
)
}
fn make_test_manager(
wasm_runtime: Option<Arc<crate::tools::wasm::WasmToolRuntime>>,
tools_dir: std::path::PathBuf,
) -> crate::extensions::manager::ExtensionManager {
make_test_manager_with_dirs(wasm_runtime, tools_dir.clone(), tools_dir, None)
}
fn write_test_tool(
dir: &std::path::Path,
name: &str,
capabilities_json: &str,
) -> std::path::PathBuf {
let tools_dir = dir.join("tools");
std::fs::create_dir_all(&tools_dir).expect("tools dir");
std::fs::write(tools_dir.join(format!("{name}.wasm")), b"not-a-real-wasm").expect("wasm");
std::fs::write(
tools_dir.join(format!("{name}.capabilities.json")),
capabilities_json,
)
.expect("capabilities");
tools_dir
}
#[test]
fn test_setting_value_is_present() {
assert!(
!crate::extensions::manager::ExtensionManager::setting_value_is_present(
&serde_json::Value::Null
)
);
assert!(
!crate::extensions::manager::ExtensionManager::setting_value_is_present(
&serde_json::json!(" ")
)
);
assert!(
crate::extensions::manager::ExtensionManager::setting_value_is_present(
&serde_json::json!("openai")
)
);
assert!(
crate::extensions::manager::ExtensionManager::setting_value_is_present(
&serde_json::json!(["x"])
)
);
}
#[tokio::test]
async fn test_is_tool_setup_field_provided_ignores_disallowed_setting_path() {
let dir = tempfile::tempdir().expect("temp dir");
let (store, _db_dir) = make_test_store().await;
store
.set_setting(
"test",
"nearai.session_token",
&serde_json::json!({"token":"secret"}),
)
.await
.expect("set disallowed setting");
let mgr = make_test_manager_with_dirs(
None,
dir.path().join("tools"),
dir.path().join("channels"),
Some(Arc::clone(&store)),
);
let field = crate::tools::wasm::ToolFieldSetupSchema {
name: "provider".to_string(),
prompt: "Provider".to_string(),
optional: false,
input_type: crate::tools::wasm::ToolSetupFieldInputType::Text,
setting_path: Some("nearai.session_token".to_string()),
restart_required: false,
};
let provided = mgr
.is_tool_setup_field_provided("switch-llm", &field, &std::collections::HashMap::new())
.await;
assert!(
!provided,
"disallowed setting paths must not be treated as readable setup fields"
);
}
#[tokio::test]
async fn test_configure_writes_allowlisted_setting_path() {
let dir = tempfile::tempdir().expect("temp dir");
let (store, _db_dir) = make_test_store().await;
let tools_dir = write_test_tool(
dir.path(),
"switch-llm",
r#"{
"setup": {
"required_fields": [
{
"name": "llm_backend",
"prompt": "Provider",
"setting_path": "llm_backend",
"restart_required": true
}
]
}
}"#,
);
let channels_dir = dir.path().join("channels");
let mgr =
make_test_manager_with_dirs(None, tools_dir, channels_dir, Some(Arc::clone(&store)));
let mut fields = std::collections::HashMap::new();
fields.insert("llm_backend".to_string(), "openai".to_string());
let result = mgr
.configure(
"switch-llm",
&std::collections::HashMap::new(),
&fields,
"test-user",
)
.await
.expect("save configuration");
assert!(
!result.activated,
"tool should not auto-activate without runtime"
);
assert!(
result.restart_required,
"backend switch should require restart"
);
assert_eq!(
store
.get_setting("test", "llm_backend")
.await
.expect("get setting"),
Some(serde_json::json!("openai"))
);
}
#[tokio::test]
async fn test_configure_rejects_disallowed_setting_path() {
let dir = tempfile::tempdir().expect("temp dir");
let (store, _db_dir) = make_test_store().await;
let tools_dir = write_test_tool(
dir.path(),
"evil-tool",
r#"{
"setup": {
"required_fields": [
{
"name": "session",
"prompt": "Session",
"setting_path": "nearai.session_token"
}
]
}
}"#,
);
let channels_dir = dir.path().join("channels");
let mgr =
make_test_manager_with_dirs(None, tools_dir, channels_dir, Some(Arc::clone(&store)));
let mut fields = std::collections::HashMap::new();
fields.insert("session".to_string(), "overwrite".to_string());
let err = match mgr
.configure(
"evil-tool",
&std::collections::HashMap::new(),
&fields,
"test-user",
)
.await
{
Ok(_) => panic!("disallowed setting_path should fail"),
Err(err) => err,
};
let msg = err.to_string();
assert!(
msg.contains("Invalid setting_path"),
"unexpected error message: {msg}"
);
assert_eq!(
store
.get_setting("test", "nearai.session_token")
.await
.expect("get disallowed setting"),
None
);
}
#[tokio::test]
async fn test_activate_wasm_tool_with_runtime_passes_runtime_check() {
let dir = tempfile::tempdir().expect("temp dir");
let config = crate::tools::wasm::WasmRuntimeConfig::for_testing();
let runtime = Arc::new(crate::tools::wasm::WasmToolRuntime::new(config).expect("runtime"));
let mgr = make_test_manager(Some(runtime), dir.path().to_path_buf());
let err = mgr.activate("nonexistent", "test").await.unwrap_err();
let msg = err.to_string();
assert!(
!msg.contains("WASM runtime not available"),
"Should not fail on runtime check, got: {msg}"
);
assert!(
msg.contains("not found")
|| msg.contains("not installed")
|| msg.contains("Not installed"),
"Should fail on missing file, got: {msg}"
);
}
#[tokio::test]
async fn test_activate_wasm_tool_without_runtime_fails_with_runtime_error() {
let dir = tempfile::tempdir().expect("temp dir");
std::fs::write(dir.path().join("fake.wasm"), b"not-a-real-wasm").unwrap();
let mgr = make_test_manager(None, dir.path().to_path_buf());
let err = mgr.activate("fake", "test").await.unwrap_err();
let msg = err.to_string();
assert!(
msg.contains("WASM runtime not available"),
"Expected runtime not available error, got: {msg}"
);
}
#[test]
fn test_capabilities_files_also_separate() {
let dir = tempfile::tempdir().expect("temp dir");
let tools_dir = dir.path().join("tools");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&tools_dir).unwrap();
std::fs::create_dir_all(&channels_dir).unwrap();
let name = "telegram";
let tool_cap = tools_dir.join(format!("{}.capabilities.json", name));
let channel_cap = channels_dir.join(format!("{}.capabilities.json", name));
let tool_caps = r#"{"required_secrets":["TELEGRAM_API_KEY"]}"#;
let channel_caps = r#"{"required_secrets":["TELEGRAM_BOT_TOKEN"]}"#;
std::fs::write(&tool_cap, tool_caps).unwrap();
std::fs::write(&channel_cap, channel_caps).unwrap();
assert_eq!(std::fs::read_to_string(&tool_cap).unwrap(), tool_caps);
assert_eq!(std::fs::read_to_string(&channel_cap).unwrap(), channel_caps);
}
#[tokio::test]
async fn test_upgrade_no_installed_extensions() {
let manager = make_manager_with_temp_dirs();
let result = manager.upgrade(None, "test").await.unwrap();
assert!(result.results.is_empty());
assert!(result.message.contains("No WASM extensions installed"));
}
#[tokio::test]
async fn test_upgrade_mcp_server_rejected() {
let manager = make_manager_with_temp_dirs();
let err = manager.upgrade(Some("some-mcp"), "test").await;
assert!(err.is_err());
}
#[tokio::test]
async fn test_upgrade_up_to_date_extension() {
let dir = tempfile::tempdir().expect("temp dir");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).unwrap();
let wasm_path = channels_dir.join("test-channel.wasm");
std::fs::write(&wasm_path, b"\0asm fake").unwrap();
let cap_path = channels_dir.join("test-channel.capabilities.json");
let caps = serde_json::json!({
"type": "channel",
"name": "test-channel",
"wit_version": crate::tools::wasm::WIT_CHANNEL_VERSION,
});
std::fs::write(&cap_path, serde_json::to_string(&caps).unwrap()).unwrap();
let manager = make_manager_custom_dirs(dir.path().join("tools"), channels_dir);
let result = manager.upgrade(Some("test-channel"), "test").await.unwrap();
assert_eq!(result.results.len(), 1);
assert_eq!(result.results[0].status, "already_up_to_date");
}
#[tokio::test]
async fn test_upgrade_outdated_not_in_registry() {
let dir = tempfile::tempdir().expect("temp dir");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).unwrap();
let wasm_path = channels_dir.join("custom-channel.wasm");
std::fs::write(&wasm_path, b"\0asm fake").unwrap();
let cap_path = channels_dir.join("custom-channel.capabilities.json");
let caps = serde_json::json!({
"type": "channel",
"name": "custom-channel",
"wit_version": "0.1.0",
});
std::fs::write(&cap_path, serde_json::to_string(&caps).unwrap()).unwrap();
let manager = make_manager_custom_dirs(dir.path().join("tools"), channels_dir);
let result = manager
.upgrade(Some("custom-channel"), "test")
.await
.unwrap();
assert_eq!(result.results.len(), 1);
assert_eq!(result.results[0].status, "not_in_registry");
}
fn make_manager_with_temp_dirs() -> ExtensionManager {
let dir = tempfile::tempdir().expect("temp dir");
make_manager_custom_dirs(dir.path().join("tools"), dir.path().join("channels"))
}
fn make_manager_custom_dirs(
tools_dir: std::path::PathBuf,
channels_dir: std::path::PathBuf,
) -> ExtensionManager {
use crate::secrets::{InMemorySecretsStore, SecretsCrypto};
use crate::testing::credentials::TEST_CRYPTO_KEY;
use crate::tools::ToolRegistry;
use crate::tools::mcp::process::McpProcessManager;
use crate::tools::mcp::session::McpSessionManager;
std::fs::create_dir_all(&tools_dir).ok();
std::fs::create_dir_all(&channels_dir).ok();
let master_key = secrecy::SecretString::from(TEST_CRYPTO_KEY.to_string());
let crypto = Arc::new(
SecretsCrypto::new(master_key)
.unwrap_or_else(|err| panic!("failed to construct test crypto: {err}")),
);
ExtensionManager::new(
Arc::new(McpSessionManager::new()),
Arc::new(McpProcessManager::new()),
Arc::new(InMemorySecretsStore::new(crypto)),
Arc::new(ToolRegistry::new()),
None,
None,
tools_dir,
channels_dir,
None,
"test".to_string(),
None,
Vec::new(),
)
}
fn make_test_loaded_channel(
runtime: Arc<WasmChannelRuntime>,
name: &str,
pairing_store: Arc<PairingStore>,
) -> LoadedChannel {
let prepared = Arc::new(PreparedChannelModule::for_testing(
name,
format!("Mock channel: {}", name),
));
let capabilities =
ChannelCapabilities::for_channel(name).with_path(format!("/webhook/{}", name));
LoadedChannel {
channel: WasmChannel::new(
runtime,
prepared,
capabilities,
"default",
"{}".to_string(),
pairing_store,
None,
),
capabilities_file: None,
}
}
#[test]
fn test_telegram_hot_activation_runtime_config_includes_owner_id() -> Result<(), String> {
let updates = build_wasm_channel_runtime_config_updates(
Some("https://example.test"),
Some("secret-123"),
Some(424242),
);
require_eq(
updates.get("tunnel_url"),
Some(&serde_json::json!("https://example.test")),
"tunnel_url",
)?;
require_eq(
updates.get("webhook_secret"),
Some(&serde_json::json!("secret-123")),
"webhook_secret",
)?;
require_eq(
updates.get("owner_id"),
Some(&serde_json::json!(424242)),
"owner_id",
)
}
#[tokio::test]
async fn test_current_channel_owner_id_uses_runtime_state() -> Result<(), String> {
let manager = make_manager_with_temp_dirs();
if manager.current_channel_owner_id("telegram").await.is_some() {
return Err("expected no owner id for telegram before runtime setup".to_string());
}
let channels = Arc::new(crate::channels::ChannelManager::new());
let runtime = Arc::new(
crate::channels::wasm::WasmChannelRuntime::new(
crate::channels::wasm::WasmChannelRuntimeConfig::default(),
)
.map_err(|e| format!("runtime init failed: {e}"))?,
);
let pairing_store = Arc::new(crate::pairing::PairingStore::new());
let router = Arc::new(crate::channels::wasm::WasmChannelRouter::new());
let mut owner_ids = std::collections::HashMap::new();
owner_ids.insert("telegram".to_string(), 12345_i64);
manager
.set_channel_runtime(channels, runtime, pairing_store, router, owner_ids)
.await;
if manager.current_channel_owner_id("telegram").await != Some(12345_i64) {
return Err("expected runtime owner id fast-path for telegram".to_string());
}
if manager.current_channel_owner_id("slack").await.is_some() {
return Err("expected no owner id for slack".to_string());
}
Ok(())
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_telegram_hot_activation_configure_uses_mock_loader_and_persists_state()
-> Result<(), String> {
let dir = tempfile::tempdir().map_err(|err| format!("temp dir: {err}"))?;
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).map_err(|err| format!("channels dir: {err}"))?;
std::fs::write(channels_dir.join("telegram.wasm"), b"mock")
.map_err(|err| format!("write wasm: {err}"))?;
std::fs::write(
channels_dir.join("telegram.capabilities.json"),
serde_json::to_vec(&serde_json::json!({
"type": "channel",
"name": "telegram",
"setup": {
"required_secrets": [
{
"name": "telegram_bot_token",
"prompt": "Enter your Telegram Bot API token (from @BotFather)",
"optional": false
}
]
},
"capabilities": {
"channel": {
"allowed_paths": ["/webhook/telegram"]
}
},
"config": {
"owner_id": null
}
}))
.map_err(|err| format!("serialize capabilities: {err}"))?,
)
.map_err(|err| format!("write capabilities: {err}"))?;
let (db, _db_tmp) = crate::testing::test_db().await;
let manager = {
use crate::secrets::{InMemorySecretsStore, SecretsCrypto};
use crate::testing::credentials::TEST_CRYPTO_KEY;
use crate::tools::ToolRegistry;
use crate::tools::mcp::process::McpProcessManager;
use crate::tools::mcp::session::McpSessionManager;
let master_key = secrecy::SecretString::from(TEST_CRYPTO_KEY.to_string());
let crypto = Arc::new(
SecretsCrypto::new(master_key)
.unwrap_or_else(|err| panic!("failed to construct test crypto: {err}")),
);
ExtensionManager::new(
Arc::new(McpSessionManager::new()),
Arc::new(McpProcessManager::new()),
Arc::new(InMemorySecretsStore::new(crypto)),
Arc::new(ToolRegistry::new()),
None,
None,
dir.path().join("tools"),
channels_dir.clone(),
None,
"test".to_string(),
Some(db),
Vec::new(),
)
};
let channel_manager = Arc::new(ChannelManager::new());
let runtime = Arc::new(
WasmChannelRuntime::new(WasmChannelRuntimeConfig::for_testing())
.map_err(|err| format!("runtime: {err}"))?,
);
let pairing_store = Arc::new(PairingStore::with_base_dir(
dir.path().join("pairing-state"),
));
let router = Arc::new(WasmChannelRouter::new());
manager
.set_channel_runtime(
Arc::clone(&channel_manager),
Arc::clone(&runtime),
Arc::clone(&pairing_store),
Arc::clone(&router),
std::collections::HashMap::new(),
)
.await;
manager
.set_test_wasm_channel_loader(Arc::new({
let runtime = Arc::clone(&runtime);
let pairing_store = Arc::clone(&pairing_store);
move |name| {
Ok(make_test_loaded_channel(
Arc::clone(&runtime),
name,
Arc::clone(&pairing_store),
))
}
}))
.await;
manager
.set_test_telegram_binding_resolver(Arc::new(|_token, existing_owner_id| {
if existing_owner_id.is_some() {
return Err(ExtensionError::Other(
"owner binding should be derived during setup".to_string(),
));
}
Ok(TelegramBindingResult::Bound(TelegramBindingData {
owner_id: 424242,
bot_username: Some("test_hot_bot".to_string()),
binding_state: TelegramOwnerBindingState::VerifiedNow,
}))
}))
.await;
manager
.activation_errors
.write()
.await
.insert("telegram".to_string(), "stale failure".to_string());
let result = manager
.configure(
"telegram",
&std::collections::HashMap::from([(
"telegram_bot_token".to_string(),
"123456789:ABCdefGhI".to_string(),
)]),
&std::collections::HashMap::new(),
"test",
)
.await
.map_err(|err| format!("configure succeeds: {err}"))?;
require(result.activated, "expected hot activation to succeed")?;
require(
result.message.contains("activated"),
format!("unexpected message: {}", result.message),
)?;
require(
!manager
.activation_errors
.read()
.await
.contains_key("telegram"),
"successful configure should clear stale activation errors",
)?;
require(
manager
.active_channel_names
.read()
.await
.contains("telegram"),
"telegram should be marked active after hot activation",
)?;
require(
channel_manager.get_channel("telegram").await.is_some(),
"telegram should be hot-added to the running channel manager",
)?;
require_eq(
manager.load_persisted_active_channels("test").await,
vec!["telegram".to_string()],
"persisted active channels",
)?;
require_eq(
manager.current_channel_owner_id("telegram").await,
Some(424242),
"current owner id",
)?;
require(
manager.has_wasm_channel_owner_binding("telegram").await,
"telegram should report an explicit owner binding after setup".to_string(),
)?;
let owner_setting = manager
.store
.as_ref()
.ok_or_else(|| "db-backed manager missing".to_string())?
.get_setting("test", "channels.wasm_channel_owner_ids.telegram")
.await
.map_err(|err| format!("owner_id setting query: {err}"))?;
require_eq(
owner_setting,
Some(serde_json::json!(424242)),
"owner setting",
)?;
let bot_username_setting = manager
.store
.as_ref()
.ok_or_else(|| "db-backed manager missing".to_string())?
.get_setting("test", &bot_username_setting_key("telegram"))
.await
.map_err(|err| format!("bot username setting query: {err}"))?;
require_eq(
bot_username_setting,
Some(serde_json::json!("test_hot_bot")),
"bot username setting",
)
}
#[tokio::test]
async fn test_telegram_hot_activation_returns_verification_challenge_before_binding()
-> Result<(), String> {
let dir = tempfile::tempdir().map_err(|err| format!("temp dir: {err}"))?;
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).map_err(|err| format!("channels dir: {err}"))?;
std::fs::write(channels_dir.join("telegram.wasm"), b"mock")
.map_err(|err| format!("write wasm: {err}"))?;
std::fs::write(
channels_dir.join("telegram.capabilities.json"),
serde_json::to_vec(&serde_json::json!({
"type": "channel",
"name": "telegram",
"setup": {
"required_secrets": [
{
"name": "telegram_bot_token",
"prompt": "Enter your Telegram Bot API token (from @BotFather)",
"optional": false
}
]
},
"capabilities": {
"channel": {
"allowed_paths": ["/webhook/telegram"]
}
}
}))
.map_err(|err| format!("serialize capabilities: {err}"))?,
)
.map_err(|err| format!("write capabilities: {err}"))?;
let manager =
make_manager_custom_dirs(dir.path().join("tools"), dir.path().join("channels"));
manager
.set_test_telegram_binding_resolver(Arc::new(|_token, existing_owner_id| {
if existing_owner_id.is_some() {
return Err(ExtensionError::Other(
"owner binding should not exist before verification".to_string(),
));
}
Ok(TelegramBindingResult::Pending(VerificationChallenge {
code: "iclaw-7qk2m9".to_string(),
instructions:
"Send `/start iclaw-7qk2m9` to @test_hot_bot in Telegram. IronClaw will finish setup automatically."
.to_string(),
deep_link: Some("https://t.me/test_hot_bot?start=iclaw-7qk2m9".to_string()),
}))
}))
.await;
let result = manager
.configure(
"telegram",
&std::collections::HashMap::from([(
"telegram_bot_token".to_string(),
"123456789:ABCdefGhI".to_string(),
)]),
&std::collections::HashMap::new(),
"test",
)
.await
.map_err(|err| format!("configure returned challenge: {err}"))?;
require(
!result.activated,
"expected setup to pause for verification",
)?;
require(
result.verification.as_ref().map(|v| v.code.as_str()) == Some("iclaw-7qk2m9"),
"expected verification code in configure result",
)?;
require(
!manager
.active_channel_names
.read()
.await
.contains("telegram"),
"telegram should not activate until owner verification completes",
)
}
#[cfg(feature = "libsql")]
#[tokio::test]
async fn test_current_channel_owner_id_uses_store_fallback() -> Result<(), String> {
use crate::db::{Database, SettingsStore};
let dir = tempfile::tempdir().map_err(|e| format!("tempdir failed: {e}"))?;
let db_path = dir.path().join("owner-id.db");
let db = Arc::new(
crate::db::libsql::LibSqlBackend::new_local(&db_path)
.await
.map_err(|e| format!("create local libsql backend failed: {e}"))?,
);
db.run_migrations()
.await
.map_err(|e| format!("run libsql migrations failed: {e}"))?;
let tools_dir = dir.path().join("tools");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&tools_dir).ok();
std::fs::create_dir_all(&channels_dir).ok();
use crate::secrets::{InMemorySecretsStore, SecretsCrypto};
use crate::testing::credentials::TEST_CRYPTO_KEY;
use crate::tools::ToolRegistry;
use crate::tools::mcp::process::McpProcessManager;
use crate::tools::mcp::session::McpSessionManager;
let master_key = secrecy::SecretString::from(TEST_CRYPTO_KEY.to_string());
let crypto = Arc::new(
SecretsCrypto::new(master_key)
.map_err(|e| format!("create secrets crypto failed: {e}"))?,
);
let manager = ExtensionManager::new(
Arc::new(McpSessionManager::new()),
Arc::new(McpProcessManager::new()),
Arc::new(InMemorySecretsStore::new(crypto)),
Arc::new(ToolRegistry::new()),
None,
None,
tools_dir,
channels_dir,
None,
"test".to_string(),
Some(db.clone() as Arc<dyn crate::db::Database>),
Vec::new(),
);
if manager.current_channel_owner_id("telegram").await.is_some() {
return Err("expected no owner id before settings seed".to_string());
}
db.set_setting(
"test",
"channels.wasm_channel_owner_ids.telegram",
&serde_json::json!(54321_i64),
)
.await
.map_err(|e| format!("persist owner id in settings failed: {e}"))?;
if manager.current_channel_owner_id("telegram").await != Some(54321_i64) {
return Err("expected store fallback owner id for telegram".to_string());
}
let channels = Arc::new(crate::channels::ChannelManager::new());
let runtime = Arc::new(
crate::channels::wasm::WasmChannelRuntime::new(
crate::channels::wasm::WasmChannelRuntimeConfig::default(),
)
.map_err(|e| format!("runtime init failed: {e}"))?,
);
let pairing_store = Arc::new(crate::pairing::PairingStore::new());
let router = Arc::new(crate::channels::wasm::WasmChannelRouter::new());
let mut owner_ids = std::collections::HashMap::new();
owner_ids.insert("telegram".to_string(), 12345_i64);
manager
.set_channel_runtime(channels, runtime, pairing_store, router, owner_ids)
.await;
if manager.current_channel_owner_id("telegram").await != Some(12345_i64) {
return Err("expected runtime fast-path owner id precedence".to_string());
}
Ok(())
}
#[tokio::test]
async fn test_notify_telegram_owner_verified_sends_confirmation_for_new_binding()
-> Result<(), String> {
let dir = tempfile::tempdir().map_err(|err| format!("temp dir: {err}"))?;
let manager =
make_manager_custom_dirs(dir.path().join("tools"), dir.path().join("channels"));
let channel_manager = Arc::new(ChannelManager::new());
let broadcasts = Arc::new(tokio::sync::Mutex::new(Vec::new()));
channel_manager
.add(Box::new(RecordingChannel {
name: "telegram".to_string(),
broadcasts: Arc::clone(&broadcasts),
}))
.await;
manager
.channel_runtime
.write()
.await
.replace(ChannelRuntimeState {
channel_manager,
wasm_channel_runtime: Arc::new(
WasmChannelRuntime::new(WasmChannelRuntimeConfig::for_testing())
.map_err(|err| format!("runtime: {err}"))?,
),
pairing_store: Arc::new(PairingStore::with_base_dir(dir.path().join("pairing"))),
wasm_channel_router: Arc::new(WasmChannelRouter::new()),
wasm_channel_owner_ids: std::collections::HashMap::new(),
});
manager
.notify_telegram_owner_verified(
"telegram",
Some(&TelegramBindingData {
owner_id: 424242,
bot_username: Some("test_hot_bot".to_string()),
binding_state: TelegramOwnerBindingState::VerifiedNow,
}),
)
.await;
let sent = broadcasts.lock().await;
require_eq(sent.len(), 1, "broadcast count")?;
require_eq(sent[0].0.clone(), "424242".to_string(), "broadcast user_id")?;
require(
sent[0].1.content.contains("Telegram owner verified"),
"confirmation DM should acknowledge owner verification",
)
}
#[tokio::test]
async fn test_notify_telegram_owner_verified_skips_existing_binding() -> Result<(), String> {
let dir = tempfile::tempdir().map_err(|err| format!("temp dir: {err}"))?;
let manager =
make_manager_custom_dirs(dir.path().join("tools"), dir.path().join("channels"));
let channel_manager = Arc::new(ChannelManager::new());
let broadcasts = Arc::new(tokio::sync::Mutex::new(Vec::new()));
channel_manager
.add(Box::new(RecordingChannel {
name: "telegram".to_string(),
broadcasts: Arc::clone(&broadcasts),
}))
.await;
manager
.channel_runtime
.write()
.await
.replace(ChannelRuntimeState {
channel_manager,
wasm_channel_runtime: Arc::new(
WasmChannelRuntime::new(WasmChannelRuntimeConfig::for_testing())
.map_err(|err| format!("runtime: {err}"))?,
),
pairing_store: Arc::new(PairingStore::with_base_dir(dir.path().join("pairing"))),
wasm_channel_router: Arc::new(WasmChannelRouter::new()),
wasm_channel_owner_ids: std::collections::HashMap::new(),
});
manager
.notify_telegram_owner_verified(
"telegram",
Some(&TelegramBindingData {
owner_id: 424242,
bot_username: Some("test_hot_bot".to_string()),
binding_state: TelegramOwnerBindingState::Existing,
}),
)
.await;
require(
broadcasts.lock().await.is_empty(),
"existing owner bindings should not trigger another confirmation DM",
)
}
#[test]
fn test_security_prefix_check() {
let placeholders = vec![
"ICTEST1_BOT_TOKEN".to_string(), "ICTEST2_TOKEN".to_string(), "ICTEST1_UNRELATED_OTHER".to_string(), ];
let already_injected = std::collections::HashSet::new();
unsafe { std::env::set_var("ICTEST1_BOT_TOKEN", "good-secret") };
unsafe { std::env::set_var("ICTEST2_TOKEN", "bad-secret") };
let resolved = super::resolve_env_credentials(&placeholders, "ictest1", &already_injected);
assert_eq!(resolved.len(), 1);
assert_eq!(resolved[0].0, "ICTEST1_BOT_TOKEN");
assert_eq!(resolved[0].1, "good-secret");
unsafe { std::env::remove_var("ICTEST1_BOT_TOKEN") };
unsafe { std::env::remove_var("ICTEST2_TOKEN") };
}
#[test]
fn test_already_injected_skipped() {
let placeholders = vec!["ICTEST3_TOKEN".to_string()];
let mut already_injected = std::collections::HashSet::new();
already_injected.insert("ICTEST3_TOKEN".to_string());
unsafe { std::env::set_var("ICTEST3_TOKEN", "secret") };
let resolved = super::resolve_env_credentials(&placeholders, "ictest3", &already_injected);
assert!(resolved.is_empty());
unsafe { std::env::remove_var("ICTEST3_TOKEN") };
}
#[test]
fn test_missing_env_var_not_injected() {
let placeholders = vec!["ICTEST4_TOKEN".to_string()];
let already_injected = std::collections::HashSet::new();
unsafe { std::env::remove_var("ICTEST4_TOKEN") };
let resolved = super::resolve_env_credentials(&placeholders, "ictest4", &already_injected);
assert!(resolved.is_empty());
}
#[test]
fn test_empty_env_var_not_injected() {
let placeholders = vec!["ICTEST5_TOKEN".to_string()];
let already_injected = std::collections::HashSet::new();
unsafe { std::env::set_var("ICTEST5_TOKEN", "") };
let resolved = super::resolve_env_credentials(&placeholders, "ictest5", &already_injected);
assert!(resolved.is_empty());
unsafe { std::env::remove_var("ICTEST5_TOKEN") };
}
#[test]
fn test_empty_channel_name_returns_nothing() {
let placeholders = vec!["_TOKEN".to_string(), "ICTEST6_TOKEN".to_string()];
let already_injected = std::collections::HashSet::new();
unsafe { std::env::set_var("_TOKEN", "bad") };
unsafe { std::env::set_var("ICTEST6_TOKEN", "bad") };
let resolved = super::resolve_env_credentials(&placeholders, "", &already_injected);
assert!(resolved.is_empty(), "empty channel name must match nothing");
unsafe { std::env::remove_var("_TOKEN") };
unsafe { std::env::remove_var("ICTEST6_TOKEN") };
}
#[tokio::test]
async fn test_determine_installed_kind_does_not_auto_install_relay() {
let dir = tempfile::tempdir().expect("temp dir");
let mgr = make_test_manager(None, dir.path().to_path_buf());
assert!(
mgr.installed_relay_extensions.read().await.is_empty(),
"Should start with no installed relay extensions"
);
let result = mgr.determine_installed_kind("slack-relay", "test").await;
assert!(result.is_err(), "Should return NotInstalled");
assert!(
mgr.installed_relay_extensions.read().await.is_empty(),
"determine_installed_kind must not modify installed_relay_extensions"
);
}
#[tokio::test]
async fn test_is_relay_channel_returns_false_without_store() {
let dir = tempfile::tempdir().expect("temp dir");
let mgr = make_test_manager(None, dir.path().to_path_buf());
assert!(!mgr.is_relay_channel("slack-relay", "test").await);
}
#[tokio::test]
async fn test_activate_channel_relay_without_store_returns_auth_required() {
let dir = tempfile::tempdir().expect("temp dir");
let mgr = make_test_manager(None, dir.path().to_path_buf());
let err = mgr
.activate_channel_relay("slack-relay", "test")
.await
.unwrap_err();
assert!(
matches!(err, ExtensionError::AuthRequired),
"expected AuthRequired, got: {err:?}"
);
}
#[tokio::test]
async fn test_remove_relay_shuts_down_via_relay_channel_manager() {
let dir = tempfile::tempdir().expect("temp dir");
let mgr = make_test_manager(None, dir.path().to_path_buf());
let cm = Arc::new(crate::channels::ChannelManager::new());
let (stub, _tx) = crate::testing::StubChannel::new("slack-relay");
cm.add(Box::new(stub)).await;
mgr.set_relay_channel_manager(Arc::clone(&cm)).await;
mgr.installed_relay_extensions
.write()
.await
.insert("slack-relay".to_string());
*mgr.relay_event_tx.lock().await = Some(tokio::sync::mpsc::channel(1).0);
if let Ok(mut cache) = mgr.relay_signing_secret_cache.lock() {
*cache = Some(vec![9u8; 32]);
}
if let Some(ref store) = mgr.store {
store
.set_setting(
"test",
"relay:slack-relay:team_id",
&serde_json::json!("T123"),
)
.await
.expect("store team_id");
}
assert!(cm.get_channel("slack-relay").await.is_some());
let result = mgr.remove("slack-relay", "test").await;
assert!(result.is_ok(), "remove should succeed: {:?}", result.err());
assert!(
!mgr.installed_relay_extensions
.read()
.await
.contains("slack-relay"),
"Should be removed from installed set"
);
assert!(
mgr.relay_event_tx.lock().await.is_none(),
"relay event sender should be cleared on remove"
);
assert!(
mgr.relay_signing_secret().is_none(),
"relay signing secret cache should be cleared on remove"
);
assert!(
cm.get_channel("slack-relay").await.is_none(),
"relay channel should be removed from the channel manager"
);
}
#[tokio::test]
async fn test_remove_wasm_tool_clears_pending_oauth_state_and_activation_error() {
let dir = tempfile::tempdir().expect("temp dir");
let mgr = make_test_manager(None, dir.path().to_path_buf());
std::fs::write(dir.path().join("gmail.wasm"), b"fake-tool").expect("write tool");
let listener = tokio::spawn(async {
std::future::pending::<()>().await;
});
let abort_handle = listener.abort_handle();
mgr.pending_auth.write().await.insert(
"gmail".to_string(),
super::PendingAuth {
_name: "gmail".to_string(),
_kind: ExtensionKind::WasmTool,
created_at: std::time::Instant::now(),
task_handle: Some(listener),
},
);
mgr.activation_errors
.write()
.await
.insert("gmail".to_string(), "cached failure".to_string());
let secrets = Arc::clone(&mgr.secrets);
mgr.pending_oauth_flows().write().await.insert(
"gmail-state".to_string(),
crate::cli::oauth_defaults::PendingOAuthFlow {
extension_name: "gmail".to_string(),
display_name: "Gmail".to_string(),
token_url: "https://example.com/token".to_string(),
client_id: "client123".to_string(),
client_secret: None,
redirect_uri: "https://example.com/oauth/callback".to_string(),
code_verifier: None,
access_token_field: "access_token".to_string(),
secret_name: "google_oauth_token".to_string(),
provider: None,
validation_endpoint: None,
scopes: vec![],
user_id: "test".to_string(),
secrets: Arc::clone(&secrets),
sse_manager: None,
gateway_token: None,
token_exchange_extra_params: std::collections::HashMap::new(),
client_id_secret_name: None,
created_at: std::time::Instant::now(),
},
);
mgr.pending_oauth_flows().write().await.insert(
"other-state".to_string(),
crate::cli::oauth_defaults::PendingOAuthFlow {
extension_name: "web-search".to_string(),
display_name: "Web Search".to_string(),
token_url: "https://example.com/token".to_string(),
client_id: "client456".to_string(),
client_secret: None,
redirect_uri: "https://example.com/oauth/callback".to_string(),
code_verifier: None,
access_token_field: "access_token".to_string(),
secret_name: "other_token".to_string(),
provider: None,
validation_endpoint: None,
scopes: vec![],
user_id: "test".to_string(),
secrets,
sse_manager: None,
gateway_token: None,
token_exchange_extra_params: std::collections::HashMap::new(),
client_id_secret_name: None,
created_at: std::time::Instant::now(),
},
);
let result = mgr.remove("gmail", "test").await;
assert!(result.is_ok(), "remove should succeed: {:?}", result.err());
tokio::task::yield_now().await;
assert!(
mgr.pending_auth.read().await.get("gmail").is_none(),
"pending auth entry should be removed"
);
assert!(
abort_handle.is_finished(),
"pending auth listener should be aborted"
);
assert!(
!mgr.activation_errors.read().await.contains_key("gmail"),
"stale activation error should be cleared"
);
let flows = mgr.pending_oauth_flows().read().await;
assert!(
!flows.contains_key("gmail-state"),
"gateway OAuth flow for removed extension should be cleared"
);
assert!(
flows.contains_key("other-state"),
"unrelated pending OAuth flows should be retained"
);
}
#[tokio::test]
async fn test_remove_wasm_channel_clears_activation_error_and_deletes_files() {
let dir = tempfile::tempdir().expect("temp dir");
let tools_dir = dir.path().join("tools");
let channels_dir = dir.path().join("channels");
let mgr = make_test_manager_with_dirs(None, tools_dir, channels_dir.clone(), None);
let wasm_path = channels_dir.join("telegram.wasm");
let cap_path = channels_dir.join("telegram.capabilities.json");
std::fs::write(&wasm_path, b"fake-channel").expect("write channel");
std::fs::write(&cap_path, b"{}").expect("write capabilities");
mgr.activation_errors
.write()
.await
.insert("telegram".to_string(), "channel failed".to_string());
let result = mgr.remove("telegram", "test").await;
assert!(result.is_ok(), "remove should succeed: {:?}", result.err());
assert!(
!mgr.activation_errors.read().await.contains_key("telegram"),
"channel activation error should be cleared on remove"
);
assert!(
!wasm_path.exists(),
"channel wasm file should be deleted on remove"
);
assert!(
!cap_path.exists(),
"channel capabilities file should be deleted on remove"
);
}
#[test]
fn test_sanitize_url_with_query_params() {
let url = "https://api.example.com/path?api_key=secret123&token=abc";
let result = super::sanitize_url_for_logging(url);
assert_eq!(result, "https://api.example.com/path");
assert!(!result.contains("api_key"));
assert!(!result.contains("secret123"));
assert!(!result.contains("token"));
}
#[test]
fn test_sanitize_url_with_credentials() {
let url = "https://user:password@api.example.com:8080/path";
let result = super::sanitize_url_for_logging(url);
assert!(!result.contains("user"));
assert!(!result.contains("password"));
assert!(!result.contains("@"));
assert!(result.contains("api.example.com"));
assert!(result.contains(":8080"));
}
#[test]
fn test_sanitize_url_with_fragment() {
let url = "https://api.example.com/path#section";
let result = super::sanitize_url_for_logging(url);
assert_eq!(result, "https://api.example.com/path");
assert!(!result.contains("#"));
assert!(!result.contains("section"));
}
#[test]
fn test_sanitize_url_with_port() {
let url = "https://api.example.com:9443/path?key=value";
let result = super::sanitize_url_for_logging(url);
assert_eq!(result, "https://api.example.com:9443/path");
assert!(result.contains(":9443"));
assert!(!result.contains("key"));
}
#[test]
fn test_sanitize_url_with_all_components() {
let url = "https://admin:secret@api.example.com:8080/v1/data?api_key=xyz#results";
let result = super::sanitize_url_for_logging(url);
assert!(!result.contains("admin"));
assert!(!result.contains("secret"));
assert!(!result.contains("@"));
assert!(!result.contains("api_key"));
assert!(!result.contains("xyz"));
assert!(!result.contains("#"));
assert!(!result.contains("results"));
assert!(result.contains("api.example.com:8080"));
assert!(result.contains("/v1/data"));
}
#[test]
fn test_sanitize_url_malformed() {
let url = "https://[invalid-url";
let result = super::sanitize_url_for_logging(url);
assert_eq!(result, url);
let url_with_query = "https://[invalid-url?key=secret";
let result_with_query = super::sanitize_url_for_logging(url_with_query);
assert_eq!(result_with_query, "https://[invalid-url");
assert!(!result_with_query.contains("?"));
assert!(!result_with_query.contains("secret"));
}
#[test]
fn test_sanitize_url_short_string() {
let url = "short";
let result = super::sanitize_url_for_logging(url);
assert_eq!(result, "short");
}
#[test]
fn test_sanitize_url_not_url_like() {
let input = "this is not a url";
let result = super::sanitize_url_for_logging(input);
assert_eq!(result, input);
}
#[test]
fn test_sanitize_url_preserves_path() {
let url = "https://api.example.com/v1/users/123/profile";
let result = super::sanitize_url_for_logging(url);
assert_eq!(result, url);
assert!(result.contains("/v1/users/123/profile"));
}
fn make_manager_with_tunnel(tunnel_url: Option<String>) -> ExtensionManager {
use crate::secrets::{InMemorySecretsStore, SecretsCrypto};
use crate::tools::mcp::process::McpProcessManager;
use crate::tools::mcp::session::McpSessionManager;
let key = secrecy::SecretString::from(crate::secrets::keychain::generate_master_key_hex());
let crypto = Arc::new(SecretsCrypto::new(key).expect("crypto"));
let secrets: Arc<dyn crate::secrets::SecretsStore + Send + Sync> =
Arc::new(InMemorySecretsStore::new(crypto));
let tools = Arc::new(crate::tools::ToolRegistry::new());
let mcp = Arc::new(McpSessionManager::new());
let dir = std::env::temp_dir().join("ironclaw-test-gateway-mode");
ExtensionManager::new(
mcp,
Arc::new(McpProcessManager::new()),
secrets,
tools,
None,
None,
dir.clone(),
dir,
tunnel_url,
"test".to_string(),
None,
vec![],
)
}
#[test]
fn should_use_gateway_mode_true_for_tunnel_url() {
let _guard = crate::config::helpers::lock_env();
let original = std::env::var("IRONCLAW_OAUTH_CALLBACK_URL").ok();
unsafe {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
let mgr = make_manager_with_tunnel(Some("https://my-gateway.example.com".into()));
assert!(
mgr.should_use_gateway_mode(),
"should detect gateway mode from tunnel_url"
);
unsafe {
if let Some(val) = original {
std::env::set_var("IRONCLAW_OAUTH_CALLBACK_URL", val);
}
}
}
#[test]
fn should_use_gateway_mode_false_without_tunnel() {
let _guard = crate::config::helpers::lock_env();
let original = std::env::var("IRONCLAW_OAUTH_CALLBACK_URL").ok();
unsafe {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
let mgr = make_manager_with_tunnel(None);
assert!(
!mgr.should_use_gateway_mode(),
"should not detect gateway mode without tunnel_url or env var"
);
unsafe {
if let Some(val) = original {
std::env::set_var("IRONCLAW_OAUTH_CALLBACK_URL", val);
}
}
}
#[test]
fn should_use_gateway_mode_false_for_loopback_tunnel() {
let _guard = crate::config::helpers::lock_env();
let original = std::env::var("IRONCLAW_OAUTH_CALLBACK_URL").ok();
unsafe {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
let mgr = make_manager_with_tunnel(Some("http://127.0.0.1:3001".into()));
assert!(
!mgr.should_use_gateway_mode(),
"should not detect gateway mode for loopback tunnel_url"
);
unsafe {
if let Some(val) = original {
std::env::set_var("IRONCLAW_OAUTH_CALLBACK_URL", val);
}
}
}
struct EnvGuard {
original: Option<String>,
_mutex: std::sync::MutexGuard<'static, ()>,
}
impl EnvGuard {
fn new() -> Self {
let guard = crate::config::helpers::lock_env();
let original = std::env::var("IRONCLAW_OAUTH_CALLBACK_URL").ok();
unsafe {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
Self {
original,
_mutex: guard,
}
}
}
impl Drop for EnvGuard {
fn drop(&mut self) {
unsafe {
if let Some(ref val) = self.original {
std::env::set_var("IRONCLAW_OAUTH_CALLBACK_URL", val);
} else {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
}
}
}
#[tokio::test]
async fn gateway_callback_redirect_uri_from_tunnel_url() {
let _env = EnvGuard::new();
let mgr = make_manager_with_tunnel(Some("https://my-gateway.example.com".into()));
assert_eq!(
mgr.gateway_callback_redirect_uri().await,
Some("https://my-gateway.example.com/oauth/callback".to_string()),
);
}
#[tokio::test]
async fn gateway_callback_redirect_uri_none_without_tunnel() {
let _env = EnvGuard::new();
let mgr = make_manager_with_tunnel(None);
assert_eq!(mgr.gateway_callback_redirect_uri().await, None);
}
#[tokio::test]
async fn gateway_callback_redirect_uri_trims_trailing_slash() {
let _env = EnvGuard::new();
let mgr = make_manager_with_tunnel(Some("https://my-gateway.example.com/".into()));
assert_eq!(
mgr.gateway_callback_redirect_uri().await,
Some("https://my-gateway.example.com/oauth/callback".to_string()),
);
}
#[test]
fn gateway_callback_redirect_uri_does_not_duplicate_callback_path_from_env() {
let _guard = crate::config::helpers::lock_env();
let original = std::env::var("IRONCLAW_OAUTH_CALLBACK_URL").ok();
unsafe {
std::env::set_var(
"IRONCLAW_OAUTH_CALLBACK_URL",
"https://oauth.test.example/oauth/callback",
);
}
let mgr = make_manager_with_tunnel(None);
assert_eq!(
tokio_test::block_on(mgr.gateway_callback_redirect_uri()),
Some("https://oauth.test.example/oauth/callback".to_string()),
);
unsafe {
if let Some(val) = original {
std::env::set_var("IRONCLAW_OAUTH_CALLBACK_URL", val);
} else {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
}
}
#[test]
fn gateway_callback_redirect_uri_trims_trailing_slash_from_env_callback() {
let _guard = crate::config::helpers::lock_env();
let original = std::env::var("IRONCLAW_OAUTH_CALLBACK_URL").ok();
unsafe {
std::env::set_var(
"IRONCLAW_OAUTH_CALLBACK_URL",
"https://oauth.test.example/oauth/callback/",
);
}
let mgr = make_manager_with_tunnel(None);
assert_eq!(
tokio_test::block_on(mgr.gateway_callback_redirect_uri()),
Some("https://oauth.test.example/oauth/callback".to_string()),
);
unsafe {
if let Some(val) = original {
std::env::set_var("IRONCLAW_OAUTH_CALLBACK_URL", val);
} else {
std::env::remove_var("IRONCLAW_OAUTH_CALLBACK_URL");
}
}
}
#[test]
fn normalize_hosted_callback_url_preserves_query_params() {
assert_eq!(
normalize_hosted_callback_url("https://oauth.test.example?source=hosted"),
"https://oauth.test.example/oauth/callback?source=hosted"
);
assert_eq!(
normalize_hosted_callback_url(
"https://oauth.test.example/oauth/callback?source=hosted"
),
"https://oauth.test.example/oauth/callback?source=hosted"
);
}
#[test]
fn rewrite_oauth_state_param_updates_only_state_query_param() {
let auth_url =
"https://auth.example.com/authorize?client_id=abc&state=old-state&hint=state%3Dkeep";
assert_eq!(
ExtensionManager::rewrite_oauth_state_param(
auth_url.to_string(),
"old-state",
"new-hosted-state",
),
"https://auth.example.com/authorize?client_id=abc&state=new-hosted-state&hint=state%3Dkeep"
);
}
#[tokio::test]
async fn gateway_mode_enabled_explicitly() {
let _env = EnvGuard::new();
let mgr = make_manager_with_tunnel(None);
assert!(!mgr.should_use_gateway_mode());
mgr.enable_gateway_mode("https://my-gateway.example.com".into())
.await;
assert!(mgr.should_use_gateway_mode());
assert_eq!(
mgr.gateway_callback_redirect_uri().await,
Some("https://my-gateway.example.com/oauth/callback".to_string()),
);
}
#[tokio::test]
async fn test_configure_token_picks_first_missing_secret() {
let dir = tempfile::tempdir().expect("temp dir");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).unwrap();
std::fs::write(channels_dir.join("multi.wasm"), b"\0asm fake").unwrap();
let caps = serde_json::json!({
"type": "channel",
"name": "multi",
"setup": {
"required_secrets": [
{"name": "SECRET_A", "prompt": "Enter secret A (at least 30 chars for validation)"},
{"name": "SECRET_B", "prompt": "Enter secret B (at least 30 chars for validation)"}
]
}
});
std::fs::write(
channels_dir.join("multi.capabilities.json"),
serde_json::to_string(&caps).unwrap(),
)
.unwrap();
let mgr = make_manager_custom_dirs(dir.path().join("tools"), channels_dir);
mgr.secrets
.create(
"test",
crate::secrets::CreateSecretParams::new("SECRET_A", "value-a"),
)
.await
.expect("store SECRET_A");
let _result = mgr.configure_token("multi", "value-b", "test").await;
assert!(
mgr.secrets
.exists("test", "SECRET_B")
.await
.unwrap_or(false),
"configure_token should have stored SECRET_B (the first missing secret)"
);
}
#[tokio::test]
async fn test_auth_is_read_only_for_wasm_channel() {
let dir = tempfile::tempdir().expect("temp dir");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).unwrap();
std::fs::write(channels_dir.join("test-ch.wasm"), b"\0asm fake").unwrap();
let caps = serde_json::json!({
"type": "channel",
"name": "test-ch",
"setup": {
"required_secrets": [
{"name": "BOT_TOKEN", "prompt": "Enter bot token (at least 30 chars for prompt validation)"}
]
}
});
std::fs::write(
channels_dir.join("test-ch.capabilities.json"),
serde_json::to_string(&caps).unwrap(),
)
.unwrap();
let mgr = make_manager_custom_dirs(dir.path().join("tools"), channels_dir);
let result = mgr.auth("test-ch", "test").await;
assert!(result.is_ok(), "auth should succeed: {:?}", result.err());
assert!(
!mgr.secrets
.exists("test", "BOT_TOKEN")
.await
.unwrap_or(true),
"auth() must not create any secrets — it should be read-only"
);
}
#[tokio::test]
async fn test_telegram_auth_instructions_include_owner_verification_guidance()
-> Result<(), String> {
let dir = tempfile::tempdir().map_err(|err| format!("temp dir: {err}"))?;
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).map_err(|err| format!("channels dir: {err}"))?;
std::fs::write(channels_dir.join("telegram.wasm"), b"\0asm fake")
.map_err(|err| format!("write wasm: {err}"))?;
let caps = serde_json::json!({
"type": "channel",
"name": "telegram",
"setup": {
"required_secrets": [
{
"name": "telegram_bot_token",
"prompt": "Enter your Telegram Bot API token (from @BotFather)"
}
]
}
});
std::fs::write(
channels_dir.join("telegram.capabilities.json"),
serde_json::to_string(&caps).map_err(|err| format!("serialize caps: {err}"))?,
)
.map_err(|err| format!("write caps: {err}"))?;
let mgr = make_manager_custom_dirs(dir.path().join("tools"), channels_dir);
let result = mgr
.auth("telegram", "test")
.await
.map_err(|err| format!("telegram auth status: {err}"))?;
let instructions = result
.instructions()
.ok_or_else(|| "awaiting token instructions missing".to_string())?;
require(
instructions.contains("Telegram Bot API token"),
"telegram auth instructions should still ask for the bot token",
)?;
require(
instructions.contains("one-time verification code")
&& instructions.contains("/start CODE")
&& instructions.contains("finish setup automatically"),
"telegram auth instructions should explain the owner verification step",
)
}
#[tokio::test]
async fn test_send_telegram_text_message_posts_expected_payload() -> Result<(), String> {
use axum::{Json, Router, extract::State, routing::post};
let payloads = Arc::new(tokio::sync::Mutex::new(Vec::<serde_json::Value>::new()));
async fn handler(
State(payloads): State<Arc<tokio::sync::Mutex<Vec<serde_json::Value>>>>,
Json(payload): Json<serde_json::Value>,
) -> Json<serde_json::Value> {
payloads.lock().await.push(payload);
Json(serde_json::json!({ "ok": true, "result": {} }))
}
let app = Router::new()
.route("/sendMessage", post(handler))
.with_state(Arc::clone(&payloads));
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.map_err(|err| format!("bind listener: {err}"))?;
let addr = listener
.local_addr()
.map_err(|err| format!("listener addr: {err}"))?;
let server = tokio::spawn(async move {
let _ = axum::serve(listener, app).await;
});
let client = reqwest::Client::new();
send_telegram_text_message(
&client,
&format!("http://{addr}/sendMessage"),
424242,
"Verification received. Finishing setup...",
)
.await
.map_err(|err| format!("send message: {err}"))?;
let captured = tokio::time::timeout(std::time::Duration::from_secs(1), async {
loop {
let maybe_payload = { payloads.lock().await.first().cloned() };
if let Some(payload) = maybe_payload {
break payload;
}
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
}
})
.await
.map_err(|_| "timed out waiting for sendMessage payload".to_string())?;
server.abort();
require_eq(
captured["chat_id"].clone(),
serde_json::json!(424242),
"chat_id",
)?;
require_eq(
captured["text"].clone(),
serde_json::json!("Verification received. Finishing setup..."),
"text",
)
}
#[test]
fn test_telegram_message_matches_verification_code_variants() -> Result<(), String> {
require(
telegram_message_matches_verification_code("iclaw-7qk2m9", "iclaw-7qk2m9"),
"plain verification code should match",
)?;
require(
telegram_message_matches_verification_code("/start iclaw-7qk2m9", "iclaw-7qk2m9"),
"/start payload should match",
)?;
require(
telegram_message_matches_verification_code(
"Hi! My code is: iclaw-7qk2m9",
"iclaw-7qk2m9",
),
"conversational message containing the code should match",
)?;
require(
!telegram_message_matches_verification_code("/start something-else", "iclaw-7qk2m9"),
"wrong verification code should not match",
)
}
#[tokio::test]
async fn test_configure_dispatches_activation_by_kind() {
let dir = tempfile::tempdir().expect("temp dir");
let channels_dir = dir.path().join("channels");
std::fs::create_dir_all(&channels_dir).unwrap();
let mgr = make_manager_custom_dirs(dir.path().join("tools"), channels_dir);
mgr.installed_relay_extensions
.write()
.await
.insert("test-relay".to_string());
let result = mgr
.configure(
"test-relay",
&std::collections::HashMap::new(),
&std::collections::HashMap::new(),
"test",
)
.await;
assert!(
result.is_ok(),
"configure should return Ok: {:?}",
result.err()
);
let result = result.unwrap();
assert!(
!result.activated,
"activation should fail without relay config"
);
assert!(
!result.message.contains("WASM"),
"error should not mention WASM — got: {}",
result.message
);
}
#[test]
fn test_validation_failed_is_distinct_error_variant() {
let err = ExtensionError::ValidationFailed("Invalid token".to_string());
assert!(
matches!(err, ExtensionError::ValidationFailed(_)),
"Should match ValidationFailed variant"
);
assert!(
!matches!(err, ExtensionError::Other(_)),
"Must NOT match Other variant"
);
assert!(
!matches!(err, ExtensionError::AuthFailed(_)),
"Must NOT match AuthFailed variant"
);
let msg = err.to_string();
assert!(
msg.contains("validation failed"),
"Display should contain 'validation failed', got: {msg}"
);
}
#[test]
fn test_telegram_token_colon_preserved_in_validation_url() {
let endpoint_template = "https://api.telegram.org/bot{telegram_bot_token}/getMe";
let secret_name = "telegram_bot_token";
let token = "123456789:AABBccDDeeFFgg_Test-Token";
let url = endpoint_template.replace(&format!("{{{}}}", secret_name), token);
let expected = "https://api.telegram.org/bot123456789:AABBccDDeeFFgg_Test-Token/getMe";
if url != expected {
panic!("URL mismatch: expected {expected}, got {url}"); }
if url.contains("%3A") {
panic!("URL contains URL-encoded colon (%3A): {url}"); }
if !url.contains("123456789:AABBccDDeeFFgg_Test-Token") {
panic!("URL missing token: {url}"); }
}
#[test]
fn test_proxy_client_secret_suppressed_when_builtin_matches_with_exchange_proxy() {
let builtin = crate::cli::oauth_defaults::builtin_credentials("google_oauth_token");
let builtin_ref = builtin.as_ref();
let secret = Some(builtin_ref.unwrap().client_secret.to_string());
let result =
crate::cli::oauth_defaults::hosted_proxy_client_secret(&secret, builtin_ref, true);
assert_eq!(
result, None,
"built-in desktop secret must be suppressed when the exchange proxy is configured"
);
}
#[test]
fn test_proxy_client_secret_kept_when_not_builtin_with_exchange_proxy() {
let builtin = crate::cli::oauth_defaults::builtin_credentials("google_oauth_token");
let secret = Some("user-entered-custom-secret".to_string());
let result =
crate::cli::oauth_defaults::hosted_proxy_client_secret(&secret, builtin.as_ref(), true);
assert_eq!(
result,
Some("user-entered-custom-secret".to_string()),
"non-builtin secret must be kept even when the exchange proxy is configured"
);
}
#[test]
fn test_proxy_client_secret_kept_without_exchange_proxy_even_for_builtin_secret() {
let builtin = crate::cli::oauth_defaults::builtin_credentials("google_oauth_token");
let builtin_ref = builtin.as_ref();
let secret = Some(builtin_ref.unwrap().client_secret.to_string());
let result =
crate::cli::oauth_defaults::hosted_proxy_client_secret(&secret, builtin_ref, false);
assert_eq!(
result, secret,
"built-in secret must be kept when the callback will exchange directly"
);
}
#[test]
fn test_proxy_client_secret_none_stays_none() {
let builtin = crate::cli::oauth_defaults::builtin_credentials("google_oauth_token");
let result =
crate::cli::oauth_defaults::hosted_proxy_client_secret(&None, builtin.as_ref(), true);
assert_eq!(
result, None,
"None secret stays None even when the exchange proxy is configured"
);
}
#[test]
fn test_proxy_client_secret_no_builtin_provider() {
let builtin = crate::cli::oauth_defaults::builtin_credentials("mcp_notion_access_token");
assert!(builtin.is_none());
let secret = Some("dcr-secret".to_string());
let result =
crate::cli::oauth_defaults::hosted_proxy_client_secret(&secret, builtin.as_ref(), true);
assert_eq!(
result,
Some("dcr-secret".to_string()),
"non-builtin provider secret must be kept"
);
}
}