pub struct DoraNode { /* private fields */ }Expand description
Allows sending outputs and retrieving node information.
The main purpose of this struct is to send outputs via Dora. There are also functions available for retrieving the node configuration.
Implementations§
Source§impl DoraNode
impl DoraNode
Sourcepub fn init_from_env() -> Result<(Self, EventStream)>
pub fn init_from_env() -> Result<(Self, EventStream)>
Initiate a node from environment variables set by the Dora daemon or fall back to interactive mode.
This is the recommended initialization function for Dora nodes, which are spawned by
Dora daemon instances. The daemon will set a DORA_NODE_CONFIG environment variable to
configure the node.
When the node is started manually without the DORA_NODE_CONFIG environment variable set,
the initialization will fall back to init_interactive if stdin
is a terminal (detected through
isatty).
If the DORA_NODE_CONFIG environment variable is not set and DORA_TEST_WITH_INPUTS is
set, the node will be initialized in integration test mode. See the
integration testing module for details.
This function will also initialize the node in integration test mode when the
setup_integration_testing
function was called before. This takes precedence over all environment variables.
use dora_node_api::DoraNode;
let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");Sourcepub fn init_from_env_force() -> Result<(Self, EventStream)>
pub fn init_from_env_force() -> Result<(Self, EventStream)>
Initialize the node from environment variables set by the Dora daemon; error if not set.
This function behaves the same as init_from_env, but it does not
fall back to init_interactive. Instead, an error is returned
when the DORA_NODE_CONFIG environment variable is missing.
Sourcepub fn init_from_node_id(node_id: NodeId) -> Result<(Self, EventStream)>
pub fn init_from_node_id(node_id: NodeId) -> Result<(Self, EventStream)>
Initiate a node from a dataflow id and a node id.
This initialization function should be used for dynamic nodes.
use dora_node_api::DoraNode;
use dora_node_api::dora_core::config::NodeId;
let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");Sourcepub fn init_flexible(node_id: NodeId) -> Result<(Self, EventStream)>
pub fn init_flexible(node_id: NodeId) -> Result<(Self, EventStream)>
Dynamic initialization function for nodes that are sometimes used as dynamic nodes.
This function first tries initializing the traditional way through
init_from_env. If this fails, it falls back to
init_from_node_id.
Sourcepub fn init_interactive() -> Result<(Self, EventStream)>
pub fn init_interactive() -> Result<(Self, EventStream)>
Initialize the node in a standalone mode that prompts for inputs on the terminal.
Instead of connecting to a dora daemon, this interactive mode prompts for node inputs
on the terminal. In this mode, the node is completely isolated from the dora daemon and
other nodes, so it cannot be part of a dataflow.
Note that this function will hang indefinitely if no input is supplied to the interactive prompt. So it should be only used through a terminal.
Because of the above limitations, it is not recommended to use this function directly.
Use init_from_env instead, which supports both normal daemon
connections and manual interactive runs.
§Example
Run any node that uses init_interactive or init_from_env directly
from a terminal. The node will then start in “interactive mode” and prompt you for the next
input:
> cargo build -p rust-dataflow-example-node
> target/debug/rust-dataflow-example-node
hello
Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set
Node asks for next input
? Input ID
[empty input ID to stop]The rust-dataflow-example-node expects a tick input, so let’s set the input ID to
tick. Tick messages don’t have any data, so we leave the “Data” empty when prompted:
Node asks for next input
> Input ID tick
> Data
tick 0, sending 0x943ed1be20c711a4
node sends output random with data: PrimitiveArray<UInt64>
[
10682205980693303716,
]
Node asks for next input
? Input ID
[empty input ID to stop]We see that both the stdout output of the node and also the output messages that it sends
are printed to the terminal. Then we get another prompt for the next input.
If you want to send an input with data, you can either send it as text (for string data) or as a JSON object (for struct, string, or array data). Other data types are not supported currently.
Empty input IDs are interpreted as stop instructions:
> Input ID
given input ID is empty -> stopping
Received stop
Node asks for next input
event channel was stopped -> returning empty event list
node reports EventStreamDropped
node reports closed outputs []
node reports OutputsDoneIn addition to the node output, we see log messages for the different events that the node
reports. After OutputsDone, the node should exit.
§JSON data
In addition to text input, the Data prompt also supports JSON objects, which will be
converted to Apache Arrow struct arrays:
Node asks for next input
> Input ID some_input
> Data { "field_1": 42, "field_2": { "inner": "foo" } }This JSON data is converted to the following Arrow array:
StructArray
-- validity: [valid, ]
[
-- child 0: "field_1" (Int64)
PrimitiveArray<Int64>
[42,]
-- child 1: "field_2" (Struct([Field { name: "inner", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]))
StructArray
-- validity: [valid,]
[
-- child 0: "inner" (Utf8)
StringArray
["foo",]
]
]Sourcepub fn init_testing(
input: TestingInput,
output: TestingOutput,
options: TestingOptions,
) -> Result<(Self, EventStream)>
pub fn init_testing( input: TestingInput, output: TestingOutput, options: TestingOptions, ) -> Result<(Self, EventStream)>
Initializes a node in integration test mode.
No connection to a dora daemon is made in this mode. Instead, inputs are read from the
specified TestingInput, and outputs are written to the specified TestingOutput.
Additional options for the testing mode can be specified through TestingOptions.
It is recommended to use this function only within test functions.
Sourcepub fn send_output_raw<F>(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data_len: usize,
data: F,
) -> Result<()>
pub fn send_output_raw<F>( &mut self, output_id: DataId, parameters: MetadataParameters, data_len: usize, data: F, ) -> Result<()>
Send raw data from the node to the other nodes.
We take a closure as an input to enable zero copy on send.
use dora_node_api::{DoraNode, MetadataParameters};
use dora_core::config::DataId;
let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
let output = DataId::from("output_id".to_owned());
let data: &[u8] = &[0, 1, 2, 3];
let parameters = MetadataParameters::default();
node.send_output_raw(
output,
parameters,
data.len(),
|out| {
out.copy_from_slice(data);
}).expect("Could not send output");Ignores the output if the given output_id is not specified as node output in the dataflow
configuration file.
Sourcepub fn send_output(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data: impl Array,
) -> Result<()>
pub fn send_output( &mut self, output_id: DataId, parameters: MetadataParameters, data: impl Array, ) -> Result<()>
Sends the give Arrow array as an output message.
Uses shared memory for efficient data transfer if suitable.
This method might copy the message once to move it to shared memory.
Ignores the output if the given output_id is not specified as node output in the dataflow
configuration file.
Sourcepub fn send_output_bytes(
&mut self,
output_id: DataId,
parameters: MetadataParameters,
data_len: usize,
data: &[u8],
) -> Result<()>
pub fn send_output_bytes( &mut self, output_id: DataId, parameters: MetadataParameters, data_len: usize, data: &[u8], ) -> Result<()>
Send the given raw byte data as output.
Might copy the data once to move it into shared memory.
Ignores the output if the given output_id is not specified as node output in the dataflow
configuration file.
Sourcepub fn send_typed_output<F>(
&mut self,
output_id: DataId,
type_info: ArrowTypeInfo,
parameters: MetadataParameters,
data_len: usize,
data: F,
) -> Result<()>
pub fn send_typed_output<F>( &mut self, output_id: DataId, type_info: ArrowTypeInfo, parameters: MetadataParameters, data_len: usize, data: F, ) -> Result<()>
Send the give raw byte data with the provided type information.
It is recommended to use a function like send_output instead.
Ignores the output if the given output_id is not specified as node output in the dataflow
configuration file.
Sourcepub fn send_output_sample(
&mut self,
output_id: DataId,
type_info: ArrowTypeInfo,
parameters: MetadataParameters,
sample: Option<DataSample>,
) -> Result<()>
pub fn send_output_sample( &mut self, output_id: DataId, type_info: ArrowTypeInfo, parameters: MetadataParameters, sample: Option<DataSample>, ) -> Result<()>
Sends the given DataSample as output, combined with the given type information.
It is recommended to use a function like send_output instead.
Ignores the output if the given output_id is not specified as node output in the dataflow
configuration file.
Sourcepub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> Result<()>
pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> Result<()>
Report the given outputs IDs as closed.
The node is not allowed to send more outputs with the closed IDs.
Closing outputs early can be helpful to receivers.
Sourcepub fn id(&self) -> &NodeId
pub fn id(&self) -> &NodeId
Returns the ID of the node as specified in the dataflow configuration file.
Sourcepub fn dataflow_id(&self) -> &DataflowId
pub fn dataflow_id(&self) -> &DataflowId
Returns the unique identifier for the running dataflow instance.
Dora assigns each dataflow instance a random identifier when started.
Sourcepub fn node_config(&self) -> &NodeRunConfig
pub fn node_config(&self) -> &NodeRunConfig
Returns the input and output configuration of this node.
Sourcepub fn allocate_data_sample(&mut self, data_len: usize) -> Result<DataSample>
pub fn allocate_data_sample(&mut self, data_len: usize) -> Result<DataSample>
Allocates a DataSample of the specified size.
The data sample will use shared memory when suitable to enable efficient data transfer when sending an output message.
Sourcepub fn dataflow_descriptor(&self) -> Result<&Descriptor>
pub fn dataflow_descriptor(&self) -> Result<&Descriptor>
Returns the full dataflow descriptor that this node is part of.
This method returns the parsed dataflow YAML file.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for DoraNode
impl !RefUnwindSafe for DoraNode
impl Send for DoraNode
impl Sync for DoraNode
impl Unpin for DoraNode
impl !UnwindSafe for DoraNode
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> FutureExt for T
impl<T> FutureExt for T
Source§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
Source§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T in a tonic::Request