ggen_core/codegen/
concurrent.rs

1use crate::manifest::GenerationRule;
2use ggen_utils::error::{Error, Result};
3use std::collections::{HashMap, HashSet};
4use std::sync::Arc;
5use tokio::task::JoinSet;
6
7pub struct ConcurrentRuleExecutor;
8
9impl ConcurrentRuleExecutor {
10    pub fn detect_rule_dependencies(rules: &[GenerationRule]) -> HashMap<String, HashSet<String>> {
11        let mut dependencies: HashMap<String, HashSet<String>> = HashMap::new();
12
13        for rule in rules {
14            let mut deps = HashSet::new();
15
16            // Extract variable references from output_file
17            for var in Self::extract_variables(&rule.output_file) {
18                // Find which rules produce this variable
19                for other_rule in rules {
20                    if Self::rule_produces_variable(other_rule, &var) {
21                        deps.insert(other_rule.name.clone());
22                    }
23                }
24            }
25
26            dependencies.insert(rule.name.clone(), deps);
27        }
28
29        dependencies
30    }
31
32    pub fn find_independent_rules(
33        rules: &[GenerationRule], dependencies: &HashMap<String, HashSet<String>>,
34    ) -> Vec<Vec<String>> {
35        let mut batches: Vec<Vec<String>> = vec![];
36        let mut processed = HashSet::new();
37
38        while processed.len() < rules.len() {
39            let mut batch = vec![];
40
41            for rule in rules {
42                if processed.contains(&rule.name) {
43                    continue;
44                }
45
46                let empty_deps = HashSet::new();
47                let rule_deps = dependencies.get(&rule.name).unwrap_or(&empty_deps);
48                if rule_deps.iter().all(|dep| processed.contains(dep)) {
49                    batch.push(rule.name.clone());
50                }
51            }
52
53            if batch.is_empty() {
54                break;
55            }
56
57            for rule_name in &batch {
58                processed.insert(rule_name.clone());
59            }
60
61            batches.push(batch);
62        }
63
64        batches
65    }
66
67    pub async fn execute_rules_concurrent<F>(
68        rules: &[GenerationRule], max_parallelism: Option<usize>, executor: F,
69    ) -> Result<Vec<(String, Result<()>)>>
70    where
71        F: Fn(GenerationRule) -> futures::future::BoxFuture<'static, Result<()>>
72            + Send
73            + Sync
74            + 'static,
75    {
76        let dependencies = Self::detect_rule_dependencies(rules);
77        let batches = Self::find_independent_rules(rules, &dependencies);
78
79        let executor = Arc::new(executor);
80        let mut all_results = vec![];
81
82        for batch in batches {
83            let mut join_set = JoinSet::new();
84            let parallelism = max_parallelism.unwrap_or(num_cpus::get());
85
86            for (idx, rule_name) in batch.iter().enumerate() {
87                if idx > 0 && idx % parallelism == 0 {
88                    while let Some(result) = join_set.join_next().await {
89                        match result {
90                            Ok((name, res)) => {
91                                all_results.push((name, res));
92                            }
93                            Err(e) => {
94                                return Err(Error::new(&e.to_string()));
95                            }
96                        }
97                    }
98                }
99
100                let rule = rules
101                    .iter()
102                    .find(|r| r.name == *rule_name)
103                    .ok_or_else(|| Error::new(&format!("Rule {} not found", rule_name)))?
104                    .clone();
105
106                let executor_clone = Arc::clone(&executor);
107                join_set.spawn(async move {
108                    let res = executor_clone(rule.clone()).await;
109                    (rule.name, res)
110                });
111            }
112
113            while let Some(result) = join_set.join_next().await {
114                match result {
115                    Ok((name, res)) => {
116                        all_results.push((name, res));
117                    }
118                    Err(e) => {
119                        return Err(Error::new(&e.to_string()));
120                    }
121                }
122            }
123        }
124
125        Ok(all_results)
126    }
127
128    fn extract_variables(template: &str) -> Vec<String> {
129        let mut vars = vec![];
130        let mut chars_iter = template.chars().peekable();
131
132        while let Some(ch) = chars_iter.next() {
133            // Look for {{ pattern
134            if ch == '{' && chars_iter.peek() == Some(&'{') {
135                chars_iter.next(); // consume second {
136
137                // Read variable name until }}
138                let mut var_name = String::new();
139                while let Some(c) = chars_iter.next() {
140                    if c == '}' && chars_iter.peek() == Some(&'}') {
141                        chars_iter.next(); // consume second }
142                        if !var_name.is_empty() {
143                            vars.push(var_name.trim().to_string());
144                        }
145                        break;
146                    }
147                    var_name.push(c);
148                }
149            }
150        }
151
152        vars
153    }
154
155    fn rule_produces_variable(rule: &GenerationRule, var: &str) -> bool {
156        // Simple heuristic: rule produces variables if its query selects them
157        // In a full implementation, would parse the SPARQL query
158        rule.name.contains(&var.to_lowercase())
159            || rule.name.replace("-", "_").contains(&var.to_lowercase())
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[test]
168    fn test_extract_variables() {
169        let template = "src/{{module}}/{{name}}.rs";
170        let vars = ConcurrentRuleExecutor::extract_variables(template);
171        assert_eq!(vars, vec!["module", "name"]);
172    }
173
174    #[test]
175    fn test_detect_independent_rules() {
176        let rules = vec![
177            GenerationRule {
178                name: "rule1".to_string(),
179                output_file: "file1.rs".to_string(),
180                ..Default::default()
181            },
182            GenerationRule {
183                name: "rule2".to_string(),
184                output_file: "file2.rs".to_string(),
185                ..Default::default()
186            },
187        ];
188
189        let deps = ConcurrentRuleExecutor::detect_rule_dependencies(&rules);
190        let batches = ConcurrentRuleExecutor::find_independent_rules(&rules, &deps);
191
192        assert_eq!(batches.len(), 1);
193        assert_eq!(batches[0].len(), 2);
194    }
195}