runtime/
runtime.rs

1use std::sync::Arc;
2
3use flarrow_runtime::prelude::*;
4
5use url::Url;
6
7#[derive(Node)]
8pub struct MyOperator {
9    pub input: Input<String>,
10    pub output: Output<String>,
11
12    counter: u32,
13}
14
15#[node(runtime = "default_runtime")]
16impl Node for MyOperator {
17    async fn new(
18        mut inputs: Inputs,
19        mut outputs: Outputs,
20        _: serde_yml::Value,
21    ) -> eyre::Result<Box<dyn Node>>
22    where
23        Self: Sized,
24    {
25        Ok(Box::new(Self {
26            input: inputs.with("in").await?,
27            output: outputs.with("out").await?,
28            counter: 0,
29        }) as Box<dyn Node>)
30    }
31
32    async fn start(mut self: Box<Self>) -> eyre::Result<()> {
33        while let Ok((_, message)) = self.input.recv_async().await {
34            self.counter += 1;
35
36            self.output
37                .send(format!("{} - {}", self.counter, message))
38                .wrap_err("Failed to send message")?;
39        }
40
41        Ok(())
42    }
43}
44
45#[tokio::main]
46async fn main() -> Result<()> {
47    let mut layout = DataflowLayout::new();
48
49    let (source, output) = layout
50        .create_node(async |io: &mut NodeIO| io.open_output("out"))
51        .await;
52
53    let (operator, (op_in, op_out)) = layout
54        .create_node(async |io: &mut NodeIO| (io.open_input("in"), io.open_output("out")))
55        .await;
56
57    let (sink, input) = layout
58        .create_node(async |io: &mut NodeIO| io.open_input("in"))
59        .await;
60
61    let layout = Arc::new(layout);
62    let flows = Flows::new(layout.clone(), async move |connector: &mut Connector| {
63        connector.connect(op_in, output)?;
64        connector.connect(input, op_out)?;
65
66        Ok(())
67    })
68    .await?;
69
70    let path = std::env::var("CARGO_MANIFEST_DIR")?;
71    let examples = format!("file://{}/../../target/debug/examples", path);
72
73    let runtime = DataflowRuntime::new(flows, None, async move |loader: &mut Loader| {
74        loader
75            .load_statically_linked::<MyOperator>(operator, serde_yml::Value::from(""))
76            .await
77            .wrap_err("Failed to load MyOperator")?;
78
79        let source_file = Url::parse("builtin:///timer")?;
80        let sink_file = Url::parse(&format!("{}/libsink.so", examples))?;
81
82        loader
83            .load_from_url(source, source_file, serde_yml::from_str("frequency: 5.0")?)
84            .await
85            .wrap_err("Failed to load source")?;
86        loader
87            .load_from_url(sink, sink_file, serde_yml::Value::from(""))
88            .await
89            .wrap_err("Failed to load sink")?;
90
91        Ok(())
92    })
93    .await?;
94
95    runtime.run().await
96}