Skip to main content

a3s_code_core/session/
mod.rs

1//! Session management
2//!
3//! Provides session-based conversation management:
4//! - Multiple independent sessions per agent
5//! - Conversation history tracking
6//! - Context usage monitoring
7//! - Per-session LLM client configuration
8//! - Session state management (Active, Paused, Completed, Error)
9//! - Per-session command queue with lane-based priority
10//! - Human-in-the-Loop (HITL) confirmation support
11//! - Session persistence (JSONL file storage)
12//!
13//! ## Skill System
14//!
15//! Skills are managed at the service level via the skill registry.
16//! to all sessions. Per-session tool access is controlled through `PermissionPolicy`.
17
18pub(crate) mod compaction;
19pub mod manager;
20
21pub use manager::SessionManager;
22
23#[cfg(test)]
24#[path = "tests.rs"]
25mod tests_file;
26
27use crate::agent::AgentEvent;
28use crate::hitl::{ConfirmationManager, ConfirmationPolicy};
29use crate::llm::{LlmClient, Message, TokenUsage, ToolDefinition};
30use crate::permissions::{PermissionDecision, PermissionPolicy};
31use crate::planning::Task;
32use crate::queue::{ExternalTaskResult, LaneHandlerConfig, SessionQueueConfig};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::store::{LlmConfigData, SessionData};
35use anyhow::Result;
36use serde::{Deserialize, Serialize};
37use std::sync::Arc;
38use tokio::sync::{broadcast, RwLock};
39
40/// Session state
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
42pub enum SessionState {
43    #[default]
44    Unknown = 0,
45    Active = 1,
46    Paused = 2,
47    Completed = 3,
48    Error = 4,
49}
50
51/// Context usage statistics
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ContextUsage {
54    pub used_tokens: usize,
55    pub max_tokens: usize,
56    pub percent: f32,
57    pub turns: usize,
58}
59
60impl Default for ContextUsage {
61    fn default() -> Self {
62        Self {
63            used_tokens: 0,
64            max_tokens: 200_000,
65            percent: 0.0,
66            turns: 0,
67        }
68    }
69}
70
71/// Default auto-compact threshold (80% of context window)
72pub const DEFAULT_AUTO_COMPACT_THRESHOLD: f32 = 0.80;
73
74/// Serde default function for auto_compact_threshold
75fn default_auto_compact_threshold() -> f32 {
76    DEFAULT_AUTO_COMPACT_THRESHOLD
77}
78
79/// Session configuration
80#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct SessionConfig {
82    pub name: String,
83    pub workspace: String,
84    pub system_prompt: Option<String>,
85    pub max_context_length: u32,
86    pub auto_compact: bool,
87    /// Context usage percentage threshold to trigger auto-compaction (0.0 - 1.0).
88    /// Only used when `auto_compact` is true. Default: 0.80 (80%).
89    #[serde(default = "default_auto_compact_threshold")]
90    pub auto_compact_threshold: f32,
91    /// Storage type for this session
92    #[serde(default)]
93    pub storage_type: crate::config::StorageBackend,
94    /// Queue configuration (optional, uses defaults if None)
95    #[serde(skip_serializing_if = "Option::is_none")]
96    pub queue_config: Option<SessionQueueConfig>,
97    /// Confirmation policy (optional, uses defaults if None)
98    #[serde(skip_serializing_if = "Option::is_none")]
99    pub confirmation_policy: Option<ConfirmationPolicy>,
100    /// Permission policy (optional, uses defaults if None)
101    #[serde(skip_serializing_if = "Option::is_none")]
102    pub permission_policy: Option<PermissionPolicy>,
103    /// Parent session ID (for subagent sessions)
104    #[serde(skip_serializing_if = "Option::is_none")]
105    pub parent_id: Option<String>,
106    /// Security configuration (optional, enables security features)
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pub security_config: Option<crate::security::SecurityConfig>,
109    /// Shared hook engine for lifecycle events
110    #[serde(skip)]
111    pub hook_engine: Option<std::sync::Arc<crate::hooks::HookEngine>>,
112    /// Enable planning phase before execution
113    #[serde(default)]
114    pub planning_enabled: bool,
115    /// Enable goal tracking
116    #[serde(default)]
117    pub goal_tracking: bool,
118}
119
120impl Default for SessionConfig {
121    fn default() -> Self {
122        Self {
123            name: String::new(),
124            workspace: String::new(),
125            system_prompt: None,
126            max_context_length: 0,
127            auto_compact: false,
128            auto_compact_threshold: DEFAULT_AUTO_COMPACT_THRESHOLD,
129            storage_type: crate::config::StorageBackend::default(),
130            queue_config: None,
131            confirmation_policy: None,
132            permission_policy: None,
133            parent_id: None,
134            security_config: None,
135            hook_engine: None,
136            planning_enabled: false,
137            goal_tracking: false,
138        }
139    }
140}
141
142pub struct Session {
143    pub id: String,
144    pub config: SessionConfig,
145    pub state: SessionState,
146    pub messages: Vec<Message>,
147    pub context_usage: ContextUsage,
148    pub total_usage: TokenUsage,
149    /// Cumulative dollar cost for this session
150    pub total_cost: f64,
151    /// Model name for cost calculation
152    pub model_name: Option<String>,
153    pub tools: Vec<ToolDefinition>,
154    pub thinking_enabled: bool,
155    pub thinking_budget: Option<usize>,
156    /// Per-session LLM client (overrides default if set)
157    pub llm_client: Option<Arc<dyn LlmClient>>,
158    /// Creation timestamp (Unix epoch seconds)
159    pub created_at: i64,
160    /// Last update timestamp (Unix epoch seconds)
161    pub updated_at: i64,
162    /// Per-session command queue (a3s-lane backed)
163    pub command_queue: SessionLaneQueue,
164    /// HITL confirmation manager
165    pub confirmation_manager: Arc<ConfirmationManager>,
166    /// Permission policy for tool execution
167    pub permission_policy: Arc<RwLock<PermissionPolicy>>,
168    /// Event broadcaster for this session
169    event_tx: broadcast::Sender<AgentEvent>,
170    /// Context providers for augmenting prompts with external context
171    pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
172    /// Task list for tracking
173    pub tasks: Vec<Task>,
174    /// Parent session ID (for subagent sessions)
175    pub parent_id: Option<String>,
176    /// Agent memory system for this session
177    pub memory: Arc<RwLock<crate::memory::AgentMemory>>,
178    /// Current execution plan (if any)
179    pub current_plan: Arc<RwLock<Option<crate::planning::ExecutionPlan>>>,
180    /// Security guard (if enabled)
181    pub security_guard: Option<Arc<crate::security::SecurityGuard>>,
182    /// Per-session tool execution metrics
183    pub tool_metrics: Arc<RwLock<crate::telemetry::ToolMetrics>>,
184    /// Per-call LLM cost records for cross-session aggregation
185    pub cost_records: Vec<crate::telemetry::LlmCostRecord>,
186    /// Loaded skills for tool filter enforcement in the agent loop
187    pub loaded_skills: Vec<crate::tools::Skill>,
188    /// Context store client for semantic search over ingested content
189    #[cfg(feature = "context-store")]
190    pub context_client: Option<Arc<crate::context_store::A3SContextClient>>,
191}
192
193/// Validate that an identifier is safe for use in file paths.
194/// Rejects path traversal attempts and non-alphanumeric characters.
195fn validate_path_safe_id(id: &str, label: &str) -> Result<()> {
196    if id.is_empty() {
197        anyhow::bail!("{label} must not be empty");
198    }
199    // Allow alphanumeric, hyphens, underscores, and dots (but not leading dots)
200    let is_safe = id
201        .chars()
202        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.')
203        && !id.starts_with('.')
204        && !id.contains("..");
205    if !is_safe {
206        anyhow::bail!("{label} contains unsafe characters: {id:?}");
207    }
208    Ok(())
209}
210
211impl Session {
212    /// Create a new session (async due to SessionLaneQueue initialization)
213    pub async fn new(
214        id: String,
215        config: SessionConfig,
216        tools: Vec<ToolDefinition>,
217    ) -> Result<Self> {
218        // Validate session ID to prevent path traversal
219        validate_path_safe_id(&id, "Session ID")?;
220
221        let now = std::time::SystemTime::now()
222            .duration_since(std::time::UNIX_EPOCH)
223            .map(|d| d.as_secs() as i64)
224            .unwrap_or(0);
225
226        // Create event broadcaster
227        let (event_tx, _) = broadcast::channel(100);
228
229        // Create command queue with config or defaults
230        let queue_config = config.queue_config.clone().unwrap_or_default();
231        let command_queue = SessionLaneQueue::new(&id, queue_config, event_tx.clone()).await?;
232
233        // Create confirmation manager with policy or secure default (HITL enabled)
234        let confirmation_policy = config
235            .confirmation_policy
236            .clone()
237            .unwrap_or_else(ConfirmationPolicy::enabled);
238        let confirmation_manager = Arc::new(ConfirmationManager::new(
239            confirmation_policy,
240            event_tx.clone(),
241        ));
242
243        // Create permission policy with config or defaults
244        let permission_policy = Arc::new(RwLock::new(
245            config.permission_policy.clone().unwrap_or_default(),
246        ));
247
248        // Extract parent_id from config
249        let parent_id = config.parent_id.clone();
250
251        // Create memory system with file-based storage
252        // Memory file is stored in workspace/.a3s/memories/{session_id}.jsonl
253        let memory_dir = std::path::PathBuf::from(&config.workspace)
254            .join(".a3s")
255            .join("memories");
256        let memory_file = memory_dir.join(format!("{}.jsonl", &id));
257
258        let memory_store: Arc<dyn crate::memory::MemoryStore> =
259            match crate::memory::FileStore::new(&memory_file) {
260                Ok(store) => Arc::new(store),
261                Err(e) => {
262                    // Fall back to in-memory store if file store fails
263                    tracing::warn!(
264                    "Failed to create file-based memory store at {:?}: {}. Using in-memory store.",
265                    memory_file,
266                    e
267                );
268                    Arc::new(crate::memory::InMemoryStore::new())
269                }
270            };
271        let agent_memory = crate::memory::AgentMemory::new(memory_store);
272        let memory = Arc::new(RwLock::new(agent_memory.clone()));
273
274        // Create memory context provider to inject past memories as context
275        let memory_provider: Arc<dyn crate::context::ContextProvider> =
276            Arc::new(crate::memory::MemoryContextProvider::new(agent_memory));
277
278        // Create context store client with in-memory backend for semantic search
279        #[cfg(feature = "context-store")]
280        let (context_client, context_providers) = {
281            let mut context_store_config = crate::context_store::config::Config::default();
282            context_store_config.storage.backend =
283                crate::context_store::config::StorageBackend::Memory;
284            match crate::context_store::A3SContextClient::new(
285                context_store_config,
286                None,
287                crate::context_store::ProviderInfo::default(),
288            ) {
289                Ok(client) => {
290                    let client = Arc::new(client);
291                    let ctx_provider: Arc<dyn crate::context::ContextProvider> = Arc::new(
292                        crate::context_store::A3SContextProvider::new(client.clone()),
293                    );
294                    (Some(client), vec![memory_provider, ctx_provider])
295                }
296                Err(e) => {
297                    tracing::warn!("Failed to create context store client: {}. Skipping.", e);
298                    (None, vec![memory_provider])
299                }
300            }
301        };
302        #[cfg(not(feature = "context-store"))]
303        let context_providers = vec![memory_provider];
304
305        // Initialize empty plan
306        let current_plan = Arc::new(RwLock::new(None));
307
308        // Initialize security guard if configured, using shared hook engine when available
309        let security_guard = config.security_config.as_ref().and_then(|sc| {
310            if sc.enabled {
311                let guard = crate::security::SecurityGuard::new(id.clone(), sc.clone());
312                if let Some(ref shared) = config.hook_engine {
313                    guard.register_hooks(shared.as_ref());
314                } else {
315                    tracing::warn!(
316                        "Session {}: security guard created without a hook engine — \
317                         security hooks will not be active",
318                        id
319                    );
320                }
321                Some(Arc::new(guard))
322            } else {
323                None
324            }
325        });
326
327        Ok(Self {
328            id,
329            config,
330            state: SessionState::Active,
331            messages: Vec::new(),
332            context_usage: ContextUsage::default(),
333            total_usage: TokenUsage::default(),
334            total_cost: 0.0,
335            model_name: None,
336            tools,
337            thinking_enabled: false,
338            thinking_budget: None,
339            llm_client: None,
340            created_at: now,
341            updated_at: now,
342            command_queue,
343            confirmation_manager,
344            permission_policy,
345            event_tx,
346            context_providers,
347            tasks: Vec::new(),
348            parent_id,
349            memory,
350            current_plan,
351            security_guard,
352            tool_metrics: Arc::new(RwLock::new(crate::telemetry::ToolMetrics::new())),
353            cost_records: Vec::new(),
354            loaded_skills: Vec::new(),
355            #[cfg(feature = "context-store")]
356            context_client,
357        })
358    }
359
360    /// Check if this is a child session (has a parent)
361    pub fn is_child_session(&self) -> bool {
362        self.parent_id.is_some()
363    }
364
365    /// Get the parent session ID if this is a child session
366    pub fn parent_session_id(&self) -> Option<&str> {
367        self.parent_id.as_deref()
368    }
369
370    /// Get a receiver for session events
371    pub fn subscribe_events(&self) -> broadcast::Receiver<AgentEvent> {
372        self.event_tx.subscribe()
373    }
374
375    /// Get the event broadcaster
376    pub fn event_tx(&self) -> broadcast::Sender<AgentEvent> {
377        self.event_tx.clone()
378    }
379
380    /// Update the confirmation policy
381    pub async fn set_confirmation_policy(&self, policy: ConfirmationPolicy) {
382        self.confirmation_manager.set_policy(policy).await;
383    }
384
385    /// Get the current confirmation policy
386    pub async fn confirmation_policy(&self) -> ConfirmationPolicy {
387        self.confirmation_manager.policy().await
388    }
389
390    /// Update the permission policy
391    pub async fn set_permission_policy(&self, policy: PermissionPolicy) {
392        let mut p = self.permission_policy.write().await;
393        *p = policy;
394    }
395
396    /// Get the current permission policy
397    pub async fn permission_policy(&self) -> PermissionPolicy {
398        self.permission_policy.read().await.clone()
399    }
400
401    /// Check permission for a tool invocation
402    pub async fn check_permission(
403        &self,
404        tool_name: &str,
405        args: &serde_json::Value,
406    ) -> PermissionDecision {
407        self.permission_policy.read().await.check(tool_name, args)
408    }
409
410    /// Add an allow rule to the permission policy
411    pub async fn add_allow_rule(&self, rule: &str) {
412        let mut p = self.permission_policy.write().await;
413        p.allow.push(crate::permissions::PermissionRule::new(rule));
414    }
415
416    /// Add a deny rule to the permission policy
417    pub async fn add_deny_rule(&self, rule: &str) {
418        let mut p = self.permission_policy.write().await;
419        p.deny.push(crate::permissions::PermissionRule::new(rule));
420    }
421
422    /// Add an ask rule to the permission policy
423    pub async fn add_ask_rule(&self, rule: &str) {
424        let mut p = self.permission_policy.write().await;
425        p.ask.push(crate::permissions::PermissionRule::new(rule));
426    }
427
428    /// Add a context provider to the session
429    pub fn add_context_provider(&mut self, provider: Arc<dyn crate::context::ContextProvider>) {
430        self.context_providers.push(provider);
431    }
432
433    /// Remove a context provider by name
434    ///
435    /// Returns true if a provider was removed, false otherwise.
436    pub fn remove_context_provider(&mut self, name: &str) -> bool {
437        let initial_len = self.context_providers.len();
438        self.context_providers.retain(|p| p.name() != name);
439        self.context_providers.len() < initial_len
440    }
441
442    /// Get the names of all registered context providers
443    pub fn context_provider_names(&self) -> Vec<String> {
444        self.context_providers
445            .iter()
446            .map(|p| p.name().to_string())
447            .collect()
448    }
449
450    // ========================================================================
451    // Task Management
452    // ========================================================================
453
454    /// Get the current task list
455    pub fn get_tasks(&self) -> &[Task] {
456        &self.tasks
457    }
458
459    /// Set the task list (replaces entire list)
460    ///
461    /// Broadcasts a TaskUpdated event after updating.
462    pub fn set_tasks(&mut self, tasks: Vec<Task>) {
463        self.tasks = tasks.clone();
464        self.touch();
465
466        // Broadcast event
467        let _ = self.event_tx.send(AgentEvent::TaskUpdated {
468            session_id: self.id.clone(),
469            tasks,
470        });
471    }
472
473    /// Get count of active (non-completed, non-cancelled) tasks
474    pub fn active_task_count(&self) -> usize {
475        self.tasks.iter().filter(|t| t.is_active()).count()
476    }
477
478    /// Set handler mode for a lane
479    pub async fn set_lane_handler(
480        &self,
481        lane: crate::hitl::SessionLane,
482        config: LaneHandlerConfig,
483    ) {
484        self.command_queue.set_lane_handler(lane, config).await;
485    }
486
487    /// Get handler config for a lane
488    pub async fn get_lane_handler(&self, lane: crate::hitl::SessionLane) -> LaneHandlerConfig {
489        self.command_queue.get_lane_handler(lane).await
490    }
491
492    /// Complete an external task
493    pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
494        self.command_queue
495            .complete_external_task(task_id, result)
496            .await
497    }
498
499    /// Get pending external tasks
500    pub async fn pending_external_tasks(&self) -> Vec<crate::queue::ExternalTask> {
501        self.command_queue.pending_external_tasks().await
502    }
503
504    /// Get dead letters from the queue's DLQ
505    pub async fn dead_letters(&self) -> Vec<a3s_lane::DeadLetter> {
506        self.command_queue.dead_letters().await
507    }
508
509    /// Get queue metrics snapshot
510    pub async fn queue_metrics(&self) -> Option<a3s_lane::MetricsSnapshot> {
511        self.command_queue.metrics_snapshot().await
512    }
513
514    /// Get queue statistics
515    pub async fn queue_stats(&self) -> crate::queue::SessionQueueStats {
516        self.command_queue.stats().await
517    }
518
519    /// Start the command queue scheduler
520    pub async fn start_queue(&self) -> Result<()> {
521        self.command_queue.start().await
522    }
523
524    /// Stop the command queue scheduler
525    pub async fn stop_queue(&self) {
526        self.command_queue.stop().await;
527    }
528
529    /// Get the system prompt from config
530    pub fn system(&self) -> Option<&str> {
531        self.config.system_prompt.as_deref()
532    }
533
534    /// Get conversation history
535    pub fn history(&self) -> &[Message] {
536        &self.messages
537    }
538
539    /// Add a message to history
540    pub fn add_message(&mut self, message: Message) {
541        self.messages.push(message);
542        self.context_usage.turns = self.messages.len();
543        self.touch();
544    }
545
546    /// Update context usage after a response
547    pub fn update_usage(&mut self, usage: &TokenUsage) {
548        self.total_usage.prompt_tokens += usage.prompt_tokens;
549        self.total_usage.completion_tokens += usage.completion_tokens;
550        self.total_usage.total_tokens += usage.total_tokens;
551
552        // Calculate cost if model pricing is available
553        let cost_usd = if let Some(ref model) = self.model_name {
554            let pricing_map = crate::telemetry::default_model_pricing();
555            if let Some(pricing) = pricing_map.get(model) {
556                let cost = pricing.calculate_cost(usage.prompt_tokens, usage.completion_tokens);
557                self.total_cost += cost;
558                Some(cost)
559            } else {
560                None
561            }
562        } else {
563            None
564        };
565
566        // Record per-call cost for aggregation
567        let model_str = self.model_name.clone().unwrap_or_default();
568        self.cost_records.push(crate::telemetry::LlmCostRecord {
569            model: model_str.clone(),
570            provider: String::new(),
571            prompt_tokens: usage.prompt_tokens,
572            completion_tokens: usage.completion_tokens,
573            total_tokens: usage.total_tokens,
574            cost_usd,
575            timestamp: chrono::Utc::now(),
576            session_id: Some(self.id.clone()),
577        });
578
579        // Record OTLP metrics (counters only; duration recorded in agent loop)
580        crate::telemetry::record_llm_metrics(
581            if model_str.is_empty() {
582                "unknown"
583            } else {
584                &model_str
585            },
586            usage.prompt_tokens,
587            usage.completion_tokens,
588            cost_usd.unwrap_or(0.0),
589            0.0, // Duration not available here; recorded via spans
590        );
591
592        // Estimate context usage (rough approximation)
593        self.context_usage.used_tokens = usage.prompt_tokens;
594        self.context_usage.percent =
595            self.context_usage.used_tokens as f32 / self.context_usage.max_tokens as f32;
596        self.touch();
597    }
598
599    /// Clear conversation history
600    pub fn clear(&mut self) {
601        self.messages.clear();
602        self.context_usage = ContextUsage::default();
603        self.touch();
604    }
605
606    /// Compact context by summarizing old messages
607    pub async fn compact(&mut self, llm_client: &Arc<dyn LlmClient>) -> Result<()> {
608        if let Some(new_messages) =
609            compaction::compact_messages(&self.id, &self.messages, llm_client).await?
610        {
611            self.messages = new_messages;
612            self.touch();
613        }
614        Ok(())
615    }
616
617    /// Pause the session
618    pub fn pause(&mut self) -> bool {
619        if self.state == SessionState::Active {
620            self.state = SessionState::Paused;
621            self.touch();
622            true
623        } else {
624            false
625        }
626    }
627
628    /// Resume the session
629    pub fn resume(&mut self) -> bool {
630        if self.state == SessionState::Paused {
631            self.state = SessionState::Active;
632            self.touch();
633            true
634        } else {
635            false
636        }
637    }
638
639    /// Set session state to error
640    pub fn set_error(&mut self) {
641        self.state = SessionState::Error;
642        self.touch();
643    }
644
645    /// Set session state to completed
646    pub fn set_completed(&mut self) {
647        self.state = SessionState::Completed;
648        self.touch();
649    }
650
651    /// Update the updated_at timestamp
652    fn touch(&mut self) {
653        self.updated_at = std::time::SystemTime::now()
654            .duration_since(std::time::UNIX_EPOCH)
655            .map(|d| d.as_secs() as i64)
656            .unwrap_or(0);
657    }
658
659    /// Convert to serializable SessionData for persistence
660    pub fn to_session_data(&self, llm_config: Option<LlmConfigData>) -> SessionData {
661        SessionData {
662            id: self.id.clone(),
663            config: self.config.clone(),
664            state: self.state,
665            messages: self.messages.clone(),
666            context_usage: self.context_usage.clone(),
667            total_usage: self.total_usage.clone(),
668            total_cost: self.total_cost,
669            model_name: self.model_name.clone(),
670            cost_records: self.cost_records.clone(),
671            tool_names: SessionData::tool_names_from_definitions(&self.tools),
672            thinking_enabled: self.thinking_enabled,
673            thinking_budget: self.thinking_budget,
674            created_at: self.created_at,
675            updated_at: self.updated_at,
676            llm_config,
677            tasks: self.tasks.clone(),
678            parent_id: self.parent_id.clone(),
679        }
680    }
681
682    /// Restore session state from SessionData
683    ///
684    /// Note: This only restores serializable fields. Non-serializable fields
685    /// (event_tx, command_queue, confirmation_manager) are already initialized
686    /// in Session::new().
687    pub fn restore_from_data(&mut self, data: &SessionData) {
688        self.state = data.state;
689        self.messages = data.messages.clone();
690        self.context_usage = data.context_usage.clone();
691        self.total_usage = data.total_usage.clone();
692        self.total_cost = data.total_cost;
693        self.model_name = data.model_name.clone();
694        self.cost_records = data.cost_records.clone();
695        self.thinking_enabled = data.thinking_enabled;
696        self.thinking_budget = data.thinking_budget;
697        self.created_at = data.created_at;
698        self.updated_at = data.updated_at;
699        self.tasks = data.tasks.clone();
700        self.parent_id = data.parent_id.clone();
701    }
702}