DoraNode

Struct DoraNode 

Source
pub struct DoraNode { /* private fields */ }
Expand description

Allows sending outputs and retrieving node information.

The main purpose of this struct is to send outputs via Dora. There are also functions available for retrieving the node configuration.

Implementations§

Source§

impl DoraNode

Source

pub fn init_from_env() -> Result<(Self, EventStream)>

Initiate a node from environment variables set by the Dora daemon or fall back to interactive mode.

This is the recommended initialization function for Dora nodes, which are spawned by Dora daemon instances. The daemon will set a DORA_NODE_CONFIG environment variable to configure the node.

When the node is started manually without the DORA_NODE_CONFIG environment variable set, the initialization will fall back to init_interactive if stdin is a terminal (detected through isatty).

If the DORA_NODE_CONFIG environment variable is not set and DORA_TEST_WITH_INPUTS is set, the node will be initialized in integration test mode. See the integration testing module for details.

This function will also initialize the node in integration test mode when the setup_integration_testing function was called before. This takes precedence over all environment variables.

use dora_node_api::DoraNode;

let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
Source

pub fn init_from_env_force() -> Result<(Self, EventStream)>

Initialize the node from environment variables set by the Dora daemon; error if not set.

This function behaves the same as init_from_env, but it does not fall back to init_interactive. Instead, an error is returned when the DORA_NODE_CONFIG environment variable is missing.

Source

pub fn init_from_node_id(node_id: NodeId) -> Result<(Self, EventStream)>

Initiate a node from a dataflow id and a node id.

This initialization function should be used for dynamic nodes.

use dora_node_api::DoraNode;
use dora_node_api::dora_core::config::NodeId;

let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
Source

pub fn init_flexible(node_id: NodeId) -> Result<(Self, EventStream)>

Dynamic initialization function for nodes that are sometimes used as dynamic nodes.

This function first tries initializing the traditional way through init_from_env. If this fails, it falls back to init_from_node_id.

Source

pub fn init_interactive() -> Result<(Self, EventStream)>

Initialize the node in a standalone mode that prompts for inputs on the terminal.

Instead of connecting to a dora daemon, this interactive mode prompts for node inputs on the terminal. In this mode, the node is completely isolated from the dora daemon and other nodes, so it cannot be part of a dataflow.

Note that this function will hang indefinitely if no input is supplied to the interactive prompt. So it should be only used through a terminal.

Because of the above limitations, it is not recommended to use this function directly. Use init_from_env instead, which supports both normal daemon connections and manual interactive runs.

§Example

Run any node that uses init_interactive or init_from_env directly from a terminal. The node will then start in “interactive mode” and prompt you for the next input:

> cargo build -p rust-dataflow-example-node
> target/debug/rust-dataflow-example-node
hello
Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set
Node asks for next input
? Input ID
[empty input ID to stop]

The rust-dataflow-example-node expects a tick input, so let’s set the input ID to tick. Tick messages don’t have any data, so we leave the “Data” empty when prompted:

Node asks for next input
> Input ID tick
> Data
tick 0, sending 0x943ed1be20c711a4
node sends output random with data: PrimitiveArray<UInt64>
[
  10682205980693303716,
]
Node asks for next input
? Input ID
[empty input ID to stop]

We see that both the stdout output of the node and also the output messages that it sends are printed to the terminal. Then we get another prompt for the next input.

If you want to send an input with data, you can either send it as text (for string data) or as a JSON object (for struct, string, or array data). Other data types are not supported currently.

Empty input IDs are interpreted as stop instructions:

> Input ID
given input ID is empty -> stopping
Received stop
Node asks for next input
event channel was stopped -> returning empty event list
node reports EventStreamDropped
node reports closed outputs []
node reports OutputsDone

In addition to the node output, we see log messages for the different events that the node reports. After OutputsDone, the node should exit.

§JSON data

In addition to text input, the Data prompt also supports JSON objects, which will be converted to Apache Arrow struct arrays:

Node asks for next input
> Input ID some_input
> Data { "field_1": 42, "field_2": { "inner": "foo" } }

This JSON data is converted to the following Arrow array:

