use crate::agent::{Priority, Task, TaskResult};
use crate::error::Result;
use std::collections::HashMap;
pub struct TaskPipeline {
tasks: Vec<Task>,
}
impl TaskPipeline {
pub fn new(tasks: Vec<Task>) -> Self {
Self { tasks }
}
pub fn filter_priority(self, priority: Priority) -> impl Iterator<Item = Task> {
self.tasks
.into_iter()
.filter(move |task| task.priority == priority)
}
pub fn assign_to_agents<'a>(
self,
agents: &'a [String],
) -> impl Iterator<Item = (Task, &'a str)> + 'a {
self.tasks
.into_iter()
.zip(agents.iter().cycle())
.map(|(task, agent)| (task, agent.as_str()))
}
pub async fn process_batch<F, Fut>(
self,
batch_size: usize,
processor: F,
) -> Vec<Result<TaskResult>>
where
F: Fn(Task) -> Fut + Clone,
Fut: std::future::Future<Output = Result<TaskResult>>,
{
use futures::stream::{self, StreamExt};
stream::iter(self.tasks)
.chunks(batch_size)
.flat_map(|batch| {
stream::iter(batch)
.map(processor.clone())
.buffer_unordered(batch_size)
})
.collect()
.await
}
pub fn transform(self) -> TaskTransformer {
TaskTransformer::new(self.tasks)
}
}
pub struct TaskTransformer {
tasks: Vec<Task>,
}
impl TaskTransformer {
fn new(tasks: Vec<Task>) -> Self {
Self { tasks }
}
pub fn sort_by_priority(mut self) -> Self {
self.tasks.sort_by_key(|t| t.priority);
self
}
pub fn deduplicate(mut self) -> Self {
use std::collections::HashSet;
let mut seen = HashSet::new();
self.tasks.retain(|task| seen.insert(task.id.clone()));
self
}
pub fn group_by<K, V>(self, key_fn: impl Fn(&Task) -> K) -> HashMap<K, Vec<Task>>
where
K: Eq + std::hash::Hash,
{
let mut groups = HashMap::new();
for task in self.tasks {
let key = key_fn(&task);
groups.entry(key).or_insert_with(Vec::new).push(task);
}
groups
}
pub fn map<F, R>(self, f: F) -> impl Iterator<Item = R>
where
F: Fn(Task) -> R,
{
self.tasks.into_iter().map(f)
}
pub fn collect(self) -> Vec<Task> {
self.tasks
}
}
pub struct TaskAggregator;
impl TaskAggregator {
pub fn calculate_stats(tasks: impl Iterator<Item = Task>) -> TaskStats {
let mut total = 0;
let mut high_priority = 0;
let mut medium_priority = 0;
let mut low_priority = 0;
for task in tasks {
total += 1;
match task.priority {
Priority::High | Priority::Critical => high_priority += 1,
Priority::Medium => medium_priority += 1,
Priority::Low => low_priority += 1,
}
}
TaskStats {
total,
high_priority,
medium_priority,
low_priority,
}
}
pub fn find_matching<'a>(
tasks: &'a [Task],
predicate: impl Fn(&Task) -> bool + 'a,
) -> impl Iterator<Item = &'a Task> + 'a {
tasks.iter().filter(move |task| predicate(task))
}
}
#[derive(Debug, PartialEq)]
pub struct TaskStats {
pub total: usize,
pub high_priority: usize,
pub medium_priority: usize,
pub low_priority: usize,
}
pub struct TaskPaginator<I> {
iter: I,
page_size: usize,
}
impl<I> TaskPaginator<I>
where
I: Iterator<Item = Task>,
{
pub fn new(iter: I, page_size: usize) -> Self {
Self { iter, page_size }
}
}
impl<I> Iterator for TaskPaginator<I>
where
I: Iterator<Item = Task>,
{
type Item = Vec<Task>;
fn next(&mut self) -> Option<Self::Item> {
let mut page = Vec::with_capacity(self.page_size);
for _ in 0..self.page_size {
match self.iter.next() {
Some(task) => page.push(task),
None => break,
}
}
if page.is_empty() { None } else { Some(page) }
}
}
pub trait TaskIteratorExt: Iterator<Item = Task> + Sized {
fn paginate(self, page_size: usize) -> TaskPaginator<Self> {
TaskPaginator::new(self, page_size)
}
fn high_priority_only(self) -> impl Iterator<Item = Task> {
self.filter(|task| task.priority == Priority::High)
}
fn with_metadata(self, key: String) -> impl Iterator<Item = Task> {
self.filter(move |task| task.metadata.as_ref().is_some_and(|m| m.contains_key(&key)))
}
}
impl<I: Iterator<Item = Task> + Sized> TaskIteratorExt for I {}