Skip to main content

a3s_code_core/session/
mod.rs

1//! Session management
2//!
3//! Provides session-based conversation management:
4//! - Multiple independent sessions per agent
5//! - Conversation history tracking
6//! - Context usage monitoring
7//! - Per-session LLM client configuration
8//! - Session state management (Active, Paused, Completed, Error)
9//! - Per-session command queue with lane-based priority
10//! - Human-in-the-Loop (HITL) confirmation support
11//! - Session persistence (JSONL file storage)
12
13pub(crate) mod compaction;
14pub mod manager;
15
16pub use manager::SessionManager;
17
18#[cfg(test)]
19#[path = "tests.rs"]
20mod tests_file;
21
22use crate::agent::AgentEvent;
23use crate::hitl::{ConfirmationManager, ConfirmationPolicy, ConfirmationProvider};
24use crate::llm::{LlmClient, Message, TokenUsage, ToolDefinition};
25use crate::permissions::{PermissionChecker, PermissionDecision, PermissionPolicy};
26use crate::planning::Task;
27use crate::queue::{ExternalTaskResult, LaneHandlerConfig, SessionQueueConfig};
28use crate::session_lane_queue::SessionLaneQueue;
29use crate::store::{LlmConfigData, SessionData};
30use anyhow::Result;
31use serde::{Deserialize, Serialize};
32use std::sync::Arc;
33use tokio::sync::{broadcast, RwLock};
34
35/// Session state
36#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
37pub enum SessionState {
38    #[default]
39    Unknown = 0,
40    Active = 1,
41    Paused = 2,
42    Completed = 3,
43    Error = 4,
44}
45
46/// Context usage statistics
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContextUsage {
49    pub used_tokens: usize,
50    pub max_tokens: usize,
51    pub percent: f32,
52    pub turns: usize,
53}
54
55impl Default for ContextUsage {
56    fn default() -> Self {
57        Self {
58            used_tokens: 0,
59            max_tokens: 200_000,
60            percent: 0.0,
61            turns: 0,
62        }
63    }
64}
65
66/// Default auto-compact threshold (80% of context window)
67pub const DEFAULT_AUTO_COMPACT_THRESHOLD: f32 = 0.80;
68
69/// Serde default function for auto_compact_threshold
70fn default_auto_compact_threshold() -> f32 {
71    DEFAULT_AUTO_COMPACT_THRESHOLD
72}
73
74/// Session configuration
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct SessionConfig {
77    pub name: String,
78    pub workspace: String,
79    pub system_prompt: Option<String>,
80    pub max_context_length: u32,
81    pub auto_compact: bool,
82    /// Context usage percentage threshold to trigger auto-compaction (0.0 - 1.0).
83    /// Only used when `auto_compact` is true. Default: 0.80 (80%).
84    #[serde(default = "default_auto_compact_threshold")]
85    pub auto_compact_threshold: f32,
86    /// Storage type for this session
87    #[serde(default)]
88    pub storage_type: crate::config::StorageBackend,
89    /// Queue configuration (optional, uses defaults if None)
90    #[serde(skip_serializing_if = "Option::is_none")]
91    pub queue_config: Option<SessionQueueConfig>,
92    /// Confirmation policy (optional, uses defaults if None)
93    #[serde(skip_serializing_if = "Option::is_none")]
94    pub confirmation_policy: Option<ConfirmationPolicy>,
95    /// Permission policy (optional, uses defaults if None)
96    #[serde(skip_serializing_if = "Option::is_none")]
97    pub permission_policy: Option<PermissionPolicy>,
98    /// Parent session ID (for subagent sessions)
99    #[serde(skip_serializing_if = "Option::is_none")]
100    pub parent_id: Option<String>,
101    /// Security configuration (optional, enables security features)
102    #[serde(skip_serializing_if = "Option::is_none")]
103    pub security_config: Option<crate::security::SecurityConfig>,
104    /// Shared hook engine for lifecycle events
105    #[serde(skip)]
106    pub hook_engine: Option<std::sync::Arc<dyn crate::hooks::HookExecutor>>,
107    /// Enable planning phase before execution
108    #[serde(default)]
109    pub planning_enabled: bool,
110    /// Enable goal tracking
111    #[serde(default)]
112    pub goal_tracking: bool,
113}
114
115impl Default for SessionConfig {
116    fn default() -> Self {
117        Self {
118            name: String::new(),
119            workspace: String::new(),
120            system_prompt: None,
121            max_context_length: 0,
122            auto_compact: false,
123            auto_compact_threshold: DEFAULT_AUTO_COMPACT_THRESHOLD,
124            storage_type: crate::config::StorageBackend::default(),
125            queue_config: None,
126            confirmation_policy: None,
127            permission_policy: None,
128            parent_id: None,
129            security_config: None,
130            hook_engine: None,
131            planning_enabled: false,
132            goal_tracking: false,
133        }
134    }
135}
136
137pub struct Session {
138    pub id: String,
139    pub config: SessionConfig,
140    pub state: SessionState,
141    pub messages: Vec<Message>,
142    pub context_usage: ContextUsage,
143    pub total_usage: TokenUsage,
144    /// Cumulative dollar cost for this session
145    pub total_cost: f64,
146    /// Model name for cost calculation
147    pub model_name: Option<String>,
148    pub tools: Vec<ToolDefinition>,
149    pub thinking_enabled: bool,
150    pub thinking_budget: Option<usize>,
151    /// Per-session LLM client (overrides default if set)
152    pub llm_client: Option<Arc<dyn LlmClient>>,
153    /// Creation timestamp (Unix epoch seconds)
154    pub created_at: i64,
155    /// Last update timestamp (Unix epoch seconds)
156    pub updated_at: i64,
157    /// Per-session command queue (a3s-lane backed)
158    pub command_queue: SessionLaneQueue,
159    /// HITL confirmation manager
160    pub confirmation_manager: Arc<dyn ConfirmationProvider>,
161    /// Permission checker for tool execution
162    pub permission_checker: Arc<dyn PermissionChecker>,
163    /// Event broadcaster for this session
164    event_tx: broadcast::Sender<AgentEvent>,
165    /// Context providers for augmenting prompts with external context
166    pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
167    /// Task list for tracking
168    pub tasks: Vec<Task>,
169    /// Parent session ID (for subagent sessions)
170    pub parent_id: Option<String>,
171    /// Agent memory system for this session (externally injected)
172    pub memory: Option<Arc<RwLock<crate::memory::AgentMemory>>>,
173    /// Current execution plan (if any)
174    pub current_plan: Arc<RwLock<Option<crate::planning::ExecutionPlan>>>,
175    /// Security guard (if enabled)
176    pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
177    /// Per-session tool execution metrics
178    pub tool_metrics: Arc<RwLock<crate::telemetry::ToolMetrics>>,
179    /// Per-call LLM cost records for cross-session aggregation
180    pub cost_records: Vec<crate::telemetry::LlmCostRecord>,
181}
182
183/// Validate that an identifier is safe for use in file paths.
184/// Rejects path traversal attempts and non-alphanumeric characters.
185fn validate_path_safe_id(id: &str, label: &str) -> Result<()> {
186    if id.is_empty() {
187        anyhow::bail!("{label} must not be empty");
188    }
189    // Allow alphanumeric, hyphens, underscores, and dots (but not leading dots)
190    let is_safe = id
191        .chars()
192        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.')
193        && !id.starts_with('.')
194        && !id.contains("..");
195    if !is_safe {
196        anyhow::bail!("{label} contains unsafe characters: {id:?}");
197    }
198    Ok(())
199}
200
201impl Session {
202    /// Create a new session (async due to SessionLaneQueue initialization)
203    pub async fn new(
204        id: String,
205        config: SessionConfig,
206        tools: Vec<ToolDefinition>,
207    ) -> Result<Self> {
208        // Validate session ID to prevent path traversal
209        validate_path_safe_id(&id, "Session ID")?;
210
211        let now = std::time::SystemTime::now()
212            .duration_since(std::time::UNIX_EPOCH)
213            .map(|d| d.as_secs() as i64)
214            .unwrap_or(0);
215
216        // Create event broadcaster
217        let (event_tx, _) = broadcast::channel(100);
218
219        // Create command queue with config or defaults
220        let queue_config = config.queue_config.clone().unwrap_or_default();
221        let command_queue = SessionLaneQueue::new(&id, queue_config, event_tx.clone()).await?;
222
223        // Create confirmation manager with policy or secure default (HITL enabled)
224        let confirmation_policy = config
225            .confirmation_policy
226            .clone()
227            .unwrap_or_else(ConfirmationPolicy::enabled);
228        let confirmation_manager = Arc::new(ConfirmationManager::new(
229            confirmation_policy,
230            event_tx.clone(),
231        ));
232
233        // Create permission checker with config or defaults
234        let permission_checker: Arc<dyn PermissionChecker> =
235            Arc::new(config.permission_policy.clone().unwrap_or_default());
236
237        // Extract parent_id from config
238        let parent_id = config.parent_id.clone();
239
240        // Memory is externally injected; default to None
241        let memory = None;
242
243        let context_providers: Vec<Arc<dyn crate::context::ContextProvider>> = vec![];
244
245        // Initialize empty plan
246        let current_plan = Arc::new(RwLock::new(None));
247
248        // Create security provider if security config is enabled
249        let security_provider: Option<Arc<dyn crate::security::SecurityProvider>> =
250            config.security_config.as_ref().and_then(|sc| {
251                if sc.enabled {
252                    Some(Arc::new(crate::security::NoOpSecurityProvider)
253                        as Arc<dyn crate::security::SecurityProvider>)
254                } else {
255                    None
256                }
257            });
258
259        Ok(Self {
260            id,
261            config,
262            state: SessionState::Active,
263            messages: Vec::new(),
264            context_usage: ContextUsage::default(),
265            total_usage: TokenUsage::default(),
266            total_cost: 0.0,
267            model_name: None,
268            tools,
269            thinking_enabled: false,
270            thinking_budget: None,
271            llm_client: None,
272            created_at: now,
273            updated_at: now,
274            command_queue,
275            confirmation_manager,
276            permission_checker,
277            event_tx,
278            context_providers,
279            tasks: Vec::new(),
280            parent_id,
281            memory,
282            current_plan,
283
284            security_provider,
285            tool_metrics: Arc::new(RwLock::new(crate::telemetry::ToolMetrics::new())),
286            cost_records: Vec::new(),
287        })
288    }
289
290    /// Check if this is a child session (has a parent)
291    pub fn is_child_session(&self) -> bool {
292        self.parent_id.is_some()
293    }
294
295    /// Get the parent session ID if this is a child session
296    pub fn parent_session_id(&self) -> Option<&str> {
297        self.parent_id.as_deref()
298    }
299
300    /// Get a receiver for session events
301    pub fn subscribe_events(&self) -> broadcast::Receiver<AgentEvent> {
302        self.event_tx.subscribe()
303    }
304
305    /// Get the event broadcaster
306    pub fn event_tx(&self) -> broadcast::Sender<AgentEvent> {
307        self.event_tx.clone()
308    }
309
310    /// Update the confirmation policy
311    pub async fn set_confirmation_policy(&self, policy: ConfirmationPolicy) {
312        self.confirmation_manager.set_policy(policy).await;
313    }
314
315    /// Update the permission policy used for tool execution checks.
316    pub fn set_permission_policy(&mut self, policy: PermissionPolicy) {
317        self.permission_checker = Arc::new(policy.clone());
318        self.config.permission_policy = Some(policy);
319    }
320
321    /// Get the current confirmation policy
322    pub async fn confirmation_policy(&self) -> ConfirmationPolicy {
323        self.confirmation_manager.policy().await
324    }
325
326    /// Check permission for a tool invocation
327    pub fn check_permission(
328        &self,
329        tool_name: &str,
330        args: &serde_json::Value,
331    ) -> PermissionDecision {
332        self.permission_checker.check(tool_name, args)
333    }
334
335    /// Add a context provider to the session
336    pub fn add_context_provider(&mut self, provider: Arc<dyn crate::context::ContextProvider>) {
337        self.context_providers.push(provider);
338    }
339
340    /// Remove a context provider by name
341    ///
342    /// Returns true if a provider was removed, false otherwise.
343    pub fn remove_context_provider(&mut self, name: &str) -> bool {
344        let initial_len = self.context_providers.len();
345        self.context_providers.retain(|p| p.name() != name);
346        self.context_providers.len() < initial_len
347    }
348
349    /// Get the names of all registered context providers
350    pub fn context_provider_names(&self) -> Vec<String> {
351        self.context_providers
352            .iter()
353            .map(|p| p.name().to_string())
354            .collect()
355    }
356
357    // ========================================================================
358    // Task Management
359    // ========================================================================
360
361    /// Get the current task list
362    pub fn get_tasks(&self) -> &[Task] {
363        &self.tasks
364    }
365
366    /// Set the task list (replaces entire list)
367    ///
368    /// Broadcasts a TaskUpdated event after updating.
369    pub fn set_tasks(&mut self, tasks: Vec<Task>) {
370        self.tasks = tasks.clone();
371        self.touch();
372
373        // Broadcast event
374        let _ = self.event_tx.send(AgentEvent::TaskUpdated {
375            session_id: self.id.clone(),
376            tasks,
377        });
378    }
379
380    /// Get count of active (non-completed, non-cancelled) tasks
381    pub fn active_task_count(&self) -> usize {
382        self.tasks.iter().filter(|t| t.is_active()).count()
383    }
384
385    /// Set handler mode for a lane
386    pub async fn set_lane_handler(
387        &self,
388        lane: crate::hitl::SessionLane,
389        config: LaneHandlerConfig,
390    ) {
391        self.command_queue.set_lane_handler(lane, config).await;
392    }
393
394    /// Get handler config for a lane
395    pub async fn get_lane_handler(&self, lane: crate::hitl::SessionLane) -> LaneHandlerConfig {
396        self.command_queue.get_lane_handler(lane).await
397    }
398
399    /// Complete an external task
400    pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
401        self.command_queue
402            .complete_external_task(task_id, result)
403            .await
404    }
405
406    /// Get pending external tasks
407    pub async fn pending_external_tasks(&self) -> Vec<crate::queue::ExternalTask> {
408        self.command_queue.pending_external_tasks().await
409    }
410
411    /// Get dead letters from the queue's DLQ
412    pub async fn dead_letters(&self) -> Vec<a3s_lane::DeadLetter> {
413        self.command_queue.dead_letters().await
414    }
415
416    /// Get queue metrics snapshot
417    pub async fn queue_metrics(&self) -> Option<a3s_lane::MetricsSnapshot> {
418        self.command_queue.metrics_snapshot().await
419    }
420
421    /// Get queue statistics
422    pub async fn queue_stats(&self) -> crate::queue::SessionQueueStats {
423        self.command_queue.stats().await
424    }
425
426    /// Start the command queue scheduler
427    pub async fn start_queue(&self) -> Result<()> {
428        self.command_queue.start().await
429    }
430
431    /// Stop the command queue scheduler
432    pub async fn stop_queue(&self) {
433        self.command_queue.stop().await;
434    }
435
436    /// Get the system prompt from config
437    pub fn system(&self) -> Option<&str> {
438        self.config.system_prompt.as_deref()
439    }
440
441    /// Get conversation history
442    pub fn history(&self) -> &[Message] {
443        &self.messages
444    }
445
446    /// Add a message to history
447    pub fn add_message(&mut self, message: Message) {
448        self.messages.push(message);
449        self.context_usage.turns = self.messages.len();
450        self.touch();
451    }
452
453    /// Update context usage after a response
454    pub fn update_usage(&mut self, usage: &TokenUsage) {
455        self.total_usage.prompt_tokens += usage.prompt_tokens;
456        self.total_usage.completion_tokens += usage.completion_tokens;
457        self.total_usage.total_tokens += usage.total_tokens;
458
459        // Calculate cost if model pricing is available
460        let cost_usd = if let Some(ref model) = self.model_name {
461            let pricing_map = crate::telemetry::default_model_pricing();
462            if let Some(pricing) = pricing_map.get(model) {
463                let cost = pricing.calculate_cost(usage.prompt_tokens, usage.completion_tokens);
464                self.total_cost += cost;
465                Some(cost)
466            } else {
467                None
468            }
469        } else {
470            None
471        };
472
473        // Record per-call cost for aggregation
474        let model_str = self.model_name.clone().unwrap_or_default();
475        self.cost_records.push(crate::telemetry::LlmCostRecord {
476            model: model_str.clone(),
477            provider: String::new(),
478            prompt_tokens: usage.prompt_tokens,
479            completion_tokens: usage.completion_tokens,
480            total_tokens: usage.total_tokens,
481            cost_usd,
482            timestamp: chrono::Utc::now(),
483            session_id: Some(self.id.clone()),
484        });
485
486        // Record OTLP metrics (counters only; duration recorded in agent loop)
487        crate::telemetry::record_llm_metrics(
488            if model_str.is_empty() {
489                "unknown"
490            } else {
491                &model_str
492            },
493            usage.prompt_tokens,
494            usage.completion_tokens,
495            cost_usd.unwrap_or(0.0),
496            0.0, // Duration not available here; recorded via spans
497        );
498
499        // Estimate context usage (rough approximation)
500        self.context_usage.used_tokens = usage.prompt_tokens;
501        self.context_usage.percent =
502            self.context_usage.used_tokens as f32 / self.context_usage.max_tokens as f32;
503        self.touch();
504    }
505
506    /// Clear conversation history
507    pub fn clear(&mut self) {
508        self.messages.clear();
509        self.context_usage = ContextUsage::default();
510        self.touch();
511    }
512
513    /// Compact context by summarizing old messages
514    pub async fn compact(&mut self, llm_client: &Arc<dyn LlmClient>) -> Result<()> {
515        if let Some(new_messages) =
516            compaction::compact_messages(&self.id, &self.messages, llm_client).await?
517        {
518            self.messages = new_messages;
519            self.touch();
520        }
521        Ok(())
522    }
523
524    /// Pause the session
525    pub fn pause(&mut self) -> bool {
526        if self.state == SessionState::Active {
527            self.state = SessionState::Paused;
528            self.touch();
529            true
530        } else {
531            false
532        }
533    }
534
535    /// Resume the session
536    pub fn resume(&mut self) -> bool {
537        if self.state == SessionState::Paused {
538            self.state = SessionState::Active;
539            self.touch();
540            true
541        } else {
542            false
543        }
544    }
545
546    /// Set session state to error
547    pub fn set_error(&mut self) {
548        self.state = SessionState::Error;
549        self.touch();
550    }
551
552    /// Set session state to completed
553    pub fn set_completed(&mut self) {
554        self.state = SessionState::Completed;
555        self.touch();
556    }
557
558    /// Update the updated_at timestamp
559    fn touch(&mut self) {
560        self.updated_at = std::time::SystemTime::now()
561            .duration_since(std::time::UNIX_EPOCH)
562            .map(|d| d.as_secs() as i64)
563            .unwrap_or(0);
564    }
565
566    /// Convert to serializable SessionData for persistence
567    pub fn to_session_data(&self, llm_config: Option<LlmConfigData>) -> SessionData {
568        SessionData {
569            id: self.id.clone(),
570            config: self.config.clone(),
571            state: self.state,
572            messages: self.messages.clone(),
573            context_usage: self.context_usage.clone(),
574            total_usage: self.total_usage.clone(),
575            total_cost: self.total_cost,
576            model_name: self.model_name.clone(),
577            cost_records: self.cost_records.clone(),
578            tool_names: SessionData::tool_names_from_definitions(&self.tools),
579            thinking_enabled: self.thinking_enabled,
580            thinking_budget: self.thinking_budget,
581            created_at: self.created_at,
582            updated_at: self.updated_at,
583            llm_config,
584            tasks: self.tasks.clone(),
585            parent_id: self.parent_id.clone(),
586        }
587    }
588
589    /// Restore session state from SessionData
590    ///
591    /// Note: This only restores serializable fields. Non-serializable fields
592    /// (event_tx, command_queue, confirmation_manager) are already initialized
593    /// in Session::new().
594    pub fn restore_from_data(&mut self, data: &SessionData) {
595        self.state = data.state;
596        self.messages = data.messages.clone();
597        self.context_usage = data.context_usage.clone();
598        self.total_usage = data.total_usage.clone();
599        self.total_cost = data.total_cost;
600        self.model_name = data.model_name.clone();
601        self.cost_records = data.cost_records.clone();
602        self.thinking_enabled = data.thinking_enabled;
603        self.thinking_budget = data.thinking_budget;
604        self.created_at = data.created_at;
605        self.updated_at = data.updated_at;
606        self.tasks = data.tasks.clone();
607        self.parent_id = data.parent_id.clone();
608    }
609}