Skip to main content

taskflow_build/
lib.rs

1pub mod macros;
2
3use serde::Deserialize;
4use std::{
5    collections::{HashMap, VecDeque}, env, fs, path::{Path, PathBuf}
6};
7
8#[derive(Debug, Deserialize)]
9struct FlowPathIndex {
10    flow_path: FlowPathList,
11}
12
13#[derive(Debug, Deserialize)]
14struct FlowPathList {
15    paths: Vec<String>,
16}
17
18#[derive(Debug, Deserialize)]
19struct FlowFile {
20    flow: FlowConfig,
21}
22
23#[derive(Debug, Deserialize)]
24struct FlowConfig {
25    source: Vec<TaskConfig>,
26    processor: Vec<TaskConfig>,
27    sink: TaskConfig,
28}
29
30#[derive(Debug, Deserialize)]
31struct TaskConfig {
32    name: String,
33    dependencies: Vec<String>,
34    output: String,
35    builder: String,
36}
37
38#[derive(Debug)]
39struct Node {
40    name: String,
41    dependencies: Vec<String>,
42    output: String,
43    builder: String,
44    is_source: bool,
45    is_sink: bool,
46}
47
48fn to_unix(path: &Path) -> String {
49    path.to_string_lossy().replace('\\', "/")
50}
51
52fn normalize_for_concat(manifest_dir: &Path, path: &Path) -> String {
53    let relative = path.strip_prefix(manifest_dir).unwrap_or(path);
54    let relative_unix = to_unix(relative);
55    if relative_unix.starts_with('/') {
56        relative_unix
57    } else {
58        format!("/{relative_unix}")
59    }
60}
61
62fn sanitize_ident(raw: &str) -> String {
63    let mut buf = String::with_capacity(raw.len() + 8);
64    buf.push_str("out_");
65    for ch in raw.chars() {
66        if ch.is_ascii_alphanumeric() || ch == '_' {
67            buf.push(ch.to_ascii_lowercase());
68        } else {
69            buf.push('_');
70        }
71    }
72    if buf
73        .chars()
74        .last()
75        .map(|c| c.is_ascii_digit())
76        .unwrap_or(false)
77    {
78        buf.push('_');
79    }
80    buf
81}
82
83fn topo_sort(nodes: &[Node]) -> Result<Vec<usize>, String> {
84    let mut output_to_index = HashMap::new();
85    for (idx, node) in nodes.iter().enumerate() {
86        if output_to_index.insert(node.output.clone(), idx).is_some() {
87            return Err(format!(
88                "duplicate output '{}' for task '{}'",
89                node.output, node.name
90            ));
91        }
92    }
93
94    let mut indegree = vec![0usize; nodes.len()];
95    let mut graph = vec![Vec::<usize>::new(); nodes.len()];
96
97    for (idx, node) in nodes.iter().enumerate() {
98        for dep in &node.dependencies {
99            let from = output_to_index.get(dep).ok_or_else(|| {
100                format!(
101                    "task '{}' references unknown dependency output '{}'",
102                    node.name, dep
103                )
104            })?;
105            graph[*from].push(idx);
106            indegree[idx] += 1;
107        }
108    }
109
110    let mut queue = VecDeque::new();
111    for (idx, deg) in indegree.iter().enumerate() {
112        if *deg == 0 {
113            queue.push_back(idx);
114        }
115    }
116
117    let mut order = Vec::with_capacity(nodes.len());
118    while let Some(cur) = queue.pop_front() {
119        order.push(cur);
120        for next in &graph[cur] {
121            indegree[*next] -= 1;
122            if indegree[*next] == 0 {
123                queue.push_back(*next);
124            }
125        }
126    }
127
128    if order.len() != nodes.len() {
129        return Err("flow dependency graph has cycle".to_string());
130    }
131
132    Ok(order)
133}
134
135fn render_flow_builder(func_name: &str, nodes: &[Node]) -> Result<String, String> {
136    for node in nodes {
137        if node.builder.trim().is_empty() {
138            return Err(format!("task '{}' has empty builder expression", node.name));
139        }
140        if node.is_source && !node.dependencies.is_empty() {
141            return Err(format!(
142                "source task '{}' must not have dependencies",
143                node.name
144            ));
145        }
146        if !node.is_source && node.dependencies.is_empty() {
147            return Err(format!(
148                "non-source task '{}' must have at least one dependency",
149                node.name
150            ));
151        }
152    }
153
154    let mut output_to_var = HashMap::new();
155    let mut seen_vars = HashMap::new();
156    for node in nodes {
157        let sanitized = sanitize_ident(&node.output);
158        if let Some(prev) = seen_vars.insert(sanitized.clone(), &node.output) {
159            return Err(format!(
160                "outputs '{}' and '{}' produce the same variable name '{sanitized}'",
161                prev, node.output
162            ));
163        }
164        output_to_var.insert(node.output.clone(), sanitized);
165    }
166
167    let order = topo_sort(nodes)?;
168
169    let mut sink_var_name: Option<String> = None;
170    let mut body = String::new();
171    body.push_str(&format!(
172        "fn {func_name}() -> (taskflow::tf::flow::Flow, taskflow::tf::flow::TaskId) {{\n    let mut flow = taskflow::tf::flow::Flow::new();\n"
173    ));
174
175    for idx in order {
176        let node = &nodes[idx];
177        let var_name = output_to_var
178            .get(&node.output)
179            .expect("output variable must exist");
180        if node.is_source {
181            body.push_str(&format!(
182                "    let {var_name} = flow.commit_source_task(\"{}\", {});\n",
183                node.name, node.builder
184            ));
185            continue;
186        }
187
188        let dependency_vars = node
189            .dependencies
190            .iter()
191            .map(|dep| {
192                output_to_var.get(dep).cloned().ok_or_else(|| {
193                    format!(
194                        "task '{}' references unknown dependency output '{}'",
195                        node.name, dep
196                    )
197                })
198            })
199            .collect::<Result<Vec<_>, _>>()?;
200
201        let deps_expr = if dependency_vars.len() == 1 {
202            dependency_vars[0].clone()
203        } else {
204            format!("({})", dependency_vars.join(", "))
205        };
206
207        body.push_str(&format!(
208            "    let {var_name} = flow.commit_task(\"{}\", {}).with_dependencies({deps_expr});\n",
209            node.name, node.builder
210        ));
211
212        if node.is_sink {
213            sink_var_name = Some(var_name.clone());
214        }
215    }
216
217    let sink_var_name = sink_var_name.ok_or_else(|| "sink task not found in flow nodes".to_string())?;
218    body.push_str(&format!(
219        "    let sink_task_id = {sink_var_name}.id.clone();\n    (flow, sink_task_id)\n}}\n"
220    ));
221    Ok(body)
222}
223
224fn render_flow_runner(func_name: &str, nodes: &[Node]) -> Result<String, String> {
225    let mut output_to_var = HashMap::new();
226    for node in nodes {
227        output_to_var.insert(node.output.clone(), sanitize_ident(&node.output));
228    }
229
230    let order = topo_sort(nodes)?;
231    let mut sink_var_name: Option<String> = None;
232
233    let mut body = String::new();
234    body.push_str(&format!(
235        "async fn run_{func_name}() -> Result<std::sync::Arc<dyn std::any::Any + Send + Sync>, taskflow::tf::errors::FlowError> {{\n    let mut flow = taskflow::tf::flow::Flow::new();\n"
236    ));
237
238    for idx in order {
239        let node = &nodes[idx];
240        let var_name = output_to_var
241            .get(&node.output)
242            .expect("output variable must exist");
243        if node.is_source {
244            body.push_str(&format!(
245                "    let {var_name} = flow.commit_source_task(\"{}\", {});\n",
246                node.name, node.builder
247            ));
248            continue;
249        }
250
251        let dependency_vars = node
252            .dependencies
253            .iter()
254            .map(|dep| {
255                output_to_var.get(dep).cloned().ok_or_else(|| {
256                    format!(
257                        "task '{}' references unknown dependency output '{}'",
258                        node.name, dep
259                    )
260                })
261            })
262            .collect::<Result<Vec<_>, _>>()?;
263
264        let deps_expr = if dependency_vars.len() == 1 {
265            dependency_vars[0].clone()
266        } else {
267            format!("({})", dependency_vars.join(", "))
268        };
269
270        body.push_str(&format!(
271            "    let {var_name} = flow.commit_task(\"{}\", {}).with_dependencies({deps_expr});\n",
272            node.name, node.builder
273        ));
274
275        if node.is_sink {
276            sink_var_name = Some(var_name.clone());
277        }
278    }
279
280    let sink_var_name = sink_var_name.ok_or_else(|| "sink task not found in flow nodes".to_string())?;
281    body.push_str(&format!(
282        "    let output = flow.run({sink_var_name}).await?;\n    Ok(std::sync::Arc::new(output) as std::sync::Arc<dyn std::any::Any + Send + Sync>)\n}}\n"
283    ));
284
285    Ok(body)
286}
287
288pub fn generate(index_path: &Path, manifest_dir: &Path, out_dir: &Path) -> Result<PathBuf, String> {
289    let index_raw = fs::read_to_string(index_path)
290        .map_err(|e| format!("failed to read flow index file {}: {e}", index_path.display()))?;
291    let index: FlowPathIndex = toml::from_str(&index_raw)
292        .map_err(|e| format!("failed to parse flow index {}: {e}", index_path.display()))?;
293
294    let index_dir = index_path
295        .parent()
296        .ok_or_else(|| format!("flow index path has no parent: {}", index_path.display()))?;
297
298    let mut path_entries = Vec::new();
299    let mut match_arms = Vec::new();
300    let mut run_match_arms = Vec::new();
301    let mut builders = Vec::new();
302    let mut runners = Vec::new();
303
304    for (flow_idx, configured) in index.flow_path.paths.iter().enumerate() {
305        let resolved = index_dir.join(configured);
306        println!("cargo:rerun-if-changed={}", resolved.display());
307
308        let flow_raw = fs::read_to_string(&resolved)
309            .map_err(|e| format!("failed to read flow file {}: {e}", resolved.display()))?;
310        let flow_file: FlowFile = toml::from_str(&flow_raw)
311            .map_err(|e| format!("failed to parse {}: {e}", resolved.display()))?;
312
313        let mut nodes = Vec::new();
314        for task in flow_file.flow.source {
315            nodes.push(Node {
316                name: task.name,
317                dependencies: task.dependencies,
318                output: task.output,
319                builder: task.builder,
320                is_source: true,
321                is_sink: false,
322            });
323        }
324        for task in flow_file.flow.processor {
325            nodes.push(Node {
326                name: task.name,
327                dependencies: task.dependencies,
328                output: task.output,
329                builder: task.builder,
330                is_source: false,
331                is_sink: false,
332            });
333        }
334        nodes.push(Node {
335            name: flow_file.flow.sink.name,
336            dependencies: flow_file.flow.sink.dependencies,
337            output: flow_file.flow.sink.output,
338            builder: flow_file.flow.sink.builder,
339            is_source: false,
340            is_sink: true,
341        });
342
343        let func_name = format!("build_flow_{flow_idx}");
344        let builder_src = render_flow_builder(&func_name, &nodes)
345            .map_err(|e| format!("{}: {e}", resolved.display()))?;
346        builders.push(builder_src);
347
348        let runner_src = render_flow_runner(&func_name, &nodes)
349            .map_err(|e| format!("{}: {e}", resolved.display()))?;
350        runners.push(runner_src);
351
352        let normalized = normalize_for_concat(manifest_dir, &resolved);
353        let path_expr = format!("concat!(env!(\"CARGO_MANIFEST_DIR\"), \"{normalized}\")");
354        path_entries.push(format!("    {path_expr}"));
355        match_arms.push(format!("        {path_expr} => Some({func_name}()),"));
356        run_match_arms.push(format!("        {path_expr} => run_{func_name}().await,"));
357    }
358
359    let generated = format!(
360        "// @generated by taskflow-build. Do not edit manually.\n\
361pub const GENERATED_FLOW_PATHS: &[&str] = &[\n{}\n];\n\
362\n\
363pub fn build_flow_by_path(path: &str) -> Option<(taskflow::tf::flow::Flow, taskflow::tf::flow::TaskId)> {{\n    match path {{\n{}\n        _ => None,\n    }}\n}}\n\
364\n\
365pub async fn run_flow_by_path(path: &str) -> Result<std::sync::Arc<dyn std::any::Any + Send + Sync>, taskflow::tf::errors::FlowError> {{\n    match path {{\n{}\n        _ => Err(taskflow::tf::errors::FlowError::ConfigBuildError(format!(\"flow path '{{}}' is not generated\", path))),\n    }}\n}}\n\n{}\n\n{}\n",
366        path_entries.join(",\n"),
367        match_arms.join("\n"),
368        run_match_arms.join("\n"),
369        builders.join("\n"),
370        runners.join("\n")
371    );
372
373
374    let out_file = out_dir.join("generated_typed_flows.rs");
375    fs::write(&out_file, generated)
376        .map_err(|e| format!("failed to write generated typed flow file {}: {e}", out_file.display()))?;
377    Ok(out_file)
378}
379
380pub fn run_with_default() {
381    let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is not set"));
382    let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR is not set"));
383
384    // use CARGO_MANIFEST_DIR/configs/flows.toml by default
385    let index_path = manifest_dir.join("configs/flows.toml");
386
387    println!("cargo:rerun-if-env-changed=TASKFLOW_FLOW_INDEX_PATH");
388    println!("cargo:rerun-if-changed={}", index_path.display());
389
390    generate(&index_path, &manifest_dir, &out_dir)
391        .unwrap_or_else(|err| panic!("failed to generate typed flow builders: {err}"));
392}
393
394pub fn run_with_env(env_key: &str) {
395    let manifest_dir = PathBuf::from(env::var("CARGO_MANIFEST_DIR").expect("CARGO_MANIFEST_DIR is not set"));
396    let out_dir = PathBuf::from(env::var("OUT_DIR").expect("OUT_DIR is not set"));
397    let env_path = env::var(env_key).expect(format!("{env_key} is not set").as_str());
398    let index_path = PathBuf::from(env_path);
399    let index_path = if index_path.is_absolute() { index_path } else { manifest_dir.join(index_path) };
400
401    println!("cargo:rerun-if-env-changed=TASKFLOW_FLOW_INDEX_PATH");
402    println!("cargo:rerun-if-changed={}", index_path.display());
403
404    generate(&index_path, &manifest_dir, &out_dir)
405        .unwrap_or_else(|err| panic!("failed to generate typed flow builders: {err}"));
406}