use anyhow::Result;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{Mutex, mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use crate::agent::approval::{
ApprovalGate, ApprovalRequest, ApprovalResponse, ApproveMode, SessionApprovals,
SharedApproveMode,
};
use crate::agent::context::ConversationContext;
use crate::agent::r#loop::AgentEvent;
use crate::agent::prompt;
use crate::agent::session::SessionStore;
use crate::api::provider::OpenAiCompatibleProvider;
use crate::config::Config;
use super::adapter::*;
use super::auth::AuthConfig;
use super::channel_map::ChannelMap;
use super::commands::RemoteCommand;
use super::formatter;
use super::session_pool::{AgentSession, PendingPlan, SessionPool};
type SessionEntry = (String, String, bool);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum RemoteApprovalMode {
Yolo,
PlanOnly,
Cautious,
}
impl RemoteApprovalMode {
fn parse(s: &str) -> Self {
match s.to_lowercase().as_str() {
"plan-only" | "plan_only" => Self::PlanOnly,
"cautious" => Self::Cautious,
_ => Self::Yolo,
}
}
}
#[derive(Debug, Clone)]
enum PendingAction {
NewProjectDir,
TakeoverConfirm {
snapshot: Box<crate::agent::session::SessionSnapshot>,
project_dir: String,
},
}
pub struct RemoteGateway {
config: Config,
channel_map: ChannelMap,
session_pool: Arc<Mutex<SessionPool>>,
auth: AuthConfig,
adapters: Vec<Arc<dyn PlatformAdapter>>,
default_streaming: StreamingLevel,
default_workspace: WorkspaceScope,
default_workspace_dir: Option<String>,
pending: Mutex<HashMap<ChannelId, PendingAction>>,
skill_registry: crate::skills::SkillRegistry,
approval_mode: RemoteApprovalMode,
permissions: crate::config::types::RemotePermissionsSection,
pending_tool_approvals: Arc<Mutex<HashMap<String, oneshot::Sender<ApprovalResponse>>>>,
project_cache: Arc<crate::project_cache::ProjectCacheManager>,
cmd_tx: std::sync::Mutex<Option<mpsc::UnboundedSender<IncomingCommand>>>,
}
impl RemoteGateway {
#[allow(clippy::too_many_arguments)]
pub fn new(
config: Config,
channel_map: ChannelMap,
auth: AuthConfig,
adapters: Vec<Arc<dyn PlatformAdapter>>,
default_streaming: StreamingLevel,
default_workspace: WorkspaceScope,
default_workspace_dir: Option<String>,
approval_mode: Option<String>,
permissions: crate::config::types::RemotePermissionsSection,
) -> Self {
let session_timeout = 300u64;
let skill_registry = {
let workspace = default_workspace_dir.as_deref().unwrap_or(".");
crate::skills::SkillRegistry::discover(std::path::Path::new(workspace))
};
let approval_mode = approval_mode
.as_deref()
.map(RemoteApprovalMode::parse)
.unwrap_or(RemoteApprovalMode::Cautious);
Self {
config,
channel_map,
session_pool: Arc::new(Mutex::new(SessionPool::new(session_timeout))),
auth,
adapters,
default_streaming,
default_workspace,
default_workspace_dir,
pending: Mutex::new(HashMap::new()),
skill_registry,
approval_mode,
permissions,
pending_tool_approvals: Arc::new(Mutex::new(HashMap::new())),
project_cache: Arc::clone(crate::project_cache::global()),
cmd_tx: std::sync::Mutex::new(None),
}
}
fn load_config(&self) -> Config {
crate::config::Config::load().unwrap_or_else(|_| self.config.clone())
}
pub async fn run(&self) -> Result<()> {
self.project_cache.ensure_background_tasks();
let (cmd_tx, mut cmd_rx) = mpsc::unbounded_channel::<IncomingCommand>();
*self.cmd_tx.lock().unwrap() = Some(cmd_tx.clone());
for adapter in &self.adapters {
let adapter = Arc::clone(adapter);
let tx = cmd_tx.clone();
tokio::spawn(async move {
if let Err(e) = adapter.run(tx).await {
tracing::error!(
"[remote] {} adapter exited with error: {}",
adapter.platform_name(),
e
);
}
});
}
let pool_clone = Arc::clone(&self.session_pool);
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(300)).await;
let mut pool = pool_clone.lock().await;
let evicted = pool.evict_stale();
if evicted > 0 {
tracing::info!(
"[remote] evicted {evicted} stale session(s); pool empty={}",
pool.is_empty()
);
}
}
});
{
const SCAN_INTERVAL_SECS: u64 = 60;
const NOTIFY_COOLDOWN_SECS: u64 = 300;
let pool_clone = Arc::clone(&self.session_pool);
let adapters_clone = self.adapters.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(SCAN_INTERVAL_SECS)).await;
let pending_channels = {
let pool = pool_clone.lock().await;
pool.pending_sessions()
};
for channel in pending_channels {
let should = {
let mut pool = pool_clone.lock().await;
pool.get_mut(&channel)
.map(|s| s.should_notify(NOTIFY_COOLDOWN_SECS))
.unwrap_or(false)
};
if should
&& let Some(adapter) = adapters_clone
.iter()
.find(|a| a.platform_name() == channel.platform)
{
let res = adapter
.send_buttons(
&channel,
"⏳ A plan is waiting for your approval.",
&[
("/approve".to_string(), "✅ Approve".to_string()),
("/reject".to_string(), "❌ Reject".to_string()),
],
)
.await;
if let Err(e) = res {
tracing::warn!(
"[remote] pending-plan notify failed for {channel}: {e}"
);
}
}
}
}
});
}
if self.channel_map.is_empty() {
tracing::warn!(
"[remote] no channel mappings configured — all messages will use the default project"
);
}
for adapter in &self.adapters {
let platform = adapter.platform_name();
let platform_channels = self.channel_map.for_platform(platform);
tracing::info!(
"[remote] {} adapter: {} channel mapping(s)",
platform,
platform_channels.len(),
);
}
tracing::info!(
"[remote] gateway started with {} adapter(s), {} channel mapping(s)",
self.adapters.len(),
self.channel_map.all().len(),
);
{
const BUILTIN: &[(&str, &str)] = &[
("help", "Show available commands"),
("status", "Current session info"),
("cancel", "Stop the running agent"),
("approve", "Approve a pending plan"),
("reject", "Reject a pending plan"),
("projects", "List available projects"),
("sessions", "List recent sessions"),
("resume", "Resume a previous session"),
("new", "Start a new session"),
("models", "List available models"),
("agents", "List available agents"),
("stream", "Set streaming level (compact/full)"),
("switch", "Switch to a different project"),
("workspace", "Set workspace scope"),
];
let mut seen: std::collections::HashSet<String> =
BUILTIN.iter().map(|(n, _)| (*n).to_string()).collect();
let mut skill_pairs: Vec<(String, String)> = Vec::new();
for s in self.skill_registry.all() {
let Some(norm) = super::commands::normalize_command_name(&s.name) else {
tracing::warn!(
"[remote] skipping skill '{}' — name has no valid command chars",
s.name
);
continue;
};
if !seen.insert(norm.clone()) {
tracing::warn!(
"[remote] skipping skill '{}' — normalized name '{}' already taken",
s.name,
norm
);
continue;
}
let mut desc = s.description.trim().to_string();
if desc.len() < 3 {
desc = format!("{} skill", s.name);
}
if desc.chars().count() > 256 {
desc = desc.chars().take(256).collect();
}
skill_pairs.push((norm, desc));
}
let mut all: Vec<(&str, &str)> = BUILTIN.to_vec();
for (n, d) in &skill_pairs {
all.push((n.as_str(), d.as_str()));
}
let reg_futs: Vec<_> = self
.adapters
.iter()
.map(|adapter| {
let all = all.clone();
let adapter = adapter.clone();
async move {
if let Err(e) = adapter.register_commands(&all).await {
tracing::warn!(
"[remote] {} register_commands failed: {e}",
adapter.platform_name()
);
} else {
tracing::info!(
"[remote] {} registered {} command(s)",
adapter.platform_name(),
all.len(),
);
}
}
})
.collect();
futures::future::join_all(reg_futs).await;
}
while let Some(cmd) = cmd_rx.recv().await {
if !self.auth.is_authorized(&cmd.channel.platform, &cmd.user_id) {
tracing::warn!(
"[remote] unauthorized: {} user {}",
cmd.channel.platform,
cmd.user_id,
);
continue;
}
if let Err(e) = self.handle_command(cmd).await {
tracing::error!("[remote] command handler error: {e}");
}
}
Ok(())
}
async fn handle_command(&self, cmd: IncomingCommand) -> Result<()> {
let channel = cmd.channel;
match cmd.command {
RemoteCommand::Message { text } => {
if let Some(dir) = text.strip_prefix("/project-detail ") {
self.send_project_detail(&channel, dir.trim()).await?;
return Ok(());
}
if let Some(model) = text.strip_prefix("/set-model ") {
self.set_model(&channel, model.trim()).await?;
return Ok(());
}
if let Some(agent) = text.strip_prefix("/set-agent ") {
self.set_agent(&channel, agent.trim()).await?;
return Ok(());
}
if let Some(req_id) = text.strip_prefix("/tool-approve ") {
self.resolve_tool_approval(req_id.trim(), ApprovalResponse::Approve)
.await;
return Ok(());
}
if let Some(req_id) = text.strip_prefix("/tool-approve-session ") {
self.resolve_tool_approval(req_id.trim(), ApprovalResponse::ApproveAll)
.await;
return Ok(());
}
if let Some(req_id) = text.strip_prefix("/tool-deny ") {
self.resolve_tool_approval(req_id.trim(), ApprovalResponse::Deny)
.await;
return Ok(());
}
let pending = self.pending.lock().await.remove(&channel);
if let Some(action) = pending {
match action {
PendingAction::NewProjectDir => {
self.new_session(&channel, Some(text.trim())).await?;
return Ok(());
}
PendingAction::TakeoverConfirm {
snapshot,
project_dir,
} => {
if text.trim() == "/takeover-yes" {
self.restore_takeover_session(&channel, *snapshot, &project_dir)
.await?;
} else {
self.create_fresh_session_for_takeover(&channel, &project_dir)
.await?;
self.run_agent_message(&channel, &text).await?;
}
return Ok(());
}
}
}
if let Some(slash) = text.strip_prefix('/') {
let (skill_name, skill_args) = slash
.split_once(char::is_whitespace)
.map(|(n, a)| (n, a.trim()))
.unwrap_or((slash, ""));
if let Some(inv) = self.try_invoke_skill(skill_name) {
self.run_skill(&channel, inv, skill_args).await?;
return Ok(());
}
}
self.run_agent_message(&channel, &text).await?;
}
RemoteCommand::Cancel => {
self.cancel_agent(&channel).await;
let ch = channel.clone();
let cmd_tx = self.cmd_tx.lock().unwrap().clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
if let Some(tx) = cmd_tx {
let _ = tx.send(IncomingCommand {
channel: ch,
user_id: String::new(),
command: RemoteCommand::Message {
text: "The user has cancelled the current task. Exit plan mode immediately if you are in plan mode. Mark any in-progress tasks as completed. Do not continue any planned work. Just acknowledge briefly.".to_string(),
},
});
}
});
}
RemoteCommand::Status => {
self.send_status(&channel).await?;
}
RemoteCommand::Help => {
self.send_help(&channel).await?;
}
RemoteCommand::Projects => {
self.send_project_list(&channel).await?;
}
RemoteCommand::Sessions => {
self.send_session_list(&channel).await?;
}
RemoteCommand::Resume { session_id } => {
self.resume_session(&channel, session_id.as_deref()).await?;
}
RemoteCommand::New { project_dir } => {
self.new_session(&channel, project_dir.as_deref()).await?;
}
RemoteCommand::Models => {
self.send_model_list(&channel).await?;
}
RemoteCommand::Agents => {
self.send_agent_list(&channel).await?;
}
RemoteCommand::Stream { level } => {
self.set_streaming(&channel, level.as_deref().unwrap_or(""))
.await?;
}
RemoteCommand::Workspace { scope } => {
self.set_workspace(&channel, &scope).await?;
}
RemoteCommand::Approve => {
self.approve_plan(&channel).await?;
}
RemoteCommand::Reject => {
self.reject_plan(&channel).await?;
}
RemoteCommand::Switch { name } => {
self.switch_project(&channel, &name).await?;
}
RemoteCommand::History => {
self.send_history(&channel).await?;
}
RemoteCommand::DeleteSession { session_id } => {
self.delete_session(&channel, session_id.as_deref()).await?;
}
}
Ok(())
}
async fn run_agent_message(&self, channel: &ChannelId, text: &str) -> Result<()> {
let typing_fut = async {
if let Some(adapter) = self.find_adapter(&channel.platform) {
let _ = adapter.send_typing(channel).await;
}
};
let (_, session_result) = tokio::join!(typing_fut, self.ensure_session(channel));
session_result?;
let mut pool = self.session_pool.lock().await;
let session = match pool.get_mut(channel) {
Some(s) => s,
None => return Ok(()),
};
if session.busy {
session.enqueue(text.to_string());
let queue_len = session.message_queue.len();
drop(pool);
self.send_buttons_to_channel(
channel,
&format!("📥 Message queued (#{queue_len}). Will process after current task."),
&[("/cancel".to_string(), "🛑 Cancel".to_string())],
)
.await?;
return Ok(());
}
session.busy = true;
session.touch();
let cancel = CancellationToken::new();
session.cancel_token = Some(cancel.clone());
let context = std::mem::replace(
&mut session.context,
ConversationContext::with_budget("".to_string(), 1000, 0.8),
);
let fresh = self.load_config();
let model = session
.model_override
.clone()
.unwrap_or_else(|| fresh.model.clone());
let mut config = fresh;
config.model = model;
let client = OpenAiCompatibleProvider::from_config(&config)?;
let working_dir = session.project_dir.clone();
let streaming_level = session.streaming_level;
let user_msg = text.to_string();
let lsp_manager = crate::lsp::manager::LspManager::new(working_dir.clone());
let trust_level = match session.workspace_scope {
WorkspaceScope::Full | WorkspaceScope::Workspace => crate::trust::TrustLevel::Full,
WorkspaceScope::Project => crate::trust::TrustLevel::ReadOnly,
};
let (event_tx, event_rx) = mpsc::unbounded_channel::<AgentEvent>();
let (approval_gate, approval_rx_opt) = self.build_approval_gate(channel, session);
let shared_mcp = session.mcp.clone();
let shared_skills = session.skills.clone();
let shared_tool_index = session.tool_index.clone();
tokio::spawn(async move {
let agent_params = crate::agent::r#loop::AgentParams {
client,
config,
context,
user_msg,
working_dir,
event_tx,
cancel,
lsp_manager,
trust_level,
approval_gate,
images: Vec::new(),
};
if let Some(mcp) = shared_mcp {
crate::agent::r#loop::run_with_shared_mcp(
agent_params,
crate::agent::r#loop::SwarmParams {
mcp_manager: mcp,
shared_knowledge: None,
shared_tool_index,
shared_skill_registry: shared_skills,
instruction_rx: None,
},
)
.await;
} else {
crate::agent::r#loop::run_with_mode(agent_params).await;
}
});
if let Some(approval_rx) = approval_rx_opt {
let channel_for_approval = channel.clone();
let adapter_for_approval = self.find_adapter(&channel.platform);
let pending_approvals = Arc::clone(&self.pending_tool_approvals);
let permissions = self.permissions.clone();
let session_approvals_clone = session.session_approvals.clone();
tokio::spawn(async move {
remote_approval_handler(
approval_rx,
channel_for_approval,
adapter_for_approval,
pending_approvals,
permissions,
session_approvals_clone,
)
.await;
});
}
let channel_clone = channel.clone();
let pool_clone = Arc::clone(&self.session_pool);
let pcache = Arc::clone(&self.project_cache);
let adapter = self.find_adapter(&channel.platform);
let cmd_tx_clone = self.cmd_tx.lock().unwrap().clone();
tokio::spawn(async move {
if let Some(adapter) = adapter {
stream_events_to_channel(
adapter,
&channel_clone,
event_rx,
streaming_level,
pool_clone,
pcache,
cmd_tx_clone,
)
.await;
}
});
drop(pool);
Ok(())
}
async fn cancel_agent(&self, channel: &ChannelId) {
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
if let Some(ref cancel) = session.cancel_token {
cancel.cancel();
session.session_approvals.clear().await;
session.clear_queue();
session.suppress();
drop(pool);
let _ = self
.send_to_channel_inner(channel, "🛑 Task cancelled.")
.await;
} else {
drop(pool);
let _ = self
.send_to_channel_inner(channel, "No active agent to cancel.")
.await;
}
} else {
drop(pool);
}
}
async fn approve_plan(&self, channel: &ChannelId) -> Result<()> {
let pending = {
let mut pool = self.session_pool.lock().await;
pool.get_mut(channel).and_then(|s| s.pending_plan.take())
};
match pending {
Some(p) => self.execute_approved_plan(channel, p).await,
None => {
self.send_to_channel(channel, "No pending plan to approve.")
.await
}
}
}
async fn reject_plan(&self, channel: &ChannelId) -> Result<()> {
let had_plan = {
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
let had = session.pending_plan.is_some();
session.pending_plan = None;
had
} else {
false
}
};
if had_plan {
self.send_to_channel(channel, "❌ Plan rejected.").await
} else {
self.send_to_channel(channel, "No pending plan to reject.")
.await
}
}
async fn execute_approved_plan(&self, channel: &ChannelId, pending: PendingPlan) -> Result<()> {
if let Some(adapter) = self.find_adapter(&channel.platform) {
let _ = adapter.send_typing(channel).await;
}
let mut pool = self.session_pool.lock().await;
let session = match pool.get_mut(channel) {
Some(s) => s,
None => {
return self.send_to_channel(channel, "Session not found.").await;
}
};
if session.busy {
return self
.send_buttons_to_channel(
channel,
"⏳ Agent is busy.",
&[("/cancel".to_string(), "🛑 Cancel".to_string())],
)
.await;
}
session.busy = true;
session.touch();
let cancel = CancellationToken::new();
session.cancel_token = Some(cancel.clone());
let fresh = self.load_config();
let model = session
.model_override
.clone()
.unwrap_or_else(|| fresh.model.clone());
let mut config = fresh;
config.model = model;
let client = OpenAiCompatibleProvider::from_config(&config)?;
let working_dir = session.project_dir.clone();
let streaming_level = session.streaming_level;
let lsp_manager = crate::lsp::manager::LspManager::new(working_dir.clone());
let session_approvals = session.session_approvals.clone();
drop(pool);
let (approval_gate, approval_rx_opt) = match self.approval_mode {
RemoteApprovalMode::Cautious => {
let (tx, rx) = mpsc::unbounded_channel::<crate::agent::approval::ApprovalRequest>();
let gate = crate::agent::approval::ApprovalGate::new_with_session(
crate::agent::approval::SharedApproveMode::new(
crate::agent::approval::ApproveMode::Manual,
),
tx,
session_approvals.clone(),
);
(gate, Some(rx))
}
_ => (crate::agent::approval::ApprovalGate::yolo(), None),
};
let (event_tx, event_rx) = mpsc::unbounded_channel::<AgentEvent>();
let plan = pending.plan.clone();
let user_msg = pending.user_msg.clone();
let system_prompt = pending.system_prompt.clone();
let arch_context = pending.arch_context;
tokio::spawn(async move {
crate::agent::r#loop::execute_plan(crate::agent::r#loop::ExecutePlanParams {
client,
config,
system_prompt,
plan,
user_msg,
working_dir,
event_tx,
cancel,
lsp_manager,
arch_context: Some(arch_context),
approval_gate,
})
.await;
});
if let Some(approval_rx) = approval_rx_opt {
let channel_for_approval = channel.clone();
let adapter_for_approval = self.find_adapter(&channel.platform);
let pending_approvals = Arc::clone(&self.pending_tool_approvals);
let permissions = self.permissions.clone();
tokio::spawn(async move {
remote_approval_handler(
approval_rx,
channel_for_approval,
adapter_for_approval,
pending_approvals,
permissions,
session_approvals,
)
.await;
});
}
let channel_clone = channel.clone();
let pool_clone = Arc::clone(&self.session_pool);
let pcache = Arc::clone(&self.project_cache);
let cmd_tx_clone = self.cmd_tx.lock().unwrap().clone();
let adapter = self.find_adapter(&channel.platform);
tokio::spawn(async move {
if let Some(adapter) = adapter {
stream_events_to_channel(
adapter,
&channel_clone,
event_rx,
streaming_level,
pool_clone,
pcache,
cmd_tx_clone,
)
.await;
}
});
Ok(())
}
async fn ensure_session(&self, channel: &ChannelId) -> Result<()> {
{
let pool = self.session_pool.lock().await;
if pool.contains(channel) {
return Ok(());
}
}
let entry = self
.channel_map
.resolve(&channel.platform, &channel.channel);
let project_dir = entry
.and_then(|e| e.project.clone())
.or_else(|| default_workspace_dir(self.default_workspace_dir.as_deref()))
.unwrap_or_else(|| {
std::env::current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| ".".to_string())
});
let channel_agent = entry
.and_then(|e| e.agent.as_deref())
.and_then(|name| self.config.agents.iter().find(|a| a.name == name));
if let Some(entry) = entry
&& let Some(ref agent_name) = entry.agent
{
if channel_agent.is_some() {
tracing::info!(
"[remote] channel {}/{} auto-assigned agent '{}' (model={})",
entry.platform,
entry.channel,
agent_name,
channel_agent.map(|a| a.model.as_str()).unwrap_or("?"),
);
} else {
tracing::warn!(
"[remote] channel {}/{} specifies unknown agent '{}'",
entry.platform,
entry.channel,
agent_name
);
}
}
let store = SessionStore::new(&project_dir);
if let Some(snapshot) = store.find_incomplete().await {
let task_raw = snapshot
.user_task
.clone()
.unwrap_or_else(|| "(unknown task)".to_string());
let task_preview = task_raw[..task_raw.len().min(80)].to_string();
let short = snapshot.session_id[..8.min(snapshot.session_id.len())].to_string();
let msg_count = snapshot.messages.len();
self.pending.lock().await.insert(
channel.clone(),
PendingAction::TakeoverConfirm {
snapshot: Box::new(snapshot),
project_dir,
},
);
let msg = format!(
"📂 Incomplete session detected: \"{task_preview}\"\n \
Session: {short} · {msg_count} messages"
);
if let Some(adapter) = self.find_adapter(&channel.platform) {
adapter
.send_buttons(
channel,
&msg,
&[("/takeover-yes".to_string(), "▶ Continue".to_string())],
)
.await?;
}
return Ok(());
}
let context = self
.build_context_with_agent(&project_dir, channel_agent)
.await;
let mut session = AgentSession::new(
project_dir,
context,
self.default_streaming,
self.default_workspace,
);
if let Some(agent_def) = channel_agent {
session.model_override = Some(agent_def.model.clone());
session.agent_override = Some(agent_def.name.clone());
}
let project_dir_for_mcp = session.project_dir.clone();
let (mcp, skills, idx) = self.init_session_resources(&project_dir_for_mcp).await;
session.mcp = Some(mcp);
session.skills = Some(skills);
session.tool_index = Some(idx);
let mut pool = self.session_pool.lock().await;
if !pool.contains(channel) {
pool.insert(channel.clone(), session);
}
Ok(())
}
async fn restore_takeover_session(
&self,
channel: &ChannelId,
snapshot: crate::agent::session::SessionSnapshot,
project_dir: &str,
) -> Result<()> {
let entry = self
.channel_map
.resolve(&channel.platform, &channel.channel);
let channel_agent = entry
.and_then(|e| e.agent.as_deref())
.and_then(|name| self.config.agents.iter().find(|a| a.name == name));
let context = ConversationContext::restore_with_budget(
snapshot.system_prompt.clone(),
snapshot.messages.clone(),
snapshot.last_reasoning.clone(),
self.config.context_max_tokens,
self.config.compaction_threshold,
);
let mut session = AgentSession::new(
project_dir.to_string(),
context,
self.default_streaming,
self.default_workspace,
);
session.session_id = snapshot.session_id.clone();
if let Some(agent_def) = channel_agent {
session.model_override = Some(agent_def.model.clone());
session.agent_override = Some(agent_def.name.clone());
} else if let Some(ref model) = snapshot.model {
session.model_override = Some(model.clone());
}
let (mcp, skills, idx) = self.init_session_resources(project_dir).await;
session.mcp = Some(mcp);
session.skills = Some(skills);
session.tool_index = Some(idx);
self.session_pool
.lock()
.await
.insert(channel.clone(), session);
let task = snapshot.user_task.as_deref().unwrap_or("(unknown task)");
let short = &snapshot.session_id[..8.min(snapshot.session_id.len())];
let msg_count = snapshot.messages.len();
self.send_to_channel(
channel,
&format!(
"✅ Session restored ({short}, {msg_count} messages)\n \
Task: \"{}\"\n Continue from where you left off.",
&task[..task.len().min(80)]
),
)
.await
}
async fn create_fresh_session_for_takeover(
&self,
channel: &ChannelId,
project_dir: &str,
) -> Result<()> {
let entry = self
.channel_map
.resolve(&channel.platform, &channel.channel);
let channel_agent = entry
.and_then(|e| e.agent.as_deref())
.and_then(|name| self.config.agents.iter().find(|a| a.name == name));
let context = self
.build_context_with_agent(project_dir, channel_agent)
.await;
let mut session = AgentSession::new(
project_dir.to_string(),
context,
self.default_streaming,
self.default_workspace,
);
if let Some(agent_def) = channel_agent {
session.model_override = Some(agent_def.model.clone());
session.agent_override = Some(agent_def.name.clone());
}
let (mcp, skills, idx) = self.init_session_resources(project_dir).await;
session.mcp = Some(mcp);
session.skills = Some(skills);
session.tool_index = Some(idx);
self.session_pool
.lock()
.await
.insert(channel.clone(), session);
Ok(())
}
async fn save_current_session(&self, channel: &ChannelId) {
let mut pool = self.session_pool.lock().await;
if let Some(old) = pool.remove(channel) {
let store = SessionStore::new(&old.project_dir);
let _ = store
.save(&crate::agent::session::SessionSnapshot {
session_id: old.session_id,
working_dir: old.project_dir.clone(),
system_prompt: String::new(),
messages: old.context.messages().to_vec(),
last_reasoning: None,
timestamp: chrono::Utc::now().to_rfc3339(),
completed: false,
user_task: None,
model: old.model_override,
ui_state: None,
})
.await;
}
}
async fn new_session(&self, channel: &ChannelId, project_dir: Option<&str>) -> Result<()> {
self.save_current_session(channel).await;
if let Some(raw_dir) = project_dir {
let dir_expanded = expand_path(raw_dir);
let dir = dir_expanded.as_str();
let path = std::path::Path::new(dir);
if !path.is_dir() {
return self
.send_to_channel(
channel,
&format!(
"❌ Directory not found: `{dir}`\n\nUsage: `/new /path/to/project`"
),
)
.await;
}
let context = self.build_context(dir).await;
let mut session = AgentSession::new(
dir.to_string(),
context,
self.default_streaming,
self.default_workspace,
);
let (mcp, skills, idx) = self.init_session_resources(dir).await;
session.mcp = Some(mcp);
session.skills = Some(skills);
session.tool_index = Some(idx);
let mut pool = self.session_pool.lock().await;
pool.insert(channel.clone(), session);
let short = shorten_path(dir);
self.send_to_channel_inner(channel, &format!("🆕 New session started for `{short}`."))
.await
} else {
self.pending
.lock()
.await
.insert(channel.clone(), PendingAction::NewProjectDir);
self.send_to_channel(
channel,
"📁 Please enter the project folder path.\n\nExample: `/home/user/my-app`",
)
.await
}
}
async fn resume_session(&self, channel: &ChannelId, session_id: Option<&str>) -> Result<()> {
self.save_current_session(channel).await;
let snap = if let Some(id) = session_id {
self.find_session_by_id(id).await
} else {
let all = self.scan_all_projects().await;
if let Some((_wd, sessions)) = all.first() {
if let Some((sid, _, _)) = sessions.first() {
self.find_session_by_id(sid).await
} else {
None
}
} else {
None
}
};
let snap = match snap {
Some(s) => s,
None => {
self.send_to_channel(channel, "Session not found.").await?;
return Ok(());
}
};
let context = ConversationContext::restore(
snap.system_prompt.clone(),
snap.messages.clone(),
snap.last_reasoning.clone(),
);
let short_id = &snap.session_id[..8.min(snap.session_id.len())];
let short_dir = shorten_path(&snap.working_dir);
let session = AgentSession::new(
snap.working_dir,
context,
self.default_streaming,
self.default_workspace,
);
let mut pool = self.session_pool.lock().await;
pool.insert(channel.clone(), session);
self.send_to_channel_inner(channel, &format!("📂 Resumed `{short_id}` ({short_dir})"))
.await?;
Ok(())
}
async fn find_session_by_id(
&self,
session_id: &str,
) -> Option<crate::agent::session::SessionSnapshot> {
let projects_dir =
crate::config::collet_home(self.config.collet_home.to_str()).join("projects");
let mut entries = tokio::fs::read_dir(&projects_dir).await.ok()?;
while let Ok(Some(entry)) = entries.next_entry().await {
let sess_path = entry
.path()
.join("sessions")
.join(format!("{session_id}.json"));
if let Ok(content) = tokio::fs::read_to_string(&sess_path).await
&& let Ok(snap) =
serde_json::from_str::<crate::agent::session::SessionSnapshot>(&content)
{
return Some(snap);
}
}
None
}
async fn send_session_list(&self, channel: &ChannelId) -> Result<()> {
let pool = self.session_pool.lock().await;
let project_dir = pool.get(channel).map(|s| s.project_dir.clone());
let current_session_id = pool.get(channel).map(|s| s.session_id.clone());
drop(pool);
let mut recent: Vec<(String, String, String, bool)> = Vec::new();
if let Some(dir) = &project_dir {
let store = SessionStore::new(dir);
let sessions = store.list().await;
for (id, ts, completed) in sessions {
recent.push((id, ts, dir.clone(), completed));
}
}
if recent.len() < 10 {
let all = self.scan_all_projects().await;
for (wd, sessions) in &all {
if project_dir.as_deref() == Some(wd.as_str()) {
continue; }
for (id, ts, completed) in sessions {
recent.push((id.clone(), ts.clone(), wd.clone(), *completed));
}
}
}
recent.sort_by(|a, b| b.1.cmp(&a.1));
recent.truncate(10);
if recent.is_empty() {
return self
.send_to_channel(
channel,
"No sessions found.\n\nStart a new session:\n `/new /path/to/project`",
)
.await;
}
let mut buttons: Vec<(String, String)> = Vec::new();
let mut text_parts: Vec<String> = Vec::new();
for (i, (id, ts, wd, completed)) in recent.iter().enumerate() {
let is_current = current_session_id.as_deref() == Some(id.as_str());
let icon = if *completed { "✅" } else { "🔄" };
let short_id = &id[..id.len().min(8)];
let short_dir = shorten_path(wd);
let time_ago = formatter::format_time_ago(ts);
let marker = if is_current { " [current]" } else { "" };
let header = if is_current {
format!("{}. **{short_dir}**{marker}", i + 1)
} else {
format!("{}. **{short_dir}**", i + 1)
};
let line2 = format!(" {icon} `{short_id}` · {time_ago}");
text_parts.push(format!("{header}\n{line2}"));
buttons.push((
format!("/resume {id}"),
format!("{icon} {short_dir} · {time_ago}{marker}"),
));
}
buttons.push(("/new".to_string(), "➕ New Session".to_string()));
let display_text = format!("📋 **Recent Sessions**\n\n{}", text_parts.join("\n\n"));
self.send_buttons_to_channel(channel, &display_text, &buttons)
.await
}
async fn send_status(&self, channel: &ChannelId) -> Result<()> {
let pool = self.session_pool.lock().await;
if let Some(session) = pool.get(channel) {
let model = session
.model_override
.as_deref()
.unwrap_or(&self.config.model);
let streaming = match session.streaming_level {
StreamingLevel::Compact => "compact",
StreamingLevel::Full => "full",
};
let workspace = match session.workspace_scope {
WorkspaceScope::Project => "project",
WorkspaceScope::Workspace => "workspace",
WorkspaceScope::Full => "full",
};
let text = formatter::format_status(
&session.session_id,
&session.project_dir,
model,
session.busy,
session.message_queue.len(),
streaming,
workspace,
session.suppressed,
);
drop(pool);
let buttons = vec![
(
"/stream".to_string(),
if streaming == "compact" {
"📡 Full streaming"
} else {
"📡 Compact"
}
.to_string(),
),
("/models".to_string(), "🤖 Models".to_string()),
("/cancel".to_string(), "🛑 Cancel".to_string()),
];
self.send_buttons_to_channel(channel, &text, &buttons).await
} else {
self.send_to_channel(channel, "No active session. Send a message to start one.")
.await
}
}
fn try_invoke_skill(&self, name: &str) -> Option<crate::skills::registry::SkillInvocation> {
if let Ok(inv) = self.skill_registry.invoke(name) {
return Some(inv);
}
let query_norm = super::commands::normalize_command_name(name)?;
let matched = self
.skill_registry
.all()
.iter()
.find(|s| {
super::commands::normalize_command_name(&s.name).as_deref()
== Some(query_norm.as_str())
})?
.name
.clone();
self.skill_registry.invoke(&matched).ok()
}
async fn run_skill(
&self,
channel: &ChannelId,
invocation: crate::skills::registry::SkillInvocation,
args: &str,
) -> Result<()> {
let context_str = invocation.to_context_string();
let user_msg = if args.is_empty() {
format!("{context_str}\n\nPlease execute these instructions.")
} else {
format!("{context_str}\n\nUser request: {args}")
};
self.run_agent_message(channel, &user_msg).await
}
async fn send_help(&self, channel: &ChannelId) -> Result<()> {
let help = "\
**collet Remote Control**
📝 *Just type a message* — runs the agent
`/projects` `/p` — browse projects
`/sessions` `/s` — recent sessions
`/status` — current session info
`/cancel` — stop running agent
`/models` — list available models
`/agents` — list agents
`/stream compact|full` — set detail level
`/history` `/hist` — conversation history\n\
`/delete` — delete a session\n\
`/help` — this message";
let mut msg = help.to_string();
if self.skill_registry.count() > 0 {
msg.push_str("\n\n**Skills** (`/<name> [args]`):");
for skill in self.skill_registry.all() {
let shown = super::commands::normalize_command_name(&skill.name)
.unwrap_or_else(|| skill.name.clone());
msg.push_str(&format!("\n`/{}` — {}", shown, skill.description));
}
}
let buttons = vec![
("/projects".to_string(), "📂 Projects".to_string()),
("/sessions".to_string(), "📋 Sessions".to_string()),
("/status".to_string(), "📊 Status".to_string()),
("/history".to_string(), "📜 History".to_string()),
("/new".to_string(), "➕ New".to_string()),
("/cancel".to_string(), "🛑 Cancel".to_string()),
];
self.send_buttons_to_channel(channel, &msg, &buttons).await
}
async fn send_history(&self, channel: &ChannelId) -> Result<()> {
let pool = self.session_pool.lock().await;
if let Some(session) = pool.get(channel) {
let messages = session.context.messages();
if messages.is_empty() {
drop(pool);
return self
.send_to_channel(channel, "No conversation history yet.")
.await;
}
let recent: Vec<_> = messages
.iter()
.rev()
.take(6)
.collect::<Vec<_>>()
.into_iter()
.rev()
.collect();
let mut parts: Vec<String> = Vec::new();
for msg in &recent {
let role = &msg.role;
let text = msg
.content
.as_ref()
.map(|c| c.text_content())
.unwrap_or_default();
if text.is_empty() {
continue;
}
let label = match role.as_str() {
"user" => "You",
"assistant" => "Bot",
"system" => "System",
"tool" => continue,
_ => continue,
};
let truncated = if text.chars().count() > 300 {
let mut s: String = text.chars().take(297).collect();
s.push('…');
s
} else {
text.clone()
};
parts.push(format!(
"**{label}:** {}",
formatter::strip_ansi(&truncated)
));
}
drop(pool);
if parts.is_empty() {
return self
.send_to_channel(channel, "No conversation history yet.")
.await;
}
let history_text = parts.join("\n\n");
self.send_to_channel(channel, &format!("📜 **Recent History**\n\n{history_text}"))
.await
} else {
self.send_to_channel(channel, "No active session.").await
}
}
async fn delete_session(&self, channel: &ChannelId, session_id: Option<&str>) -> Result<()> {
let sid = match session_id {
Some(id) => id.to_string(),
None => {
let pool = self.session_pool.lock().await;
let project_dir = pool.get(channel).map(|s| s.project_dir.clone());
drop(pool);
let mut recent: Vec<(String, String, String, bool)> = Vec::new();
if let Some(dir) = &project_dir {
let store = SessionStore::new(dir);
let sessions = store.list().await;
for (id, ts, completed) in sessions {
recent.push((id, ts, dir.clone(), completed));
}
}
if recent.is_empty() {
return self
.send_to_channel(channel, "No sessions to delete.")
.await;
}
recent.sort_by(|a, b| b.1.cmp(&a.1));
recent.truncate(10);
let mut buttons: Vec<(String, String)> = Vec::new();
for (id, ts, _wd, completed) in &recent {
let icon = if *completed { "✅" } else { "🔄" };
let short_id = &id[..id.len().min(8)];
let time_ago = formatter::format_time_ago(ts);
buttons.push((
format!("/delete {id}"),
format!("{icon} {short_id} · {time_ago}"),
));
}
return self
.send_buttons_to_channel(channel, "🗑 **Select session to delete:**", &buttons)
.await;
}
};
let pool = self.session_pool.lock().await;
let project_dir = pool.get(channel).map(|s| s.project_dir.clone());
drop(pool);
let dir = match project_dir {
Some(d) => d,
None => return self.send_to_channel(channel, "No active session.").await,
};
let store = SessionStore::new(&dir);
match store.delete(&sid).await {
true => {
let short_id = &sid[..sid.len().min(8)];
self.send_to_channel(channel, &format!("🗑 Deleted session `{short_id}`"))
.await
}
false => {
let short_id = &sid[..sid.len().min(8)];
self.send_to_channel(channel, &format!("Session `{short_id}` not found."))
.await
}
}
}
async fn send_project_list(&self, channel: &ChannelId) -> Result<()> {
let discovered = self.scan_all_projects().await;
if discovered.is_empty() {
return self
.send_to_channel(
channel,
"No projects found.\n\nStart a new session:\n `/new /path/to/project`",
)
.await;
}
let mut buttons: Vec<(String, String)> = Vec::new();
let mut text_parts: Vec<String> = Vec::new();
for (i, (working_dir, sessions)) in discovered.iter().enumerate() {
let name = shorten_path(working_dir);
let tilde = tilde_path(working_dir);
let count = sessions.len();
let count_label = if count > 5 { "5+" } else { &count.to_string() };
let last_ts = sessions.first().map(|(_, ts, _)| ts.as_str()).unwrap_or("");
let time_ago = if last_ts.is_empty() {
"—".to_string()
} else {
formatter::format_time_ago(last_ts)
};
let header = format!("{}. **{}**", i + 1, name);
let detail = format!(" {count_label} sessions · {time_ago}");
text_parts.push(format!("{header}\n{detail}"));
buttons.push((
format!("/project-detail {tilde}"),
format!("📂 {name} · {count_label}s · {time_ago}"),
));
}
buttons.push(("/new".to_string(), "➕ New Project".to_string()));
let display_text = format!("📂 **Projects**\n\n{}", text_parts.join("\n\n"));
self.send_buttons_to_channel(channel, &display_text, &buttons)
.await
}
async fn send_project_detail(&self, channel: &ChannelId, project_dir: &str) -> Result<()> {
let expanded = expand_path(project_dir);
let all = self.scan_all_projects().await;
let sessions = all
.iter()
.find(|(wd, _)| wd == &expanded)
.map(|(_, s)| s.as_slice())
.unwrap_or(&[]);
let pool = self.session_pool.lock().await;
let current_session_id = pool.get(channel).map(|s| s.session_id.clone());
drop(pool);
let name = shorten_path(&expanded);
let display_path = formatter::shorten_path_for_display(&expanded);
if sessions.is_empty() {
return self
.send_to_channel(
channel,
&format!("📂 **{name}**\n`{display_path}`\n\nNo sessions found."),
)
.await;
}
let mut buttons: Vec<(String, String)> = Vec::new();
let mut text_parts: Vec<String> = Vec::new();
for (i, (id, ts, completed)) in sessions.iter().take(10).enumerate() {
let is_current = current_session_id.as_deref() == Some(id.as_str());
let icon = if *completed { "✅" } else { "🔄" };
let short_id = &id[..id.len().min(8)];
let time_ago = formatter::format_time_ago(ts);
let marker = if is_current { " [current]" } else { "" };
let header = if is_current {
format!("{}. `{short_id}`{marker}", i + 1)
} else {
format!("{}. `{short_id}`", i + 1)
};
text_parts.push(format!("{header}\n {icon} {time_ago}"));
buttons.push((
format!("/resume {id}"),
format!("{icon} {short_id} · {time_ago}{marker}"),
));
}
let tilde = tilde_path(&expanded);
buttons.push((format!("/new {tilde}"), "➕ New Session".to_string()));
let display_text = format!(
"📂 **{name}**\n`{display_path}`\n\n{}",
text_parts.join("\n\n")
);
self.send_buttons_to_channel(channel, &display_text, &buttons)
.await
}
async fn switch_project(&self, channel: &ChannelId, name: &str) -> Result<()> {
{
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
session.suppress();
session.clear_queue();
session.session_approvals.clear().await;
}
}
self.save_current_session(channel).await;
if let Some(entry) = self.channel_map.find_by_name(name) {
let project = entry
.project
.clone()
.or_else(|| default_workspace_dir(self.default_workspace_dir.as_deref()))
.unwrap_or_else(|| {
std::env::current_dir()
.map(|p| p.to_string_lossy().to_string())
.unwrap_or_else(|_| ".".to_string())
});
let project_name = entry.name.clone();
let context = self.build_context(&project).await;
let session = AgentSession::new(
project,
context,
self.default_streaming,
self.default_workspace,
);
let mut pool = self.session_pool.lock().await;
pool.insert(channel.clone(), session);
return self
.send_to_channel_inner(
channel,
&format!("🔀 Switched to project **{project_name}**."),
)
.await;
}
let path = std::path::Path::new(name);
if path.is_dir() {
let context = self.build_context(name).await;
let session = AgentSession::new(
name.to_string(),
context,
self.default_streaming,
self.default_workspace,
);
let mut pool = self.session_pool.lock().await;
pool.insert(channel.clone(), session);
let short = shorten_path(name);
return self
.send_to_channel_inner(channel, &format!("🔀 Switched to `{short}`."))
.await;
}
self.send_to_channel(
channel,
&format!("Project '{name}' not found.\n\nUse a channel map name or directory path:\n `/switch my-project`\n `/switch /path/to/project`"),
).await
}
async fn send_model_list(&self, channel: &ChannelId) -> Result<()> {
let pool = self.session_pool.lock().await;
let current = pool
.get(channel)
.and_then(|s| s.model_override.clone())
.unwrap_or_else(|| self.config.model.clone());
drop(pool);
let mut buttons: Vec<(String, String)> = Vec::new();
let mut seen = std::collections::HashSet::new();
seen.insert(self.config.model.clone());
let icon = if current == self.config.model {
"✅"
} else {
"⬜"
};
buttons.push((
format!("/set-model {}", self.config.model),
format!("{icon} {}", self.config.model),
));
for agent in &self.config.agents {
if seen.insert(agent.model.clone()) {
let icon = if current == agent.model { "✅" } else { "⬜" };
buttons.push((
format!("/set-model {}", agent.model),
format!("{icon} {}", agent.model),
));
}
}
if let Ok(cf) = crate::config::load_config_file() {
for provider in &cf.providers {
for model in provider.all_models() {
if seen.insert(model.to_string()) {
let icon = if current == model { "✅" } else { "⬜" };
buttons.push((format!("/set-model {model}"), format!("{icon} {model}")));
}
}
}
}
self.send_buttons_to_channel(
channel,
&format!("🤖 **Models** — current: `{current}`"),
&buttons,
)
.await
}
async fn set_model(&self, channel: &ChannelId, name: &str) -> Result<()> {
self.ensure_session(channel).await?;
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
session.model_override = Some(name.to_string());
self.send_to_channel_inner(channel, &format!("🤖 Model → `{name}`"))
.await
} else {
Ok(())
}
}
async fn send_agent_list(&self, channel: &ChannelId) -> Result<()> {
let pool = self.session_pool.lock().await;
let _current_model = pool
.get(channel)
.and_then(|s| s.model_override.as_ref())
.unwrap_or(&self.config.model);
drop(pool);
let mut buttons: Vec<(String, String)> = Vec::new();
for agent in &self.config.agents {
buttons.push((
format!("/set-agent {}", agent.name),
format!("🔧 {} ({})", agent.name, agent.model),
));
}
self.send_buttons_to_channel(channel, "🤖 **Agents**", &buttons)
.await
}
async fn set_agent(&self, channel: &ChannelId, name: &str) -> Result<()> {
if let Some(agent) = self.config.agents.iter().find(|a| a.name == name) {
self.ensure_session(channel).await?;
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
session.model_override = Some(agent.model.clone());
}
let model = &agent.model;
self.send_to_channel_inner(channel, &format!("🤖 Agent → `{name}` ({model})"))
.await
} else {
self.send_to_channel(channel, &format!("Agent `{name}` not found."))
.await
}
}
async fn set_streaming(&self, channel: &ChannelId, level: &str) -> Result<()> {
self.ensure_session(channel).await?;
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
if level.is_empty() {
let toggled = match session.streaming_level {
StreamingLevel::Compact => StreamingLevel::Full,
StreamingLevel::Full => StreamingLevel::Compact,
};
session.streaming_level = toggled;
let label = match toggled {
StreamingLevel::Compact => "compact",
StreamingLevel::Full => "full",
};
self.send_to_channel_inner(channel, &format!("📡 Streaming toggled to `{label}`."))
.await
} else if let Some(lvl) = StreamingLevel::parse(level) {
session.streaming_level = lvl;
self.send_to_channel_inner(channel, &format!("📡 Streaming set to `{level}`."))
.await
} else {
self.send_to_channel(channel, "Invalid level. Use `compact` or `full`.")
.await
}
} else {
self.send_to_channel(channel, "No active session.").await
}
}
async fn set_workspace(&self, channel: &ChannelId, scope: &str) -> Result<()> {
if let Some(ws) = WorkspaceScope::parse(scope) {
self.ensure_session(channel).await?;
let mut pool = self.session_pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
session.workspace_scope = ws;
}
self.send_to_channel_inner(channel, &format!("📁 Workspace scope set to `{scope}`."))
.await
} else {
self.send_to_channel(
channel,
"Invalid scope. Use `project`, `workspace`, or `full`.",
)
.await
}
}
async fn scan_all_projects(&self) -> Vec<(String, Vec<SessionEntry>)> {
let projects_dir =
crate::config::collet_home(self.config.collet_home.to_str()).join("projects");
let mut result: Vec<(String, Vec<SessionEntry>)> = Vec::new();
let mut entries = match tokio::fs::read_dir(&projects_dir).await {
Ok(e) => e,
Err(_) => return result,
};
while let Ok(Some(entry)) = entries.next_entry().await {
let meta = match entry.metadata().await {
Ok(m) => m,
Err(_) => continue,
};
if !meta.is_dir() {
continue;
}
let sessions_dir = entry.path().join("sessions");
if !sessions_dir.is_dir() {
continue;
}
let mut working_dir: Option<String> = None;
let mut sessions: Vec<(String, String, bool)> = Vec::new();
let mut reader = match tokio::fs::read_dir(&sessions_dir).await {
Ok(r) => r,
Err(_) => continue,
};
while let Ok(Some(sess_entry)) = reader.next_entry().await {
let name = sess_entry.file_name().to_string_lossy().to_string();
if !name.ends_with(".json") || name == "latest.json" {
continue;
}
if let Ok(content) = tokio::fs::read_to_string(sess_entry.path()).await
&& let Ok(snap) =
serde_json::from_str::<crate::agent::session::SessionSnapshot>(&content)
{
if working_dir.is_none() {
working_dir = Some(snap.working_dir.clone());
}
sessions.push((snap.session_id, snap.timestamp, snap.completed));
}
}
sessions.sort_by(|a, b| b.1.cmp(&a.1));
if let Some(wd) = working_dir {
if wd.starts_with("/tmp")
|| wd.starts_with("/private/tmp")
|| wd.starts_with("/var/tmp")
|| wd.contains("/tmp/")
{
continue;
}
result.push((wd, sessions));
}
}
result.sort_by(|a, b| {
let a_ts = a.1.first().map(|s| s.1.as_str()).unwrap_or("");
let b_ts = b.1.first().map(|s| s.1.as_str()).unwrap_or("");
b_ts.cmp(a_ts)
});
result
}
async fn build_context(&self, project_dir: &str) -> ConversationContext {
self.build_context_with_agent(project_dir, None).await
}
async fn init_session_resources(
&self,
project_dir: &str,
) -> (
std::sync::Arc<crate::mcp::manager::McpManager>,
std::sync::Arc<crate::skills::SkillRegistry>,
std::sync::Arc<crate::tools::tool_index::ToolIndex>,
) {
let mcp = crate::mcp::manager::McpManager::connect_all(project_dir).await;
let skills = crate::skills::SkillRegistry::discover(std::path::Path::new(project_dir));
let mut idx = crate::tools::tool_index::ToolIndex::new();
idx.reindex_mcp_tools(&mcp);
idx.reindex_skills(&skills);
idx.reindex_agents(&self.config.agents);
(
std::sync::Arc::new(mcp),
std::sync::Arc::new(skills),
std::sync::Arc::new(idx),
)
}
async fn build_context_with_agent(
&self,
project_dir: &str,
agent_override: Option<&crate::config::types::AgentDef>,
) -> ConversationContext {
let effective_agent = agent_override
.or_else(|| self.config.agents.first())
.cloned();
let collet_home = self.config.collet_home.clone();
let context_max_tokens = self.config.context_max_tokens;
let compaction_threshold = self.config.compaction_threshold;
let project_dir = project_dir.to_string();
let soul_enabled_cfg = self.config.clone();
let cache_mgr = Arc::clone(&self.project_cache);
tokio::task::spawn_blocking(move || {
let cache = cache_mgr.get_or_build(&project_dir);
let map_string = cache.map_string();
let file_count = cache.file_count();
let symbol_count = cache.symbol_count();
let soul_content =
if crate::agent::soul::is_enabled(&soul_enabled_cfg, effective_agent.as_ref()) {
let name = effective_agent
.as_ref()
.map(|a| a.name.as_str())
.unwrap_or("agent");
crate::agent::soul::load(&collet_home, name)
} else {
None
};
let system_prompt = prompt::build_prompt_with_agent(
&map_string,
file_count,
symbol_count,
None,
effective_agent
.as_ref()
.map(|a| a.system_prompt.as_str())
.filter(|s| !s.is_empty()),
None,
soul_content.as_deref(),
);
ConversationContext::with_budget(
system_prompt,
context_max_tokens,
compaction_threshold,
)
})
.await
.expect("build_context_with_agent panicked in spawn_blocking")
}
fn find_adapter(&self, platform: &str) -> Option<Arc<dyn PlatformAdapter>> {
self.adapters
.iter()
.find(|a| a.platform_name() == platform)
.cloned()
}
async fn send_to_channel(&self, channel: &ChannelId, text: &str) -> Result<()> {
self.send_to_channel_inner(channel, text).await
}
async fn send_to_channel_inner(&self, channel: &ChannelId, text: &str) -> Result<()> {
if let Some(adapter) = self.find_adapter(&channel.platform) {
let max_len = adapter.max_message_length();
let chunks = formatter::split_message(text, max_len);
for chunk in chunks {
adapter.send_message(channel, &chunk).await?;
}
}
Ok(())
}
async fn send_buttons_to_channel(
&self,
channel: &ChannelId,
text: &str,
buttons: &[(String, String)],
) -> Result<()> {
if let Some(adapter) = self.find_adapter(&channel.platform) {
adapter.send_buttons(channel, text, buttons).await?;
}
Ok(())
}
fn build_approval_gate(
&self,
_channel: &ChannelId,
session: &super::session_pool::AgentSession,
) -> (
ApprovalGate,
Option<mpsc::UnboundedReceiver<ApprovalRequest>>,
) {
match self.approval_mode {
RemoteApprovalMode::Cautious => {
let (tx, rx) = mpsc::unbounded_channel::<ApprovalRequest>();
let gate = ApprovalGate::new_with_session(
SharedApproveMode::new(ApproveMode::Manual),
tx,
session.session_approvals.clone(),
);
(gate, Some(rx))
}
_ => (ApprovalGate::yolo(), None),
}
}
async fn resolve_tool_approval(&self, request_id: &str, response: ApprovalResponse) {
let mut pending = self.pending_tool_approvals.lock().await;
if let Some(tx) = pending.remove(request_id) {
let _ = tx.send(response);
} else {
tracing::debug!(
"[remote] resolve_tool_approval: request_id {request_id:?} not found (may have timed out)"
);
}
}
}
async fn remote_approval_handler(
mut approval_rx: mpsc::UnboundedReceiver<ApprovalRequest>,
channel: ChannelId,
adapter: Option<Arc<dyn PlatformAdapter>>,
pending_approvals: Arc<Mutex<HashMap<String, oneshot::Sender<ApprovalResponse>>>>,
permissions: crate::config::types::RemotePermissionsSection,
session_approvals: SessionApprovals,
) {
const TIMEOUT_SECS: u64 = 120;
while let Some(request) = approval_rx.recv().await {
let tool_name = &request.tool_name;
let tool_args = &request.tool_args;
if permissions
.always_deny
.iter()
.any(|p| matches_permission_pattern(p, tool_name, tool_args))
{
let _ = request.response_tx.send(ApprovalResponse::Deny);
continue;
}
if permissions
.always_allow
.iter()
.any(|p| matches_permission_pattern(p, tool_name, tool_args))
{
let _ = request.response_tx.send(ApprovalResponse::Approve);
continue;
}
let Some(ref adapter) = adapter else {
tracing::warn!(
"[remote] cautious mode: no adapter for channel {channel}, denying {tool_name}"
);
let _ = request.response_tx.send(ApprovalResponse::Deny);
continue;
};
let request_id = uuid::Uuid::new_v4().to_string();
let display_args = if tool_args.len() > 200 {
format!("{}…", &tool_args[..200])
} else {
tool_args.clone()
};
let msg = format!(
"🔐 **Tool approval required**\n`{tool_name}` wants to run:\n```\n{display_args}\n```"
);
let buttons = vec![
(
format!("/tool-approve {request_id}"),
"✅ Allow".to_string(),
),
(
format!("/tool-approve-session {request_id}"),
"✅✅ Allow for session".to_string(),
),
(format!("/tool-deny {request_id}"), "❌ Deny".to_string()),
];
if let Err(e) = adapter.send_buttons(&channel, &msg, &buttons).await {
tracing::warn!("[remote] approval button send failed for {tool_name}: {e}");
let _ = request.response_tx.send(ApprovalResponse::Deny);
continue;
}
let _ = adapter.send_typing(&channel).await;
let (resp_tx, resp_rx) = oneshot::channel::<ApprovalResponse>();
pending_approvals
.lock()
.await
.insert(request_id.clone(), resp_tx);
let response = tokio::time::timeout(Duration::from_secs(TIMEOUT_SECS), resp_rx).await;
match response {
Ok(Ok(resp)) => {
if resp == ApprovalResponse::ApproveAll {
session_approvals.resolve(tool_name, true).await;
}
let _ = request.response_tx.send(resp);
}
_ => {
pending_approvals.lock().await.remove(&request_id);
let _ = adapter
.send_message(
&channel,
&format!(
"⏱ Approval timed out ({TIMEOUT_SECS}s). \
Tool `{tool_name}` was denied."
),
)
.await;
let _ = request.response_tx.send(ApprovalResponse::Deny);
}
}
let _ = adapter.send_typing(&channel).await;
}
}
fn matches_permission_pattern(pattern: &str, tool_name: &str, tool_args: &str) -> bool {
if let Some(paren_pos) = pattern.find('(') {
let name = &pattern[..paren_pos];
if name != tool_name {
return false;
}
let arg_pattern = pattern[paren_pos + 1..].trim_end_matches(')');
if let Some(prefix) = arg_pattern.strip_suffix('*') {
tool_args.contains(prefix)
} else {
tool_args.contains(arg_pattern)
}
} else {
pattern == tool_name
}
}
fn shorten_path(path: &str) -> String {
std::path::Path::new(path)
.file_name()
.map(|n| n.to_string_lossy().to_string())
.unwrap_or_else(|| path.to_string())
}
fn tilde_path(path: &str) -> String {
if let Some(home) = dirs::home_dir() {
let home_str = home.to_string_lossy();
if let Some(rest) = path.strip_prefix(home_str.as_ref()) {
return format!("~{rest}");
}
}
path.to_string()
}
fn default_workspace_dir(configured: Option<&str>) -> Option<String> {
let path = match configured.map(str::trim) {
Some(p) if !p.is_empty() => expand_path(p),
_ => {
let home = dirs::home_dir()?;
home.join(".collet")
.join("workspace")
.to_string_lossy()
.to_string()
}
};
if let Err(err) = std::fs::create_dir_all(&path) {
tracing::warn!(
"[remote] failed to create default workspace {}: {}",
path,
err
);
return None;
}
Some(path)
}
fn expand_path(path: &str) -> String {
if let Some(rest) = path.strip_prefix('~')
&& let Some(home) = dirs::home_dir()
{
return format!("{}{rest}", home.to_string_lossy());
}
path.to_string()
}
fn format_completion_summary(msg_count: usize, elapsed: u64) -> String {
formatter::format_completion_summary(msg_count, elapsed)
}
async fn stream_events_to_channel(
adapter: Arc<dyn PlatformAdapter>,
channel: &ChannelId,
mut event_rx: mpsc::UnboundedReceiver<AgentEvent>,
streaming_level: StreamingLevel,
pool: Arc<Mutex<SessionPool>>,
project_cache: Arc<crate::project_cache::ProjectCacheManager>,
cmd_tx: Option<mpsc::UnboundedSender<IncomingCommand>>,
) {
let max_len = adapter.max_message_length();
let mut token_buffer = String::new();
let mut last_flush = std::time::Instant::now();
let flush_interval = Duration::from_millis(200);
let typing_paused = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
let typing_adapter = adapter.clone();
let typing_channel = channel.clone();
let typing_pause_flag = typing_paused.clone();
let typing_task = tokio::spawn(async move {
loop {
if !typing_pause_flag.load(std::sync::atomic::Ordering::Relaxed) {
let _ = typing_adapter.send_typing(&typing_channel).await;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
while let Some(event) = event_rx.recv().await {
let is_suppressed = {
let pool = pool.lock().await;
pool.get(channel).map(|s| s.suppressed).unwrap_or(false)
};
if is_suppressed {
if matches!(event, AgentEvent::Done { .. } | AgentEvent::GuardStop(_)) {
let mut pool = pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
session.busy = false;
session.cancel_token = None;
session.suppressed = false;
session.touch();
}
typing_task.abort();
}
continue;
}
match event {
AgentEvent::Token(token) => {
if streaming_level == StreamingLevel::Full {
token_buffer.push_str(&token);
let remote_status = crate::remote::adapter::RemoteEvent::Status(format!(
"streaming: {} chars buffered",
token_buffer.len()
));
if let crate::remote::adapter::RemoteEvent::Status(msg) = &remote_status {
tracing::trace!(status = %msg, "Remote streaming status");
}
let at_natural_break = token_buffer.ends_with('\n')
|| token_buffer.ends_with('。')
|| token_buffer.ends_with('!')
|| token_buffer.ends_with('?')
|| token_buffer.ends_with(". ")
|| token_buffer.ends_with("! ")
|| token_buffer.ends_with("? ");
if (at_natural_break || last_flush.elapsed() >= flush_interval)
&& !token_buffer.is_empty()
{
let text = formatter::strip_ansi(&token_buffer);
for chunk in formatter::split_message(&text, max_len) {
let _ = adapter.send_message(channel, &chunk).await;
}
token_buffer.clear();
last_flush = std::time::Instant::now();
}
}
}
AgentEvent::Response(text) => {
token_buffer.clear();
let remote_ev = RemoteEvent::Response(text.clone());
let formatted = match &remote_ev {
RemoteEvent::Response(t) => formatter::format_response(t),
_ => unreachable!(),
};
if !formatted.is_empty() {
let chunks = formatter::split_message(&formatted, max_len);
if chunks.len() > 3 {
let _ = adapter
.send_long_message(channel, &formatted, Some("response.md"))
.await;
} else {
for chunk in chunks {
let _ = adapter.send_message(channel, &chunk).await;
}
}
}
}
AgentEvent::ToolCall { name, args, .. } => {
if streaming_level == StreamingLevel::Full {
let summary = formatter::tool_call_summary(&name, &args, max_len);
let remote_ev = RemoteEvent::ToolCall {
name: name.clone(),
summary: summary.clone(),
};
let text = match remote_ev {
RemoteEvent::ToolCall {
name: tool_name,
summary,
} => {
tracing::trace!(tool = %tool_name, "Forwarding tool call to remote");
summary
}
_ => unreachable!(),
};
let _ = adapter.send_message(channel, &text).await;
let _ = adapter.send_typing(channel).await;
}
}
AgentEvent::ToolResult {
name,
success,
result,
..
} => match streaming_level {
StreamingLevel::Full => {
let preview = formatter::truncate(&result, 200);
let remote_ev = RemoteEvent::ToolResult {
name: name.clone(),
preview: preview.clone(),
success,
};
let display = match &remote_ev {
RemoteEvent::ToolResult {
name,
preview,
success,
} => {
let icon = if *success { "✓" } else { "✗" };
format!(" {icon} {name}: {preview}")
}
_ => unreachable!(),
};
let _ = adapter.send_message(channel, &display).await;
let _ = adapter.send_typing(channel).await;
}
StreamingLevel::Compact => {
if !success {
let preview = formatter::truncate(&result, 200);
let _ = adapter
.send_message(channel, &format!("✗ {name}: {preview}"))
.await;
let _ = adapter.send_typing(channel).await;
}
}
},
AgentEvent::PlanReady {
plan,
context: arch_context,
user_msg,
} => {
{
let mut pool = pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
let system_prompt = arch_context.system_prompt().to_string();
session.pending_plan = Some(PendingPlan {
plan: plan.clone(),
arch_context,
user_msg,
system_prompt,
});
session.busy = false;
session.cancel_token = None;
session.touch();
}
}
let remote_ev = RemoteEvent::PlanReady { plan: plan.clone() };
let formatted = match &remote_ev {
RemoteEvent::PlanReady { plan } => formatter::format_plan(plan),
_ => unreachable!(),
};
let chunks = formatter::split_message(&formatted, max_len);
for chunk in chunks {
let _ = adapter.send_message(channel, &chunk).await;
}
let _ = adapter
.send_buttons(
channel,
"Approve this plan?",
&[
("✅ Approve".to_string(), "/approve".to_string()),
("❌ Reject".to_string(), "/reject".to_string()),
],
)
.await;
typing_task.abort();
break;
}
AgentEvent::Error(msg) => {
let display = formatter::format_user_error(msg.as_str());
let _ = adapter.send_message(channel, &display).await;
let _ = adapter.send_typing(channel).await;
}
AgentEvent::Done { context, .. } => {
typing_task.abort();
let mut pool = pool.lock().await;
if let Some(session) = pool.get_mut(channel) {
let done_ev = RemoteEvent::Done {
iterations: session.context.messages().len() as u32,
elapsed_secs: 0,
};
if let RemoteEvent::Done {
iterations,
elapsed_secs,
} = done_ev
{
tracing::debug!(
channel = %channel,
iterations,
elapsed_secs,
"Remote agent task complete"
);
if let Some(ref msg_id) = session.streaming_message_id
&& !session.streaming_buffer.is_empty()
{
let _ = adapter
.edit_message(channel, msg_id, &session.streaming_buffer)
.await;
}
}
session.context = context;
session.busy = false;
session.cancel_token = None;
session.streaming_buffer.clear();
session.streaming_message_id = None;
session.touch();
let next_msg = session.drain_next();
let ch_clone = channel.clone();
let user_id = session.session_id.clone();
let elapsed = session.last_activity.elapsed().as_secs();
let msg_count = session.context.messages().len();
let has_queue = next_msg.is_some();
drop(pool);
if !has_queue {
let summary = format_completion_summary(msg_count, elapsed);
let _ = adapter.send_message(channel, &summary).await;
}
if let Some(queued_text) = next_msg {
tracing::info!(
"[remote] draining queued message for {} ({} chars)",
ch_clone,
queued_text.len()
);
if let Some(ref tx) = cmd_tx {
let _ = tx.send(IncomingCommand {
channel: ch_clone,
user_id,
command: RemoteCommand::Message { text: queued_text },
});
}
}
}
break;
}
AgentEvent::Status {
iteration,
elapsed_secs,
..
} => {
if streaming_level == StreamingLevel::Full {
let _ = adapter
.send_message(
channel,
&format!("⏱ Iteration {iteration} ({elapsed_secs}s)"),
)
.await;
let _ = adapter.send_typing(channel).await;
}
}
AgentEvent::GuardStop(msg) => {
let remote_ev = RemoteEvent::Text(format!("🛑 {msg}"));
if let RemoteEvent::Text(text) = remote_ev {
let _ = adapter.send_message(channel, &text).await;
}
}
AgentEvent::PhaseChange { label } => {
let _ = adapter.send_message(channel, &format!("--- {label}")).await;
let _ = adapter.send_typing(channel).await;
}
AgentEvent::SwarmDone {
merged_response,
agent_count,
total_tool_calls,
..
} => {
match streaming_level {
StreamingLevel::Full => {
if !merged_response.is_empty() {
let chunks = formatter::split_message(&merged_response, max_len);
for chunk in chunks {
let _ = adapter.send_message(channel, &chunk).await;
}
}
}
StreamingLevel::Compact => {
let _ = adapter
.send_message(
channel,
&format!(
"✅ Hive done: {agent_count} agents, {total_tool_calls} tools"
),
)
.await;
}
}
}
AgentEvent::FileModified { ref path } => {
project_cache.notify_file_modified(path);
}
AgentEvent::StreamRetry { .. }
| AgentEvent::SwarmAgentStarted { .. }
| AgentEvent::SwarmAgentProgress { .. }
| AgentEvent::SwarmAgentDone { .. }
| AgentEvent::SwarmConflict { .. }
| AgentEvent::SwarmWorkerApproaching { .. }
| AgentEvent::SwarmModeSwitch { .. }
| AgentEvent::SwarmResolvedToSingle { .. }
| AgentEvent::SwarmAgentToolCall { .. }
| AgentEvent::SwarmAgentToolResult { .. }
| AgentEvent::SwarmAgentToken { .. }
| AgentEvent::SwarmAgentResponse { .. }
| AgentEvent::SwarmWorkersDispatched
| AgentEvent::SwarmWorkerPaused { .. }
| AgentEvent::SwarmWorkerResumed { .. }
| AgentEvent::PerformanceUpdate { .. }
| AgentEvent::LspInstalled { .. }
| AgentEvent::McpPids { .. }
| AgentEvent::SoulReflecting { .. }
| AgentEvent::ImageNotice { .. }
| AgentEvent::ApprovalRequired { .. }
| AgentEvent::ApprovalDenied { .. }
| AgentEvent::Evolution(_)
| AgentEvent::ShellOutput { .. }
| AgentEvent::ToolBatchProgress { .. }
| AgentEvent::StreamWaiting { .. }
| AgentEvent::CompactionStarted { .. }
| AgentEvent::CompactionDone { .. }
| AgentEvent::ToolResultTruncated { .. } => {}
}
}
typing_task.abort();
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn permission_pattern_name_only() {
assert!(matches_permission_pattern("bash", "bash", "echo hello"));
assert!(!matches_permission_pattern(
"bash",
"file_write",
"echo hello"
));
assert!(matches_permission_pattern("file_write", "file_write", "{}"));
}
#[test]
fn permission_pattern_with_args_prefix() {
assert!(matches_permission_pattern(
"bash(git status*)",
"bash",
"git status --short"
));
assert!(!matches_permission_pattern(
"bash(git status*)",
"bash",
"git push --force"
));
assert!(!matches_permission_pattern(
"bash(git status*)",
"file_write",
"git status"
));
}
#[test]
fn permission_pattern_exact_arg() {
assert!(matches_permission_pattern(
"bash(cargo test)",
"bash",
"cargo test --all-features"
));
assert!(!matches_permission_pattern(
"bash(cargo test)",
"bash",
"cargo build"
));
}
#[test]
fn remote_approval_mode_parse() {
assert_eq!(RemoteApprovalMode::parse("yolo"), RemoteApprovalMode::Yolo);
assert_eq!(
RemoteApprovalMode::parse("plan-only"),
RemoteApprovalMode::PlanOnly
);
assert_eq!(
RemoteApprovalMode::parse("cautious"),
RemoteApprovalMode::Cautious
);
assert_eq!(
RemoteApprovalMode::parse("unknown"),
RemoteApprovalMode::Yolo
);
}
}