pub struct Flows {
pub inputs_receivers: Arc<Mutex<HashMap<Uuid, Receiver<DataflowMessage>>>>,
pub outputs_senders: Arc<Mutex<HashMap<Uuid, Vec<Sender<DataflowMessage>>>>>,
pub queries_senders: Arc<Mutex<HashMap<Uuid, Sender<DataflowMessage>>>>,
pub queries_receivers: Arc<Mutex<HashMap<Uuid, Receiver<DataflowMessage>>>>,
pub queryables_senders: Arc<Mutex<HashMap<Uuid, HashMap<Uuid, Sender<DataflowMessage>>>>>,
pub queryables_receivers: Arc<Mutex<HashMap<Uuid, Receiver<DataflowMessage>>>>,
}Expand description
Represents the collection of flows in the system (all the MPSC channels)
Fields§
§inputs_receivers: Arc<Mutex<HashMap<Uuid, Receiver<DataflowMessage>>>>§outputs_senders: Arc<Mutex<HashMap<Uuid, Vec<Sender<DataflowMessage>>>>>§queries_senders: Arc<Mutex<HashMap<Uuid, Sender<DataflowMessage>>>>§queries_receivers: Arc<Mutex<HashMap<Uuid, Receiver<DataflowMessage>>>>§queryables_senders: Arc<Mutex<HashMap<Uuid, HashMap<Uuid, Sender<DataflowMessage>>>>>§queryables_receivers: Arc<Mutex<HashMap<Uuid, Receiver<DataflowMessage>>>>Implementations§
Source§impl Flows
impl Flows
Sourcepub async fn new(
layout: Arc<DataflowLayout>,
flows: impl AsyncFnOnce(&mut FlowsBuilder) -> Result<(), Report>,
) -> Result<Flows, Report>
pub async fn new( layout: Arc<DataflowLayout>, flows: impl AsyncFnOnce(&mut FlowsBuilder) -> Result<(), Report>, ) -> Result<Flows, Report>
Creates a new Flows instance with a Building async closure
Examples found in repository?
examples/simple_runtime.rs (lines 29-34)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (source, output) = layout
10 .node("source", async |builder: &mut NodeIOBuilder| {
11 builder.output("out")
12 })
13 .await;
14
15 let (operator, (op_in, op_out)) = layout
16 .node("operator", async |builder: &mut NodeIOBuilder| {
17 (builder.input("in"), builder.output("out"))
18 })
19 .await;
20
21 let (sink, input) = layout
22 .node("sink", async |builder: &mut NodeIOBuilder| {
23 builder.input("in")
24 })
25 .await;
26
27 let layout = layout.build();
28
29 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
30 builder.connect(op_in, output, None)?;
31 builder.connect(input, op_out, None)?;
32
33 Ok(())
34 })
35 .await?;
36
37 let runtime = Runtime::new(
38 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
39 Ok(())
40 },
41 )
42 .await?;
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 loader
47 .load::<Timer>(source, serde_yml::from_str("frequency: 1.0")?)
48 .await?;
49
50 loader
51 .load::<Transport>(operator, serde_yml::from_str("")?)
52 .await?;
53
54 loader
55 .load::<Printer>(sink, serde_yml::from_str("")?)
56 .await?;
57
58 Ok(())
59 })
60 .await
61}More examples
examples/service_runtime.rs (lines 26-31)
4async fn main() -> Result<()> {
5 tracing_subscriber::fmt::init();
6
7 let mut layout = DataflowLayout::new();
8
9 let (service, (compare_to_128, compare_to_64)) = layout
10 .node("service", async |builder: &mut NodeIOBuilder| {
11 (
12 builder.queryable("compare_to_128"),
13 builder.queryable("compare_to_64"),
14 )
15 })
16 .await;
17
18 let (client, (ask_128, ask_64)) = layout
19 .node("client", async |builder: &mut NodeIOBuilder| {
20 (builder.query("ask_128"), builder.query("ask_64"))
21 })
22 .await;
23
24 let layout = layout.build();
25
26 let flows = Flows::new(layout.clone(), async move |builder: &mut FlowsBuilder| {
27 builder.connect(ask_128, compare_to_128, None)?;
28 builder.connect(ask_64, compare_to_64, None)?;
29
30 Ok(())
31 })
32 .await?;
33
34 let runtime = Runtime::new(
35 async |_file_ext: &mut FileExtManagerBuilder, _url_scheme: &mut UrlSchemeManagerBuilder| {
36 Ok(())
37 },
38 )
39 .await?;
40
41 let path = std::env::var("CARGO_MANIFEST_DIR")?;
42 let examples = format!("file://{}/../../target/debug/examples", path);
43
44 runtime
45 .run(flows, async move |loader: &mut NodeLoader| {
46 let service_file = Url::parse(&format!("{}/libservice.so", examples))?;
47 let client_file = Url::parse(&format!("{}/libclient.so", examples))?;
48
49 loader
50 .load_url(service_file, service, serde_yml::from_str("")?)
51 .await?;
52
53 loader
54 .load_url(client_file, client, serde_yml::from_str("")?)
55 .await?;
56
57 Ok(())
58 })
59 .await
60}Sourcepub fn node_io(
&self,
clock: Arc<HLC>,
source: NodeLayout,
) -> (Inputs, Outputs, Queries, Queryables)
pub fn node_io( &self, clock: Arc<HLC>, source: NodeLayout, ) -> (Inputs, Outputs, Queries, Queryables)
This is intended to be used only by the Runtime when loading nodes
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Flows
impl !RefUnwindSafe for Flows
impl Send for Flows
impl Sync for Flows
impl Unpin for Flows
impl !UnwindSafe for Flows
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more