use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub enum DispatchTask {
TimeDriven {
name: String,
task: String,
layer: crate::AgentLayer,
},
IssueDriven {
identifier: String,
title: String,
priority: Option<i32>,
pagerank_score: Option<f64>,
},
MentionDriven {
agent_name: String,
issue_number: u64,
comment_id: u64,
context: String,
},
}
#[derive(Debug)]
struct PrioritizedTask {
seq: u64,
score: i64,
task: DispatchTask,
}
pub struct Dispatcher {
queue: VecDeque<PrioritizedTask>,
seq_counter: u64,
stats: DispatcherStats,
}
#[derive(Debug, Default, Clone)]
pub struct DispatcherStats {
pub total_enqueued: u64,
pub total_dequeued: u64,
pub current_depth: usize,
pub by_source: std::collections::HashMap<String, u64>,
}
impl Dispatcher {
pub fn new() -> Self {
Self {
queue: VecDeque::new(),
seq_counter: 0,
stats: DispatcherStats::default(),
}
}
pub fn enqueue(&mut self, task: DispatchTask) {
let seq = self.seq_counter;
self.seq_counter += 1;
let score = self.compute_priority(&task);
let source = self.task_source(&task);
let pt = PrioritizedTask { seq, score, task };
let insert_pos = self
.queue
.iter()
.position(|existing| {
existing.score > score || (existing.score == score && existing.seq > seq)
})
.unwrap_or(self.queue.len());
self.queue.insert(insert_pos, pt);
self.stats.total_enqueued += 1;
self.stats.current_depth = self.queue.len();
*self.stats.by_source.entry(source).or_insert(0) += 1;
tracing::debug!(score, depth = self.stats.current_depth, "task enqueued");
}
pub fn dequeue(&mut self) -> Option<DispatchTask> {
let task = self.queue.pop_front().map(|pt| {
let source = self.task_source(&pt.task);
self.stats.total_dequeued += 1;
self.stats.current_depth = self.queue.len();
if let Some(count) = self.stats.by_source.get_mut(&source) {
*count -= 1;
}
pt.task
});
if task.is_some() {
tracing::debug!(depth = self.stats.current_depth, "task dequeued");
}
task
}
pub fn peek(&self) -> Option<&DispatchTask> {
self.queue.front().map(|pt| &pt.task)
}
pub fn depth(&self) -> usize {
self.queue.len()
}
pub fn stats(&self) -> &DispatcherStats {
&self.stats
}
fn compute_priority(&self, task: &DispatchTask) -> i64 {
match task {
DispatchTask::TimeDriven { layer, .. } => {
match layer {
crate::AgentLayer::Safety => 0,
crate::AgentLayer::Core => 1000,
crate::AgentLayer::Growth => 2000,
}
}
DispatchTask::IssueDriven {
priority,
pagerank_score,
..
} => {
let base = priority.map(|p| p as i64 * 100).unwrap_or(500);
let pagerank_bonus = pagerank_score.map(|pr| -(pr * 100.0) as i64).unwrap_or(0);
base + pagerank_bonus + 3000
}
DispatchTask::MentionDriven { .. } => {
200
}
}
}
fn task_source(&self, task: &DispatchTask) -> String {
match task {
DispatchTask::TimeDriven { .. } => "time_driven".into(),
DispatchTask::IssueDriven { .. } => "issue_driven".into(),
DispatchTask::MentionDriven { .. } => "mention_driven".into(),
}
}
}
impl Default for Dispatcher {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_enqueue_dequeue() {
let mut dispatcher = Dispatcher::new();
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "test".into(),
task: "do something".into(),
layer: crate::AgentLayer::Safety,
});
assert_eq!(dispatcher.depth(), 1);
let task = dispatcher.dequeue();
assert!(task.is_some());
assert_eq!(dispatcher.depth(), 0);
}
#[test]
fn test_priority_ordering() {
let mut dispatcher = Dispatcher::new();
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "growth".into(),
task: "task".into(),
layer: crate::AgentLayer::Growth,
});
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "core".into(),
task: "task".into(),
layer: crate::AgentLayer::Core,
});
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "safety".into(),
task: "task".into(),
layer: crate::AgentLayer::Safety,
});
if let Some(DispatchTask::TimeDriven { name, .. }) = dispatcher.dequeue() {
assert_eq!(name, "safety");
}
if let Some(DispatchTask::TimeDriven { name, .. }) = dispatcher.dequeue() {
assert_eq!(name, "core");
}
if let Some(DispatchTask::TimeDriven { name, .. }) = dispatcher.dequeue() {
assert_eq!(name, "growth");
}
}
#[test]
fn test_fifo_within_same_priority() {
let mut dispatcher = Dispatcher::new();
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "first".into(),
task: "task".into(),
layer: crate::AgentLayer::Safety,
});
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "second".into(),
task: "task".into(),
layer: crate::AgentLayer::Safety,
});
if let Some(DispatchTask::TimeDriven { name, .. }) = dispatcher.dequeue() {
assert_eq!(name, "first");
}
if let Some(DispatchTask::TimeDriven { name, .. }) = dispatcher.dequeue() {
assert_eq!(name, "second");
}
}
#[test]
fn test_pagerank_priority() {
let mut dispatcher = Dispatcher::new();
dispatcher.enqueue(DispatchTask::IssueDriven {
identifier: "low-pr".into(),
title: "Low PageRank".into(),
priority: Some(1),
pagerank_score: Some(0.15),
});
dispatcher.enqueue(DispatchTask::IssueDriven {
identifier: "high-pr".into(),
title: "High PageRank".into(),
priority: Some(1),
pagerank_score: Some(2.5),
});
if let Some(DispatchTask::IssueDriven { identifier, .. }) = dispatcher.dequeue() {
assert_eq!(identifier, "high-pr");
}
if let Some(DispatchTask::IssueDriven { identifier, .. }) = dispatcher.dequeue() {
assert_eq!(identifier, "low-pr");
}
}
#[test]
fn test_stats_tracking() {
let mut dispatcher = Dispatcher::new();
dispatcher.enqueue(DispatchTask::TimeDriven {
name: "safety".into(),
task: "task".into(),
layer: crate::AgentLayer::Safety,
});
dispatcher.enqueue(DispatchTask::IssueDriven {
identifier: "issue-1".into(),
title: "Issue".into(),
priority: Some(1),
pagerank_score: None,
});
let stats = dispatcher.stats();
assert_eq!(stats.total_enqueued, 2);
assert_eq!(stats.current_depth, 2);
assert_eq!(stats.by_source.get("time_driven"), Some(&1));
assert_eq!(stats.by_source.get("issue_driven"), Some(&1));
dispatcher.dequeue();
let stats = dispatcher.stats();
assert_eq!(stats.total_dequeued, 1);
assert_eq!(stats.current_depth, 1);
}
}