pub struct DataflowLayout {
pub inputs: HashSet<Uuid>,
pub outputs: HashSet<Uuid>,
pub queries: HashSet<Uuid>,
pub queryables: HashSet<Uuid>,
pub labels: HashMap<Uuid, String>,
pub nodes: HashMap<Uuid, HashSet<Uuid>>,
}Expand description
Represents a flattened dataflow layout
Fields§
§inputs: HashSet<Uuid>Inputs present in the dataflow
outputs: HashSet<Uuid>Outputs present in the dataflow
queries: HashSet<Uuid>Queries present in the dataflow
queryables: HashSet<Uuid>Queryables present in the dataflow
labels: HashMap<Uuid, String>Labels for nodes and IO, useful for debugging and visualization
nodes: HashMap<Uuid, HashSet<Uuid>>Nodes representation with their IO, useful for debugging and visualization
Implementations§
Source§impl DataflowLayout
impl DataflowLayout
Sourcepub fn new() -> DataflowLayout
pub fn new() -> DataflowLayout
Creates a new empty dataflow layout
Examples found in repository?
examples/simple_runtime.rs (line 7)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (source, output) = layout
10 .node("source", async |builder: &mut NodeIOBuilder| {
11 builder.output("out")
12 })
13 .await;
14
15 let (operator, (op_in, op_out)) = layout
16 .node("operator", async |builder: &mut NodeIOBuilder| {
17 (builder.input("in"), builder.output("out"))
18 })
19 .await;
20
21 let (sink, input) = layout
22 .node("sink", async |builder: &mut NodeIOBuilder| {
23 builder.input("in")
24 })
25 .await;
26
27 let layout = layout.build();
28
29 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
30 builder.connect(op_in, output, None)?;
31 builder.connect(input, op_out, None)?;
32
33 Ok(())
34 })
35 .await?;
36
37 let runtime = Runtime::new(
38 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
39 Ok(())
40 },
41 )
42 .await?;
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 loader
47 .load::<Timer>(source, serde_yml::from_str("frequency: 1.0")?)
48 .await?;
49
50 loader
51 .load::<Transport>(operator, serde_yml::from_str("")?)
52 .await?;
53
54 loader
55 .load::<Printer>(sink, serde_yml::from_str("")?)
56 .await?;
57
58 Ok(())
59 })
60 .await
61}More examples
examples/service_runtime.rs (line 7)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (service, (compare_to_128, compare_to_64)) = layout
10 .node("service", async |builder: &mut NodeIOBuilder| {
11 (
12 builder.queryable("compare_to_128"),
13 builder.queryable("compare_to_64"),
14 )
15 })
16 .await;
17
18 let (client, (ask_128, ask_64)) = layout
19 .node("client", async |builder: &mut NodeIOBuilder| {
20 (builder.query("ask_128"), builder.query("ask_64"))
21 })
22 .await;
23
24 let layout = layout.build();
25
26 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
27 builder.connect(ask_128, compare_to_128, None)?;
28 builder.connect(ask_64, compare_to_64, None)?;
29
30 Ok(())
31 })
32 .await?;
33
34 let runtime = Runtime::new(
35 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
36 Ok(())
37 },
38 )
39 .await?;
40
41 let path = std::env::var("CARGO_MANIFEST_DIR")?;
42 let examples = format!("file://{}/../../target/debug/examples", path);
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 let service_file = Url::parse(&format!("{}/libservice.so", examples))?;
47 let client_file = Url::parse(&format!("{}/libclient.so", examples))?;
48
49 loader
50 .load_url(service_file, service, serde_yml::from_str("")?)
51 .await?;
52
53 loader
54 .load_url(client_file, client, serde_yml::from_str("")?)
55 .await?;
56
57 Ok(())
58 })
59 .await
60}Sourcepub async fn node<T>(
&mut self,
label: impl Into<String>,
builder_function: impl AsyncFnOnce(&mut NodeIOBuilder) -> T,
) -> (NodeLayout, T)
pub async fn node<T>( &mut self, label: impl Into<String>, builder_function: impl AsyncFnOnce(&mut NodeIOBuilder) -> T, ) -> (NodeLayout, T)
Adds a new node to the dataflow layout, providing its label and a builder function for its IO. For convenience, the node layout is returned, together with the result of the builder function.
Examples found in repository?
examples/simple_runtime.rs (lines 10-12)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (source, output) = layout
10 .node("source", async |builder: &mut NodeIOBuilder| {
11 builder.output("out")
12 })
13 .await;
14
15 let (operator, (op_in, op_out)) = layout
16 .node("operator", async |builder: &mut NodeIOBuilder| {
17 (builder.input("in"), builder.output("out"))
18 })
19 .await;
20
21 let (sink, input) = layout
22 .node("sink", async |builder: &mut NodeIOBuilder| {
23 builder.input("in")
24 })
25 .await;
26
27 let layout = layout.build();
28
29 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
30 builder.connect(op_in, output, None)?;
31 builder.connect(input, op_out, None)?;
32
33 Ok(())
34 })
35 .await?;
36
37 let runtime = Runtime::new(
38 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
39 Ok(())
40 },
41 )
42 .await?;
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 loader
47 .load::<Timer>(source, serde_yml::from_str("frequency: 1.0")?)
48 .await?;
49
50 loader
51 .load::<Transport>(operator, serde_yml::from_str("")?)
52 .await?;
53
54 loader
55 .load::<Printer>(sink, serde_yml::from_str("")?)
56 .await?;
57
58 Ok(())
59 })
60 .await
61}More examples
examples/service_runtime.rs (lines 10-15)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (service, (compare_to_128, compare_to_64)) = layout
10 .node("service", async |builder: &mut NodeIOBuilder| {
11 (
12 builder.queryable("compare_to_128"),
13 builder.queryable("compare_to_64"),
14 )
15 })
16 .await;
17
18 let (client, (ask_128, ask_64)) = layout
19 .node("client", async |builder: &mut NodeIOBuilder| {
20 (builder.query("ask_128"), builder.query("ask_64"))
21 })
22 .await;
23
24 let layout = layout.build();
25
26 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
27 builder.connect(ask_128, compare_to_128, None)?;
28 builder.connect(ask_64, compare_to_64, None)?;
29
30 Ok(())
31 })
32 .await?;
33
34 let runtime = Runtime::new(
35 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
36 Ok(())
37 },
38 )
39 .await?;
40
41 let path = std::env::var("CARGO_MANIFEST_DIR")?;
42 let examples = format!("file://{}/../../target/debug/examples", path);
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 let service_file = Url::parse(&format!("{}/libservice.so", examples))?;
47 let client_file = Url::parse(&format!("{}/libclient.so", examples))?;
48
49 loader
50 .load_url(service_file, service, serde_yml::from_str("")?)
51 .await?;
52
53 loader
54 .load_url(client_file, client, serde_yml::from_str("")?)
55 .await?;
56
57 Ok(())
58 })
59 .await
60}Sourcepub fn label(&self, uuid: impl Into<Uuid>) -> String
pub fn label(&self, uuid: impl Into<Uuid>) -> String
Access the label of an entity in the dataflow layout (node, input, output, query, queryable)
Sourcepub fn build(self) -> Arc<DataflowLayout>
pub fn build(self) -> Arc<DataflowLayout>
Build the dataflow layout by making it immutable and returning an Arc
Examples found in repository?
examples/simple_runtime.rs (line 27)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (source, output) = layout
10 .node("source", async |builder: &mut NodeIOBuilder| {
11 builder.output("out")
12 })
13 .await;
14
15 let (operator, (op_in, op_out)) = layout
16 .node("operator", async |builder: &mut NodeIOBuilder| {
17 (builder.input("in"), builder.output("out"))
18 })
19 .await;
20
21 let (sink, input) = layout
22 .node("sink", async |builder: &mut NodeIOBuilder| {
23 builder.input("in")
24 })
25 .await;
26
27 let layout = layout.build();
28
29 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
30 builder.connect(op_in, output, None)?;
31 builder.connect(input, op_out, None)?;
32
33 Ok(())
34 })
35 .await?;
36
37 let runtime = Runtime::new(
38 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
39 Ok(())
40 },
41 )
42 .await?;
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 loader
47 .load::<Timer>(source, serde_yml::from_str("frequency: 1.0")?)
48 .await?;
49
50 loader
51 .load::<Transport>(operator, serde_yml::from_str("")?)
52 .await?;
53
54 loader
55 .load::<Printer>(sink, serde_yml::from_str("")?)
56 .await?;
57
58 Ok(())
59 })
60 .await
61}More examples
examples/service_runtime.rs (line 24)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (service, (compare_to_128, compare_to_64)) = layout
10 .node("service", async |builder: &mut NodeIOBuilder| {
11 (
12 builder.queryable("compare_to_128"),
13 builder.queryable("compare_to_64"),
14 )
15 })
16 .await;
17
18 let (client, (ask_128, ask_64)) = layout
19 .node("client", async |builder: &mut NodeIOBuilder| {
20 (builder.query("ask_128"), builder.query("ask_64"))
21 })
22 .await;
23
24 let layout = layout.build();
25
26 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
27 builder.connect(ask_128, compare_to_128, None)?;
28 builder.connect(ask_64, compare_to_64, None)?;
29
30 Ok(())
31 })
32 .await?;
33
34 let runtime = Runtime::new(
35 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
36 Ok(())
37 },
38 )
39 .await?;
40
41 let path = std::env::var("CARGO_MANIFEST_DIR")?;
42 let examples = format!("file://{}/../../target/debug/examples", path);
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 let service_file = Url::parse(&format!("{}/libservice.so", examples))?;
47 let client_file = Url::parse(&format!("{}/libclient.so", examples))?;
48
49 loader
50 .load_url(service_file, service, serde_yml::from_str("")?)
51 .await?;
52
53 loader
54 .load_url(client_file, client, serde_yml::from_str("")?)
55 .await?;
56
57 Ok(())
58 })
59 .await
60}Trait Implementations§
Source§impl Debug for DataflowLayout
impl Debug for DataflowLayout
Source§impl Default for DataflowLayout
impl Default for DataflowLayout
Source§fn default() -> DataflowLayout
fn default() -> DataflowLayout
Returns the “default value” for a type. Read more
Auto Trait Implementations§
impl Freeze for DataflowLayout
impl RefUnwindSafe for DataflowLayout
impl Send for DataflowLayout
impl Sync for DataflowLayout
impl Unpin for DataflowLayout
impl UnwindSafe for DataflowLayout
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more