use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use crate::message_provider::MessageProvider;
use crate::types::AgentMessage;
use super::{Agent, FollowUpMode, SteeringMode};
impl Agent {
pub fn steer(&mut self, message: AgentMessage) {
self.steering_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push_back(message);
}
pub fn follow_up(&mut self, message: AgentMessage) {
self.follow_up_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.push_back(message);
}
pub fn clear_steering(&mut self) {
self.steering_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clear();
}
pub fn clear_follow_up(&mut self) {
self.follow_up_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.clear();
}
pub fn clear_queues(&mut self) {
self.clear_steering();
self.clear_follow_up();
}
#[must_use]
pub fn has_pending_messages(&self) -> bool {
let steering_empty = self
.steering_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_empty();
let follow_up_empty = self
.follow_up_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.is_empty();
!steering_empty || !follow_up_empty
}
}
pub(super) struct QueueMessageProvider {
pub(super) steering_queue: Arc<Mutex<VecDeque<AgentMessage>>>,
pub(super) follow_up_queue: Arc<Mutex<VecDeque<AgentMessage>>>,
pub(super) steering_mode: SteeringMode,
pub(super) follow_up_mode: FollowUpMode,
pub(super) pending_message_snapshot: Arc<crate::pause_state::PendingMessageSnapshot>,
}
impl MessageProvider for QueueMessageProvider {
fn poll_steering(&self) -> Vec<AgentMessage> {
let drained = drain_queue(
&self.steering_queue,
self.steering_mode == SteeringMode::OneAtATime,
);
self.pending_message_snapshot.append(&drained);
drained
}
fn poll_follow_up(&self) -> Vec<AgentMessage> {
let drained = drain_queue(
&self.follow_up_queue,
self.follow_up_mode == FollowUpMode::OneAtATime,
);
self.pending_message_snapshot.append(&drained);
drained
}
fn has_steering(&self) -> bool {
let guard = self
.steering_queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
!guard.is_empty()
}
}
pub(super) fn drain_messages_from_queue(
queue: &Arc<Mutex<VecDeque<AgentMessage>>>,
) -> Vec<AgentMessage> {
queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.drain(..)
.collect()
}
fn drain_queue(queue: &Mutex<VecDeque<AgentMessage>>, one_at_a_time: bool) -> Vec<AgentMessage> {
let mut guard = queue
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if guard.is_empty() {
return Vec::new();
}
if one_at_a_time {
guard.pop_front().into_iter().collect()
} else {
guard.drain(..).collect()
}
}