use super::{ContextUsage, Session, SessionConfig, SessionState};
use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
use crate::hitl::ConfirmationPolicy;
use crate::llm::{self, LlmClient, LlmConfig, Message};
use crate::mcp::McpManager;
use crate::memory::AgentMemory;
use crate::permissions::PermissionPolicy;
use crate::prompts::SystemPromptSlots;
use crate::skills::SkillRegistry;
use crate::store::{FileSessionStore, LlmConfigData, SessionData, SessionStore};
use crate::text::truncate_utf8;
use crate::tools::ToolExecutor;
use crate::PlanningMode;
use a3s_memory::MemoryStore;
use anyhow::{Context, Result};
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
#[derive(Clone)]
pub struct SessionManager {
pub(crate) sessions: Arc<RwLock<HashMap<String, Arc<RwLock<Session>>>>>,
pub(crate) llm_client: Arc<RwLock<Option<Arc<dyn LlmClient>>>>,
pub(crate) tool_executor: Arc<ToolExecutor>,
pub(crate) stores: Arc<RwLock<HashMap<crate::config::StorageBackend, Arc<dyn SessionStore>>>>,
pub(crate) session_storage_types: Arc<RwLock<HashMap<String, crate::config::StorageBackend>>>,
pub(crate) llm_configs: Arc<RwLock<HashMap<String, LlmConfigData>>>,
pub(crate) ongoing_operations:
Arc<RwLock<HashMap<String, tokio_util::sync::CancellationToken>>>,
pub(crate) skill_registry: Arc<RwLock<Option<Arc<SkillRegistry>>>>,
pub(crate) session_skill_registries: Arc<RwLock<HashMap<String, Arc<SkillRegistry>>>>,
pub(crate) memory_store: Arc<RwLock<Option<Arc<dyn MemoryStore>>>>,
pub(crate) mcp_manager: Arc<RwLock<Option<Arc<McpManager>>>>,
}
impl SessionManager {
fn compact_json_value(value: &serde_json::Value) -> String {
let raw = match value {
serde_json::Value::Null => String::new(),
serde_json::Value::String(s) => s.clone(),
_ => serde_json::to_string(value).unwrap_or_default(),
};
let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
if compact.len() > 180 {
format!("{}...", truncate_utf8(&compact, 180))
} else {
compact
}
}
pub fn new(llm_client: Option<Arc<dyn LlmClient>>, tool_executor: Arc<ToolExecutor>) -> Self {
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
llm_client: Arc::new(RwLock::new(llm_client)),
tool_executor,
stores: Arc::new(RwLock::new(HashMap::new())),
session_storage_types: Arc::new(RwLock::new(HashMap::new())),
llm_configs: Arc::new(RwLock::new(HashMap::new())),
ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
skill_registry: Arc::new(RwLock::new(None)),
session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
memory_store: Arc::new(RwLock::new(None)),
mcp_manager: Arc::new(RwLock::new(None)),
}
}
pub async fn with_persistence<P: AsRef<std::path::Path>>(
llm_client: Option<Arc<dyn LlmClient>>,
tool_executor: Arc<ToolExecutor>,
sessions_dir: P,
) -> Result<Self> {
let store = FileSessionStore::new(sessions_dir).await?;
let mut stores = HashMap::new();
stores.insert(
crate::config::StorageBackend::File,
Arc::new(store) as Arc<dyn SessionStore>,
);
let manager = Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
llm_client: Arc::new(RwLock::new(llm_client)),
tool_executor,
stores: Arc::new(RwLock::new(stores)),
session_storage_types: Arc::new(RwLock::new(HashMap::new())),
llm_configs: Arc::new(RwLock::new(HashMap::new())),
ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
skill_registry: Arc::new(RwLock::new(None)),
session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
memory_store: Arc::new(RwLock::new(None)),
mcp_manager: Arc::new(RwLock::new(None)),
};
Ok(manager)
}
pub fn with_store(
llm_client: Option<Arc<dyn LlmClient>>,
tool_executor: Arc<ToolExecutor>,
store: Arc<dyn SessionStore>,
backend: crate::config::StorageBackend,
) -> Self {
let mut stores = HashMap::new();
stores.insert(backend, store);
Self {
sessions: Arc::new(RwLock::new(HashMap::new())),
llm_client: Arc::new(RwLock::new(llm_client)),
tool_executor,
stores: Arc::new(RwLock::new(stores)),
session_storage_types: Arc::new(RwLock::new(HashMap::new())),
llm_configs: Arc::new(RwLock::new(HashMap::new())),
ongoing_operations: Arc::new(RwLock::new(HashMap::new())),
skill_registry: Arc::new(RwLock::new(None)),
session_skill_registries: Arc::new(RwLock::new(HashMap::new())),
memory_store: Arc::new(RwLock::new(None)),
mcp_manager: Arc::new(RwLock::new(None)),
}
}
pub async fn set_default_llm(&self, client: Option<Arc<dyn LlmClient>>) {
*self.llm_client.write().await = client;
}
pub async fn set_skill_registry(
&self,
registry: Arc<SkillRegistry>,
skills_dir: std::path::PathBuf,
) {
let manage_tool = crate::skills::ManageSkillTool::new(registry.clone(), skills_dir);
self.tool_executor
.register_dynamic_tool(Arc::new(manage_tool));
*self.skill_registry.write().await = Some(registry);
}
pub async fn skill_registry(&self) -> Option<Arc<SkillRegistry>> {
self.skill_registry.read().await.clone()
}
pub async fn set_session_skill_registry(
&self,
session_id: impl Into<String>,
registry: Arc<SkillRegistry>,
) {
self.session_skill_registries
.write()
.await
.insert(session_id.into(), registry);
}
pub async fn session_skill_registry(&self, session_id: &str) -> Option<Arc<SkillRegistry>> {
self.session_skill_registries
.read()
.await
.get(session_id)
.cloned()
}
pub async fn set_memory_store(&self, store: Arc<dyn MemoryStore>) {
*self.memory_store.write().await = Some(store);
}
pub async fn memory_store(&self) -> Option<Arc<dyn MemoryStore>> {
self.memory_store.read().await.clone()
}
pub async fn set_mcp_manager(&self, manager: Arc<McpManager>) {
let all_tools = manager.get_all_tools().await;
let mut by_server: HashMap<String, Vec<crate::mcp::McpTool>> = HashMap::new();
for (server, tool) in all_tools {
by_server.entry(server).or_default().push(tool);
}
for (server_name, tools) in by_server {
for tool in
crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&manager))
{
self.tool_executor.register_dynamic_tool(tool);
}
}
*self.mcp_manager.write().await = Some(manager);
}
pub async fn add_mcp_server(&self, config: crate::mcp::McpServerConfig) -> Result<()> {
let manager = {
let guard = self.mcp_manager.read().await;
match guard.clone() {
Some(m) => m,
None => {
drop(guard);
let m = Arc::new(McpManager::new());
*self.mcp_manager.write().await = Some(Arc::clone(&m));
m
}
}
};
let name = config.name.clone();
manager.register_server(config).await;
manager.connect(&name).await?;
let tools = manager.get_server_tools(&name).await;
for tool in crate::mcp::tools::create_mcp_tools(&name, tools, Arc::clone(&manager)) {
self.tool_executor.register_dynamic_tool(tool);
}
Ok(())
}
pub async fn remove_mcp_server(&self, name: &str) -> Result<()> {
let guard = self.mcp_manager.read().await;
if let Some(ref manager) = *guard {
manager.disconnect(name).await?;
}
self.tool_executor
.unregister_tools_by_prefix(&format!("mcp__{name}__"));
Ok(())
}
pub async fn mcp_status(
&self,
) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
let guard = self.mcp_manager.read().await;
match guard.as_ref() {
Some(m) => {
let m = Arc::clone(m);
drop(guard);
m.get_status().await
}
None => std::collections::HashMap::new(),
}
}
pub async fn restore_session_by_id(&self, session_id: &str) -> Result<()> {
{
let sessions = self.sessions.read().await;
if sessions.contains_key(session_id) {
return Ok(());
}
}
let stores = self.stores.read().await;
for (backend, store) in stores.iter() {
match store.load(session_id).await {
Ok(Some(data)) => {
{
let mut storage_types = self.session_storage_types.write().await;
storage_types.insert(data.id.clone(), backend.clone());
}
self.restore_session(data).await?;
return Ok(());
}
Ok(None) => continue,
Err(e) => {
tracing::warn!(
"Failed to load session {} from {:?}: {}",
session_id,
backend,
e
);
continue;
}
}
}
Err(anyhow::anyhow!(
"Session {} not found in any store",
session_id
))
}
pub async fn load_all_sessions(&mut self) -> Result<usize> {
let stores = self.stores.read().await;
let mut loaded = 0;
for (backend, store) in stores.iter() {
let session_ids = match store.list().await {
Ok(ids) => ids,
Err(e) => {
tracing::warn!("Failed to list sessions from {:?} store: {}", backend, e);
continue;
}
};
for id in session_ids {
match store.load(&id).await {
Ok(Some(data)) => {
{
let mut storage_types = self.session_storage_types.write().await;
storage_types.insert(data.id.clone(), backend.clone());
}
if let Err(e) = self.restore_session(data).await {
tracing::warn!("Failed to restore session {}: {}", id, e);
} else {
loaded += 1;
}
}
Ok(None) => {
tracing::warn!("Session {} not found in store", id);
}
Err(e) => {
tracing::warn!("Failed to load session {}: {}", id, e);
}
}
}
}
tracing::info!("Loaded {} sessions from store", loaded);
Ok(loaded)
}
async fn restore_session(&self, data: SessionData) -> Result<()> {
let tools = self.tool_executor.definitions();
let mut session = Session::new(data.id.clone(), data.config.clone(), tools).await?;
session.restore_from_data(&data);
if let Some(llm_config) = &data.llm_config {
let mut configs = self.llm_configs.write().await;
configs.insert(data.id.clone(), llm_config.clone());
}
let mut sessions = self.sessions.write().await;
sessions.insert(data.id.clone(), Arc::new(RwLock::new(session)));
tracing::info!("Restored session: {}", data.id);
Ok(())
}
async fn save_session(&self, session_id: &str) -> Result<()> {
let storage_type = {
let storage_types = self.session_storage_types.read().await;
storage_types.get(session_id).cloned()
};
let Some(storage_type) = storage_type else {
return Ok(());
};
if storage_type == crate::config::StorageBackend::Memory {
return Ok(());
}
let stores = self.stores.read().await;
let Some(store) = stores.get(&storage_type) else {
tracing::warn!("No store available for storage type: {:?}", storage_type);
return Ok(());
};
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
let llm_config = {
let configs = self.llm_configs.read().await;
configs.get(session_id).cloned()
};
let data = session.to_session_data(llm_config);
store.save(&data).await?;
tracing::debug!("Saved session: {}", session_id);
Ok(())
}
async fn persist_or_warn(&self, session_id: &str, operation: &str) {
if let Err(e) = self.save_session(session_id).await {
tracing::warn!(
"Failed to persist session {} after {}: {}",
session_id,
operation,
e
);
if let Ok(session_lock) = self.get_session(session_id).await {
let session = session_lock.read().await;
let _ = session.event_tx().send(AgentEvent::PersistenceFailed {
session_id: session_id.to_string(),
operation: operation.to_string(),
error: e.to_string(),
});
}
}
}
fn persist_in_background(&self, session_id: &str, operation: &str) {
let mgr = self.clone();
let sid = session_id.to_string();
let op = operation.to_string();
tokio::spawn(async move {
mgr.persist_or_warn(&sid, &op).await;
});
}
pub async fn create_session(&self, id: String, config: SessionConfig) -> Result<String> {
tracing::info!(name: "a3s.session.create", session_id = %id, "Creating session");
{
let mut storage_types = self.session_storage_types.write().await;
storage_types.insert(id.clone(), config.storage_type.clone());
}
let tools = self.tool_executor.definitions();
let mut session = Session::new(id.clone(), config, tools).await?;
session.start_queue().await?;
if session.config.max_context_length > 0 {
session.context_usage.max_tokens = session.config.max_context_length as usize;
}
{
let mut sessions = self.sessions.write().await;
sessions.insert(id.clone(), Arc::new(RwLock::new(session)));
}
self.persist_in_background(&id, "create");
tracing::info!("Created session: {}", id);
Ok(id)
}
pub async fn destroy_session(&self, id: &str) -> Result<()> {
tracing::info!(name: "a3s.session.destroy", session_id = %id, "Destroying session");
let storage_type = {
let storage_types = self.session_storage_types.read().await;
storage_types.get(id).cloned()
};
{
let mut sessions = self.sessions.write().await;
sessions.remove(id);
}
{
let mut configs = self.llm_configs.write().await;
configs.remove(id);
}
{
let mut registries = self.session_skill_registries.write().await;
registries.remove(id);
}
{
let mut storage_types = self.session_storage_types.write().await;
storage_types.remove(id);
}
if let Some(storage_type) = storage_type {
if storage_type != crate::config::StorageBackend::Memory {
let stores = self.stores.read().await;
if let Some(store) = stores.get(&storage_type) {
if let Err(e) = store.delete(id).await {
tracing::warn!("Failed to delete session {} from store: {}", id, e);
}
}
}
}
tracing::info!("Destroyed session: {}", id);
Ok(())
}
pub async fn get_session(&self, id: &str) -> Result<Arc<RwLock<Session>>> {
let sessions = self.sessions.read().await;
sessions
.get(id)
.cloned()
.context(format!("Session not found: {}", id))
}
pub async fn create_child_session(
&self,
parent_id: &str,
child_id: String,
mut config: SessionConfig,
) -> Result<String> {
let parent_lock = self.get_session(parent_id).await?;
let parent_llm_client = {
let parent = parent_lock.read().await;
if config.confirmation_policy.is_none() {
let parent_policy = parent.confirmation_manager.policy().await;
config.confirmation_policy = Some(parent_policy);
}
parent.llm_client.clone()
};
config.parent_id = Some(parent_id.to_string());
let tools = self.tool_executor.definitions();
let mut session = Session::new(child_id.clone(), config, tools).await?;
if session.llm_client.is_none() {
let default_llm = self.llm_client.read().await.clone();
session.llm_client = parent_llm_client.or(default_llm);
}
session.start_queue().await?;
if session.config.max_context_length > 0 {
session.context_usage.max_tokens = session.config.max_context_length as usize;
}
{
let mut sessions = self.sessions.write().await;
sessions.insert(child_id.clone(), Arc::new(RwLock::new(session)));
}
self.persist_in_background(&child_id, "create_child");
tracing::info!(
"Created child session: {} (parent: {})",
child_id,
parent_id
);
Ok(child_id)
}
pub async fn get_child_sessions(&self, parent_id: &str) -> Vec<String> {
let sessions = self.sessions.read().await;
let mut children = Vec::new();
for (id, session_lock) in sessions.iter() {
let session = session_lock.read().await;
if session.parent_id.as_deref() == Some(parent_id) {
children.push(id.clone());
}
}
children
}
pub async fn is_child_session(&self, session_id: &str) -> Result<bool> {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
Ok(session.is_child_session())
}
pub async fn generate(&self, session_id: &str, prompt: &str) -> Result<AgentResult> {
let session_lock = self.get_session(session_id).await?;
{
let session = session_lock.read().await;
if session.state == SessionState::Paused {
anyhow::bail!(
"Session {} is paused. Call Resume before generating.",
session_id
);
}
}
let (
history,
system,
tools,
session_llm_client,
permission_checker,
confirmation_manager,
context_providers,
session_workspace,
tool_metrics,
hook_engine,
planning_mode,
goal_tracking,
) = {
let session = session_lock.read().await;
(
session.messages.clone(),
session.system().map(String::from),
session.tools.clone(),
session.llm_client.clone(),
session.permission_checker.clone(),
session.confirmation_manager.clone(),
session.context_providers.clone(),
session.config.workspace.clone(),
session.tool_metrics.clone(),
session.config.hook_engine.clone(),
session.config.planning_mode,
session.config.goal_tracking,
)
};
let llm_client = if let Some(client) = session_llm_client {
client
} else if let Some(client) = self.llm_client.read().await.clone() {
client
} else {
anyhow::bail!(
"LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
session_id
);
};
let tool_context = if session_workspace.is_empty() {
crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
.with_session_id(session_id)
} else {
crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
.with_session_id(session_id)
};
let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
tool_context.with_command_env(command_env)
} else {
tool_context
};
let skill_registry = match self.session_skill_registry(session_id).await {
Some(registry) => Some(registry),
None => self.skill_registry.read().await.clone(),
};
let system = if let Some(ref registry) = skill_registry {
let skill_prompt = registry.to_system_prompt();
if skill_prompt.is_empty() {
system
} else {
match system {
Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
None => Some(skill_prompt),
}
}
} else {
system
};
let effective_prompt = if let Some(ref registry) = skill_registry {
let skill_content = registry.match_skills(prompt);
if skill_content.is_empty() {
prompt.to_string()
} else {
format!("{}\n\n---\n\n{}", skill_content, prompt)
}
} else {
prompt.to_string()
};
let memory = self
.memory_store
.read()
.await
.as_ref()
.map(|store| Arc::new(AgentMemory::new(store.clone())));
let config = AgentConfig {
prompt_slots: match system {
Some(s) => SystemPromptSlots::from_legacy(s),
None => SystemPromptSlots::default(),
},
tools,
max_tool_rounds: 50,
permission_checker: Some(permission_checker),
confirmation_manager: Some(confirmation_manager),
context_providers,
planning_mode,
goal_tracking,
hook_engine,
skill_registry,
memory,
..AgentConfig::default()
};
let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
.with_tool_metrics(tool_metrics);
let result = agent
.execute_with_session(&history, &effective_prompt, Some(session_id), None, None)
.await?;
{
let mut session = session_lock.write().await;
session.messages = result.messages.clone();
session.update_usage(&result.usage);
}
self.persist_in_background(session_id, "generate");
if let Err(e) = self.maybe_auto_compact(session_id).await {
tracing::warn!("Auto-compact failed for session {}: {}", session_id, e);
}
Ok(result)
}
pub async fn generate_streaming(
&self,
session_id: &str,
prompt: &str,
) -> Result<(
mpsc::Receiver<AgentEvent>,
tokio::task::JoinHandle<Result<AgentResult>>,
tokio_util::sync::CancellationToken,
)> {
let session_lock = self.get_session(session_id).await?;
{
let session = session_lock.read().await;
if session.state == SessionState::Paused {
anyhow::bail!(
"Session {} is paused. Call Resume before generating.",
session_id
);
}
}
let (
history,
system,
tools,
session_llm_client,
permission_checker,
confirmation_manager,
context_providers,
session_workspace,
tool_metrics,
hook_engine,
planning_mode,
goal_tracking,
) = {
let session = session_lock.read().await;
(
session.messages.clone(),
session.system().map(String::from),
session.tools.clone(),
session.llm_client.clone(),
session.permission_checker.clone(),
session.confirmation_manager.clone(),
session.context_providers.clone(),
session.config.workspace.clone(),
session.tool_metrics.clone(),
session.config.hook_engine.clone(),
session.config.planning_mode,
session.config.goal_tracking,
)
};
let llm_client = if let Some(client) = session_llm_client {
client
} else if let Some(client) = self.llm_client.read().await.clone() {
client
} else {
anyhow::bail!(
"LLM client not configured for session {}. Please call Configure RPC with model configuration first.",
session_id
);
};
let tool_context = if session_workspace.is_empty() {
crate::tools::ToolContext::new(self.tool_executor.workspace().clone())
.with_session_id(session_id)
} else {
crate::tools::ToolContext::new(std::path::PathBuf::from(&session_workspace))
.with_session_id(session_id)
};
let tool_context = if let Some(command_env) = self.tool_executor.command_env() {
tool_context.with_command_env(command_env)
} else {
tool_context
};
let skill_registry = match self.session_skill_registry(session_id).await {
Some(registry) => Some(registry),
None => self.skill_registry.read().await.clone(),
};
let system = if let Some(ref registry) = skill_registry {
let skill_prompt = registry.to_system_prompt();
if skill_prompt.is_empty() {
system
} else {
match system {
Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
None => Some(skill_prompt),
}
}
} else {
system
};
let effective_prompt = if let Some(ref registry) = skill_registry {
let skill_content = registry.match_skills(prompt);
if skill_content.is_empty() {
prompt.to_string()
} else {
format!("{}\n\n---\n\n{}", skill_content, prompt)
}
} else {
prompt.to_string()
};
let memory = self
.memory_store
.read()
.await
.as_ref()
.map(|store| Arc::new(AgentMemory::new(store.clone())));
let config = AgentConfig {
prompt_slots: match system {
Some(s) => SystemPromptSlots::from_legacy(s),
None => SystemPromptSlots::default(),
},
tools,
max_tool_rounds: 50,
permission_checker: Some(permission_checker),
confirmation_manager: Some(confirmation_manager),
context_providers,
planning_mode,
goal_tracking,
hook_engine,
skill_registry,
memory,
..AgentConfig::default()
};
let agent = AgentLoop::new(llm_client, self.tool_executor.clone(), tool_context, config)
.with_tool_metrics(tool_metrics);
let (rx, handle, cancel_token) =
agent.execute_streaming(&history, &effective_prompt).await?;
let cancel_token_clone = cancel_token.clone();
{
let mut ops = self.ongoing_operations.write().await;
ops.insert(session_id.to_string(), cancel_token);
}
let session_lock_clone = session_lock.clone();
let original_handle = handle;
let stores = self.stores.clone();
let session_storage_types = self.session_storage_types.clone();
let llm_configs = self.llm_configs.clone();
let session_id_owned = session_id.to_string();
let ongoing_operations = self.ongoing_operations.clone();
let session_manager = self.clone();
let wrapped_handle = tokio::spawn(async move {
let result = original_handle.await??;
{
let mut ops = ongoing_operations.write().await;
ops.remove(&session_id_owned);
}
{
let mut session = session_lock_clone.write().await;
session.messages = result.messages.clone();
session.update_usage(&result.usage);
}
let storage_type = {
let storage_types = session_storage_types.read().await;
storage_types.get(&session_id_owned).cloned()
};
if let Some(storage_type) = storage_type {
if storage_type != crate::config::StorageBackend::Memory {
let stores_guard = stores.read().await;
if let Some(store) = stores_guard.get(&storage_type) {
let session = session_lock_clone.read().await;
let llm_config = {
let configs = llm_configs.read().await;
configs.get(&session_id_owned).cloned()
};
let data = session.to_session_data(llm_config);
if let Err(e) = store.save(&data).await {
tracing::warn!(
"Failed to persist session {} after streaming: {}",
session_id_owned,
e
);
}
}
}
}
if let Err(e) = session_manager.maybe_auto_compact(&session_id_owned).await {
tracing::warn!(
"Auto-compact failed for session {}: {}",
session_id_owned,
e
);
}
Ok(result)
});
Ok((rx, wrapped_handle, cancel_token_clone))
}
pub async fn context_usage(&self, session_id: &str) -> Result<ContextUsage> {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
Ok(session.context_usage.clone())
}
pub async fn history(&self, session_id: &str) -> Result<Vec<Message>> {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
Ok(session.messages.clone())
}
pub async fn clear(&self, session_id: &str) -> Result<()> {
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.clear();
}
self.persist_in_background(session_id, "clear");
Ok(())
}
pub async fn compact(&self, session_id: &str) -> Result<()> {
tracing::info!(name: "a3s.session.compact", session_id = %session_id, "Compacting session context");
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
let llm_client = if let Some(client) = &session.llm_client {
client.clone()
} else if let Some(client) = self.llm_client.read().await.clone() {
client
} else {
tracing::warn!("No LLM client configured for compaction, using simple truncation");
let keep_messages = 20;
if session.messages.len() > keep_messages {
let len = session.messages.len();
session.messages = session.messages.split_off(len - keep_messages);
}
drop(session);
self.persist_in_background(session_id, "compact");
return Ok(());
};
session.compact(&llm_client).await?;
}
self.persist_in_background(session_id, "compact");
Ok(())
}
pub async fn maybe_auto_compact(&self, session_id: &str) -> Result<bool> {
let (should_compact, percent_before, messages_before) = {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
if !session.config.auto_compact {
return Ok(false);
}
let threshold = session.config.auto_compact_threshold;
let percent = session.context_usage.percent;
let msg_count = session.messages.len();
tracing::debug!(
"Auto-compact check for session {}: percent={:.2}%, threshold={:.2}%, messages={}",
session_id,
percent * 100.0,
threshold * 100.0,
msg_count,
);
(percent >= threshold, percent, msg_count)
};
if !should_compact {
return Ok(false);
}
tracing::info!(
name: "a3s.session.auto_compact",
session_id = %session_id,
percent_before = %format!("{:.1}%", percent_before * 100.0),
messages_before = %messages_before,
"Auto-compacting session due to high context usage"
);
self.compact(session_id).await?;
let messages_after = {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
session.messages.len()
};
let event = AgentEvent::ContextCompacted {
session_id: session_id.to_string(),
before_messages: messages_before,
after_messages: messages_after,
percent_before,
};
if let Ok(session_lock) = self.get_session(session_id).await {
let session = session_lock.read().await;
let _ = session.event_tx.send(event);
}
tracing::info!(
name: "a3s.session.auto_compact.done",
session_id = %session_id,
messages_before = %messages_before,
messages_after = %messages_after,
"Auto-compaction complete"
);
Ok(true)
}
pub async fn get_llm_for_session(
&self,
session_id: &str,
) -> Result<Option<Arc<dyn LlmClient>>> {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
if let Some(client) = &session.llm_client {
return Ok(Some(client.clone()));
}
Ok(self.llm_client.read().await.clone())
}
pub async fn btw_with_context(
&self,
session_id: &str,
question: &str,
runtime_context: Option<&str>,
) -> Result<crate::agent_api::BtwResult> {
let question = question.trim();
if question.is_empty() {
anyhow::bail!("btw: question cannot be empty");
}
let session_lock = self.get_session(session_id).await?;
let (history_snapshot, session_runtime) = {
let session = session_lock.read().await;
let mut sections = Vec::new();
let pending_confirmations = session.confirmation_manager.pending_confirmations().await;
if !pending_confirmations.is_empty() {
let mut lines = pending_confirmations
.into_iter()
.map(|pending| {
let arg_summary = Self::compact_json_value(&pending.args);
if arg_summary.is_empty() {
format!(
"- {} [{}] remaining={}ms",
pending.tool_name, pending.tool_id, pending.remaining_ms
)
} else {
format!(
"- {} [{}] remaining={}ms {}",
pending.tool_name,
pending.tool_id,
pending.remaining_ms,
arg_summary
)
}
})
.collect::<Vec<_>>();
lines.sort();
sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
}
let stats = session.command_queue.stats().await;
if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
let mut lines = vec![format!(
"active={}, pending={}, external_pending={}",
stats.total_active, stats.total_pending, stats.external_pending
)];
let mut lanes = stats
.lanes
.into_values()
.filter(|lane| lane.active > 0 || lane.pending > 0)
.map(|lane| {
format!(
"- {:?}: active={}, pending={}, handler={:?}",
lane.lane, lane.active, lane.pending, lane.handler_mode
)
})
.collect::<Vec<_>>();
lanes.sort();
lines.extend(lanes);
sections.push(format!("[session queue]\n{}", lines.join("\n")));
}
let external_tasks = session.command_queue.pending_external_tasks().await;
if !external_tasks.is_empty() {
let mut lines = external_tasks
.into_iter()
.take(6)
.map(|task| {
let payload_summary = Self::compact_json_value(&task.payload);
if payload_summary.is_empty() {
format!(
"- {} {:?} remaining={}ms",
task.command_type,
task.lane,
task.remaining_ms()
)
} else {
format!(
"- {} {:?} remaining={}ms {}",
task.command_type,
task.lane,
task.remaining_ms(),
payload_summary
)
}
})
.collect::<Vec<_>>();
lines.sort();
sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
}
let active_tasks = session
.tasks
.iter()
.filter(|task| task.status.is_active())
.take(6)
.map(|task| match &task.tool {
Some(tool) if !tool.is_empty() => {
format!("- [{}] {} ({})", task.status, task.content, tool)
}
_ => format!("- [{}] {}", task.status, task.content),
})
.collect::<Vec<_>>();
if !active_tasks.is_empty() {
sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
}
sections.push(format!(
"[session state]\n{}",
match session.state {
SessionState::Active => "active",
SessionState::Paused => "paused",
SessionState::Completed => "completed",
SessionState::Error => "error",
SessionState::Unknown => "unknown",
}
));
(session.messages.clone(), sections.join("\n\n"))
};
let llm_client = self
.get_llm_for_session(session_id)
.await?
.context("btw failed: no model configured for session")?;
let mut messages = history_snapshot;
let mut injected_sections = Vec::new();
if !session_runtime.is_empty() {
injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
}
if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
injected_sections.push(format!("[host runtime context]\n{}", extra));
}
if !injected_sections.is_empty() {
let injected_context = format!(
"Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
injected_sections.join("\n\n")
);
messages.push(Message::user(&injected_context));
}
messages.push(Message::user(question));
let response = llm_client
.complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
.await
.map_err(|e| anyhow::anyhow!("btw: ephemeral LLM call failed: {e}"))?;
Ok(crate::agent_api::BtwResult {
question: question.to_string(),
answer: response.text(),
usage: response.usage,
})
}
pub async fn configure(
&self,
session_id: &str,
thinking: Option<bool>,
budget: Option<usize>,
model_config: Option<LlmConfig>,
) -> Result<()> {
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
if let Some(t) = thinking {
session.thinking_enabled = t;
}
if let Some(b) = budget {
session.thinking_budget = Some(b);
}
if let Some(ref config) = model_config {
tracing::info!(
"Configuring session {} with LLM: provider={}, model={}",
session_id,
config.provider,
config.model
);
session.model_name = Some(config.model.clone());
session.llm_client = Some(llm::create_client_with_config(config.clone()));
}
}
if let Some(config) = model_config {
let llm_config_data = LlmConfigData {
provider: config.provider,
model: config.model,
api_key: None, base_url: config.base_url,
};
let mut configs = self.llm_configs.write().await;
configs.insert(session_id.to_string(), llm_config_data);
}
self.persist_in_background(session_id, "configure");
Ok(())
}
pub async fn set_system_prompt(
&self,
session_id: &str,
system_prompt: Option<String>,
) -> Result<()> {
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.config.system_prompt = system_prompt;
}
self.persist_in_background(session_id, "set_system_prompt");
Ok(())
}
pub async fn session_count(&self) -> usize {
let sessions = self.sessions.read().await;
sessions.len()
}
pub async fn store_health(&self) -> Vec<(String, Result<()>)> {
let stores = self.stores.read().await;
let mut results = Vec::new();
for (_, store) in stores.iter() {
let name = store.backend_name().to_string();
let result = store.health_check().await;
results.push((name, result));
}
results
}
pub fn list_tools(&self) -> Vec<crate::llm::ToolDefinition> {
self.tool_executor.definitions()
}
pub async fn pause_session(&self, session_id: &str) -> Result<bool> {
let paused = {
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.pause()
};
if paused {
self.persist_in_background(session_id, "pause");
}
Ok(paused)
}
pub async fn resume_session(&self, session_id: &str) -> Result<bool> {
let resumed = {
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.resume()
};
if resumed {
self.persist_in_background(session_id, "resume");
}
Ok(resumed)
}
pub async fn cancel_operation(&self, session_id: &str) -> Result<bool> {
let session_lock = self.get_session(session_id).await?;
let cancelled_confirmations = {
let session = session_lock.read().await;
session.confirmation_manager.cancel_all().await
};
if cancelled_confirmations > 0 {
tracing::info!(
"Cancelled {} pending confirmations for session {}",
cancelled_confirmations,
session_id
);
}
let cancel_token = {
let mut ops = self.ongoing_operations.write().await;
ops.remove(session_id)
};
if let Some(token) = cancel_token {
token.cancel();
tracing::info!("Cancelled ongoing operation for session {}", session_id);
Ok(true)
} else if cancelled_confirmations > 0 {
Ok(true)
} else {
tracing::debug!("No ongoing operation to cancel for session {}", session_id);
Ok(false)
}
}
pub async fn get_all_sessions(&self) -> Vec<Arc<RwLock<Session>>> {
let sessions = self.sessions.read().await;
sessions.values().cloned().collect()
}
pub fn tool_executor(&self) -> &Arc<ToolExecutor> {
&self.tool_executor
}
pub async fn confirm_tool(
&self,
session_id: &str,
tool_id: &str,
approved: bool,
reason: Option<String>,
) -> Result<bool> {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
session
.confirmation_manager
.confirm(tool_id, approved, reason)
.await
.map_err(|e| anyhow::anyhow!(e))
}
pub async fn set_confirmation_policy(
&self,
session_id: &str,
policy: ConfirmationPolicy,
) -> Result<ConfirmationPolicy> {
{
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
session.set_confirmation_policy(policy.clone()).await;
}
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.config.confirmation_policy = Some(policy.clone());
}
self.persist_in_background(session_id, "set_confirmation_policy");
Ok(policy)
}
pub async fn set_permission_policy(
&self,
session_id: &str,
policy: PermissionPolicy,
) -> Result<PermissionPolicy> {
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.set_permission_policy(policy.clone());
}
self.persist_in_background(session_id, "set_permission_policy");
Ok(policy)
}
pub async fn get_confirmation_policy(&self, session_id: &str) -> Result<ConfirmationPolicy> {
let session_lock = self.get_session(session_id).await?;
let session = session_lock.read().await;
Ok(session.confirmation_policy().await)
}
pub async fn set_planning_mode(
&self,
session_id: &str,
mode: PlanningMode,
) -> Result<PlanningMode> {
{
let session_lock = self.get_session(session_id).await?;
let mut session = session_lock.write().await;
session.config.planning_mode = mode;
}
self.persist_in_background(session_id, "set_planning_mode");
Ok(mode)
}
}