Skip to main content

a3s_code_core/
queue.rs

1//! Per-session command queue with lane-based priority scheduling
2//!
3//! Provides session-isolated command queues where each session has its own
4//! set of lanes with configurable concurrency limits and priorities.
5//!
6//! ## External Task Handling
7//!
8//! Supports pluggable task handlers allowing SDK users to implement custom
9//! processing logic for different lanes:
10//!
11//! - **Internal**: Default, tasks executed within the runtime
12//! - **External**: Tasks sent to SDK, wait for callback completion
13//! - **Hybrid**: Internal execution with external notification
14//!
15//! ## Implementation
16//!
17//! The actual queue implementation is in `SessionLaneQueue` which is backed
18//! by a3s-lane with features like DLQ, metrics, retry policies, and rate limiting.
19
20use anyhow::Result;
21use async_trait::async_trait;
22use serde::{Deserialize, Serialize};
23use std::collections::HashMap;
24use std::time::{Duration, Instant};
25
26// ============================================================================
27// Session Lane
28// ============================================================================
29
30/// Session lane for queue priority scheduling and HITL auto-approval
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
32pub enum SessionLane {
33    /// Control operations (P0) - pause, resume, cancel
34    Control,
35    /// Query operations (P1) - read, glob, ls, grep
36    Query,
37    /// Execute operations (P2) - bash, write, edit
38    Execute,
39    /// Generate operations (P3) - LLM calls
40    Generate,
41}
42
43impl SessionLane {
44    /// Get the priority level (lower = higher priority)
45    pub fn priority(&self) -> u8 {
46        match self {
47            SessionLane::Control => 0,
48            SessionLane::Query => 1,
49            SessionLane::Execute => 2,
50            SessionLane::Generate => 3,
51        }
52    }
53
54    /// Map a tool name to its lane
55    pub fn from_tool_name(tool_name: &str) -> Self {
56        match tool_name {
57            "read" | "glob" | "ls" | "grep" | "list_files" | "search" | "web_fetch"
58            | "web_search" => SessionLane::Query,
59            "bash" | "write" | "edit" | "delete" | "move" | "copy" | "execute" => {
60                SessionLane::Execute
61            }
62            _ => SessionLane::Execute,
63        }
64    }
65}
66
67// ============================================================================
68// Task Handler Configuration
69// ============================================================================
70
71/// Task handler mode determines how tasks in a lane are processed
72#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
73pub enum TaskHandlerMode {
74    /// Tasks are executed internally within the runtime (default)
75    #[default]
76    Internal,
77    /// Tasks are sent to external handler (SDK), wait for callback
78    External,
79    /// Tasks are executed internally but also notify external handler
80    Hybrid,
81}
82
83/// Configuration for a lane's task handler
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct LaneHandlerConfig {
86    /// Processing mode
87    pub mode: TaskHandlerMode,
88    /// Timeout for external processing (ms), default 60000 (60s)
89    pub timeout_ms: u64,
90}
91
92impl Default for LaneHandlerConfig {
93    fn default() -> Self {
94        Self {
95            mode: TaskHandlerMode::Internal,
96            timeout_ms: 60_000,
97        }
98    }
99}
100
101// ============================================================================
102// External Task Types
103// ============================================================================
104
105/// An external task that needs to be processed by SDK
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ExternalTask {
108    /// Unique task identifier
109    pub task_id: String,
110    /// Session this task belongs to
111    pub session_id: String,
112    /// Lane the task is in
113    pub lane: SessionLane,
114    /// Type of command (e.g., "bash", "read", "write")
115    pub command_type: String,
116    /// Task payload as JSON
117    pub payload: serde_json::Value,
118    /// Timeout in milliseconds
119    pub timeout_ms: u64,
120    /// When the task was created
121    #[serde(skip)]
122    pub created_at: Option<Instant>,
123}
124
125impl ExternalTask {
126    /// Check if this task has timed out
127    pub fn is_timed_out(&self) -> bool {
128        self.created_at
129            .map(|t| t.elapsed() > Duration::from_millis(self.timeout_ms))
130            .unwrap_or(false)
131    }
132
133    /// Get remaining time until timeout in milliseconds
134    pub fn remaining_ms(&self) -> u64 {
135        self.created_at
136            .map(|t| {
137                let elapsed = t.elapsed().as_millis() as u64;
138                self.timeout_ms.saturating_sub(elapsed)
139            })
140            .unwrap_or(self.timeout_ms)
141    }
142}
143
144/// Result of external task processing
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct ExternalTaskResult {
147    /// Whether the task succeeded
148    pub success: bool,
149    /// Result data (JSON)
150    pub result: serde_json::Value,
151    /// Error message if failed
152    pub error: Option<String>,
153}
154
155// ============================================================================
156// Configuration
157// ============================================================================
158
159/// Configuration for a session command queue
160#[derive(Debug, Clone, Serialize, Deserialize)]
161#[serde(rename_all = "camelCase")]
162pub struct SessionQueueConfig {
163    /// Max concurrency for Control lane (P0)
164    #[serde(default = "default_control_concurrency")]
165    pub control_max_concurrency: usize,
166    /// Max concurrency for Query lane (P1)
167    #[serde(default = "default_query_concurrency")]
168    pub query_max_concurrency: usize,
169    /// Max concurrency for Execute lane (P2)
170    #[serde(default = "default_execute_concurrency")]
171    pub execute_max_concurrency: usize,
172    /// Max concurrency for Generate lane (P3)
173    #[serde(default = "default_generate_concurrency")]
174    pub generate_max_concurrency: usize,
175    /// Handler configurations per lane
176    #[serde(default)]
177    pub lane_handlers: HashMap<SessionLane, LaneHandlerConfig>,
178
179    // ========================================================================
180    // a3s-lane v0.4.0 integration features
181    // ========================================================================
182    /// Enable dead letter queue for failed commands
183    #[serde(default)]
184    pub enable_dlq: bool,
185    /// Max size of dead letter queue (None = use default 1000)
186    #[serde(default)]
187    pub dlq_max_size: Option<usize>,
188    /// Enable metrics collection
189    #[serde(default)]
190    pub enable_metrics: bool,
191    /// Enable queue alerts
192    #[serde(default)]
193    pub enable_alerts: bool,
194    /// Default timeout for commands in milliseconds
195    #[serde(default)]
196    pub default_timeout_ms: Option<u64>,
197    /// Persistent storage path (None = in-memory only)
198    #[serde(default)]
199    pub storage_path: Option<std::path::PathBuf>,
200
201    // ========================================================================
202    // a3s-lane v0.4.0 advanced features
203    // ========================================================================
204    /// Retry policy configuration
205    #[serde(default)]
206    pub retry_policy: Option<RetryPolicyConfig>,
207    /// Rate limit configuration
208    #[serde(default)]
209    pub rate_limit: Option<RateLimitConfig>,
210    /// Priority boost configuration
211    #[serde(default)]
212    pub priority_boost: Option<PriorityBoostConfig>,
213    /// Pressure threshold for emitting pressure/idle events
214    #[serde(default)]
215    pub pressure_threshold: Option<usize>,
216    /// Per-lane timeout overrides in milliseconds
217    #[serde(default)]
218    pub lane_timeouts: HashMap<SessionLane, u64>,
219}
220
221/// Retry policy configuration
222#[derive(Debug, Clone, Serialize, Deserialize)]
223#[serde(rename_all = "camelCase")]
224pub struct RetryPolicyConfig {
225    /// Retry strategy: "exponential", "fixed", or "none"
226    pub strategy: String,
227    /// Maximum number of retries
228    #[serde(default = "default_max_retries")]
229    pub max_retries: u32,
230    /// Initial delay in milliseconds (for exponential)
231    #[serde(default = "default_initial_delay_ms")]
232    pub initial_delay_ms: u64,
233    /// Fixed delay in milliseconds (for fixed strategy)
234    #[serde(default)]
235    pub fixed_delay_ms: Option<u64>,
236}
237
238fn default_max_retries() -> u32 {
239    3
240}
241
242fn default_initial_delay_ms() -> u64 {
243    100
244}
245
246/// Rate limit configuration
247#[derive(Debug, Clone, Serialize, Deserialize)]
248#[serde(rename_all = "camelCase")]
249pub struct RateLimitConfig {
250    /// Rate limit type: "per_second", "per_minute", "per_hour", or "unlimited"
251    pub limit_type: String,
252    /// Maximum number of operations per time period
253    #[serde(default)]
254    pub max_operations: Option<u64>,
255}
256
257/// Priority boost configuration
258#[derive(Debug, Clone, Serialize, Deserialize)]
259#[serde(rename_all = "camelCase")]
260pub struct PriorityBoostConfig {
261    /// Boost strategy: "standard", "aggressive", or "disabled"
262    pub strategy: String,
263    /// Deadline in milliseconds
264    #[serde(default)]
265    pub deadline_ms: Option<u64>,
266}
267
268fn default_control_concurrency() -> usize {
269    4
270}
271
272fn default_query_concurrency() -> usize {
273    12 // Balanced: better stability than 8, good performance (between 8 and 16)
274}
275
276fn default_execute_concurrency() -> usize {
277    4
278}
279
280fn default_generate_concurrency() -> usize {
281    2
282}
283
284impl Default for SessionQueueConfig {
285    fn default() -> Self {
286        Self {
287            control_max_concurrency: 2,
288            query_max_concurrency: 4,
289            execute_max_concurrency: 2,
290            generate_max_concurrency: 1,
291            lane_handlers: HashMap::new(),
292            enable_dlq: false,
293            dlq_max_size: None,
294            enable_metrics: false,
295            enable_alerts: false,
296            default_timeout_ms: None,
297            storage_path: None,
298            retry_policy: None,
299            rate_limit: None,
300            priority_boost: None,
301            pressure_threshold: None,
302            lane_timeouts: HashMap::new(),
303        }
304    }
305}
306
307impl SessionQueueConfig {
308    /// Get max concurrency for a lane
309    pub fn max_concurrency(&self, lane: SessionLane) -> usize {
310        match lane {
311            SessionLane::Control => self.control_max_concurrency,
312            SessionLane::Query => self.query_max_concurrency,
313            SessionLane::Execute => self.execute_max_concurrency,
314            SessionLane::Generate => self.generate_max_concurrency,
315        }
316    }
317
318    /// Get handler config for a lane (returns default if not configured)
319    pub fn handler_config(&self, lane: SessionLane) -> LaneHandlerConfig {
320        self.lane_handlers.get(&lane).cloned().unwrap_or_default()
321    }
322
323    /// Enable dead letter queue with optional max size
324    pub fn with_dlq(mut self, max_size: Option<usize>) -> Self {
325        self.enable_dlq = true;
326        self.dlq_max_size = max_size;
327        self
328    }
329
330    /// Enable metrics collection
331    pub fn with_metrics(mut self) -> Self {
332        self.enable_metrics = true;
333        self
334    }
335
336    /// Enable queue alerts
337    pub fn with_alerts(mut self) -> Self {
338        self.enable_alerts = true;
339        self
340    }
341
342    /// Set default timeout for commands
343    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
344        self.default_timeout_ms = Some(timeout_ms);
345        self
346    }
347
348    /// Set persistent storage path
349    pub fn with_storage(mut self, path: impl Into<std::path::PathBuf>) -> Self {
350        self.storage_path = Some(path.into());
351        self
352    }
353
354    /// Enable all a3s-lane features with sensible defaults
355    pub fn with_lane_features(mut self) -> Self {
356        self.enable_dlq = true;
357        self.dlq_max_size = Some(1000);
358        self.enable_metrics = true;
359        self.enable_alerts = true;
360        self.default_timeout_ms = Some(60_000);
361        self
362    }
363}
364
365// ============================================================================
366// Session Command Trait
367// ============================================================================
368
369/// Command to be executed in a session queue
370#[async_trait]
371pub trait SessionCommand: Send + Sync {
372    /// Execute the command
373    async fn execute(&self) -> Result<serde_json::Value>;
374
375    /// Get command type (for logging/debugging)
376    fn command_type(&self) -> &str;
377
378    /// Get command payload as JSON (for external handling)
379    fn payload(&self) -> serde_json::Value {
380        serde_json::json!({})
381    }
382}
383
384// ============================================================================
385// Queue Status Types
386// ============================================================================
387
388/// Status of a single lane
389#[derive(Debug, Clone, Serialize, Deserialize)]
390pub struct LaneStatus {
391    pub lane: SessionLane,
392    pub pending: usize,
393    pub active: usize,
394    pub max_concurrency: usize,
395    pub handler_mode: TaskHandlerMode,
396}
397
398/// Statistics for a session queue
399#[derive(Debug, Clone, Default, Serialize, Deserialize)]
400pub struct SessionQueueStats {
401    pub total_pending: usize,
402    pub total_active: usize,
403    pub external_pending: usize,
404    pub lanes: HashMap<String, LaneStatus>,
405}
406
407// ============================================================================
408// Tests
409// ============================================================================
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_task_handler_mode_default() {
417        let mode = TaskHandlerMode::default();
418        assert_eq!(mode, TaskHandlerMode::Internal);
419    }
420
421    #[test]
422    fn test_lane_handler_config_default() {
423        let config = LaneHandlerConfig::default();
424        assert_eq!(config.mode, TaskHandlerMode::Internal);
425        assert_eq!(config.timeout_ms, 60_000);
426    }
427
428    #[test]
429    fn test_external_task_timeout() {
430        let task = ExternalTask {
431            task_id: "test".to_string(),
432            session_id: "session".to_string(),
433            lane: SessionLane::Query,
434            command_type: "read".to_string(),
435            payload: serde_json::json!({}),
436            timeout_ms: 100,
437            created_at: Some(Instant::now()),
438        };
439
440        assert!(!task.is_timed_out());
441        assert!(task.remaining_ms() <= 100);
442    }
443
444    #[test]
445    fn test_session_queue_config_default() {
446        let config = SessionQueueConfig::default();
447        assert_eq!(config.control_max_concurrency, 2);
448        assert_eq!(config.query_max_concurrency, 4);
449        assert_eq!(config.execute_max_concurrency, 2);
450        assert_eq!(config.generate_max_concurrency, 1);
451        assert!(!config.enable_dlq);
452        assert!(!config.enable_metrics);
453        assert!(!config.enable_alerts);
454    }
455
456    #[test]
457    fn test_session_queue_config_max_concurrency() {
458        let config = SessionQueueConfig::default();
459        assert_eq!(config.max_concurrency(SessionLane::Control), 2);
460        assert_eq!(config.max_concurrency(SessionLane::Query), 4);
461        assert_eq!(config.max_concurrency(SessionLane::Execute), 2);
462        assert_eq!(config.max_concurrency(SessionLane::Generate), 1);
463    }
464
465    #[test]
466    fn test_session_queue_config_handler_config() {
467        let config = SessionQueueConfig::default();
468        let handler = config.handler_config(SessionLane::Execute);
469        assert_eq!(handler.mode, TaskHandlerMode::Internal);
470        assert_eq!(handler.timeout_ms, 60_000);
471    }
472
473    #[test]
474    fn test_session_queue_config_builders() {
475        let config = SessionQueueConfig::default()
476            .with_dlq(Some(500))
477            .with_metrics()
478            .with_alerts()
479            .with_timeout(30_000);
480
481        assert!(config.enable_dlq);
482        assert_eq!(config.dlq_max_size, Some(500));
483        assert!(config.enable_metrics);
484        assert!(config.enable_alerts);
485        assert_eq!(config.default_timeout_ms, Some(30_000));
486    }
487
488    #[test]
489    fn test_session_queue_config_with_lane_features() {
490        let config = SessionQueueConfig::default().with_lane_features();
491
492        assert!(config.enable_dlq);
493        assert_eq!(config.dlq_max_size, Some(1000));
494        assert!(config.enable_metrics);
495        assert!(config.enable_alerts);
496        assert_eq!(config.default_timeout_ms, Some(60_000));
497    }
498
499    #[test]
500    fn test_external_task_result() {
501        let result = ExternalTaskResult {
502            success: true,
503            result: serde_json::json!({"output": "hello"}),
504            error: None,
505        };
506        assert!(result.success);
507        assert!(result.error.is_none());
508    }
509}