1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use crate::runtime;
use crate::runtime::BlockDescription;
use crate::runtime::BlockId;
use crate::runtime::Error;
use crate::runtime::Flowgraph;
use crate::runtime::FlowgraphDescription;
use crate::runtime::FlowgraphHandle;
use crate::runtime::FlowgraphTask;
use crate::runtime::Pmt;
use crate::runtime::Result;
/// A running [`Flowgraph`] together with its control handle and completion task.
///
/// This value is returned by [`Runtime::start_async`](crate::runtime::Runtime::start_async)
/// and by `Runtime::start` on native targets.
/// It can be split into a [`FlowgraphHandle`] and [`FlowgraphTask`], or used
/// directly to post messages, request descriptions, stop the flowgraph, and
/// wait for its finished [`Flowgraph`].
///
/// Waiting consumes `RunningFlowgraph` because the finished flowgraph is
/// returned to the caller. Clone [`RunningFlowgraph::handle`] first when other
/// tasks need to keep sending control messages while one task waits.
pub struct RunningFlowgraph {
handle: FlowgraphHandle,
task: FlowgraphTask,
}
impl RunningFlowgraph {
pub(crate) fn new(handle: FlowgraphHandle, task: FlowgraphTask) -> Self {
Self { handle, task }
}
/// Get a clonable handle to the running [`Flowgraph`].
pub fn handle(&self) -> FlowgraphHandle {
self.handle.clone()
}
/// Get a control handle scoped to one block in the running flowgraph.
pub fn block(&self, block_id: impl Into<BlockId>) -> runtime::FlowgraphBlockHandle {
self.handle.block(block_id)
}
/// Split the running flowgraph into its completion task and control handle.
///
/// This is useful when one task should own the wait path while other code
/// keeps a handle for control messages.
pub fn split(self) -> (FlowgraphTask, FlowgraphHandle) {
(self.task, self.handle)
}
/// Await flowgraph termination and return the finished [`Flowgraph`].
pub async fn wait_async(self) -> Result<Flowgraph, Error> {
self.task.await
}
/// Block until the flowgraph terminates and return the finished [`Flowgraph`].
#[cfg(not(target_arch = "wasm32"))]
pub fn wait(self) -> Result<Flowgraph, Error> {
crate::runtime::block_on(self.wait_async())
}
/// Post a message to a block without waiting for handler completion.
pub async fn post(
&self,
block_id: impl Into<BlockId>,
port_id: impl Into<crate::runtime::PortId>,
data: Pmt,
) -> Result<(), Error> {
self.handle.post(block_id, port_id, data).await
}
/// Call a message handler on a block and return its result.
pub async fn call(
&self,
block_id: impl Into<BlockId>,
port_id: impl Into<crate::runtime::PortId>,
data: Pmt,
) -> Result<Pmt, Error> {
self.handle.call(block_id, port_id, data).await
}
/// Describe the running flowgraph.
pub async fn describe(&self) -> Result<FlowgraphDescription, Error> {
self.handle.describe().await
}
/// Describe a block in the running flowgraph.
pub async fn describe_block(
&self,
block_id: impl Into<BlockId>,
) -> Result<BlockDescription, Error> {
self.handle.describe_block(block_id).await
}
/// Stop the running flowgraph.
pub async fn stop(&self) -> Result<(), Error> {
self.handle.stop().await
}
/// Stop the running flowgraph and wait until it terminates.
///
/// Returns the finished [`Flowgraph`] after all block tasks have stopped and
/// their block state has been restored into the graph.
pub async fn stop_and_wait(self) -> Result<Flowgraph, Error> {
self.handle.stop().await?;
self.wait_async().await
}
}