ccswarm 0.4.0

AI-powered multi-agent orchestration system with proactive intelligence, security monitoring, and session management
Documentation
/// Task execution pipeline using Rust's iterator patterns
///
/// This module implements efficient task processing using
/// zero-cost abstractions and functional programming patterns.
use crate::agent::{Priority, Task, TaskResult};
use crate::error::Result;
use std::collections::HashMap;

/// Task pipeline for efficient batch processing
pub struct TaskPipeline {
    tasks: Vec<Task>,
}

impl TaskPipeline {
    pub fn new(tasks: Vec<Task>) -> Self {
        Self { tasks }
    }

    /// Filter tasks by priority using iterator adapters
    pub fn filter_priority(self, priority: Priority) -> impl Iterator<Item = Task> {
        self.tasks
            .into_iter()
            .filter(move |task| task.priority == priority)
    }

    /// Map tasks to assignments with agents
    pub fn assign_to_agents<'a>(
        self,
        agents: &'a [String],
    ) -> impl Iterator<Item = (Task, &'a str)> + 'a {
        self.tasks
            .into_iter()
            .zip(agents.iter().cycle())
            .map(|(task, agent)| (task, agent.as_str()))
    }

    /// Process tasks in parallel batches
    pub async fn process_batch<F, Fut>(
        self,
        batch_size: usize,
        processor: F,
    ) -> Vec<Result<TaskResult>>
    where
        F: Fn(Task) -> Fut + Clone,
        Fut: std::future::Future<Output = Result<TaskResult>>,
    {
        use futures::stream::{self, StreamExt};

        stream::iter(self.tasks)
            .chunks(batch_size)
            .flat_map(|batch| {
                stream::iter(batch)
                    .map(processor.clone())
                    .buffer_unordered(batch_size)
            })
            .collect()
            .await
    }

    /// Chain multiple transformations efficiently
    pub fn transform(self) -> TaskTransformer {
        TaskTransformer::new(self.tasks)
    }
}

/// Fluent API for task transformations
pub struct TaskTransformer {
    tasks: Vec<Task>,
}

impl TaskTransformer {
    fn new(tasks: Vec<Task>) -> Self {
        Self { tasks }
    }

    /// Sort by priority (in-place for efficiency)
    pub fn sort_by_priority(mut self) -> Self {
        self.tasks.sort_by_key(|t| t.priority);
        self
    }

    /// Deduplicate tasks by ID
    pub fn deduplicate(mut self) -> Self {
        use std::collections::HashSet;

        let mut seen = HashSet::new();
        self.tasks.retain(|task| seen.insert(task.id.clone()));
        self
    }

    /// Group by metadata field
    pub fn group_by<K, V>(self, key_fn: impl Fn(&Task) -> K) -> HashMap<K, Vec<Task>>
    where
        K: Eq + std::hash::Hash,
    {
        let mut groups = HashMap::new();

        for task in self.tasks {
            let key = key_fn(&task);
            groups.entry(key).or_insert_with(Vec::new).push(task);
        }

        groups
    }

    /// Apply a transformation function
    pub fn map<F, R>(self, f: F) -> impl Iterator<Item = R>
    where
        F: Fn(Task) -> R,
    {
        self.tasks.into_iter().map(f)
    }

    /// Collect results
    pub fn collect(self) -> Vec<Task> {
        self.tasks
    }
}

/// Efficient task aggregation using iterators
pub struct TaskAggregator;

impl TaskAggregator {
    /// Calculate statistics without allocating intermediate collections
    pub fn calculate_stats(tasks: impl Iterator<Item = Task>) -> TaskStats {
        let mut total = 0;
        let mut high_priority = 0;
        let mut medium_priority = 0;
        let mut low_priority = 0;

        for task in tasks {
            total += 1;
            match task.priority {
                Priority::High | Priority::Critical => high_priority += 1,
                Priority::Medium => medium_priority += 1,
                Priority::Low => low_priority += 1,
            }
        }

        TaskStats {
            total,
            high_priority,
            medium_priority,
            low_priority,
        }
    }

    /// Find tasks matching criteria without allocation
    pub fn find_matching<'a>(
        tasks: &'a [Task],
        predicate: impl Fn(&Task) -> bool + 'a,
    ) -> impl Iterator<Item = &'a Task> + 'a {
        tasks.iter().filter(move |task| predicate(task))
    }
}

#[derive(Debug, PartialEq)]
pub struct TaskStats {
    pub total: usize,
    pub high_priority: usize,
    pub medium_priority: usize,
    pub low_priority: usize,
}

/// Custom iterator for task pagination
pub struct TaskPaginator<I> {
    iter: I,
    page_size: usize,
}

impl<I> TaskPaginator<I>
where
    I: Iterator<Item = Task>,
{
    pub fn new(iter: I, page_size: usize) -> Self {
        Self { iter, page_size }
    }
}

impl<I> Iterator for TaskPaginator<I>
where
    I: Iterator<Item = Task>,
{
    type Item = Vec<Task>;

    fn next(&mut self) -> Option<Self::Item> {
        let mut page = Vec::with_capacity(self.page_size);

        for _ in 0..self.page_size {
            match self.iter.next() {
                Some(task) => page.push(task),
                None => break,
            }
        }

        if page.is_empty() { None } else { Some(page) }
    }
}

/// Extension trait for task iterators
pub trait TaskIteratorExt: Iterator<Item = Task> + Sized {
    fn paginate(self, page_size: usize) -> TaskPaginator<Self> {
        TaskPaginator::new(self, page_size)
    }

    fn high_priority_only(self) -> impl Iterator<Item = Task> {
        self.filter(|task| task.priority == Priority::High)
    }

    fn with_metadata(self, key: String) -> impl Iterator<Item = Task> {
        self.filter(move |task| task.metadata.as_ref().is_some_and(|m| m.contains_key(&key)))
    }
}

impl<I: Iterator<Item = Task> + Sized> TaskIteratorExt for I {}