Skip to main content

a3s_code_core/
queue.rs

1//! Per-session command queue with lane-based priority scheduling
2//!
3//! Provides session-isolated command queues where each session has its own
4//! set of lanes with configurable concurrency limits and priorities.
5//!
6//! ## External Task Handling
7//!
8//! Supports pluggable task handlers allowing SDK users to implement custom
9//! processing logic for different lanes:
10//!
11//! - **Internal**: Default, tasks executed within the runtime
12//! - **External**: Tasks sent to SDK, wait for callback completion
13//! - **Hybrid**: Internal execution with external notification
14//!
15//! ## Implementation
16//!
17//! The actual queue implementation is in `SessionLaneQueue` which is backed
18//! by a3s-lane with features like DLQ, metrics, retry policies, and rate limiting.
19
20pub use a3s_lane::MetricsSnapshot;
21use anyhow::Result;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24use std::collections::HashMap;
25use std::time::{Duration, Instant};
26
27// ============================================================================
28// Session Lane
29// ============================================================================
30
31/// Session lane for queue priority scheduling and HITL auto-approval
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33pub enum SessionLane {
34    /// Control operations (P0) - pause, resume, cancel
35    Control,
36    /// Query operations (P1) - read, glob, ls, grep
37    Query,
38    /// Execute operations (P2) - bash, write, edit
39    Execute,
40    /// Generate operations (P3) - LLM calls
41    Generate,
42}
43
44impl SessionLane {
45    /// Get the priority level (lower = higher priority)
46    pub fn priority(&self) -> u8 {
47        match self {
48            SessionLane::Control => 0,
49            SessionLane::Query => 1,
50            SessionLane::Execute => 2,
51            SessionLane::Generate => 3,
52        }
53    }
54
55    /// Map a tool name to its lane
56    pub fn from_tool_name(tool_name: &str) -> Self {
57        match tool_name {
58            "read" | "glob" | "ls" | "grep" | "list_files" | "search" | "web_fetch"
59            | "web_search" => SessionLane::Query,
60            "bash" | "write" | "edit" | "delete" | "move" | "copy" | "execute" => {
61                SessionLane::Execute
62            }
63            _ => SessionLane::Execute,
64        }
65    }
66}
67
68// ============================================================================
69// Task Handler Configuration
70// ============================================================================
71
72/// Task handler mode determines how tasks in a lane are processed
73#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
74pub enum TaskHandlerMode {
75    /// Tasks are executed internally within the runtime (default)
76    #[default]
77    Internal,
78    /// Tasks are sent to external handler (SDK), wait for callback
79    External,
80    /// Tasks are executed internally but also notify external handler
81    Hybrid,
82}
83
84/// Configuration for a lane's task handler
85#[derive(Debug, Clone, Serialize, Deserialize)]
86pub struct LaneHandlerConfig {
87    /// Processing mode
88    pub mode: TaskHandlerMode,
89    /// Timeout for external processing (ms), default 60000 (60s)
90    pub timeout_ms: u64,
91}
92
93impl Default for LaneHandlerConfig {
94    fn default() -> Self {
95        Self {
96            mode: TaskHandlerMode::Internal,
97            timeout_ms: 60_000,
98        }
99    }
100}
101
102// ============================================================================
103// External Task Types
104// ============================================================================
105
106/// An external task that needs to be processed by SDK
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct ExternalTask {
109    /// Unique task identifier
110    pub task_id: String,
111    /// Session this task belongs to
112    pub session_id: String,
113    /// Lane the task is in
114    pub lane: SessionLane,
115    /// Type of command (e.g., "bash", "read", "write")
116    pub command_type: String,
117    /// Task payload as JSON
118    pub payload: serde_json::Value,
119    /// Timeout in milliseconds
120    pub timeout_ms: u64,
121    /// When the task was created
122    #[serde(skip)]
123    pub created_at: Option<Instant>,
124}
125
126impl ExternalTask {
127    /// Check if this task has timed out
128    pub fn is_timed_out(&self) -> bool {
129        self.created_at
130            .map(|t| t.elapsed() > Duration::from_millis(self.timeout_ms))
131            .unwrap_or(false)
132    }
133
134    /// Get remaining time until timeout in milliseconds
135    pub fn remaining_ms(&self) -> u64 {
136        self.created_at
137            .map(|t| {
138                let elapsed = t.elapsed().as_millis() as u64;
139                self.timeout_ms.saturating_sub(elapsed)
140            })
141            .unwrap_or(self.timeout_ms)
142    }
143}
144
145/// Result of external task processing
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ExternalTaskResult {
148    /// Whether the task succeeded
149    pub success: bool,
150    /// Result data (JSON)
151    pub result: serde_json::Value,
152    /// Error message if failed
153    pub error: Option<String>,
154}
155
156// ============================================================================
157// Configuration
158// ============================================================================
159
160/// Configuration for a session command queue
161#[derive(Debug, Clone, Serialize, Deserialize)]
162#[serde(rename_all = "camelCase")]
163pub struct SessionQueueConfig {
164    /// Max concurrency for Control lane (P0)
165    #[serde(default = "default_control_concurrency")]
166    pub control_max_concurrency: usize,
167    /// Max concurrency for Query lane (P1)
168    #[serde(default = "default_query_concurrency")]
169    pub query_max_concurrency: usize,
170    /// Max concurrency for Execute lane (P2)
171    #[serde(default = "default_execute_concurrency")]
172    pub execute_max_concurrency: usize,
173    /// Max concurrency for Generate lane (P3)
174    #[serde(default = "default_generate_concurrency")]
175    pub generate_max_concurrency: usize,
176    /// Handler configurations per lane
177    #[serde(default)]
178    pub lane_handlers: HashMap<SessionLane, LaneHandlerConfig>,
179
180    // ========================================================================
181    // a3s-lane v0.4.0 integration features
182    // ========================================================================
183    /// Enable dead letter queue for failed commands
184    #[serde(default)]
185    pub enable_dlq: bool,
186    /// Max size of dead letter queue (None = use default 1000)
187    #[serde(default)]
188    pub dlq_max_size: Option<usize>,
189    /// Enable metrics collection
190    #[serde(default)]
191    pub enable_metrics: bool,
192    /// Enable queue alerts
193    #[serde(default)]
194    pub enable_alerts: bool,
195    /// Default timeout for commands in milliseconds
196    #[serde(default)]
197    pub default_timeout_ms: Option<u64>,
198    /// Persistent storage path (None = in-memory only)
199    #[serde(default)]
200    pub storage_path: Option<std::path::PathBuf>,
201
202    // ========================================================================
203    // a3s-lane v0.4.0 advanced features
204    // ========================================================================
205    /// Retry policy configuration
206    #[serde(default)]
207    pub retry_policy: Option<RetryPolicyConfig>,
208    /// Rate limit configuration
209    #[serde(default)]
210    pub rate_limit: Option<RateLimitConfig>,
211    /// Priority boost configuration
212    #[serde(default)]
213    pub priority_boost: Option<PriorityBoostConfig>,
214    /// Pressure threshold for emitting pressure/idle events
215    #[serde(default)]
216    pub pressure_threshold: Option<usize>,
217    /// Per-lane timeout overrides in milliseconds
218    #[serde(default)]
219    pub lane_timeouts: HashMap<SessionLane, u64>,
220}
221
222/// Retry policy configuration
223#[derive(Debug, Clone, Serialize, Deserialize)]
224#[serde(rename_all = "camelCase")]
225pub struct RetryPolicyConfig {
226    /// Retry strategy: "exponential", "fixed", or "none"
227    pub strategy: String,
228    /// Maximum number of retries
229    #[serde(default = "default_max_retries")]
230    pub max_retries: u32,
231    /// Initial delay in milliseconds (for exponential)
232    #[serde(default = "default_initial_delay_ms")]
233    pub initial_delay_ms: u64,
234    /// Fixed delay in milliseconds (for fixed strategy)
235    #[serde(default)]
236    pub fixed_delay_ms: Option<u64>,
237}
238
239fn default_max_retries() -> u32 {
240    3
241}
242
243fn default_initial_delay_ms() -> u64 {
244    100
245}
246
247/// Rate limit configuration
248#[derive(Debug, Clone, Serialize, Deserialize)]
249#[serde(rename_all = "camelCase")]
250pub struct RateLimitConfig {
251    /// Rate limit type: "per_second", "per_minute", "per_hour", or "unlimited"
252    pub limit_type: String,
253    /// Maximum number of operations per time period
254    #[serde(default)]
255    pub max_operations: Option<u64>,
256}
257
258/// Priority boost configuration
259#[derive(Debug, Clone, Serialize, Deserialize)]
260#[serde(rename_all = "camelCase")]
261pub struct PriorityBoostConfig {
262    /// Boost strategy: "standard", "aggressive", or "disabled"
263    pub strategy: String,
264    /// Deadline in milliseconds
265    #[serde(default)]
266    pub deadline_ms: Option<u64>,
267}
268
269fn default_control_concurrency() -> usize {
270    4
271}
272
273fn default_query_concurrency() -> usize {
274    12 // Balanced: better stability than 8, good performance (between 8 and 16)
275}
276
277fn default_execute_concurrency() -> usize {
278    4
279}
280
281fn default_generate_concurrency() -> usize {
282    2
283}
284
285impl Default for SessionQueueConfig {
286    fn default() -> Self {
287        Self {
288            control_max_concurrency: 2,
289            query_max_concurrency: 4,
290            execute_max_concurrency: 2,
291            generate_max_concurrency: 1,
292            lane_handlers: HashMap::new(),
293            enable_dlq: false,
294            dlq_max_size: None,
295            enable_metrics: false,
296            enable_alerts: false,
297            default_timeout_ms: None,
298            storage_path: None,
299            retry_policy: None,
300            rate_limit: None,
301            priority_boost: None,
302            pressure_threshold: None,
303            lane_timeouts: HashMap::new(),
304        }
305    }
306}
307
308impl SessionQueueConfig {
309    /// Get max concurrency for a lane
310    pub fn max_concurrency(&self, lane: SessionLane) -> usize {
311        match lane {
312            SessionLane::Control => self.control_max_concurrency,
313            SessionLane::Query => self.query_max_concurrency,
314            SessionLane::Execute => self.execute_max_concurrency,
315            SessionLane::Generate => self.generate_max_concurrency,
316        }
317    }
318
319    /// Get handler config for a lane (returns default if not configured)
320    pub fn handler_config(&self, lane: SessionLane) -> LaneHandlerConfig {
321        self.lane_handlers.get(&lane).cloned().unwrap_or_default()
322    }
323
324    /// Enable dead letter queue with optional max size
325    pub fn with_dlq(mut self, max_size: Option<usize>) -> Self {
326        self.enable_dlq = true;
327        self.dlq_max_size = max_size;
328        self
329    }
330
331    /// Enable metrics collection
332    pub fn with_metrics(mut self) -> Self {
333        self.enable_metrics = true;
334        self
335    }
336
337    /// Enable queue alerts
338    pub fn with_alerts(mut self) -> Self {
339        self.enable_alerts = true;
340        self
341    }
342
343    /// Set default timeout for commands
344    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
345        self.default_timeout_ms = Some(timeout_ms);
346        self
347    }
348
349    /// Set persistent storage path
350    pub fn with_storage(mut self, path: impl Into<std::path::PathBuf>) -> Self {
351        self.storage_path = Some(path.into());
352        self
353    }
354
355    /// Enable all a3s-lane features with sensible defaults
356    pub fn with_lane_features(mut self) -> Self {
357        self.enable_dlq = true;
358        self.dlq_max_size = Some(1000);
359        self.enable_metrics = true;
360        self.enable_alerts = true;
361        self.default_timeout_ms = Some(60_000);
362        self
363    }
364}
365
366// ============================================================================
367// Session Command Trait
368// ============================================================================
369
370/// Command to be executed in a session queue
371#[async_trait]
372pub trait SessionCommand: Send + Sync {
373    /// Execute the command
374    async fn execute(&self) -> Result<serde_json::Value>;
375
376    /// Get command type (for logging/debugging)
377    fn command_type(&self) -> &str;
378
379    /// Get command payload as JSON (for external handling)
380    fn payload(&self) -> serde_json::Value {
381        serde_json::json!({})
382    }
383}
384
385// ============================================================================
386// Queue Status Types
387// ============================================================================
388
389/// Status of a single lane
390#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct LaneStatus {
392    pub lane: SessionLane,
393    pub pending: usize,
394    pub active: usize,
395    pub max_concurrency: usize,
396    pub handler_mode: TaskHandlerMode,
397}
398
399/// Statistics for a session queue
400#[derive(Debug, Clone, Default, Serialize, Deserialize)]
401pub struct SessionQueueStats {
402    pub total_pending: usize,
403    pub total_active: usize,
404    pub external_pending: usize,
405    pub lanes: HashMap<String, LaneStatus>,
406}
407
408// ============================================================================
409// Tests
410// ============================================================================
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415
416    #[test]
417    fn test_task_handler_mode_default() {
418        let mode = TaskHandlerMode::default();
419        assert_eq!(mode, TaskHandlerMode::Internal);
420    }
421
422    #[test]
423    fn test_lane_handler_config_default() {
424        let config = LaneHandlerConfig::default();
425        assert_eq!(config.mode, TaskHandlerMode::Internal);
426        assert_eq!(config.timeout_ms, 60_000);
427    }
428
429    #[test]
430    fn test_external_task_timeout() {
431        let task = ExternalTask {
432            task_id: "test".to_string(),
433            session_id: "session".to_string(),
434            lane: SessionLane::Query,
435            command_type: "read".to_string(),
436            payload: serde_json::json!({}),
437            timeout_ms: 100,
438            created_at: Some(Instant::now()),
439        };
440
441        assert!(!task.is_timed_out());
442        assert!(task.remaining_ms() <= 100);
443    }
444
445    #[test]
446    fn test_session_queue_config_default() {
447        let config = SessionQueueConfig::default();
448        assert_eq!(config.control_max_concurrency, 2);
449        assert_eq!(config.query_max_concurrency, 4);
450        assert_eq!(config.execute_max_concurrency, 2);
451        assert_eq!(config.generate_max_concurrency, 1);
452        assert!(!config.enable_dlq);
453        assert!(!config.enable_metrics);
454        assert!(!config.enable_alerts);
455    }
456
457    #[test]
458    fn test_session_queue_config_max_concurrency() {
459        let config = SessionQueueConfig::default();
460        assert_eq!(config.max_concurrency(SessionLane::Control), 2);
461        assert_eq!(config.max_concurrency(SessionLane::Query), 4);
462        assert_eq!(config.max_concurrency(SessionLane::Execute), 2);
463        assert_eq!(config.max_concurrency(SessionLane::Generate), 1);
464    }
465
466    #[test]
467    fn test_session_queue_config_handler_config() {
468        let config = SessionQueueConfig::default();
469        let handler = config.handler_config(SessionLane::Execute);
470        assert_eq!(handler.mode, TaskHandlerMode::Internal);
471        assert_eq!(handler.timeout_ms, 60_000);
472    }
473
474    #[test]
475    fn test_session_queue_config_builders() {
476        let config = SessionQueueConfig::default()
477            .with_dlq(Some(500))
478            .with_metrics()
479            .with_alerts()
480            .with_timeout(30_000);
481
482        assert!(config.enable_dlq);
483        assert_eq!(config.dlq_max_size, Some(500));
484        assert!(config.enable_metrics);
485        assert!(config.enable_alerts);
486        assert_eq!(config.default_timeout_ms, Some(30_000));
487    }
488
489    #[test]
490    fn test_session_queue_config_with_lane_features() {
491        let config = SessionQueueConfig::default().with_lane_features();
492
493        assert!(config.enable_dlq);
494        assert_eq!(config.dlq_max_size, Some(1000));
495        assert!(config.enable_metrics);
496        assert!(config.enable_alerts);
497        assert_eq!(config.default_timeout_ms, Some(60_000));
498    }
499
500    #[test]
501    fn test_external_task_result() {
502        let result = ExternalTaskResult {
503            success: true,
504            result: serde_json::json!({"output": "hello"}),
505            error: None,
506        };
507        assert!(result.success);
508        assert!(result.error.is_none());
509    }
510}