ggen_core/codegen/
swarm_execution.rs

1use crate::manifest::GenerationRule;
2use ggen_utils::error::{Error, Result};
3use std::collections::{HashMap, VecDeque};
4use std::sync::Arc;
5use tokio::sync::RwLock;
6
7pub struct Agent {
8    pub id: String,
9    pub capability_level: u8,
10    pub assigned_rules: Vec<String>,
11    pub completed_rules: Vec<String>,
12    pub failed_rules: Vec<String>,
13}
14
15pub struct SwarmCoordinator {
16    agents: Arc<RwLock<HashMap<String, Agent>>>,
17    task_queue: Arc<RwLock<VecDeque<GenerationRule>>>,
18    rule_assignments: Arc<RwLock<HashMap<String, String>>>, // rule_name -> agent_id
19}
20
21impl SwarmCoordinator {
22    pub fn new(agent_count: usize) -> Self {
23        let mut agents = HashMap::new();
24
25        for i in 0..agent_count {
26            let agent_id = format!("agent-{}", i);
27            agents.insert(
28                agent_id.clone(),
29                Agent {
30                    id: agent_id,
31                    capability_level: ((i % 3) + 1) as u8,
32                    assigned_rules: vec![],
33                    completed_rules: vec![],
34                    failed_rules: vec![],
35                },
36            );
37        }
38
39        Self {
40            agents: Arc::new(RwLock::new(agents)),
41            task_queue: Arc::new(RwLock::new(VecDeque::new())),
42            rule_assignments: Arc::new(RwLock::new(HashMap::new())),
43        }
44    }
45
46    pub async fn enqueue_rules(&self, rules: Vec<GenerationRule>) -> Result<()> {
47        let mut queue = self.task_queue.write().await;
48        for rule in rules {
49            queue.push_back(rule);
50        }
51        Ok(())
52    }
53
54    pub async fn distribute_work(&self) -> Result<()> {
55        let mut queue = self.task_queue.write().await;
56        let mut agents = self.agents.write().await;
57        let mut assignments = self.rule_assignments.write().await;
58
59        // Priority: assign to agents with lowest workload first
60        while let Some(rule) = queue.pop_front() {
61            // Find agent with least assigned work
62            let best_agent = agents
63                .values_mut()
64                .min_by_key(|a| a.assigned_rules.len())
65                .ok_or_else(|| Error::new("No agents available"))?;
66
67            best_agent.assigned_rules.push(rule.name.clone());
68            assignments.insert(rule.name, best_agent.id.clone());
69        }
70
71        Ok(())
72    }
73
74    pub async fn record_completion(&self, rule_name: &str) -> Result<()> {
75        let assignments = self.rule_assignments.read().await;
76        let agent_id = assignments
77            .get(rule_name)
78            .ok_or_else(|| Error::new("Rule not assigned"))?
79            .clone();
80
81        let mut agents = self.agents.write().await;
82        if let Some(agent) = agents.get_mut(&agent_id) {
83            agent.assigned_rules.retain(|r| r != rule_name);
84            agent.completed_rules.push(rule_name.to_string());
85        }
86
87        Ok(())
88    }
89
90    pub async fn record_failure(&self, rule_name: &str) -> Result<()> {
91        let assignments = self.rule_assignments.read().await;
92        let agent_id = assignments
93            .get(rule_name)
94            .ok_or_else(|| Error::new("Rule not assigned"))?
95            .clone();
96
97        let mut agents = self.agents.write().await;
98        if let Some(agent) = agents.get_mut(&agent_id) {
99            agent.assigned_rules.retain(|r| r != rule_name);
100            agent.failed_rules.push(rule_name.to_string());
101        }
102
103        Ok(())
104    }
105
106    pub async fn get_agent_status(&self, agent_id: &str) -> Result<AgentStatus> {
107        let agents = self.agents.read().await;
108        let agent = agents
109            .get(agent_id)
110            .ok_or_else(|| Error::new("Agent not found"))?;
111
112        Ok(AgentStatus {
113            id: agent.id.clone(),
114            capability_level: agent.capability_level,
115            assigned_count: agent.assigned_rules.len(),
116            completed_count: agent.completed_rules.len(),
117            failed_count: agent.failed_rules.len(),
118            efficiency: Self::calculate_efficiency(
119                agent.completed_rules.len(),
120                agent.failed_rules.len(),
121            ),
122        })
123    }
124
125    pub async fn get_swarm_summary(&self) -> Result<SwarmSummary> {
126        let agents = self.agents.read().await;
127
128        let total_assigned = agents.values().map(|a| a.assigned_rules.len()).sum();
129        let total_completed = agents.values().map(|a| a.completed_rules.len()).sum();
130        let total_failed = agents.values().map(|a| a.failed_rules.len()).sum();
131
132        let idle_agents = agents
133            .values()
134            .filter(|a| a.assigned_rules.is_empty())
135            .count();
136
137        Ok(SwarmSummary {
138            total_agents: agents.len(),
139            active_agents: agents.len() - idle_agents,
140            idle_agents,
141            total_assigned,
142            total_completed,
143            total_failed,
144            utilization: if agents.is_empty() {
145                0.0
146            } else {
147                (total_assigned as f64 / agents.len() as f64) * 100.0
148            },
149        })
150    }
151
152    fn calculate_efficiency(completed: usize, failed: usize) -> f64 {
153        let total = completed + failed;
154        if total == 0 {
155            100.0
156        } else {
157            (completed as f64 / total as f64) * 100.0
158        }
159    }
160
161    pub async fn rebalance_work(&self) -> Result<()> {
162        let mut agents = self.agents.write().await;
163
164        // Find overloaded agents (assigned > 2x average)
165        let avg_load = agents
166            .values()
167            .map(|a| a.assigned_rules.len())
168            .sum::<usize>() as f64
169            / agents.len() as f64;
170
171        for agent in agents.values_mut() {
172            if agent.assigned_rules.len() as f64 > avg_load * 2.0 {
173                // This agent is overloaded, but actual redistribution
174                // happens in distribute_work() after queue updates
175                agent.assigned_rules.clear();
176            }
177        }
178
179        Ok(())
180    }
181}
182
183pub struct AgentStatus {
184    pub id: String,
185    pub capability_level: u8,
186    pub assigned_count: usize,
187    pub completed_count: usize,
188    pub failed_count: usize,
189    pub efficiency: f64,
190}
191
192pub struct SwarmSummary {
193    pub total_agents: usize,
194    pub active_agents: usize,
195    pub idle_agents: usize,
196    pub total_assigned: usize,
197    pub total_completed: usize,
198    pub total_failed: usize,
199    pub utilization: f64,
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::manifest::{QuerySource, TemplateSource};
206
207    #[tokio::test]
208    async fn test_swarm_coordinator_creation() {
209        let swarm = SwarmCoordinator::new(4);
210        let summary = swarm.get_swarm_summary().await.unwrap();
211
212        assert_eq!(summary.total_agents, 4);
213        assert_eq!(summary.active_agents, 0);
214        assert_eq!(summary.idle_agents, 4);
215    }
216
217    #[tokio::test]
218    async fn test_agent_status_retrieval() {
219        let swarm = SwarmCoordinator::new(2);
220        let status = swarm.get_agent_status("agent-0").await.unwrap();
221
222        assert_eq!(status.id, "agent-0");
223        assert_eq!(status.capability_level, 1);
224        assert_eq!(status.efficiency, 100.0);
225    }
226
227    #[tokio::test]
228    async fn test_work_distribution() {
229        let swarm = SwarmCoordinator::new(3);
230
231        let rules = vec![
232            GenerationRule {
233                name: "rule-1".to_string(),
234                query: QuerySource::Inline {
235                    inline: "SELECT * WHERE {}".to_string(),
236                },
237                template: TemplateSource::Inline {
238                    inline: "t.tera".to_string(),
239                },
240                output_file: "out.rs".to_string(),
241                skip_empty: false,
242                mode: Default::default(),
243                when: None,
244            },
245            GenerationRule {
246                name: "rule-2".to_string(),
247                query: QuerySource::Inline {
248                    inline: "SELECT * WHERE {}".to_string(),
249                },
250                template: TemplateSource::Inline {
251                    inline: "t.tera".to_string(),
252                },
253                output_file: "out.rs".to_string(),
254                skip_empty: false,
255                mode: Default::default(),
256                when: None,
257            },
258        ];
259
260        swarm.enqueue_rules(rules).await.unwrap();
261        swarm.distribute_work().await.unwrap();
262
263        let summary = swarm.get_swarm_summary().await.unwrap();
264        assert_eq!(summary.total_assigned, 2);
265    }
266
267    #[tokio::test]
268    async fn test_completion_tracking() {
269        let swarm = SwarmCoordinator::new(2);
270
271        let rules = vec![GenerationRule {
272            name: "rule-1".to_string(),
273            query: QuerySource::Inline {
274                inline: "SELECT * WHERE {}".to_string(),
275            },
276            template: TemplateSource::Inline {
277                inline: "t.tera".to_string(),
278            },
279            output_file: "out.rs".to_string(),
280            skip_empty: false,
281            mode: Default::default(),
282            when: None,
283        }];
284
285        swarm.enqueue_rules(rules).await.unwrap();
286        swarm.distribute_work().await.unwrap();
287        swarm.record_completion("rule-1").await.unwrap();
288
289        let summary = swarm.get_swarm_summary().await.unwrap();
290        assert_eq!(summary.total_assigned, 0);
291        assert_eq!(summary.total_completed, 1);
292    }
293
294    #[test]
295    fn test_efficiency_calculation() {
296        assert_eq!(SwarmCoordinator::calculate_efficiency(10, 0), 100.0);
297        assert_eq!(SwarmCoordinator::calculate_efficiency(5, 5), 50.0);
298        assert_eq!(SwarmCoordinator::calculate_efficiency(0, 0), 100.0);
299    }
300}