use std::cell::RefCell;
use std::collections::HashMap;
use std::path::PathBuf;
use std::rc::Rc;
use agent_client_protocol::{
AgentCapabilities, AgentSideConnection, AuthenticateRequest, AuthenticateResponse,
AvailableCommand, AvailableCommandInput, AvailableCommandsUpdate, BlobResourceContents,
CancelNotification, Client, ContentBlock, ContentChunk, EmbeddedResourceResource, ExtRequest,
ExtResponse, Implementation, InitializeRequest, InitializeResponse, LoadSessionRequest,
LoadSessionResponse, McpCapabilities, McpServer, NewSessionRequest, NewSessionResponse,
PromptCapabilities, PromptRequest, PromptResponse, ProtocolVersion, SessionNotification,
SessionUpdate, StopReason, ToolCall, ToolCallStatus, ToolCallUpdate, ToolCallUpdateFields,
UnstructuredCommandInput,
};
use crate::config::mcp::McpServerConfig;
use crate::config::Config;
use crate::session::cancellation::SessionCancellation;
use crate::session::chat::session::{
execute_api_call_and_process_response, prepare_for_api_call, process_layers_if_enabled,
setup_and_initialize_session, setup_system_prompt_and_cache, ChatSession, GenericSessionArgs,
};
use crate::session::output::{OutputMode, WebSocketSink};
use crate::websocket::ServerMessage;
use crate::{log_debug, log_error, log_info};
pub struct OctomindAgent {
config: RefCell<Config>,
role: String,
sessions: Rc<RefCell<HashMap<String, (ChatSession, PathBuf)>>>,
cancellations: Rc<RefCell<HashMap<String, SessionCancellation>>>,
conn: Rc<RefCell<Option<Rc<AgentSideConnection>>>>,
}
impl OctomindAgent {
pub fn new(config: Config, role: String) -> Self {
Self {
config: RefCell::new(config),
role,
sessions: Rc::new(RefCell::new(HashMap::new())),
cancellations: Rc::new(RefCell::new(HashMap::new())),
conn: Rc::new(RefCell::new(None)),
}
}
pub fn set_connection(&self, conn: Rc<AgentSideConnection>) {
*self.conn.borrow_mut() = Some(conn);
}
}
fn build_config_with_injected_servers(
base_config: &Config,
role: &str,
servers: &[McpServer],
) -> Config {
let mut config = base_config.clone();
for server in servers {
let server_config = match server {
McpServer::Stdio(s) => {
let args: Vec<String> = s.args.iter().map(|a| a.to_string()).collect();
McpServerConfig::stdin(
&s.name,
s.command.to_string_lossy().as_ref(),
args,
30,
vec![],
)
}
McpServer::Http(s) => McpServerConfig::http(&s.name, &s.url, 30, vec![]),
McpServer::Sse(s) => {
log_info!("ACP: skipping SSE MCP server '{}' (not supported)", s.name);
continue;
}
_ => {
log_info!("ACP: skipping unknown MCP server transport (not supported)");
continue;
}
};
let name = server_config.name().to_string();
if !config.mcp.servers.iter().any(|s| s.name() == name) {
config.mcp.servers.push(server_config);
}
if let Some(role_entry) = config.role_map.get_mut(role) {
if !role_entry.mcp.server_refs.contains(&name) {
role_entry.mcp.server_refs.push(name);
}
}
}
config
}
fn build_available_commands() -> Vec<AvailableCommand> {
let unstructured =
|hint: &str| AvailableCommandInput::Unstructured(UnstructuredCommandInput::new(hint));
vec![
AvailableCommand::new("help", "Show available commands"),
AvailableCommand::new("role", "View or change current role")
.input(unstructured("<role_name>")),
AvailableCommand::new("model", "View or change current AI model")
.input(unstructured("<provider:model>")),
AvailableCommand::new(
"done",
"Finalize task with memorization, summarization, and auto-commit",
),
AvailableCommand::new("save", "Save the current session"),
AvailableCommand::new("info", "Display token and cost breakdown for this session"),
AvailableCommand::new("clear", "Clear the screen"),
AvailableCommand::new("copy", "Copy last response to clipboard"),
AvailableCommand::new("context", "Display session context")
.input(unstructured("[all|assistant|user|tool|large]")),
AvailableCommand::new("truncate", "Smart context truncation to reduce token usage"),
AvailableCommand::new(
"summarize",
"Summarize entire conversation to reduce token usage",
),
AvailableCommand::new("cache", "Manage cache checkpoints")
.input(unstructured("[stats|clear|threshold]")),
AvailableCommand::new("list", "List all available sessions").input(unstructured("[page]")),
AvailableCommand::new("session", "Switch to or create a session")
.input(unstructured("[session_name]")),
AvailableCommand::new("run", "Execute a command layer")
.input(unstructured("<command_name>")),
AvailableCommand::new("workflow", "Execute a workflow")
.input(unstructured("<workflow_name> [input]")),
AvailableCommand::new("mcp", "MCP server management")
.input(unstructured("[info|list|full|health|dump|validate]")),
AvailableCommand::new("plan", "Display current plan stored in MCP plan tool"),
AvailableCommand::new("prompt", "Manage prompt templates")
.input(unstructured("[template_name]")),
AvailableCommand::new("image", "Attach image to next message")
.input(unstructured("<path>")),
AvailableCommand::new("video", "Attach video to next message")
.input(unstructured("<path>")),
AvailableCommand::new("loglevel", "Set logging level")
.input(unstructured("[none|info|debug]")),
AvailableCommand::new("report", "Generate detailed usage report for this session"),
AvailableCommand::new("exit", "Exit the session"),
]
}
async fn send_available_commands(conn: Option<std::rc::Rc<AgentSideConnection>>, session_id: &str) {
if let Some(conn) = conn {
let update = SessionUpdate::AvailableCommandsUpdate(AvailableCommandsUpdate::new(
build_available_commands(),
));
let notif = SessionNotification::new(std::sync::Arc::<str>::from(session_id), update);
if let Err(e) = conn.session_notification(notif).await {
log_error!("ACP: failed to send available_commands_update: {}", e);
}
}
}
fn spawn_inbox_monitor(
session_id: String,
sessions: Rc<RefCell<HashMap<String, (ChatSession, PathBuf)>>>,
cancellations: Rc<RefCell<HashMap<String, SessionCancellation>>>,
config: RefCell<Config>,
role: String,
conn: Rc<RefCell<Option<Rc<AgentSideConnection>>>>,
) {
tokio::task::spawn_local(async move {
log_debug!("ACP: inbox monitor started for session: {}", session_id);
loop {
let should_exit = crate::session::context::with_session_id(session_id.clone(), async {
crate::mcp::core::flush_due_to_inbox();
while crate::session::inbox::has_inbox_messages()
&& sessions.borrow().contains_key(&session_id)
{
let inbox_msg = match crate::session::inbox::try_pop_inbox_message() {
Some(msg) => msg,
None => break,
};
log_debug!(
"ACP monitor: processing inbox message from {:?} for {}",
inbox_msg.source,
session_id
);
let entry = sessions.borrow_mut().remove(&session_id);
let (mut chat_session, session_cwd) = match entry {
Some(s) => s,
None => {
crate::session::inbox::push_inbox_message(inbox_msg);
return false;
}
};
crate::mcp::set_session_working_directory(session_cwd.clone());
let config_for_role = config.borrow().get_merged_config_for_role(&role);
let op_rx = cancellations
.borrow_mut()
.entry(session_id.clone())
.or_default()
.new_operation();
if let Err(e) = chat_session.add_user_message(&inbox_msg.content) {
log_error!("ACP monitor: failed to add inbox message: {}", e);
sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
continue;
}
if let Err(e) =
prepare_for_api_call(&mut chat_session, &config_for_role, op_rx.clone())
.await
{
log_error!("ACP monitor: failed to prepare API call: {}", e);
sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
continue;
}
let (ws_tx, mut ws_rx) =
tokio::sync::mpsc::unbounded_channel::<ServerMessage>();
let ws_sink = WebSocketSink::new(ws_tx.clone());
crate::mcp::process::set_notification_sender(Some(session_id.clone()), ws_tx);
let sid_arc: std::sync::Arc<str> = session_id.as_str().into();
let conn_for_fwd = conn.borrow().as_ref().cloned();
let forward_task = tokio::task::spawn_local(async move {
while let Some(msg) = ws_rx.recv().await {
let update = match msg {
ServerMessage::Assistant(p) => {
Some(SessionUpdate::AgentMessageChunk(ContentChunk::new(
p.content.into(),
)))
}
ServerMessage::Thinking(p) => {
Some(SessionUpdate::AgentThoughtChunk(ContentChunk::new(
p.content.into(),
)))
}
ServerMessage::ToolUse(p) => {
let tc = ToolCall::new(p.tool_id.clone(), p.tool.clone())
.status(ToolCallStatus::InProgress)
.raw_input(p.params.clone());
Some(SessionUpdate::ToolCall(tc))
}
ServerMessage::ToolResult(p) => {
let status = if p.success {
ToolCallStatus::Completed
} else {
ToolCallStatus::Failed
};
let upd = ToolCallUpdate::new(
p.tool_id.clone(),
ToolCallUpdateFields::new().status(status).raw_output(
serde_json::from_str::<serde_json::Value>(&p.content)
.unwrap_or(serde_json::Value::String(p.content)),
),
);
Some(SessionUpdate::ToolCallUpdate(upd))
}
_ => None,
};
if let (Some(update), Some(c)) = (update, conn_for_fwd.as_ref()) {
let notif = SessionNotification::new(sid_arc.clone(), update);
if let Err(e) = c.session_notification(notif).await {
log_error!("ACP monitor: failed to send notification: {}", e);
}
}
}
});
let result = execute_api_call_and_process_response(
&mut chat_session,
&config_for_role,
&role,
op_rx,
OutputMode::WebSocket,
ws_sink,
)
.await;
crate::mcp::process::clear_notification_sender(Some(session_id.clone()));
let _ = forward_task.await;
if let Err(e) = result {
log_debug!("ACP monitor: error processing inbox message: {}", e);
}
sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
}
false })
.await;
if should_exit {
break;
}
let inbox_gone = crate::session::context::with_session_id(session_id.clone(), async {
crate::session::inbox::get_inbox_notify().is_none()
})
.await;
if inbox_gone {
log_debug!("ACP monitor: inbox cleared for {}, exiting", session_id);
break;
}
crate::session::context::with_session_id(session_id.clone(), async {
let inbox_notify = crate::session::inbox::get_inbox_notify();
tokio::select! {
_ = crate::mcp::core::next_schedule_sleep() => {}
_ = async {
if let Some(notify) = inbox_notify {
notify.notified().await;
} else {
std::future::pending::<()>().await;
}
} => {}
}
})
.await;
}
log_debug!("ACP: inbox monitor exited for session: {}", session_id);
});
}
#[async_trait::async_trait(?Send)]
impl agent_client_protocol::Agent for OctomindAgent {
async fn initialize(
&self,
args: InitializeRequest,
) -> agent_client_protocol::Result<InitializeResponse> {
log_debug!("ACP: initialize from {:?}", args.client_info);
let mut meta = agent_client_protocol::Meta::new();
meta.insert(
"octomind.dev".to_string(),
serde_json::json!({
"commands": true
}),
);
let response = InitializeResponse::new(ProtocolVersion::LATEST)
.agent_capabilities(
AgentCapabilities::default()
.load_session(true)
.mcp_capabilities(McpCapabilities::new().http(true))
.prompt_capabilities(
PromptCapabilities::default()
.image(true)
.embedded_context(true),
)
.meta(meta),
)
.agent_info(Implementation::new("octomind", env!("CARGO_PKG_VERSION")));
Ok(response)
}
async fn authenticate(
&self,
_args: AuthenticateRequest,
) -> agent_client_protocol::Result<AuthenticateResponse> {
Ok(AuthenticateResponse::default())
}
async fn new_session(
&self,
args: NewSessionRequest,
) -> agent_client_protocol::Result<NewSessionResponse> {
crate::mcp::set_session_working_directory(args.cwd.clone());
let session_cwd = args.cwd.clone();
let config_snapshot = build_config_with_injected_servers(
&self.config.borrow(),
&self.role,
&args.mcp_servers,
);
crate::mcp::initialize_mcp_for_role(&self.role, &config_snapshot)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
let session_args = GenericSessionArgs {
role: self.role.clone(),
mode: "websocket".into(),
..Default::default()
};
let (mut chat_session, config_for_role, session_role, _, _) =
setup_and_initialize_session(&session_args, &config_snapshot)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
setup_system_prompt_and_cache(&mut chat_session, &config_for_role, &session_role, false)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
let session_id = chat_session.session.info.name.clone();
log_debug!("ACP: new_session created: {}", session_id);
let role_for_pool = self.role.clone();
let session_id_for_restore = session_id.clone();
crate::session::context::with_session_id(session_id.clone(), async move {
crate::session::context::init_session_services(&role_for_pool);
crate::mcp::core::plan::core::restore_plan_for_session(&session_id_for_restore);
})
.await;
self.sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
self.cancellations
.borrow_mut()
.insert(session_id.clone(), SessionCancellation::new());
{
let entry = self.sessions.borrow_mut().remove(&session_id);
if let Some((mut session, cwd)) = entry {
let sid = session_id.clone();
crate::session::context::with_session_id(sid, async {
crate::mcp::core::skill_auto::load_env_skills(&mut session).await;
})
.await;
self.sessions
.borrow_mut()
.insert(session_id.clone(), (session, cwd));
}
}
let conn = self.conn.borrow().clone();
send_available_commands(conn, &session_id).await;
spawn_inbox_monitor(
session_id.clone(),
Rc::clone(&self.sessions),
Rc::clone(&self.cancellations),
RefCell::new(self.config.borrow().clone()),
self.role.clone(),
Rc::clone(&self.conn),
);
Ok(NewSessionResponse::new(session_id))
}
async fn prompt(&self, args: PromptRequest) -> agent_client_protocol::Result<PromptResponse> {
let session_id = args.session_id.to_string();
let mut text_parts = Vec::new();
let mut images = Vec::new();
let mut videos = Vec::new();
for block in &args.prompt {
match block {
ContentBlock::Text(t) => text_parts.push(t.text.as_str()),
ContentBlock::Image(img) => {
images.push(crate::session::image::ImageAttachment {
data: crate::session::image::ImageData::Base64(img.data.clone()),
media_type: img.mime_type.clone(),
source_type: crate::session::image::SourceType::Url, dimensions: None,
size_bytes: None,
});
}
ContentBlock::Resource(res) => {
if let EmbeddedResourceResource::BlobResourceContents(BlobResourceContents {
blob,
mime_type: Some(mime),
..
}) = &res.resource
{
if mime.starts_with("video/") {
videos.push(crate::session::video::VideoAttachment {
data: crate::session::video::VideoData::Base64(blob.clone()),
media_type: mime.clone(),
source_type: crate::session::video::SourceType::Url,
dimensions: None,
size_bytes: None,
duration_secs: None,
});
}
}
}
_ => {} }
}
let input: String = text_parts.join("\n");
if input.trim().is_empty() && images.is_empty() && videos.is_empty() {
return Ok(PromptResponse::new(StopReason::EndTurn));
}
crate::session::context::with_session_id(session_id.clone(), async {
if input.trim_start().starts_with('/') {
let (mut chat_session, session_cwd) =
match self.sessions.borrow_mut().remove(&session_id) {
Some(s) => s,
None => {
return Err(agent_client_protocol::Error::invalid_params()
.data(format!("session not found: {session_id}")));
}
};
crate::mcp::set_session_working_directory(session_cwd.clone());
let operation_rx = self
.cancellations
.borrow_mut()
.entry(session_id.clone())
.or_default()
.new_operation();
let mut config = self.config.borrow().clone();
let result = crate::session::chat::session::commands::process_command(
&mut chat_session,
input.trim(),
&mut config,
&self.role,
operation_rx,
)
.await;
*self.config.borrow_mut() = config;
self.sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
let text = match result {
Ok(
crate::session::chat::session::commands::CommandResult::HandledWithOutput(
output,
),
) => serde_json::to_string_pretty(&output.to_json())
.unwrap_or_else(|_| "Command executed.".to_string()),
Ok(crate::session::chat::session::commands::CommandResult::Handled) => {
"Command executed.".to_string()
}
Ok(crate::session::chat::session::commands::CommandResult::Exit) => {
"Session exit requested.".to_string()
}
Ok(
crate::session::chat::session::commands::CommandResult::TreatAsUserInput,
) => {
let available: Vec<&str> = crate::session::chat::COMMANDS.to_vec();
format!(
"The {} command is not supported by Octomind.\n\nAvailable commands: {}",
input.trim(),
available.join(", ")
)
}
Err(e) => format!("Command failed: {e}"),
};
let conn = self.conn.borrow().clone();
if let Some(conn) = conn {
let update = SessionUpdate::AgentMessageChunk(ContentChunk::new(text.into()));
let notif = SessionNotification::new(
std::sync::Arc::<str>::from(session_id.as_str()),
update,
);
if let Err(e) = conn.session_notification(notif).await {
log_error!("ACP: failed to send command result: {}", e);
}
}
return Ok(PromptResponse::new(StopReason::EndTurn));
}
let (mut chat_session, session_cwd) =
match self.sessions.borrow_mut().remove(&session_id) {
Some(s) => s,
None => {
return Err(agent_client_protocol::Error::invalid_params()
.data(format!("session not found: {session_id}")));
}
};
crate::mcp::set_session_working_directory(session_cwd.clone());
let config_for_role = self.config.borrow().get_merged_config_for_role(&self.role);
let current_dir = session_cwd.clone();
let mut cancellation = self
.cancellations
.borrow_mut()
.remove(&session_id)
.unwrap_or_default();
cancellation.reset();
let operation_rx = cancellation.new_operation();
self.cancellations
.borrow_mut()
.insert(session_id.clone(), cancellation);
{
crate::mcp::core::flush_due_to_inbox();
while let Some(inbox_msg) = crate::session::inbox::try_pop_inbox_message() {
log_debug!(
"ACP pre-user: processing inbox message from {:?}",
inbox_msg.source
);
if let Err(e) = chat_session.add_user_message(&inbox_msg.content) {
log_error!("ACP: failed to add inbox message: {}", e);
continue;
}
let op_rx = self
.cancellations
.borrow_mut()
.entry(session_id.to_string())
.or_default()
.new_operation();
if let Err(e) =
prepare_for_api_call(&mut chat_session, &config_for_role, op_rx.clone())
.await
{
log_error!("ACP: failed to prepare inbox API call: {}", e);
continue;
}
let (tx, _rx) = tokio::sync::mpsc::unbounded_channel();
let sink = WebSocketSink::new(tx);
if let Err(e) = execute_api_call_and_process_response(
&mut chat_session,
&config_for_role,
&self.role,
op_rx,
OutputMode::WebSocket,
sink,
)
.await
{
log_debug!("ACP: error processing pre-user inbox message: {}", e);
}
}
}
let first_message_processed = !chat_session.session.messages.is_empty();
let (processed_input, layers_modified_session, layer_cancelled) =
process_layers_if_enabled(
&input,
&mut chat_session,
&config_for_role,
&self.role,
first_message_processed,
operation_rx.clone(),
)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
if layer_cancelled {
self.sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd.clone()));
return Ok(PromptResponse::new(StopReason::Cancelled));
}
if let Some(first_image) = images.into_iter().next() {
chat_session.pending_image = Some(first_image);
}
if let Some(first_video) = videos.into_iter().next() {
chat_session.pending_video = Some(first_video);
}
if !layers_modified_session {
let final_input =
crate::session::chat::session::utils::append_constraints_if_exists(
&processed_input,
&config_for_role.custom_constraints_file_name,
¤t_dir,
);
if let Err(e) = chat_session.add_user_message(&final_input) {
self.sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
return Err(agent_client_protocol::Error::internal_error().data(e.to_string()));
}
}
if let Err(e) =
prepare_for_api_call(&mut chat_session, &config_for_role, operation_rx.clone())
.await
{
self.sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
return Err(agent_client_protocol::Error::internal_error().data(e.to_string()));
}
let (ws_tx, mut ws_rx) = tokio::sync::mpsc::unbounded_channel::<ServerMessage>();
let ws_sink = WebSocketSink::new(ws_tx.clone());
crate::mcp::process::set_notification_sender(Some(session_id.clone()), ws_tx);
let session_id_for_task: std::sync::Arc<str> = session_id.as_str().into();
let conn_for_task = self.conn.borrow().as_ref().cloned();
let forward_task = tokio::task::spawn_local(async move {
while let Some(msg) = ws_rx.recv().await {
let update = match msg {
ServerMessage::Assistant(p) => Some(SessionUpdate::AgentMessageChunk(
ContentChunk::new(p.content.into()),
)),
ServerMessage::Thinking(p) => Some(SessionUpdate::AgentThoughtChunk(
ContentChunk::new(p.content.into()),
)),
ServerMessage::ToolUse(p) => {
let tool_call = ToolCall::new(p.tool_id.clone(), p.tool.clone())
.status(ToolCallStatus::InProgress)
.raw_input(p.params.clone());
Some(SessionUpdate::ToolCall(tool_call))
}
ServerMessage::ToolResult(p) => {
let status = if p.success {
ToolCallStatus::Completed
} else {
ToolCallStatus::Failed
};
let update = ToolCallUpdate::new(
p.tool_id.clone(),
ToolCallUpdateFields::new().status(status).raw_output(
serde_json::from_str::<serde_json::Value>(&p.content)
.unwrap_or(serde_json::Value::String(p.content)),
),
);
Some(SessionUpdate::ToolCallUpdate(update))
}
_ => None,
};
if let (Some(update), Some(conn)) = (update, conn_for_task.as_ref()) {
let notif = SessionNotification::new(session_id_for_task.clone(), update);
if let Err(e) = conn.session_notification(notif).await {
log_error!("ACP: failed to send session notification: {}", e);
}
}
}
});
let api_result = execute_api_call_and_process_response(
&mut chat_session,
&config_for_role,
&self.role,
operation_rx.clone(),
OutputMode::WebSocket,
ws_sink,
)
.await;
crate::mcp::process::clear_notification_sender(Some(session_id.clone()));
let _ = forward_task.await;
self.sessions
.borrow_mut()
.insert(session_id.to_string(), (chat_session, session_cwd));
if let Some(notify) = crate::session::inbox::get_inbox_notify() {
notify.notify_one();
}
match api_result {
Ok(_) => {
if *operation_rx.borrow() {
Ok(PromptResponse::new(StopReason::Cancelled))
} else {
Ok(PromptResponse::new(StopReason::EndTurn))
}
}
Err(e) => {
log_error!("ACP: prompt API call failed: {}", e);
Err(agent_client_protocol::Error::internal_error().data(e.to_string()))
}
}
})
.await
}
async fn cancel(&self, args: CancelNotification) -> agent_client_protocol::Result<()> {
let session_id = args.session_id.to_string();
log_debug!("ACP: cancel requested for session: {}", session_id);
if let Some(cancellation) = self.cancellations.borrow().get(&session_id) {
cancellation.shutdown();
}
Ok(())
}
async fn load_session(
&self,
args: LoadSessionRequest,
) -> agent_client_protocol::Result<LoadSessionResponse> {
let session_id = args.session_id.to_string();
log_debug!("ACP: load_session requested: {}", session_id);
crate::mcp::set_session_working_directory(args.cwd.clone());
let session_cwd = args.cwd.clone();
let config_snapshot = build_config_with_injected_servers(
&self.config.borrow(),
&self.role,
&args.mcp_servers,
);
crate::mcp::initialize_mcp_for_role(&self.role, &config_snapshot)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
let session_args = GenericSessionArgs {
resume: Some(session_id.clone()),
role: self.role.clone(),
mode: "websocket".into(),
..Default::default()
};
let (mut chat_session, config_for_role, session_role, _, _) =
setup_and_initialize_session(&session_args, &config_snapshot)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
setup_system_prompt_and_cache(&mut chat_session, &config_for_role, &session_role, false)
.await
.map_err(|e| agent_client_protocol::Error::internal_error().data(e.to_string()))?;
let actual_session_id = chat_session.session.info.name.clone();
let role_for_pool = self.role.clone();
let session_id_for_restore = actual_session_id.clone();
crate::session::context::with_session_id(actual_session_id.clone(), async move {
crate::session::context::init_session_services(&role_for_pool);
crate::mcp::core::plan::core::restore_plan_for_session(&session_id_for_restore);
})
.await;
self.sessions
.borrow_mut()
.insert(session_id.clone(), (chat_session, session_cwd));
self.cancellations
.borrow_mut()
.insert(session_id.clone(), SessionCancellation::new());
{
let entry = self.sessions.borrow_mut().remove(&session_id);
if let Some((mut session, cwd)) = entry {
let sid = actual_session_id.clone();
crate::session::context::with_session_id(sid, async {
crate::mcp::core::skill_auto::load_env_skills(&mut session).await;
})
.await;
self.sessions
.borrow_mut()
.insert(session_id.clone(), (session, cwd));
}
}
let conn = self.conn.borrow().clone();
send_available_commands(conn, &session_id).await;
spawn_inbox_monitor(
session_id.clone(),
Rc::clone(&self.sessions),
Rc::clone(&self.cancellations),
RefCell::new(self.config.borrow().clone()),
self.role.clone(),
Rc::clone(&self.conn),
);
Ok(LoadSessionResponse::new())
}
async fn ext_method(&self, args: ExtRequest) -> agent_client_protocol::Result<ExtResponse> {
super::commands::handle_ext_method(
args,
&self.sessions,
&self.config,
&self.role,
&self.cancellations,
)
.await
}
}