1use std::sync::Arc;
2
3use flarrow_runtime::prelude::*;
4
5use url::Url;
6
7#[derive(Node)]
8pub struct MyOperator {
9 pub input: Input<String>,
10 pub output: Output<String>,
11
12 counter: u32,
13}
14
15#[node(runtime = "default_runtime")]
16impl Node for MyOperator {
17 async fn new(
18 mut inputs: Inputs,
19 mut outputs: Outputs,
20 _: serde_yml::Value,
21 ) -> eyre::Result<Box<dyn Node>>
22 where
23 Self: Sized,
24 {
25 Ok(Box::new(Self {
26 input: inputs.with("in").await?,
27 output: outputs.with("out").await?,
28 counter: 0,
29 }) as Box<dyn Node>)
30 }
31
32 async fn start(mut self: Box<Self>) -> eyre::Result<()> {
33 while let Ok((_, message)) = self.input.recv_async().await {
34 self.counter += 1;
35
36 self.output
37 .send(format!("{} - {}", self.counter, message))
38 .wrap_err("Failed to send message")?;
39 }
40
41 Ok(())
42 }
43}
44
45#[tokio::main]
46async fn main() -> Result<()> {
47 let mut layout = DataflowLayout::new();
48
49 let (source, output) = layout
50 .create_node(async |io: &mut NodeIO| io.open_output("out"))
51 .await;
52
53 let (operator, (op_in, op_out)) = layout
54 .create_node(async |io: &mut NodeIO| (io.open_input("in"), io.open_output("out")))
55 .await;
56
57 let (sink, input) = layout
58 .create_node(async |io: &mut NodeIO| io.open_input("in"))
59 .await;
60
61 let layout = Arc::new(layout);
62 let flows = Flows::new(layout.clone(), async move |connector: &mut Connector| {
63 connector.connect(op_in, output)?;
64 connector.connect(input, op_out)?;
65
66 Ok(())
67 })
68 .await?;
69
70 let path = std::env::var("CARGO_MANIFEST_DIR")?;
71 let examples = format!("file://{}/../../target/debug/examples", path);
72
73 let runtime = DataflowRuntime::new(flows, None, async move |loader: &mut Loader| {
74 loader
75 .load_statically_linked::<MyOperator>(operator, serde_yml::Value::from(""))
76 .await
77 .wrap_err("Failed to load MyOperator")?;
78
79 let source_file = Url::parse("builtin:///timer")?;
80 let sink_file = Url::parse(&format!("{}/libsink.so", examples))?;
81
82 loader
83 .load_from_url(source, source_file, serde_yml::from_str("frequency: 5.0")?)
84 .await
85 .wrap_err("Failed to load source")?;
86 loader
87 .load_from_url(sink, sink_file, serde_yml::Value::from(""))
88 .await
89 .wrap_err("Failed to load sink")?;
90
91 Ok(())
92 })
93 .await?;
94
95 runtime.run().await
96}