use anyhow::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SessionLane {
Control,
Query,
Execute,
Generate,
}
impl SessionLane {
pub fn priority(&self) -> u8 {
match self {
SessionLane::Control => 0,
SessionLane::Query => 1,
SessionLane::Execute => 2,
SessionLane::Generate => 3,
}
}
pub fn from_tool_name(tool_name: &str) -> Self {
match tool_name {
"read" | "glob" | "ls" | "grep" | "list_files" | "search" | "web_fetch"
| "web_search" => SessionLane::Query,
"bash" | "write" | "edit" | "delete" | "move" | "copy" | "execute" => {
SessionLane::Execute
}
_ => SessionLane::Execute,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
pub enum TaskHandlerMode {
#[default]
Internal,
External,
Hybrid,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaneHandlerConfig {
pub mode: TaskHandlerMode,
pub timeout_ms: u64,
}
impl Default for LaneHandlerConfig {
fn default() -> Self {
Self {
mode: TaskHandlerMode::Internal,
timeout_ms: 60_000,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalTask {
pub task_id: String,
pub session_id: String,
pub lane: SessionLane,
pub command_type: String,
pub payload: serde_json::Value,
pub timeout_ms: u64,
#[serde(skip)]
pub created_at: Option<Instant>,
}
impl ExternalTask {
pub fn is_timed_out(&self) -> bool {
self.created_at
.map(|t| t.elapsed() > Duration::from_millis(self.timeout_ms))
.unwrap_or(false)
}
pub fn remaining_ms(&self) -> u64 {
self.created_at
.map(|t| {
let elapsed = t.elapsed().as_millis() as u64;
self.timeout_ms.saturating_sub(elapsed)
})
.unwrap_or(self.timeout_ms)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalTaskResult {
pub success: bool,
pub result: serde_json::Value,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SessionQueueConfig {
#[serde(default = "default_control_concurrency")]
pub control_max_concurrency: usize,
#[serde(default = "default_query_concurrency")]
pub query_max_concurrency: usize,
#[serde(default = "default_execute_concurrency")]
pub execute_max_concurrency: usize,
#[serde(default = "default_generate_concurrency")]
pub generate_max_concurrency: usize,
#[serde(default)]
pub lane_handlers: HashMap<SessionLane, LaneHandlerConfig>,
#[serde(default)]
pub enable_dlq: bool,
#[serde(default)]
pub dlq_max_size: Option<usize>,
#[serde(default)]
pub enable_metrics: bool,
#[serde(default)]
pub enable_alerts: bool,
#[serde(default)]
pub default_timeout_ms: Option<u64>,
#[serde(default)]
pub storage_path: Option<std::path::PathBuf>,
#[serde(default)]
pub retry_policy: Option<RetryPolicyConfig>,
#[serde(default)]
pub rate_limit: Option<RateLimitConfig>,
#[serde(default)]
pub priority_boost: Option<PriorityBoostConfig>,
#[serde(default)]
pub pressure_threshold: Option<usize>,
#[serde(default)]
pub lane_timeouts: HashMap<SessionLane, u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RetryPolicyConfig {
pub strategy: String,
#[serde(default = "default_max_retries")]
pub max_retries: u32,
#[serde(default = "default_initial_delay_ms")]
pub initial_delay_ms: u64,
#[serde(default)]
pub fixed_delay_ms: Option<u64>,
}
fn default_max_retries() -> u32 {
3
}
fn default_initial_delay_ms() -> u64 {
100
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RateLimitConfig {
pub limit_type: String,
#[serde(default)]
pub max_operations: Option<u64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct PriorityBoostConfig {
pub strategy: String,
#[serde(default)]
pub deadline_ms: Option<u64>,
}
fn default_control_concurrency() -> usize {
4
}
fn default_query_concurrency() -> usize {
12 }
fn default_execute_concurrency() -> usize {
4
}
fn default_generate_concurrency() -> usize {
2
}
impl Default for SessionQueueConfig {
fn default() -> Self {
Self {
control_max_concurrency: 2,
query_max_concurrency: 4,
execute_max_concurrency: 2,
generate_max_concurrency: 1,
lane_handlers: HashMap::new(),
enable_dlq: false,
dlq_max_size: None,
enable_metrics: false,
enable_alerts: false,
default_timeout_ms: None,
storage_path: None,
retry_policy: None,
rate_limit: None,
priority_boost: None,
pressure_threshold: None,
lane_timeouts: HashMap::new(),
}
}
}
impl SessionQueueConfig {
pub fn max_concurrency(&self, lane: SessionLane) -> usize {
match lane {
SessionLane::Control => self.control_max_concurrency,
SessionLane::Query => self.query_max_concurrency,
SessionLane::Execute => self.execute_max_concurrency,
SessionLane::Generate => self.generate_max_concurrency,
}
}
pub fn handler_config(&self, lane: SessionLane) -> LaneHandlerConfig {
self.lane_handlers.get(&lane).cloned().unwrap_or_default()
}
pub fn with_dlq(mut self, max_size: Option<usize>) -> Self {
self.enable_dlq = true;
self.dlq_max_size = max_size;
self
}
pub fn with_metrics(mut self) -> Self {
self.enable_metrics = true;
self
}
pub fn with_alerts(mut self) -> Self {
self.enable_alerts = true;
self
}
pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
self.default_timeout_ms = Some(timeout_ms);
self
}
pub fn with_storage(mut self, path: impl Into<std::path::PathBuf>) -> Self {
self.storage_path = Some(path.into());
self
}
pub fn with_lane_features(mut self) -> Self {
self.enable_dlq = true;
self.dlq_max_size = Some(1000);
self.enable_metrics = true;
self.enable_alerts = true;
self.default_timeout_ms = Some(60_000);
self
}
}
#[async_trait]
pub trait SessionCommand: Send + Sync {
async fn execute(&self) -> Result<serde_json::Value>;
fn command_type(&self) -> &str;
fn payload(&self) -> serde_json::Value {
serde_json::json!({})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaneStatus {
pub lane: SessionLane,
pub pending: usize,
pub active: usize,
pub max_concurrency: usize,
pub handler_mode: TaskHandlerMode,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct SessionQueueStats {
pub total_pending: usize,
pub total_active: usize,
pub external_pending: usize,
pub lanes: HashMap<String, LaneStatus>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_task_handler_mode_default() {
let mode = TaskHandlerMode::default();
assert_eq!(mode, TaskHandlerMode::Internal);
}
#[test]
fn test_lane_handler_config_default() {
let config = LaneHandlerConfig::default();
assert_eq!(config.mode, TaskHandlerMode::Internal);
assert_eq!(config.timeout_ms, 60_000);
}
#[test]
fn test_external_task_timeout() {
let task = ExternalTask {
task_id: "test".to_string(),
session_id: "session".to_string(),
lane: SessionLane::Query,
command_type: "read".to_string(),
payload: serde_json::json!({}),
timeout_ms: 100,
created_at: Some(Instant::now()),
};
assert!(!task.is_timed_out());
assert!(task.remaining_ms() <= 100);
}
#[test]
fn test_session_queue_config_default() {
let config = SessionQueueConfig::default();
assert_eq!(config.control_max_concurrency, 2);
assert_eq!(config.query_max_concurrency, 4);
assert_eq!(config.execute_max_concurrency, 2);
assert_eq!(config.generate_max_concurrency, 1);
assert!(!config.enable_dlq);
assert!(!config.enable_metrics);
assert!(!config.enable_alerts);
}
#[test]
fn test_session_queue_config_max_concurrency() {
let config = SessionQueueConfig::default();
assert_eq!(config.max_concurrency(SessionLane::Control), 2);
assert_eq!(config.max_concurrency(SessionLane::Query), 4);
assert_eq!(config.max_concurrency(SessionLane::Execute), 2);
assert_eq!(config.max_concurrency(SessionLane::Generate), 1);
}
#[test]
fn test_session_queue_config_handler_config() {
let config = SessionQueueConfig::default();
let handler = config.handler_config(SessionLane::Execute);
assert_eq!(handler.mode, TaskHandlerMode::Internal);
assert_eq!(handler.timeout_ms, 60_000);
}
#[test]
fn test_session_queue_config_builders() {
let config = SessionQueueConfig::default()
.with_dlq(Some(500))
.with_metrics()
.with_alerts()
.with_timeout(30_000);
assert!(config.enable_dlq);
assert_eq!(config.dlq_max_size, Some(500));
assert!(config.enable_metrics);
assert!(config.enable_alerts);
assert_eq!(config.default_timeout_ms, Some(30_000));
}
#[test]
fn test_session_queue_config_with_lane_features() {
let config = SessionQueueConfig::default().with_lane_features();
assert!(config.enable_dlq);
assert_eq!(config.dlq_max_size, Some(1000));
assert!(config.enable_metrics);
assert!(config.enable_alerts);
assert_eq!(config.default_timeout_ms, Some(60_000));
}
#[test]
fn test_external_task_result() {
let result = ExternalTaskResult {
success: true,
result: serde_json::json!({"output": "hello"}),
error: None,
};
assert!(result.success);
assert!(result.error.is_none());
}
}