static_graph/lib.rs
1//! This crate provides the ability to generate static graphs by analysing the node dependencies in DSL. It allows only one input and one output in a graph, and independent nodes can run in maximum parallel.
2
3//! For example, in the following graph(the number represents the execution time of the node), run it in serial will take 6 seconds, but run it in maximum parallel will just take 2 seconds.
4
5//! ```mermaid
6//! graph TD;
7//! A/0-->B/1;
8//! A/0-->C/2;
9//! A/0-->D/1;
10//! A/0-->E/1;
11//! B/1-->F/0;
12//! C/2-->F/0;
13//! D/1-->G/1;
14//! E/1-->G/1;
15//! F/0-->H/0;
16//! G/1-->H/0;
17//! ```
18
19//! ## Usage
20
21//! Add this to your `Cargo.toml`:
22
23//! ```toml
24//! [build-dependencies]
25//! static-graph = "0.2"
26//! ```
27
28//! ## Example
29
30//! Write a graph description in `example.graph` file:
31
32//! ```txt
33//! node E -> (X, Y) {
34//! #[default = "crate::Custom::new"]
35//! custom: crate::Custom,
36//! }
37
38//! node X -> O {
39//! x: list<string>,
40//! }
41
42//! node Y -> O {
43//! y: map<i32, string>,
44//! }
45
46//! node O {
47//! #[editable = "true"]
48//! o: string,
49//! }
50
51//! graph G(E)
52//! ```
53
54//! Then, in `build.rs`:
55
56//! ```rust
57//! static_graph::configure()
58//! .file_name("example.rs")
59//! .compile("example.graph")
60//! .unwrap();
61//! ```
62
63//! Finally, in `main.rs` write your own logic for your nodes in the graph. The generated code will be in the `OUT_DIR` directory by default, the graph name is `G`, and the nodes name are `E`, `X`, `Y`, `O`. You should implement the `Runnable` trait for each node, and then you can automatically run the graph in maximum parallel by calling `G::new().run()`.
64
65//! ```rust
66//! use std::{
67//! sync::Arc,
68//! time::{Duration, Instant},
69//! };
70
71//! use gen_graph::{Runnable, E, G, O, X, Y};
72
73//! #[allow(warnings, clippy::all)]
74//! pub mod gen_graph {
75//! static_graph::include_graph!("example.rs");
76//! }
77
78//! #[derive(Default)]
79//! pub struct Custom;
80
81//! impl Custom {
82//! pub fn new() -> Self {
83//! Self
84//! }
85//! }
86
87//! #[tokio::main]
88//! async fn main() {
89//! let start = Instant::now();
90//! let resp = G::new()
91//! .run::<Request, EResponse, XResponse, YResponse, OResponse, ()>(Request {
92//! msg: "**Hello, world!**".to_string(),
93//! user_age: 18,
94//! })
95//! .await;
96//! let duration = start.elapsed();
97
98//! println!("Time elapsed is {duration:?}, resp is {resp:?}");
99//! }
100
101//! #[derive(Clone)]
102//! pub struct Request {
103//! msg: String,
104//! user_age: u8,
105//! }
106
107//! #[derive(Clone)]
108//! pub struct EResponse(Duration);
109
110//! #[async_trait::async_trait]
111//! impl Runnable<Request, ()> for E {
112//! type Resp = EResponse;
113//! type Error = ();
114
115//! async fn run(&self, _req: Request, _prev_resp: ()) -> Result<Self::Resp, Self::Error> {
116//! tokio::time::sleep(Duration::from_secs(1)).await;
117//! Ok(EResponse(Duration::from_secs(1)))
118//! }
119//! }
120
121//! #[derive(Clone)]
122//! pub struct XResponse(bool);
123
124//! #[async_trait::async_trait]
125//! impl Runnable<Request, EResponse> for X {
126//! type Resp = XResponse;
127//! type Error = ();
128
129//! async fn run(&self, req: Request, prev_resp: EResponse) -> Result<Self::Resp, Self::Error> {
130//! tokio::time::sleep(prev_resp.0).await;
131//! Ok(XResponse(!req.msg.contains('*')))
132//! }
133//! }
134
135//! #[derive(Clone)]
136//! pub struct YResponse(bool);
137
138//! #[async_trait::async_trait]
139//! impl Runnable<Request, EResponse> for Y {
140//! type Resp = YResponse;
141//! type Error = ();
142
143//! async fn run(&self, req: Request, prev_resp: EResponse) -> Result<Self::Resp, Self::Error> {
144//! tokio::time::sleep(prev_resp.0).await;
145//! Ok(YResponse(req.user_age >= 18))
146//! }
147//! }
148
149//! #[derive(Clone, Debug)]
150//! pub struct OResponse(String);
151
152//! #[async_trait::async_trait]
153//! impl Runnable<Request, (XResponse, YResponse)> for O {
154//! type Resp = OResponse;
155//! type Error = ();
156
157//! async fn run(
158//! &self,
159//! req: Request,
160//! prev_resp: (XResponse, YResponse),
161//! ) -> Result<Self::Resp, Self::Error> {
162//! self.o.store(Arc::new(req.msg.clone()));
163//! println!("O: {:#?}", self.o.load());
164//! if prev_resp.0 .0 && prev_resp.1 .0 {
165//! Ok(OResponse(req.msg))
166//! } else {
167//! Ok(OResponse("Ban".to_string()))
168//! }
169//! }
170//! }
171//! ```
172//!
173pub mod codegen;
174pub mod context;
175pub mod index;
176pub mod parser;
177pub mod resolver;
178pub mod symbol;
179pub mod tags;
180
181pub use arc_swap::*;
182pub use async_trait::*;
183pub use tokio::*;
184
185use crate::{
186 codegen::Codegen,
187 context::Context,
188 parser::{document::Document, Parser},
189 resolver::{ResolveResult, Resolver},
190};
191use std::{
192 io::{self, Write},
193 path::{Path, PathBuf},
194 process::{exit, Command},
195};
196
197#[macro_export]
198macro_rules! include_graph {
199 ($graph: tt) => {
200 include!(concat!(env!("OUT_DIR"), concat!("/", $graph)));
201 };
202}
203
204#[must_use]
205pub fn configure() -> Builder {
206 Builder {
207 emit_rerun_if_changed: std::env::var_os("CARGO").is_some(),
208 out_dir: None,
209 file_name: "gen_graph.rs".into(),
210 }
211}
212
213#[derive(Debug, Clone)]
214pub struct Builder {
215 emit_rerun_if_changed: bool,
216 out_dir: Option<PathBuf>,
217 file_name: PathBuf,
218}
219
220impl Builder {
221 #[must_use]
222 pub fn out_dir(mut self, out_dir: impl AsRef<Path>) -> Self {
223 self.out_dir = Some(out_dir.as_ref().to_path_buf());
224 self
225 }
226
227 #[must_use]
228 pub fn file_name(mut self, file_name: impl AsRef<Path>) -> Self {
229 self.file_name = file_name.as_ref().to_path_buf();
230 self
231 }
232
233 #[must_use]
234 pub fn emit_rerun_if_changed(mut self, enable: bool) -> Self {
235 self.emit_rerun_if_changed = enable;
236 self
237 }
238
239 pub fn compile(self, graph: impl AsRef<Path>) -> io::Result<()> {
240 let out_dir = if let Some(out_dir) = self.out_dir.as_ref() {
241 out_dir.clone()
242 } else {
243 PathBuf::from(std::env::var("OUT_DIR").unwrap())
244 };
245
246 if self.emit_rerun_if_changed {
247 println!("cargo:rerun-if-changed={}", graph.as_ref().display());
248 }
249
250 let input = unsafe { String::from_utf8_unchecked(std::fs::read(graph).unwrap()) };
251
252 let document = Document::parse(&input).unwrap().1;
253 let ResolveResult {
254 graphs,
255 nodes,
256 fields,
257 tags,
258 entrys,
259 } = Resolver::default().resolve_document(document);
260
261 let mut cx = Context::new();
262 cx.set_graphs(graphs);
263 cx.set_nodes(nodes);
264 cx.set_fields(fields);
265 cx.set_tags(tags);
266
267 let mut cg = Codegen::new(cx);
268 let stream = cg.write_document(entrys);
269 let out = out_dir.join(self.file_name);
270 let mut file = std::io::BufWriter::new(std::fs::File::create(&out).unwrap());
271 file.write_all(stream.to_string().as_bytes()).unwrap();
272 file.flush().unwrap();
273 fmt_file(out);
274
275 Ok(())
276 }
277}
278
279fn fmt_file<P: AsRef<Path>>(file: P) {
280 let file = file.as_ref();
281 if let Some(a) = file.extension() {
282 if a != "rs" {
283 return;
284 }
285 };
286
287 let result = Command::new(std::env::var("RUSTFMT").unwrap_or_else(|_| "rustfmt".to_owned()))
288 .arg("--emit")
289 .arg("files")
290 .arg("--edition")
291 .arg("2021")
292 .arg(file)
293 .output();
294
295 match result {
296 Err(e) => eprintln!("{e:?}",),
297 Ok(output) => {
298 if !output.status.success() {
299 std::io::stderr().write_all(&output.stderr).unwrap();
300 exit(output.status.code().unwrap_or(1))
301 }
302 }
303 }
304}