1pub mod macros;
2
3use serde::Deserialize;
4use std::{
5 collections::{HashMap, VecDeque}, env, fs, path::{Path, PathBuf}
6};
7
8#[derive(Debug, Deserialize)]
9struct FlowPathIndex {
10 flow_path: FlowPathList,
11}
12
13#[derive(Debug, Deserialize)]
14struct FlowPathList {
15 paths: Vec<String>,
16}
17
18#[derive(Debug, Deserialize)]
19struct FlowFile {
20 flow: FlowConfig,
21}
22
23#[derive(Debug, Deserialize)]
24struct FlowConfig {
25 source: Vec<TaskConfig>,
26 processor: Vec<TaskConfig>,
27 sink: TaskConfig,
28}
29
30#[derive(Debug, Deserialize)]
31struct TaskConfig {
32 name: String,
33 dependencies: Vec<String>,
34 output: String,
35 builder: String,
36}
37
38#[derive(Debug)]
39struct Node {
40 name: String,
41 dependencies: Vec<String>,
42 output: String,
43 builder: String,
44 is_source: bool,
45 is_sink: bool,
46}
47
48fn to_unix(path: &Path) -> String {
49 path.to_string_lossy().replace('\\', "/")
50}
51
52fn normalize_for_concat(manifest_dir: &Path, path: &Path) -> String {
53 let relative = path.strip_prefix(manifest_dir).unwrap_or(path);
54 let relative_unix = to_unix(relative);
55 if relative_unix.starts_with('/') {
56 relative_unix
57 } else {
58 format!("/{relative_unix}")
59 }
60}
61
62fn sanitize_ident(raw: &str) -> String {
63 let mut buf = String::with_capacity(raw.len() + 8);
64 buf.push_str("out_");
65 for ch in raw.chars() {
66 if ch.is_ascii_alphanumeric() || ch == '_' {
67 buf.push(ch.to_ascii_lowercase());
68 } else {
69 buf.push('_');
70 }
71 }
72 if buf
73 .chars()
74 .last()
75 .map(|c| c.is_ascii_digit())
76 .unwrap_or(false)
77 {
78 buf.push('_');
79 }
80 buf
81}
82
83fn topo_sort(nodes: &[Node]) -> Result<Vec<usize>, String> {
84 let mut output_to_index = HashMap::new();
85 for (idx, node) in nodes.iter().enumerate() {
86 if output_to_index.insert(node.output.clone(), idx).is_some() {
87 return Err(format!(
88 "duplicate output '{}' for task '{}'",
89 node.output, node.name
90 ));
91 }
92 }
93
94 let mut indegree = vec![0usize; nodes.len()];
95 let mut graph = vec![Vec::<usize>::new(); nodes.len()];
96
97 for (idx, node) in nodes.iter().enumerate() {
98 for dep in &node.dependencies {
99 let from = output_to_index.get(dep).ok_or_else(|| {
100 format!(
101 "task '{}' references unknown dependency output '{}'",
102 node.name, dep
103 )
104 })?;
105 graph[*from].push(idx);
106 indegree[idx] += 1;
107 }
108 }
109
110 let mut queue = VecDeque::new();
111 for (idx, deg) in indegree.iter().enumerate() {
112 if *deg == 0 {
113 queue.push_back(idx);
114 }
115 }
116
117 let mut order = Vec::with_capacity(nodes.len());
118 while let Some(cur) = queue.pop_front() {
119 order.push(cur);
120 for next in &graph[cur] {
121 indegree[*next] -= 1;
122 if indegree[*next] == 0 {
123 queue.push_back(*next);
124 }
125 }
126 }
127
128 if order.len() != nodes.len() {
129 return Err("flow dependency graph has cycle".to_string());
130 }
131
132 Ok(order)
133}
134
135fn render_flow_builder(func_name: &str, nodes: &[Node]) -> Result<String, String> {
136 for node in nodes {
137 if node.builder.trim().is_empty() {
138 return Err(format!("task '{}' has empty builder expression", node.name));
139 }
140 if node.is_source && !node.dependencies.is_empty() {
141 return Err(format!(
142 "source task '{}' must not have dependencies",
143 node.name
144 ));
145 }
146 if !node.is_source && node.dependencies.is_empty() {
147 return Err(format!(
148 "non-source task '{}' must have at least one dependency",
149 node.name
150 ));
151 }
152 }
153
154 let mut output_to_var = HashMap::new();
155 let mut seen_vars = HashMap::new();
156 for node in nodes {
157 let sanitized = sanitize_ident(&node.output);
158 if let Some(prev) = seen_vars.insert(sanitized.clone(), &node.output) {
159 return Err(format!(
160 "outputs '{}' and '{}' produce the same variable name '{sanitized}'",
161 prev, node.output
162 ));
163 }
164 output_to_var.insert(node.output.clone(), sanitized);
165 }
166
167 let order = topo_sort(nodes)?;
168
169 let mut sink_var_name: Option<String> = None;
170 let mut body = String::new();
171 body.push_str(&format!(
172 "fn {func_name}() -> (taskflow::tf::flow::Flow, taskflow::tf::flow::TaskId) {{\n let mut flow = taskflow::tf::flow::Flow::new();\n"
173 ));
174
175 for idx in order {
176 let node = &nodes[idx];
177 let var_name = output_to_var
178 .get(&node.output)
179 .expect("output variable must exist");
180 if node.is_source {
181 body.push_str(&format!(
182 " let {var_name} = flow.commit_source_task(\"{}\", {});\n",
183 node.name, node.builder
184 ));
185 continue;
186 }
187
188 let dependency_vars = node
189 .dependencies
190 .iter()
191 .map(|dep| {
192 output_to_var.get(dep).cloned().ok_or_else(|| {
193 format!(
194 "task '{}' references unknown dependency output '{}'",
195 node.name, dep
196 )
197 })
198 })
199 .collect::<Result<Vec<_>, _>>()?;
200
201 let deps_expr = if dependency_vars.len() == 1 {
202 dependency_vars[0].clone()
203 } else {
204 format!("({})", dependency_vars.join(", "))
205 };
206
207 body.push_str(&format!(
208 " let {var_name} = flow.commit_task(\"{}\", {}).with_dependencies({deps_expr});\n",
209 node.name, node.builder
210 ));
211
212 if node.is_sink {
213 sink_var_name = Some(var_name.clone());
214 }
215 }
216
217 let sink_var_name = sink_var_name.ok_or_else(|| "sink task not found in flow nodes".to_string())?;
218 body.push_str(&format!(
219 " let sink_task_id = {sink_var_name}.id.clone();\n (flow, sink_task_id)\n}}\n"
220 ));
221 Ok(body)
222}
223
224fn render_flow_runner(func_name: &str, nodes: &[Node]) -> Result<String, String> {
225 let mut output_to_var = HashMap::new();
226 for node in nodes {
227 output_to_var.insert(node.output.clone(), sanitize_ident(&node.output));
228 }
229
230 let order = topo_sort(nodes)?;
231 let mut sink_var_name: Option<String> = None;
232
233 let mut body = String::new();
234 body.push_str(&format!(
235 "async fn run_{func_name}() -> Result<std::sync::Arc<dyn std::any::Any + Send + Sync>, taskflow::tf::errors::FlowError> {{\n let mut flow = taskflow::tf::flow::Flow::new();\n"
236 ));
237
238 for idx in order {
239 let node = &nodes[idx];
240 let var_name = output_to_var
241 .get(&node.output)
242 .expect("output variable must exist");
243 if node.is_source {
244 body.push_str(&format!(
245 " let {var_name} = flow.commit_source_task(\"{}\", {});\n",
246 node.name, node.builder
247 ));
248 continue;
249 }
250
251 let dependency_vars = node
252 .dependencies
253 .iter()
254 .map(|dep| {
255 output_to_var.get(dep).cloned().ok_or_else(|| {
256 format!(
257 "task '{}' references unknown dependency output '{}'",
258 node.name, dep
259 )
260 })
261 })
262 .collect::<Result<Vec<_>, _>>()?;
263
264 let deps_expr = if dependency_vars.len() == 1 {
265 dependency_vars[0].clone()
266 } else {
267 format!("({})", dependency_vars.join(", "))
268 };
269
270 body.push_str(&format!(
271 " let {var_name} = flow.commit_task(\"{}\", {}).with_dependencies({deps_expr});\n",
272 node.name, node.builder
273 ));
274
275 if node.is_sink {
276 sink_var_name = Some(var_name.clone());
277 }
278 }
279
280 let sink_var_name = sink_var_name.ok_or_else(|| "sink task not found in flow nodes".to_string())?;
281 body.push_str(&format!(
282 " let output = flow.run({sink_var_name}).await?;\n Ok(std::sync::Arc::new(output) as std::sync::Arc<dyn std::any::Any + Send + Sync>)\n}}\n"
283 ));
284
285 Ok(body)
286}
287
288pub fn generate(index_path: &Path, manifest_dir: &Path, out_dir: &Path) -> Result<PathBuf, String> {
289 let index_raw = fs::read_to_string(index_path)
290 .map_err(|e| format!("failed to read flow index file {}: {e}", index_path.display()))?;
291 let index: FlowPathIndex = toml::from_str(&index_raw)
292 .map_err(|e| format!("failed to parse flow index {}: {e}", index_path.display()))?;
293
294 let index_dir = index_path
295 .parent()
296 .ok_or_else(|| format!("flow index path has no parent: {}", index_path.display()))?;
297
298 let mut path_entries = Vec::new();
299 let mut match_arms = Vec::new();
300 let mut run_match_arms = Vec::new();
301 let mut builders = Vec::new();
302 let mut runners = Vec::new();
303
304 for (flow_idx, configured) in index.flow_path.paths.iter().enumerate() {
305 let resolved = index_dir.join(configured);
306 println!("cargo:rerun-if-changed={}", resolved.display());
307
308 let flow_raw = fs::read_to_string(&resolved)
309 .map_err(|e| format!("failed to read flow file {}: {e}", resolved.display()))?;
310 let flow_file: FlowFile = toml::from_str(&flow_raw)
311 .map_err(|e| format!("failed to parse {}: {e}", resolved.display()))?;
312
313 let mut nodes = Vec::new();
314 for task in flow_file.flow.source {
315 nodes.push(Node {
316 name: task.name,
317 dependencies: task.dependencies,
318 output: task.output,
319 builder: task.builder,
320 is_source: true,
321 is_sink: false,
322 });
323 }
324 for task in flow_file.flow.processor {
325 nodes.push(Node {
326 name: task.name,
327 dependencies: task.dependencies,
328 output: task.output,
329 builder: task.builder,
330 is_source: false,
331 is_sink: false,
332 });
333 }
334 nodes.push(Node {
335 name: flow_file.flow.sink.name,
336 dependencies: flow_file.flow.sink.dependencies,
337 output: flow_file.flow.sink.output,
338 builder: flow_file.flow.sink.builder,
339 is_source: false,
340 is_sink: true,
341 });
342
343 let func_name = format!("build_flow_{flow_idx}");
344 let builder_src = render_flow_builder(&func_name, &nodes)
345 .map_err(|e| format!("{}: {e}", resolved.display()))?;
346 builders.push(builder_src);
347
348 let runner_src = render_flow_runner(&func_name, &nodes)
349 .map_err(|e| format!("{}: {e}", resolved.display()))?;
350 runners.push(runner_src);
351
352 let normalized = normalize_for_concat(manifest_dir, &resolved);
353 let path_expr = format!("concat!(env!(\"CARGO_MANIFEST_DIR\"), \"{normalized}\")");
354 path_entries.push(format!(" {path_expr}"));
355 match_arms.push(format!(" {path_expr} => Some({func_name}()),"));
356 run_match_arms.push(format!(" {path_expr} => run_{func_name}().await,"));
357 }
358
359 let generated = format!(
360 "// @generated by taskflow-build. Do not edit manually.\n\
361pub const GENERATED_FLOW_PATHS: &[&str] = &[\n{}\n];\n\
362\n\
363pub fn build_flow_by_path(path: &str) -> Option<(taskflow::tf::flow::Flow, taskflow::tf::flow::TaskId)> {{\n match path {{\n{}\n _ => None,\n }}\n}}\n\
364\n\
365pub async fn run_flow_by_path(path: &str) -> Result<std::sync::Arc<dyn std::any::Any + Send + Sync>, taskflow::tf::errors::FlowError> {{\n match path {{\n{}\n _ => Err(taskflow::tf::errors::FlowError::ConfigBuildError(format!(\"flow path '{{}}' is not generated\", path))),\n }}\n}}\n\n{}\n\n{}\n",
366 path_entries.join(",\n"),
367 match_arms.join("\n"),
368 run_match_arms.join("\n"),
369 builders.join("\n"),
370 runners.join("\n")
371 );
372
373
374 let out_file = out_dir.join("generated_typed_flows.rs");
375 fs::write(&out_file, generated)
376 .map_err(|e| format!("failed to write generated typed flow file {}: {e}", out_file.display()))?;
377 Ok(out_file)
378}
379
380pub fn run_with_default() {
381 let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is not set"));
382 let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR is not set"));
383
384 let index_path = manifest_dir.join("configs/flows.toml");
386
387 println!("cargo:rerun-if-env-changed=TASKFLOW_FLOW_INDEX_PATH");
388 println!("cargo:rerun-if-changed={}", index_path.display());
389
390 generate(&index_path, &manifest_dir, &out_dir)
391 .unwrap_or_else(|err| panic!("failed to generate typed flow builders: {err}"));
392}
393
394pub fn run_with_env(env_key: &str) {
395 let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is not set"));
396 let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR is not set"));
397 let env_path = env::var(env_key).expect(format!("{env_key} is not set").as_str());
398 let index_path = PathBuf::from(env_path);
399 let index_path = if index_path.is_absolute() { index_path } else { manifest_dir.join(index_path) };
400
401 println!("cargo:rerun-if-env-changed=TASKFLOW_FLOW_INDEX_PATH");
402 println!("cargo:rerun-if-changed={}", index_path.display());
403
404 generate(&index_path, &manifest_dir, &out_dir)
405 .unwrap_or_else(|err| panic!("failed to generate typed flow builders: {err}"));
406}