Skip to main content

swink_agent/agent/
queueing.rs

1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3
4use crate::message_provider::MessageProvider;
5use crate::types::AgentMessage;
6
7use super::{Agent, FollowUpMode, SteeringMode};
8
9impl Agent {
10    // ── Queue Management ─────────────────────────────────────────────────
11
12    /// Push a steering message into the queue.
13    ///
14    /// The message is delivered to the agent at the next turn boundary — after
15    /// the current LLM response or tool-execution batch completes. This preserves
16    /// the agent's in-progress work rather than aborting it mid-generation.
17    pub fn steer(&mut self, message: AgentMessage) {
18        self.steering_queue
19            .lock()
20            .unwrap_or_else(std::sync::PoisonError::into_inner)
21            .push_back(message);
22    }
23
24    /// Push a follow-up message into the queue.
25    pub fn follow_up(&mut self, message: AgentMessage) {
26        self.follow_up_queue
27            .lock()
28            .unwrap_or_else(std::sync::PoisonError::into_inner)
29            .push_back(message);
30    }
31
32    /// Clear all steering messages.
33    pub fn clear_steering(&mut self) {
34        self.steering_queue
35            .lock()
36            .unwrap_or_else(std::sync::PoisonError::into_inner)
37            .clear();
38    }
39
40    /// Clear all follow-up messages.
41    pub fn clear_follow_up(&mut self) {
42        self.follow_up_queue
43            .lock()
44            .unwrap_or_else(std::sync::PoisonError::into_inner)
45            .clear();
46    }
47
48    /// Clear both steering and follow-up queues.
49    pub fn clear_queues(&mut self) {
50        self.clear_steering();
51        self.clear_follow_up();
52    }
53
54    /// Returns `true` if there are pending steering or follow-up messages.
55    #[must_use]
56    pub fn has_pending_messages(&self) -> bool {
57        let steering_empty = self
58            .steering_queue
59            .lock()
60            .unwrap_or_else(std::sync::PoisonError::into_inner)
61            .is_empty();
62        let follow_up_empty = self
63            .follow_up_queue
64            .lock()
65            .unwrap_or_else(std::sync::PoisonError::into_inner)
66            .is_empty();
67        !steering_empty || !follow_up_empty
68    }
69}
70
71/// [`MessageProvider`] backed by shared steering and follow-up queues.
72///
73/// Drains messages according to the configured [`SteeringMode`] and
74/// [`FollowUpMode`] — either one at a time or all at once.
75pub(super) struct QueueMessageProvider {
76    pub(super) steering_queue: Arc<Mutex<VecDeque<AgentMessage>>>,
77    pub(super) follow_up_queue: Arc<Mutex<VecDeque<AgentMessage>>>,
78    pub(super) steering_mode: SteeringMode,
79    pub(super) follow_up_mode: FollowUpMode,
80    pub(super) pending_message_snapshot: Arc<crate::pause_state::PendingMessageSnapshot>,
81}
82
83impl MessageProvider for QueueMessageProvider {
84    fn poll_steering(&self) -> Vec<AgentMessage> {
85        let drained = drain_queue(
86            &self.steering_queue,
87            self.steering_mode == SteeringMode::OneAtATime,
88        );
89        self.pending_message_snapshot.append(&drained);
90        drained
91    }
92
93    fn poll_follow_up(&self) -> Vec<AgentMessage> {
94        let drained = drain_queue(
95            &self.follow_up_queue,
96            self.follow_up_mode == FollowUpMode::OneAtATime,
97        );
98        self.pending_message_snapshot.append(&drained);
99        drained
100    }
101
102    fn has_steering(&self) -> bool {
103        let guard = self
104            .steering_queue
105            .lock()
106            .unwrap_or_else(std::sync::PoisonError::into_inner);
107        !guard.is_empty()
108    }
109}
110
111/// Drain the queue and return every queued [`AgentMessage`].
112pub(super) fn drain_messages_from_queue(
113    queue: &Arc<Mutex<VecDeque<AgentMessage>>>,
114) -> Vec<AgentMessage> {
115    queue
116        .lock()
117        .unwrap_or_else(std::sync::PoisonError::into_inner)
118        .drain(..)
119        .collect()
120}
121
122fn drain_queue(queue: &Mutex<VecDeque<AgentMessage>>, one_at_a_time: bool) -> Vec<AgentMessage> {
123    let mut guard = queue
124        .lock()
125        .unwrap_or_else(std::sync::PoisonError::into_inner);
126    if guard.is_empty() {
127        return Vec::new();
128    }
129    if one_at_a_time {
130        guard.pop_front().into_iter().collect()
131    } else {
132        guard.drain(..).collect()
133    }
134}