1pub(crate) mod compaction;
14pub mod manager;
15
16pub use manager::SessionManager;
17
18#[cfg(test)]
19#[path = "tests.rs"]
20mod tests_file;
21
22use crate::agent::AgentEvent;
23use crate::hitl::{ConfirmationManager, ConfirmationPolicy, ConfirmationProvider};
24use crate::llm::{LlmClient, Message, TokenUsage, ToolDefinition};
25use crate::permissions::{PermissionChecker, PermissionDecision, PermissionPolicy};
26use crate::planning::Task;
27use crate::queue::{ExternalTaskResult, LaneHandlerConfig, SessionQueueConfig};
28use crate::session_lane_queue::SessionLaneQueue;
29use crate::store::{LlmConfigData, SessionData};
30use anyhow::Result;
31use serde::{Deserialize, Serialize};
32use std::sync::Arc;
33use tokio::sync::{broadcast, RwLock};
34
35#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
37pub enum SessionState {
38 #[default]
39 Unknown = 0,
40 Active = 1,
41 Paused = 2,
42 Completed = 3,
43 Error = 4,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct ContextUsage {
49 pub used_tokens: usize,
50 pub max_tokens: usize,
51 pub percent: f32,
52 pub turns: usize,
53}
54
55impl Default for ContextUsage {
56 fn default() -> Self {
57 Self {
58 used_tokens: 0,
59 max_tokens: 200_000,
60 percent: 0.0,
61 turns: 0,
62 }
63 }
64}
65
66pub const DEFAULT_AUTO_COMPACT_THRESHOLD: f32 = 0.80;
68
69fn default_auto_compact_threshold() -> f32 {
71 DEFAULT_AUTO_COMPACT_THRESHOLD
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct SessionConfig {
77 pub name: String,
78 pub workspace: String,
79 pub system_prompt: Option<String>,
80 pub max_context_length: u32,
81 pub auto_compact: bool,
82 #[serde(default = "default_auto_compact_threshold")]
85 pub auto_compact_threshold: f32,
86 #[serde(default)]
88 pub storage_type: crate::config::StorageBackend,
89 #[serde(skip_serializing_if = "Option::is_none")]
91 pub queue_config: Option<SessionQueueConfig>,
92 #[serde(skip_serializing_if = "Option::is_none")]
94 pub confirmation_policy: Option<ConfirmationPolicy>,
95 #[serde(skip_serializing_if = "Option::is_none")]
97 pub permission_policy: Option<PermissionPolicy>,
98 #[serde(skip_serializing_if = "Option::is_none")]
100 pub parent_id: Option<String>,
101 #[serde(skip_serializing_if = "Option::is_none")]
103 pub security_config: Option<crate::security::SecurityConfig>,
104 #[serde(skip)]
106 pub hook_engine: Option<std::sync::Arc<dyn crate::hooks::HookExecutor>>,
107 #[serde(default)]
109 pub planning_enabled: bool,
110 #[serde(default)]
112 pub goal_tracking: bool,
113}
114
115impl Default for SessionConfig {
116 fn default() -> Self {
117 Self {
118 name: String::new(),
119 workspace: String::new(),
120 system_prompt: None,
121 max_context_length: 0,
122 auto_compact: false,
123 auto_compact_threshold: DEFAULT_AUTO_COMPACT_THRESHOLD,
124 storage_type: crate::config::StorageBackend::default(),
125 queue_config: None,
126 confirmation_policy: None,
127 permission_policy: None,
128 parent_id: None,
129 security_config: None,
130 hook_engine: None,
131 planning_enabled: false,
132 goal_tracking: false,
133 }
134 }
135}
136
137pub struct Session {
138 pub id: String,
139 pub config: SessionConfig,
140 pub state: SessionState,
141 pub messages: Vec<Message>,
142 pub context_usage: ContextUsage,
143 pub total_usage: TokenUsage,
144 pub total_cost: f64,
146 pub model_name: Option<String>,
148 pub tools: Vec<ToolDefinition>,
149 pub thinking_enabled: bool,
150 pub thinking_budget: Option<usize>,
151 pub llm_client: Option<Arc<dyn LlmClient>>,
153 pub created_at: i64,
155 pub updated_at: i64,
157 pub command_queue: SessionLaneQueue,
159 pub confirmation_manager: Arc<dyn ConfirmationProvider>,
161 pub permission_checker: Arc<dyn PermissionChecker>,
163 event_tx: broadcast::Sender<AgentEvent>,
165 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
167 pub tasks: Vec<Task>,
169 pub parent_id: Option<String>,
171 pub memory: Option<Arc<RwLock<crate::memory::AgentMemory>>>,
173 pub current_plan: Arc<RwLock<Option<crate::planning::ExecutionPlan>>>,
175 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
177 pub tool_metrics: Arc<RwLock<crate::telemetry::ToolMetrics>>,
179 pub cost_records: Vec<crate::telemetry::LlmCostRecord>,
181}
182
183fn validate_path_safe_id(id: &str, label: &str) -> Result<()> {
186 if id.is_empty() {
187 anyhow::bail!("{label} must not be empty");
188 }
189 let is_safe = id
191 .chars()
192 .all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_' || c == '.')
193 && !id.starts_with('.')
194 && !id.contains("..");
195 if !is_safe {
196 anyhow::bail!("{label} contains unsafe characters: {id:?}");
197 }
198 Ok(())
199}
200
201impl Session {
202 pub async fn new(
204 id: String,
205 config: SessionConfig,
206 tools: Vec<ToolDefinition>,
207 ) -> Result<Self> {
208 validate_path_safe_id(&id, "Session ID")?;
210
211 let now = std::time::SystemTime::now()
212 .duration_since(std::time::UNIX_EPOCH)
213 .map(|d| d.as_secs() as i64)
214 .unwrap_or(0);
215
216 let (event_tx, _) = broadcast::channel(100);
218
219 let queue_config = config.queue_config.clone().unwrap_or_default();
221 let command_queue = SessionLaneQueue::new(&id, queue_config, event_tx.clone()).await?;
222
223 let confirmation_policy = config
225 .confirmation_policy
226 .clone()
227 .unwrap_or_else(ConfirmationPolicy::enabled);
228 let confirmation_manager = Arc::new(ConfirmationManager::new(
229 confirmation_policy,
230 event_tx.clone(),
231 ));
232
233 let permission_checker: Arc<dyn PermissionChecker> =
235 Arc::new(config.permission_policy.clone().unwrap_or_default());
236
237 let parent_id = config.parent_id.clone();
239
240 let memory = None;
242
243 let context_providers: Vec<Arc<dyn crate::context::ContextProvider>> = vec![];
244
245 let current_plan = Arc::new(RwLock::new(None));
247
248 let security_provider: Option<Arc<dyn crate::security::SecurityProvider>> =
250 config.security_config.as_ref().and_then(|sc| {
251 if sc.enabled {
252 Some(Arc::new(crate::security::NoOpSecurityProvider)
253 as Arc<dyn crate::security::SecurityProvider>)
254 } else {
255 None
256 }
257 });
258
259 Ok(Self {
260 id,
261 config,
262 state: SessionState::Active,
263 messages: Vec::new(),
264 context_usage: ContextUsage::default(),
265 total_usage: TokenUsage::default(),
266 total_cost: 0.0,
267 model_name: None,
268 tools,
269 thinking_enabled: false,
270 thinking_budget: None,
271 llm_client: None,
272 created_at: now,
273 updated_at: now,
274 command_queue,
275 confirmation_manager,
276 permission_checker,
277 event_tx,
278 context_providers,
279 tasks: Vec::new(),
280 parent_id,
281 memory,
282 current_plan,
283
284 security_provider,
285 tool_metrics: Arc::new(RwLock::new(crate::telemetry::ToolMetrics::new())),
286 cost_records: Vec::new(),
287 })
288 }
289
290 pub fn is_child_session(&self) -> bool {
292 self.parent_id.is_some()
293 }
294
295 pub fn parent_session_id(&self) -> Option<&str> {
297 self.parent_id.as_deref()
298 }
299
300 pub fn subscribe_events(&self) -> broadcast::Receiver<AgentEvent> {
302 self.event_tx.subscribe()
303 }
304
305 pub fn event_tx(&self) -> broadcast::Sender<AgentEvent> {
307 self.event_tx.clone()
308 }
309
310 pub async fn set_confirmation_policy(&self, policy: ConfirmationPolicy) {
312 self.confirmation_manager.set_policy(policy).await;
313 }
314
315 pub async fn confirmation_policy(&self) -> ConfirmationPolicy {
317 self.confirmation_manager.policy().await
318 }
319
320 pub fn check_permission(
322 &self,
323 tool_name: &str,
324 args: &serde_json::Value,
325 ) -> PermissionDecision {
326 self.permission_checker.check(tool_name, args)
327 }
328
329 pub fn add_context_provider(&mut self, provider: Arc<dyn crate::context::ContextProvider>) {
331 self.context_providers.push(provider);
332 }
333
334 pub fn remove_context_provider(&mut self, name: &str) -> bool {
338 let initial_len = self.context_providers.len();
339 self.context_providers.retain(|p| p.name() != name);
340 self.context_providers.len() < initial_len
341 }
342
343 pub fn context_provider_names(&self) -> Vec<String> {
345 self.context_providers
346 .iter()
347 .map(|p| p.name().to_string())
348 .collect()
349 }
350
351 pub fn get_tasks(&self) -> &[Task] {
357 &self.tasks
358 }
359
360 pub fn set_tasks(&mut self, tasks: Vec<Task>) {
364 self.tasks = tasks.clone();
365 self.touch();
366
367 let _ = self.event_tx.send(AgentEvent::TaskUpdated {
369 session_id: self.id.clone(),
370 tasks,
371 });
372 }
373
374 pub fn active_task_count(&self) -> usize {
376 self.tasks.iter().filter(|t| t.is_active()).count()
377 }
378
379 pub async fn set_lane_handler(
381 &self,
382 lane: crate::hitl::SessionLane,
383 config: LaneHandlerConfig,
384 ) {
385 self.command_queue.set_lane_handler(lane, config).await;
386 }
387
388 pub async fn get_lane_handler(&self, lane: crate::hitl::SessionLane) -> LaneHandlerConfig {
390 self.command_queue.get_lane_handler(lane).await
391 }
392
393 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
395 self.command_queue
396 .complete_external_task(task_id, result)
397 .await
398 }
399
400 pub async fn pending_external_tasks(&self) -> Vec<crate::queue::ExternalTask> {
402 self.command_queue.pending_external_tasks().await
403 }
404
405 pub async fn dead_letters(&self) -> Vec<a3s_lane::DeadLetter> {
407 self.command_queue.dead_letters().await
408 }
409
410 pub async fn queue_metrics(&self) -> Option<a3s_lane::MetricsSnapshot> {
412 self.command_queue.metrics_snapshot().await
413 }
414
415 pub async fn queue_stats(&self) -> crate::queue::SessionQueueStats {
417 self.command_queue.stats().await
418 }
419
420 pub async fn start_queue(&self) -> Result<()> {
422 self.command_queue.start().await
423 }
424
425 pub async fn stop_queue(&self) {
427 self.command_queue.stop().await;
428 }
429
430 pub fn system(&self) -> Option<&str> {
432 self.config.system_prompt.as_deref()
433 }
434
435 pub fn history(&self) -> &[Message] {
437 &self.messages
438 }
439
440 pub fn add_message(&mut self, message: Message) {
442 self.messages.push(message);
443 self.context_usage.turns = self.messages.len();
444 self.touch();
445 }
446
447 pub fn update_usage(&mut self, usage: &TokenUsage) {
449 self.total_usage.prompt_tokens += usage.prompt_tokens;
450 self.total_usage.completion_tokens += usage.completion_tokens;
451 self.total_usage.total_tokens += usage.total_tokens;
452
453 let cost_usd = if let Some(ref model) = self.model_name {
455 let pricing_map = crate::telemetry::default_model_pricing();
456 if let Some(pricing) = pricing_map.get(model) {
457 let cost = pricing.calculate_cost(usage.prompt_tokens, usage.completion_tokens);
458 self.total_cost += cost;
459 Some(cost)
460 } else {
461 None
462 }
463 } else {
464 None
465 };
466
467 let model_str = self.model_name.clone().unwrap_or_default();
469 self.cost_records.push(crate::telemetry::LlmCostRecord {
470 model: model_str.clone(),
471 provider: String::new(),
472 prompt_tokens: usage.prompt_tokens,
473 completion_tokens: usage.completion_tokens,
474 total_tokens: usage.total_tokens,
475 cost_usd,
476 timestamp: chrono::Utc::now(),
477 session_id: Some(self.id.clone()),
478 });
479
480 crate::telemetry::record_llm_metrics(
482 if model_str.is_empty() {
483 "unknown"
484 } else {
485 &model_str
486 },
487 usage.prompt_tokens,
488 usage.completion_tokens,
489 cost_usd.unwrap_or(0.0),
490 0.0, );
492
493 self.context_usage.used_tokens = usage.prompt_tokens;
495 self.context_usage.percent =
496 self.context_usage.used_tokens as f32 / self.context_usage.max_tokens as f32;
497 self.touch();
498 }
499
500 pub fn clear(&mut self) {
502 self.messages.clear();
503 self.context_usage = ContextUsage::default();
504 self.touch();
505 }
506
507 pub async fn compact(&mut self, llm_client: &Arc<dyn LlmClient>) -> Result<()> {
509 if let Some(new_messages) =
510 compaction::compact_messages(&self.id, &self.messages, llm_client).await?
511 {
512 self.messages = new_messages;
513 self.touch();
514 }
515 Ok(())
516 }
517
518 pub fn pause(&mut self) -> bool {
520 if self.state == SessionState::Active {
521 self.state = SessionState::Paused;
522 self.touch();
523 true
524 } else {
525 false
526 }
527 }
528
529 pub fn resume(&mut self) -> bool {
531 if self.state == SessionState::Paused {
532 self.state = SessionState::Active;
533 self.touch();
534 true
535 } else {
536 false
537 }
538 }
539
540 pub fn set_error(&mut self) {
542 self.state = SessionState::Error;
543 self.touch();
544 }
545
546 pub fn set_completed(&mut self) {
548 self.state = SessionState::Completed;
549 self.touch();
550 }
551
552 fn touch(&mut self) {
554 self.updated_at = std::time::SystemTime::now()
555 .duration_since(std::time::UNIX_EPOCH)
556 .map(|d| d.as_secs() as i64)
557 .unwrap_or(0);
558 }
559
560 pub fn to_session_data(&self, llm_config: Option<LlmConfigData>) -> SessionData {
562 SessionData {
563 id: self.id.clone(),
564 config: self.config.clone(),
565 state: self.state,
566 messages: self.messages.clone(),
567 context_usage: self.context_usage.clone(),
568 total_usage: self.total_usage.clone(),
569 total_cost: self.total_cost,
570 model_name: self.model_name.clone(),
571 cost_records: self.cost_records.clone(),
572 tool_names: SessionData::tool_names_from_definitions(&self.tools),
573 thinking_enabled: self.thinking_enabled,
574 thinking_budget: self.thinking_budget,
575 created_at: self.created_at,
576 updated_at: self.updated_at,
577 llm_config,
578 tasks: self.tasks.clone(),
579 parent_id: self.parent_id.clone(),
580 }
581 }
582
583 pub fn restore_from_data(&mut self, data: &SessionData) {
589 self.state = data.state;
590 self.messages = data.messages.clone();
591 self.context_usage = data.context_usage.clone();
592 self.total_usage = data.total_usage.clone();
593 self.total_cost = data.total_cost;
594 self.model_name = data.model_name.clone();
595 self.cost_records = data.cost_records.clone();
596 self.thinking_enabled = data.thinking_enabled;
597 self.thinking_budget = data.thinking_budget;
598 self.created_at = data.created_at;
599 self.updated_at = data.updated_at;
600 self.tasks = data.tasks.clone();
601 self.parent_id = data.parent_id.clone();
602 }
603}