swink_agent/agent/
queueing.rs1use std::collections::VecDeque;
2use std::sync::{Arc, Mutex};
3
4use crate::message_provider::MessageProvider;
5use crate::types::AgentMessage;
6
7use super::{Agent, FollowUpMode, SteeringMode};
8
9impl Agent {
10 pub fn steer(&mut self, message: AgentMessage) {
18 self.steering_queue
19 .lock()
20 .unwrap_or_else(std::sync::PoisonError::into_inner)
21 .push_back(message);
22 }
23
24 pub fn follow_up(&mut self, message: AgentMessage) {
26 self.follow_up_queue
27 .lock()
28 .unwrap_or_else(std::sync::PoisonError::into_inner)
29 .push_back(message);
30 }
31
32 pub fn clear_steering(&mut self) {
34 self.steering_queue
35 .lock()
36 .unwrap_or_else(std::sync::PoisonError::into_inner)
37 .clear();
38 }
39
40 pub fn clear_follow_up(&mut self) {
42 self.follow_up_queue
43 .lock()
44 .unwrap_or_else(std::sync::PoisonError::into_inner)
45 .clear();
46 }
47
48 pub fn clear_queues(&mut self) {
50 self.clear_steering();
51 self.clear_follow_up();
52 }
53
54 #[must_use]
56 pub fn has_pending_messages(&self) -> bool {
57 let steering_empty = self
58 .steering_queue
59 .lock()
60 .unwrap_or_else(std::sync::PoisonError::into_inner)
61 .is_empty();
62 let follow_up_empty = self
63 .follow_up_queue
64 .lock()
65 .unwrap_or_else(std::sync::PoisonError::into_inner)
66 .is_empty();
67 !steering_empty || !follow_up_empty
68 }
69}
70
71pub(super) struct QueueMessageProvider {
76 pub(super) steering_queue: Arc<Mutex<VecDeque<AgentMessage>>>,
77 pub(super) follow_up_queue: Arc<Mutex<VecDeque<AgentMessage>>>,
78 pub(super) steering_mode: SteeringMode,
79 pub(super) follow_up_mode: FollowUpMode,
80 pub(super) pending_message_snapshot: Arc<crate::pause_state::PendingMessageSnapshot>,
81}
82
83impl MessageProvider for QueueMessageProvider {
84 fn poll_steering(&self) -> Vec<AgentMessage> {
85 let drained = drain_queue(
86 &self.steering_queue,
87 self.steering_mode == SteeringMode::OneAtATime,
88 );
89 self.pending_message_snapshot.append(&drained);
90 drained
91 }
92
93 fn poll_follow_up(&self) -> Vec<AgentMessage> {
94 let drained = drain_queue(
95 &self.follow_up_queue,
96 self.follow_up_mode == FollowUpMode::OneAtATime,
97 );
98 self.pending_message_snapshot.append(&drained);
99 drained
100 }
101
102 fn has_steering(&self) -> bool {
103 let guard = self
104 .steering_queue
105 .lock()
106 .unwrap_or_else(std::sync::PoisonError::into_inner);
107 !guard.is_empty()
108 }
109}
110
111pub(super) fn drain_messages_from_queue(
113 queue: &Arc<Mutex<VecDeque<AgentMessage>>>,
114) -> Vec<AgentMessage> {
115 queue
116 .lock()
117 .unwrap_or_else(std::sync::PoisonError::into_inner)
118 .drain(..)
119 .collect()
120}
121
122fn drain_queue(queue: &Mutex<VecDeque<AgentMessage>>, one_at_a_time: bool) -> Vec<AgentMessage> {
123 let mut guard = queue
124 .lock()
125 .unwrap_or_else(std::sync::PoisonError::into_inner);
126 if guard.is_empty() {
127 return Vec::new();
128 }
129 if one_at_a_time {
130 guard.pop_front().into_iter().collect()
131 } else {
132 guard.drain(..).collect()
133 }
134}