1pub use a3s_lane::MetricsSnapshot;
21use anyhow::Result;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::time::{Duration, Instant};
26
27#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub enum SessionLane {
34 Control,
36 Query,
38 Execute,
40 Generate,
42}
43
44impl SessionLane {
45 pub fn priority(&self) -> u8 {
47 match self {
48 SessionLane::Control => 0,
49 SessionLane::Query => 1,
50 SessionLane::Execute => 2,
51 SessionLane::Generate => 3,
52 }
53 }
54
55 pub fn from_tool_name(tool_name: &str) -> Self {
57 match tool_name {
58 "read" | "glob" | "ls" | "grep" | "list_files" | "search" | "web_fetch"
59 | "web_search" => SessionLane::Query,
60 "bash" | "write" | "edit" | "delete" | "move" | "copy" | "execute" => {
61 SessionLane::Execute
62 }
63 _ => SessionLane::Execute,
64 }
65 }
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
74pub enum TaskHandlerMode {
75 #[default]
77 Internal,
78 External,
80 Hybrid,
82}
83
84#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct LaneHandlerConfig {
87 pub mode: TaskHandlerMode,
89 pub timeout_ms: u64,
91}
92
93impl Default for LaneHandlerConfig {
94 fn default() -> Self {
95 Self {
96 mode: TaskHandlerMode::Internal,
97 timeout_ms: 60_000,
98 }
99 }
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ExternalTask {
109 pub task_id: String,
111 pub session_id: String,
113 pub lane: SessionLane,
115 pub command_type: String,
117 pub payload: serde_json::Value,
119 pub timeout_ms: u64,
121 #[serde(skip)]
123 pub created_at: Option<Instant>,
124}
125
126impl ExternalTask {
127 pub fn is_timed_out(&self) -> bool {
129 self.created_at
130 .map(|t| t.elapsed() > Duration::from_millis(self.timeout_ms))
131 .unwrap_or(false)
132 }
133
134 pub fn remaining_ms(&self) -> u64 {
136 self.created_at
137 .map(|t| {
138 let elapsed = t.elapsed().as_millis() as u64;
139 self.timeout_ms.saturating_sub(elapsed)
140 })
141 .unwrap_or(self.timeout_ms)
142 }
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ExternalTaskResult {
148 pub success: bool,
150 pub result: serde_json::Value,
152 pub error: Option<String>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub struct SessionQueueConfig {
164 #[serde(default = "default_control_concurrency")]
166 pub control_max_concurrency: usize,
167 #[serde(default = "default_query_concurrency")]
169 pub query_max_concurrency: usize,
170 #[serde(default = "default_execute_concurrency")]
172 pub execute_max_concurrency: usize,
173 #[serde(default = "default_generate_concurrency")]
175 pub generate_max_concurrency: usize,
176 #[serde(default)]
178 pub lane_handlers: HashMap<SessionLane, LaneHandlerConfig>,
179
180 #[serde(default)]
185 pub enable_dlq: bool,
186 #[serde(default)]
188 pub dlq_max_size: Option<usize>,
189 #[serde(default)]
191 pub enable_metrics: bool,
192 #[serde(default)]
194 pub enable_alerts: bool,
195 #[serde(default)]
197 pub default_timeout_ms: Option<u64>,
198 #[serde(default)]
200 pub storage_path: Option<std::path::PathBuf>,
201
202 #[serde(default)]
207 pub retry_policy: Option<RetryPolicyConfig>,
208 #[serde(default)]
210 pub rate_limit: Option<RateLimitConfig>,
211 #[serde(default)]
213 pub priority_boost: Option<PriorityBoostConfig>,
214 #[serde(default)]
216 pub pressure_threshold: Option<usize>,
217 #[serde(default)]
219 pub lane_timeouts: HashMap<SessionLane, u64>,
220}
221
222#[derive(Debug, Clone, Serialize, Deserialize)]
224#[serde(rename_all = "camelCase")]
225pub struct RetryPolicyConfig {
226 pub strategy: String,
228 #[serde(default = "default_max_retries")]
230 pub max_retries: u32,
231 #[serde(default = "default_initial_delay_ms")]
233 pub initial_delay_ms: u64,
234 #[serde(default)]
236 pub fixed_delay_ms: Option<u64>,
237}
238
239fn default_max_retries() -> u32 {
240 3
241}
242
243fn default_initial_delay_ms() -> u64 {
244 100
245}
246
247#[derive(Debug, Clone, Serialize, Deserialize)]
249#[serde(rename_all = "camelCase")]
250pub struct RateLimitConfig {
251 pub limit_type: String,
253 #[serde(default)]
255 pub max_operations: Option<u64>,
256}
257
258#[derive(Debug, Clone, Serialize, Deserialize)]
260#[serde(rename_all = "camelCase")]
261pub struct PriorityBoostConfig {
262 pub strategy: String,
264 #[serde(default)]
266 pub deadline_ms: Option<u64>,
267}
268
269fn default_control_concurrency() -> usize {
270 4
271}
272
273fn default_query_concurrency() -> usize {
274 12 }
276
277fn default_execute_concurrency() -> usize {
278 4
279}
280
281fn default_generate_concurrency() -> usize {
282 2
283}
284
285impl Default for SessionQueueConfig {
286 fn default() -> Self {
287 Self {
288 control_max_concurrency: 2,
289 query_max_concurrency: 4,
290 execute_max_concurrency: 2,
291 generate_max_concurrency: 1,
292 lane_handlers: HashMap::new(),
293 enable_dlq: false,
294 dlq_max_size: None,
295 enable_metrics: false,
296 enable_alerts: false,
297 default_timeout_ms: None,
298 storage_path: None,
299 retry_policy: None,
300 rate_limit: None,
301 priority_boost: None,
302 pressure_threshold: None,
303 lane_timeouts: HashMap::new(),
304 }
305 }
306}
307
308impl SessionQueueConfig {
309 pub fn max_concurrency(&self, lane: SessionLane) -> usize {
311 match lane {
312 SessionLane::Control => self.control_max_concurrency,
313 SessionLane::Query => self.query_max_concurrency,
314 SessionLane::Execute => self.execute_max_concurrency,
315 SessionLane::Generate => self.generate_max_concurrency,
316 }
317 }
318
319 pub fn handler_config(&self, lane: SessionLane) -> LaneHandlerConfig {
321 self.lane_handlers.get(&lane).cloned().unwrap_or_default()
322 }
323
324 pub fn with_dlq(mut self, max_size: Option<usize>) -> Self {
326 self.enable_dlq = true;
327 self.dlq_max_size = max_size;
328 self
329 }
330
331 pub fn with_metrics(mut self) -> Self {
333 self.enable_metrics = true;
334 self
335 }
336
337 pub fn with_alerts(mut self) -> Self {
339 self.enable_alerts = true;
340 self
341 }
342
343 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
345 self.default_timeout_ms = Some(timeout_ms);
346 self
347 }
348
349 pub fn with_storage(mut self, path: impl Into<std::path::PathBuf>) -> Self {
351 self.storage_path = Some(path.into());
352 self
353 }
354
355 pub fn with_lane_features(mut self) -> Self {
357 self.enable_dlq = true;
358 self.dlq_max_size = Some(1000);
359 self.enable_metrics = true;
360 self.enable_alerts = true;
361 self.default_timeout_ms = Some(60_000);
362 self
363 }
364}
365
366#[async_trait]
372pub trait SessionCommand: Send + Sync {
373 async fn execute(&self) -> Result<serde_json::Value>;
375
376 fn command_type(&self) -> &str;
378
379 fn payload(&self) -> serde_json::Value {
381 serde_json::json!({})
382 }
383}
384
385#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct LaneStatus {
392 pub lane: SessionLane,
393 pub pending: usize,
394 pub active: usize,
395 pub max_concurrency: usize,
396 pub handler_mode: TaskHandlerMode,
397}
398
399#[derive(Debug, Clone, Default, Serialize, Deserialize)]
401pub struct SessionQueueStats {
402 pub total_pending: usize,
403 pub total_active: usize,
404 pub external_pending: usize,
405 pub lanes: HashMap<String, LaneStatus>,
406}
407
408#[cfg(test)]
413mod tests {
414 use super::*;
415
416 #[test]
417 fn test_task_handler_mode_default() {
418 let mode = TaskHandlerMode::default();
419 assert_eq!(mode, TaskHandlerMode::Internal);
420 }
421
422 #[test]
423 fn test_lane_handler_config_default() {
424 let config = LaneHandlerConfig::default();
425 assert_eq!(config.mode, TaskHandlerMode::Internal);
426 assert_eq!(config.timeout_ms, 60_000);
427 }
428
429 #[test]
430 fn test_external_task_timeout() {
431 let task = ExternalTask {
432 task_id: "test".to_string(),
433 session_id: "session".to_string(),
434 lane: SessionLane::Query,
435 command_type: "read".to_string(),
436 payload: serde_json::json!({}),
437 timeout_ms: 100,
438 created_at: Some(Instant::now()),
439 };
440
441 assert!(!task.is_timed_out());
442 assert!(task.remaining_ms() <= 100);
443 }
444
445 #[test]
446 fn test_session_queue_config_default() {
447 let config = SessionQueueConfig::default();
448 assert_eq!(config.control_max_concurrency, 2);
449 assert_eq!(config.query_max_concurrency, 4);
450 assert_eq!(config.execute_max_concurrency, 2);
451 assert_eq!(config.generate_max_concurrency, 1);
452 assert!(!config.enable_dlq);
453 assert!(!config.enable_metrics);
454 assert!(!config.enable_alerts);
455 }
456
457 #[test]
458 fn test_session_queue_config_max_concurrency() {
459 let config = SessionQueueConfig::default();
460 assert_eq!(config.max_concurrency(SessionLane::Control), 2);
461 assert_eq!(config.max_concurrency(SessionLane::Query), 4);
462 assert_eq!(config.max_concurrency(SessionLane::Execute), 2);
463 assert_eq!(config.max_concurrency(SessionLane::Generate), 1);
464 }
465
466 #[test]
467 fn test_session_queue_config_handler_config() {
468 let config = SessionQueueConfig::default();
469 let handler = config.handler_config(SessionLane::Execute);
470 assert_eq!(handler.mode, TaskHandlerMode::Internal);
471 assert_eq!(handler.timeout_ms, 60_000);
472 }
473
474 #[test]
475 fn test_session_queue_config_builders() {
476 let config = SessionQueueConfig::default()
477 .with_dlq(Some(500))
478 .with_metrics()
479 .with_alerts()
480 .with_timeout(30_000);
481
482 assert!(config.enable_dlq);
483 assert_eq!(config.dlq_max_size, Some(500));
484 assert!(config.enable_metrics);
485 assert!(config.enable_alerts);
486 assert_eq!(config.default_timeout_ms, Some(30_000));
487 }
488
489 #[test]
490 fn test_session_queue_config_with_lane_features() {
491 let config = SessionQueueConfig::default().with_lane_features();
492
493 assert!(config.enable_dlq);
494 assert_eq!(config.dlq_max_size, Some(1000));
495 assert!(config.enable_metrics);
496 assert!(config.enable_alerts);
497 assert_eq!(config.default_timeout_ms, Some(60_000));
498 }
499
500 #[test]
501 fn test_external_task_result() {
502 let result = ExternalTaskResult {
503 success: true,
504 result: serde_json::json!({"output": "hello"}),
505 error: None,
506 };
507 assert!(result.success);
508 assert!(result.error.is_none());
509 }
510}