Skip to main content

a3s_code_core/session/
mod.rs

1//! Session management
2//!
3//! Provides session-based conversation management:
4//! - Multiple independent sessions per agent
5//! - Conversation history tracking
6//! - Context usage monitoring
7//! - Per-session LLM client configuration
8//! - Session state management (Active, Paused, Completed, Error)
9//! - Per-session command queue with lane-based priority
10//! - Human-in-the-Loop (HITL) confirmation support
11//! - Session persistence (JSONL file storage)
12
13
14pub(crate) mod compaction;
15pub mod manager;
16
17pub use manager::SessionManager;
18
19#[cfg(test)]
20#[path = "tests.rs"]
21mod tests_file;
22
23use crate::agent::AgentEvent;
24use crate::hitl::{ConfirmationManager, ConfirmationPolicy, ConfirmationProvider};
25use crate::llm::{LlmClient, Message, TokenUsage, ToolDefinition};
26use crate::permissions::{PermissionChecker, PermissionDecision, PermissionPolicy};
27use crate::planning::Task;
28use crate::queue::{ExternalTaskResult, LaneHandlerConfig, SessionQueueConfig};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::store::{LlmConfigData, SessionData};
31use anyhow::Result;
32use serde::{Deserialize, Serialize};
33use std::sync::Arc;
34use tokio::sync::{broadcast, RwLock};
35
36/// Session state
37#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
38pub enum SessionState {
39    #[default]
40    Unknown = 0,
41    Active = 1,
42    Paused = 2,
43    Completed = 3,
44    Error = 4,
45}
46
47/// Context usage statistics
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ContextUsage {
50    pub used_tokens: usize,
51    pub max_tokens: usize,
52    pub percent: f32,
53    pub turns: usize,
54}
55
56impl Default for ContextUsage {
57    fn default() -> Self {
58        Self {
59            used_tokens: 0,
60            max_tokens: 200_000,
61            percent: 0.0,
62            turns: 0,
63        }
64    }
65}
66
67/// Default auto-compact threshold (80% of context window)
68pub const DEFAULT_AUTO_COMPACT_THRESHOLD: f32 = 0.80;
69
70/// Serde default function for auto_compact_threshold
71fn default_auto_compact_threshold() -> f32 {
72    DEFAULT_AUTO_COMPACT_THRESHOLD
73}
74
75/// Session configuration
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct SessionConfig {
78    pub name: String,
79    pub workspace: String,
80    pub system_prompt: Option<String>,
81    pub max_context_length: u32,
82    pub auto_compact: bool,
83    /// Context usage percentage threshold to trigger auto-compaction (0.0 - 1.0).
84    /// Only used when `auto_compact` is true. Default: 0.80 (80%).
85    #[serde(default = "default_auto_compact_threshold")]
86    pub auto_compact_threshold: f32,
87    /// Storage type for this session
88    #[serde(default)]
89    pub storage_type: crate::config::StorageBackend,
90    /// Queue configuration (optional, uses defaults if None)
91    #[serde(skip_serializing_if = "Option::is_none")]
92    pub queue_config: Option<SessionQueueConfig>,
93    /// Confirmation policy (optional, uses defaults if None)
94    #[serde(skip_serializing_if = "Option::is_none")]
95    pub confirmation_policy: Option<ConfirmationPolicy>,
96    /// Permission policy (optional, uses defaults if None)
97    #[serde(skip_serializing_if = "Option::is_none")]
98    pub permission_policy: Option<PermissionPolicy>,
99    /// Parent session ID (for subagent sessions)
100    #[serde(skip_serializing_if = "Option::is_none")]
101    pub parent_id: Option<String>,
102    /// Security configuration (optional, enables security features)
103    #[serde(skip_serializing_if = "Option::is_none")]
104    pub security_config: Option<crate::security::SecurityConfig>,
105    /// Shared hook engine for lifecycle events
106    #[serde(skip)]
107    pub hook_engine: Option<std::sync::Arc<dyn crate::hooks::HookExecutor>>,
108    /// Enable planning phase before execution
109    #[serde(default)]
110    pub planning_enabled: bool,
111    /// Enable goal tracking
112    #[serde(default)]
113    pub goal_tracking: bool,
114}
115
116impl Default for SessionConfig {
117    fn default() -> Self {
118        Self {
119            name: String::new(),
120            workspace: String::new(),
121            system_prompt: None,
122            max_context_length: 0,
123            auto_compact: false,
124            auto_compact_threshold: DEFAULT_AUTO_COMPACT_THRESHOLD,
125            storage_type: crate::config::StorageBackend::default(),
126            queue_config: None,
127            confirmation_policy: None,
128            permission_policy: None,
129            parent_id: None,
130            security_config: None,
131            hook_engine: None,
132            planning_enabled: false,
133            goal_tracking: false,
134        }
135    }
136}
137
138pub struct Session {
139    pub id: String,
140    pub config: SessionConfig,
141    pub state: SessionState,
142    pub messages: Vec<Message>,
143    pub context_usage: ContextUsage,
144    pub total_usage: TokenUsage,
145    /// Cumulative dollar cost for this session
146    pub total_cost: f64,
147    /// Model name for cost calculation
148    pub model_name: Option<String>,
149    pub tools: Vec<ToolDefinition>,
150    pub thinking_enabled: bool,
151    pub thinking_budget: Option<usize>,
152    /// Per-session LLM client (overrides default if set)
153    pub llm_client: Option<Arc<dyn LlmClient>>,
154    /// Creation timestamp (Unix epoch seconds)
155    pub created_at: i64,
156    /// Last update timestamp (Unix epoch seconds)
157    pub updated_at: i64,
158    /// Per-session command queue (a3s-lane backed)
159    pub command_queue: SessionLaneQueue,
160    /// HITL confirmation manager
161    pub confirmation_manager: Arc<dyn ConfirmationProvider>,
162    /// Permission checker for tool execution
163    pub permission_checker: Arc<dyn PermissionChecker>,
164    /// Event broadcaster for this session
165    event_tx: broadcast::Sender<AgentEvent>,
166    /// Context providers for augmenting prompts with external context
167    pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
168    /// Task list for tracking
169    pub tasks: Vec<Task>,
170    /// Parent session ID (for subagent sessions)
171    pub parent_id: Option<String>,
172    /// Agent memory system for this session (externally injected)
173    pub memory: Option<Arc<RwLock<crate::memory::AgentMemory>>>,
174    /// Current execution plan (if any)
175    pub current_plan: Arc<RwLock<Option<crate::planning::ExecutionPlan>>>,
176    /// Security guard (if enabled)
177    pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
178    /// Per-session tool execution metrics
179    pub tool_metrics: Arc<RwLock<crate::telemetry::ToolMetrics>>,
180    /// Per-call LLM cost records for cross-session aggregation
181    pub cost_records: Vec<crate::telemetry::LlmCostRecord>,
182}
183
184/// Validate that an identifier is safe for use in file paths.
185/// Rejects path traversal attempts and non-alphanumeric characters.
186fn validate_path_safe_id(id: &str, label: &str) -> Result<()> {
187    if id.is_empty() {
188        anyhow::bail!("{label} must not be empty");
189    }
190    // Allow alphanumeric, hyphens, underscores, and dots (but not leading dots)
191    let is_safe = id
192        .chars()
193        .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.')
194        && !id.starts_with('.')
195        && !id.contains("..");
196    if !is_safe {
197        anyhow::bail!("{label} contains unsafe characters: {id:?}");
198    }
199    Ok(())
200}
201
202impl Session {
203    /// Create a new session (async due to SessionLaneQueue initialization)
204    pub async fn new(
205        id: String,
206        config: SessionConfig,
207        tools: Vec<ToolDefinition>,
208    ) -> Result<Self> {
209        // Validate session ID to prevent path traversal
210        validate_path_safe_id(&id, "Session ID")?;
211
212        let now = std::time::SystemTime::now()
213            .duration_since(std::time::UNIX_EPOCH)
214            .map(|d| d.as_secs() as i64)
215            .unwrap_or(0);
216
217        // Create event broadcaster
218        let (event_tx, _) = broadcast::channel(100);
219
220        // Create command queue with config or defaults
221        let queue_config = config.queue_config.clone().unwrap_or_default();
222        let command_queue = SessionLaneQueue::new(&id, queue_config, event_tx.clone()).await?;
223
224        // Create confirmation manager with policy or secure default (HITL enabled)
225        let confirmation_policy = config
226            .confirmation_policy
227            .clone()
228            .unwrap_or_else(ConfirmationPolicy::enabled);
229        let confirmation_manager = Arc::new(ConfirmationManager::new(
230            confirmation_policy,
231            event_tx.clone(),
232        ));
233
234        // Create permission checker with config or defaults
235        let permission_checker: Arc<dyn PermissionChecker> = Arc::new(
236            config.permission_policy.clone().unwrap_or_default(),
237        );
238
239        // Extract parent_id from config
240        let parent_id = config.parent_id.clone();
241
242        // Memory is externally injected; default to None
243        let memory = None;
244
245        let context_providers: Vec<Arc<dyn crate::context::ContextProvider>> = vec![];
246
247        // Initialize empty plan
248        let current_plan = Arc::new(RwLock::new(None));
249
250        // Create security provider if security config is enabled
251        let security_provider: Option<Arc<dyn crate::security::SecurityProvider>> =
252            config.security_config.as_ref().and_then(|sc| {
253                if sc.enabled {
254                    Some(Arc::new(crate::security::NoOpSecurityProvider) as Arc<dyn crate::security::SecurityProvider>)
255                } else {
256                    None
257                }
258            });
259
260        Ok(Self {
261            id,
262            config,
263            state: SessionState::Active,
264            messages: Vec::new(),
265            context_usage: ContextUsage::default(),
266            total_usage: TokenUsage::default(),
267            total_cost: 0.0,
268            model_name: None,
269            tools,
270            thinking_enabled: false,
271            thinking_budget: None,
272            llm_client: None,
273            created_at: now,
274            updated_at: now,
275            command_queue,
276            confirmation_manager,
277            permission_checker,
278            event_tx,
279            context_providers,
280            tasks: Vec::new(),
281            parent_id,
282            memory,
283            current_plan,
284
285            security_provider,
286            tool_metrics: Arc::new(RwLock::new(crate::telemetry::ToolMetrics::new())),
287            cost_records: Vec::new(),
288
289
290        })
291    }
292
293    /// Check if this is a child session (has a parent)
294    pub fn is_child_session(&self) -> bool {
295        self.parent_id.is_some()
296    }
297
298    /// Get the parent session ID if this is a child session
299    pub fn parent_session_id(&self) -> Option<&str> {
300        self.parent_id.as_deref()
301    }
302
303    /// Get a receiver for session events
304    pub fn subscribe_events(&self) -> broadcast::Receiver<AgentEvent> {
305        self.event_tx.subscribe()
306    }
307
308    /// Get the event broadcaster
309    pub fn event_tx(&self) -> broadcast::Sender<AgentEvent> {
310        self.event_tx.clone()
311    }
312
313    /// Update the confirmation policy
314    pub async fn set_confirmation_policy(&self, policy: ConfirmationPolicy) {
315        self.confirmation_manager.set_policy(policy).await;
316    }
317
318    /// Get the current confirmation policy
319    pub async fn confirmation_policy(&self) -> ConfirmationPolicy {
320        self.confirmation_manager.policy().await
321    }
322
323    /// Check permission for a tool invocation
324    pub fn check_permission(
325        &self,
326        tool_name: &str,
327        args: &serde_json::Value,
328    ) -> PermissionDecision {
329        self.permission_checker.check(tool_name, args)
330    }
331
332    /// Add a context provider to the session
333    pub fn add_context_provider(&mut self, provider: Arc<dyn crate::context::ContextProvider>) {
334        self.context_providers.push(provider);
335    }
336
337    /// Remove a context provider by name
338    ///
339    /// Returns true if a provider was removed, false otherwise.
340    pub fn remove_context_provider(&mut self, name: &str) -> bool {
341        let initial_len = self.context_providers.len();
342        self.context_providers.retain(|p| p.name() != name);
343        self.context_providers.len() < initial_len
344    }
345
346    /// Get the names of all registered context providers
347    pub fn context_provider_names(&self) -> Vec<String> {
348        self.context_providers
349            .iter()
350            .map(|p| p.name().to_string())
351            .collect()
352    }
353
354    // ========================================================================
355    // Task Management
356    // ========================================================================
357
358    /// Get the current task list
359    pub fn get_tasks(&self) -> &[Task] {
360        &self.tasks
361    }
362
363    /// Set the task list (replaces entire list)
364    ///
365    /// Broadcasts a TaskUpdated event after updating.
366    pub fn set_tasks(&mut self, tasks: Vec<Task>) {
367        self.tasks = tasks.clone();
368        self.touch();
369
370        // Broadcast event
371        let _ = self.event_tx.send(AgentEvent::TaskUpdated {
372            session_id: self.id.clone(),
373            tasks,
374        });
375    }
376
377    /// Get count of active (non-completed, non-cancelled) tasks
378    pub fn active_task_count(&self) -> usize {
379        self.tasks.iter().filter(|t| t.is_active()).count()
380    }
381
382    /// Set handler mode for a lane
383    pub async fn set_lane_handler(
384        &self,
385        lane: crate::hitl::SessionLane,
386        config: LaneHandlerConfig,
387    ) {
388        self.command_queue.set_lane_handler(lane, config).await;
389    }
390
391    /// Get handler config for a lane
392    pub async fn get_lane_handler(&self, lane: crate::hitl::SessionLane) -> LaneHandlerConfig {
393        self.command_queue.get_lane_handler(lane).await
394    }
395
396    /// Complete an external task
397    pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
398        self.command_queue
399            .complete_external_task(task_id, result)
400            .await
401    }
402
403    /// Get pending external tasks
404    pub async fn pending_external_tasks(&self) -> Vec<crate::queue::ExternalTask> {
405        self.command_queue.pending_external_tasks().await
406    }
407
408    /// Get dead letters from the queue's DLQ
409    pub async fn dead_letters(&self) -> Vec<a3s_lane::DeadLetter> {
410        self.command_queue.dead_letters().await
411    }
412
413    /// Get queue metrics snapshot
414    pub async fn queue_metrics(&self) -> Option<a3s_lane::MetricsSnapshot> {
415        self.command_queue.metrics_snapshot().await
416    }
417
418    /// Get queue statistics
419    pub async fn queue_stats(&self) -> crate::queue::SessionQueueStats {
420        self.command_queue.stats().await
421    }
422
423    /// Start the command queue scheduler
424    pub async fn start_queue(&self) -> Result<()> {
425        self.command_queue.start().await
426    }
427
428    /// Stop the command queue scheduler
429    pub async fn stop_queue(&self) {
430        self.command_queue.stop().await;
431    }
432
433    /// Get the system prompt from config
434    pub fn system(&self) -> Option<&str> {
435        self.config.system_prompt.as_deref()
436    }
437
438    /// Get conversation history
439    pub fn history(&self) -> &[Message] {
440        &self.messages
441    }
442
443    /// Add a message to history
444    pub fn add_message(&mut self, message: Message) {
445        self.messages.push(message);
446        self.context_usage.turns = self.messages.len();
447        self.touch();
448    }
449
450    /// Update context usage after a response
451    pub fn update_usage(&mut self, usage: &TokenUsage) {
452        self.total_usage.prompt_tokens += usage.prompt_tokens;
453        self.total_usage.completion_tokens += usage.completion_tokens;
454        self.total_usage.total_tokens += usage.total_tokens;
455
456        // Calculate cost if model pricing is available
457        let cost_usd = if let Some(ref model) = self.model_name {
458            let pricing_map = crate::telemetry::default_model_pricing();
459            if let Some(pricing) = pricing_map.get(model) {
460                let cost = pricing.calculate_cost(usage.prompt_tokens, usage.completion_tokens);
461                self.total_cost += cost;
462                Some(cost)
463            } else {
464                None
465            }
466        } else {
467            None
468        };
469
470        // Record per-call cost for aggregation
471        let model_str = self.model_name.clone().unwrap_or_default();
472        self.cost_records.push(crate::telemetry::LlmCostRecord {
473            model: model_str.clone(),
474            provider: String::new(),
475            prompt_tokens: usage.prompt_tokens,
476            completion_tokens: usage.completion_tokens,
477            total_tokens: usage.total_tokens,
478            cost_usd,
479            timestamp: chrono::Utc::now(),
480            session_id: Some(self.id.clone()),
481        });
482
483        // Record OTLP metrics (counters only; duration recorded in agent loop)
484        crate::telemetry::record_llm_metrics(
485            if model_str.is_empty() {
486                "unknown"
487            } else {
488                &model_str
489            },
490            usage.prompt_tokens,
491            usage.completion_tokens,
492            cost_usd.unwrap_or(0.0),
493            0.0, // Duration not available here; recorded via spans
494        );
495
496        // Estimate context usage (rough approximation)
497        self.context_usage.used_tokens = usage.prompt_tokens;
498        self.context_usage.percent =
499            self.context_usage.used_tokens as f32 / self.context_usage.max_tokens as f32;
500        self.touch();
501    }
502
503    /// Clear conversation history
504    pub fn clear(&mut self) {
505        self.messages.clear();
506        self.context_usage = ContextUsage::default();
507        self.touch();
508    }
509
510    /// Compact context by summarizing old messages
511    pub async fn compact(&mut self, llm_client: &Arc<dyn LlmClient>) -> Result<()> {
512        if let Some(new_messages) =
513            compaction::compact_messages(&self.id, &self.messages, llm_client).await?
514        {
515            self.messages = new_messages;
516            self.touch();
517        }
518        Ok(())
519    }
520
521    /// Pause the session
522    pub fn pause(&mut self) -> bool {
523        if self.state == SessionState::Active {
524            self.state = SessionState::Paused;
525            self.touch();
526            true
527        } else {
528            false
529        }
530    }
531
532    /// Resume the session
533    pub fn resume(&mut self) -> bool {
534        if self.state == SessionState::Paused {
535            self.state = SessionState::Active;
536            self.touch();
537            true
538        } else {
539            false
540        }
541    }
542
543    /// Set session state to error
544    pub fn set_error(&mut self) {
545        self.state = SessionState::Error;
546        self.touch();
547    }
548
549    /// Set session state to completed
550    pub fn set_completed(&mut self) {
551        self.state = SessionState::Completed;
552        self.touch();
553    }
554
555    /// Update the updated_at timestamp
556    fn touch(&mut self) {
557        self.updated_at = std::time::SystemTime::now()
558            .duration_since(std::time::UNIX_EPOCH)
559            .map(|d| d.as_secs() as i64)
560            .unwrap_or(0);
561    }
562
563    /// Convert to serializable SessionData for persistence
564    pub fn to_session_data(&self, llm_config: Option<LlmConfigData>) -> SessionData {
565        SessionData {
566            id: self.id.clone(),
567            config: self.config.clone(),
568            state: self.state,
569            messages: self.messages.clone(),
570            context_usage: self.context_usage.clone(),
571            total_usage: self.total_usage.clone(),
572            total_cost: self.total_cost,
573            model_name: self.model_name.clone(),
574            cost_records: self.cost_records.clone(),
575            tool_names: SessionData::tool_names_from_definitions(&self.tools),
576            thinking_enabled: self.thinking_enabled,
577            thinking_budget: self.thinking_budget,
578            created_at: self.created_at,
579            updated_at: self.updated_at,
580            llm_config,
581            tasks: self.tasks.clone(),
582            parent_id: self.parent_id.clone(),
583        }
584    }
585
586    /// Restore session state from SessionData
587    ///
588    /// Note: This only restores serializable fields. Non-serializable fields
589    /// (event_tx, command_queue, confirmation_manager) are already initialized
590    /// in Session::new().
591    pub fn restore_from_data(&mut self, data: &SessionData) {
592        self.state = data.state;
593        self.messages = data.messages.clone();
594        self.context_usage = data.context_usage.clone();
595        self.total_usage = data.total_usage.clone();
596        self.total_cost = data.total_cost;
597        self.model_name = data.model_name.clone();
598        self.cost_records = data.cost_records.clone();
599        self.thinking_enabled = data.thinking_enabled;
600        self.thinking_budget = data.thinking_budget;
601        self.created_at = data.created_at;
602        self.updated_at = data.updated_at;
603        self.tasks = data.tasks.clone();
604        self.parent_id = data.parent_id.clone();
605    }
606}