StructArray
-- validity: [valid, ]
[
  -- child 0: "field_1" (Int64)
     PrimitiveArray<Int64>
     [42,]
  -- child 1: "field_2" (Struct([Field { name: "inner", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]))
     StructArray
     -- validity: [valid,]
     [
       -- child 0: "inner" (Utf8)
       StringArray
       ["foo",]
     ]
]
Source

pub fn init_testing( input: TestingInput, output: TestingOutput, options: TestingOptions, ) -> Result<(Self, EventStream)>

Initializes a node in integration test mode.

No connection to a dora daemon is made in this mode. Instead, inputs are read from the specified TestingInput, and outputs are written to the specified TestingOutput. Additional options for the testing mode can be specified through TestingOptions.

It is recommended to use this function only within test functions.

Source

pub fn send_output_raw<F>( &mut self, output_id: DataId, parameters: MetadataParameters, data_len: usize, data: F, ) -> Result<()>
where F: FnOnce(&mut [u8]),

Send raw data from the node to the other nodes.

We take a closure as an input to enable zero copy on send.

use dora_node_api::{DoraNode, MetadataParameters};
use dora_core::config::DataId;

let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");

let output = DataId::from("output_id".to_owned());

let data: &[u8] = &[0, 1, 2, 3];
let parameters = MetadataParameters::default();

node.send_output_raw(
   output,
   parameters,
   data.len(),
   |out| {
        out.copy_from_slice(data);
    }).expect("Could not send output");

Ignores the output if the given output_id is not specified as node output in the dataflow configuration file.

Source

pub fn send_output( &mut self, output_id: DataId, parameters: MetadataParameters, data: impl Array, ) -> Result<()>

Sends the give Arrow array as an output message.

Uses shared memory for efficient data transfer if suitable.

This method might copy the message once to move it to shared memory.

Ignores the output if the given output_id is not specified as node output in the dataflow configuration file.

Source

pub fn send_output_bytes( &mut self, output_id: DataId, parameters: MetadataParameters, data_len: usize, data: &[u8], ) -> Result<()>

Send the given raw byte data as output.

Might copy the data once to move it into shared memory.

Ignores the output if the given output_id is not specified as node output in the dataflow configuration file.

Source

pub fn send_typed_output<F>( &mut self, output_id: DataId, type_info: ArrowTypeInfo, parameters: MetadataParameters, data_len: usize, data: F, ) -> Result<()>
where F: FnOnce(&mut [u8]),

Send the give raw byte data with the provided type information.

It is recommended to use a function like send_output instead.

Ignores the output if the given output_id is not specified as node output in the dataflow configuration file.

Source

pub fn send_output_sample( &mut self, output_id: DataId, type_info: ArrowTypeInfo, parameters: MetadataParameters, sample: Option<DataSample>, ) -> Result<()>

Sends the given DataSample as output, combined with the given type information.

It is recommended to use a function like send_output instead.

Ignores the output if the given output_id is not specified as node output in the dataflow configuration file.

Source

pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> Result<()>

Report the given outputs IDs as closed.

The node is not allowed to send more outputs with the closed IDs.

Closing outputs early can be helpful to receivers.

Source

pub fn id(&self) -> &NodeId

Returns the ID of the node as specified in the dataflow configuration file.

Source

pub fn dataflow_id(&self) -> &DataflowId

Returns the unique identifier for the running dataflow instance.

Dora assigns each dataflow instance a random identifier when started.

Source

pub fn node_config(&self) -> &NodeRunConfig

Returns the input and output configuration of this node.

Source

pub fn allocate_data_sample(&mut self, data_len: usize) -> Result<DataSample>

Allocates a DataSample of the specified size.

The data sample will use shared memory when suitable to enable efficient data transfer when sending an output message.

Source

pub fn dataflow_descriptor(&self) -> Result<&Descriptor>

Returns the full dataflow descriptor that this node is part of.

This method returns the parsed dataflow YAML file.

Trait Implementations§

Source§

impl Drop for DoraNode

Source§

fn drop(&mut self)

Executes the destructor for this type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> FutureExt for T

Source§

fn with_context(self, otel_cx: Context) -> WithContext<Self>

Attaches the provided Context to this type, returning a WithContext wrapper. Read more
Source§

fn with_current_context(self) -> WithContext<Self>

Attaches the current Context to this type, returning a WithContext wrapper. Read more
Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<L> LayerExt<L> for L

Source§

fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>
where L: Layer<S>,

Applies the layer to a service and wraps it in Layered.
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more