dora_node_api/node/
mod.rs

1use crate::{
2    DaemonCommunicationWrapper, EventStream,
3    daemon_connection::{DaemonChannel, IntegrationTestingEvents},
4    integration_testing::{
5        TestingCommunication, TestingInput, TestingOptions, TestingOutput,
6        take_testing_communication,
7    },
8};
9
10use self::{
11    arrow_utils::{copy_array_into_sample, required_data_size},
12    control_channel::ControlChannel,
13    drop_stream::DropStream,
14};
15use aligned_vec::{AVec, ConstAlign};
16use arrow::array::Array;
17use colored::Colorize;
18use dora_core::{
19    config::{DataId, NodeId, NodeRunConfig},
20    descriptor::Descriptor,
21    metadata::ArrowTypeInfoExt,
22    topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
23    uhlc,
24};
25use dora_message::{
26    DataflowId,
27    daemon_to_node::{DaemonCommunication, DaemonReply, NodeConfig},
28    metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
29    node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
30};
31use eyre::{WrapErr, bail};
32use is_terminal::IsTerminal;
33use shared_memory_extended::{Shmem, ShmemConf};
34
35use std::sync::Mutex;
36use std::{
37    collections::{BTreeSet, HashMap, VecDeque},
38    ops::{Deref, DerefMut},
39    path::PathBuf,
40    sync::Arc,
41    time::Duration,
42};
43use tokio::runtime::Handle;
44
45#[cfg(feature = "tracing")]
46use dora_tracing::{OtelGuard, TracingBuilder};
47use tracing::{info, warn};
48
49pub mod arrow_utils;
50mod control_channel;
51mod drop_stream;
52
53/// The data size threshold at which we start using shared memory.
54///
55/// Shared memory works by sharing memory pages. This means that the smallest
56/// memory region that can be shared is one memory page, which is typically
57/// 4KiB.
58///
59/// Using shared memory for messages smaller than the page size still requires
60/// sharing a full page, so we have some memory overhead. We also have some
61/// performance overhead because we need to issue multiple syscalls. For small
62/// messages it is faster to send them over a traditional TCP stream (or similar).
63///
64/// This hardcoded threshold value specifies which messages are sent through
65/// shared memory. Messages that are smaller than this threshold are sent through
66/// TCP.
67pub const ZERO_COPY_THRESHOLD: usize = 4096;
68
69/// Allows sending outputs and retrieving node information.
70///
71/// The main purpose of this struct is to send outputs via Dora. There are also functions available
72/// for retrieving the node configuration.
73pub struct DoraNode {
74    id: NodeId,
75    dataflow_id: DataflowId,
76    node_config: NodeRunConfig,
77    control_channel: ControlChannel,
78    clock: Arc<uhlc::HLC>,
79
80    sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
81    drop_stream: DropStream,
82    cache: VecDeque<ShmemHandle>,
83
84    dataflow_descriptor: serde_yaml::Result<Descriptor>,
85    warned_unknown_output: BTreeSet<DataId>,
86    interactive: bool,
87}
88
89impl DoraNode {
90    /// Initiate a node from environment variables set by the Dora daemon or fall back to
91    /// interactive mode.
92    ///
93    /// This is the recommended initialization function for Dora nodes, which are spawned by
94    /// Dora daemon instances. The daemon will set a `DORA_NODE_CONFIG` environment variable to
95    /// configure the node.
96    ///
97    /// When the node is started manually without the `DORA_NODE_CONFIG` environment variable set,
98    /// the initialization will fall back to [`init_interactive`](Self::init_interactive) if `stdin`
99    /// is a terminal (detected through
100    /// [`isatty`](https://www.man7.org/linux/man-pages/man3/isatty.3.html)).
101    ///
102    /// If the `DORA_NODE_CONFIG` environment variable is not set and `DORA_TEST_WITH_INPUTS` is
103    /// set, the node will be initialized in integration test mode. See the
104    /// [integration testing](crate::integration_testing) module for details.
105    ///
106    /// This function will also initialize the node in integration test mode when the
107    /// [`setup_integration_testing`](crate::integration_testing::setup_integration_testing)
108    /// function was called before. This takes precedence over all environment variables.
109    ///
110    /// ```no_run
111    /// use dora_node_api::DoraNode;
112    ///
113    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
114    /// ```
115    pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
116        Self::init_from_env_inner(true)
117    }
118
119    /// Initialize the node from environment variables set by the Dora daemon; error if not set.
120    ///
121    /// This function behaves the same as [`init_from_env`](Self::init_from_env), but it does _not_
122    /// fall back to [`init_interactive`](Self::init_interactive). Instead, an error is returned
123    /// when the `DORA_NODE_CONFIG` environment variable is missing.
124    pub fn init_from_env_force() -> eyre::Result<(Self, EventStream)> {
125        Self::init_from_env_inner(false)
126    }
127
128    fn init_from_env_inner(fallback_to_interactive: bool) -> eyre::Result<(Self, EventStream)> {
129        if let Some(testing_comm) = take_testing_communication() {
130            let TestingCommunication {
131                input,
132                output,
133                options,
134            } = *testing_comm;
135            return Self::init_testing(input, output, options);
136        }
137
138        // normal execution (started by dora daemon)
139        match std::env::var("DORA_NODE_CONFIG") {
140            Ok(raw) => {
141                let node_config: NodeConfig =
142                    serde_yaml::from_str(&raw).context("failed to deserialize node config")?;
143                return Self::init(node_config);
144            }
145            Err(std::env::VarError::NotUnicode(_)) => {
146                bail!("DORA_NODE_CONFIG env variable is not valid unicode")
147            }
148            Err(std::env::VarError::NotPresent) => {} // continue trying other init methods
149        };
150
151        // node integration test mode
152        match std::env::var("DORA_TEST_WITH_INPUTS") {
153            Ok(raw) => {
154                let input_file = PathBuf::from(raw);
155                let output_file = match std::env::var("DORA_TEST_WRITE_OUTPUTS_TO") {
156                    Ok(raw) => PathBuf::from(raw),
157                    Err(std::env::VarError::NotUnicode(_)) => {
158                        bail!("DORA_TEST_WRITE_OUTPUTS_TO env variable is not valid unicode")
159                    }
160                    Err(std::env::VarError::NotPresent) => {
161                        input_file.with_file_name("outputs.jsonl")
162                    }
163                };
164                let skip_output_time_offsets =
165                    std::env::var_os("DORA_TEST_NO_OUTPUT_TIME_OFFSET").is_some();
166
167                let input = TestingInput::FromJsonFile(input_file);
168                let output = TestingOutput::ToFile(output_file);
169                let options = TestingOptions {
170                    skip_output_time_offsets,
171                };
172
173                return Self::init_testing(input, output, options);
174            }
175            Err(std::env::VarError::NotUnicode(_)) => {
176                bail!("DORA_TEST_WITH_INPUTS env variable is not valid unicode")
177            }
178            Err(std::env::VarError::NotPresent) => {} // continue trying other init methods
179        }
180
181        // interactive mode
182        if fallback_to_interactive && std::io::stdin().is_terminal() {
183            println!(
184                "{}",
185                "Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set"
186                    .green()
187            );
188            return Self::init_interactive();
189        }
190
191        // no run mode applicable
192        bail!("DORA_NODE_CONFIG env variable is not set")
193    }
194
195    /// Initiate a node from a dataflow id and a node id.
196    ///
197    /// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes).
198    ///
199    /// ```no_run
200    /// use dora_node_api::DoraNode;
201    /// use dora_node_api::dora_core::config::NodeId;
202    ///
203    /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
204    /// ```
205    ///
206    pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
207        // Make sure that the node is initialized outside of dora start.
208        let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
209
210        let mut channel =
211            DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
212        let clock = Arc::new(uhlc::HLC::default());
213
214        let reply = channel
215            .request(&Timestamped {
216                inner: DaemonRequest::NodeConfig { node_id },
217                timestamp: clock.new_timestamp(),
218            })
219            .wrap_err("failed to request node config from daemon")?;
220
221        match reply {
222            DaemonReply::NodeConfig {
223                result: Ok(node_config),
224            } => Self::init(node_config),
225            DaemonReply::NodeConfig { result: Err(error) } => {
226                bail!("failed to get node config from daemon: {error}")
227            }
228            _ => bail!("unexpected reply from daemon"),
229        }
230    }
231
232    /// Dynamic initialization function for nodes that are sometimes used as dynamic nodes.
233    ///
234    /// This function first tries initializing the traditional way through
235    /// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to
236    /// [`init_from_node_id`][Self::init_from_node_id].
237    pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
238        if std::env::var("DORA_NODE_CONFIG").is_ok() {
239            info!(
240                "Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"
241            );
242            Self::init_from_env()
243        } else {
244            Self::init_from_node_id(node_id)
245        }
246    }
247
248    /// Initialize the node in a standalone mode that prompts for inputs on the terminal.
249    ///
250    /// Instead of connecting to a `dora daemon`, this interactive mode prompts for node inputs
251    /// on the terminal. In this mode, the node is completely isolated from the dora daemon and
252    /// other nodes, so it cannot be part of a dataflow.
253    ///
254    /// Note that this function will hang indefinitely if no input is supplied to the interactive
255    /// prompt. So it should be only used through a terminal.
256    ///
257    /// Because of the above limitations, it is not recommended to use this function directly.
258    /// Use [**`init_from_env`**](Self::init_from_env) instead, which supports both normal daemon
259    /// connections and manual interactive runs.
260    ///
261    /// ## Example
262    ///
263    /// Run any node that uses `init_interactive` or [`init_from_env`](Self::init_from_env) directly
264    /// from a terminal. The node will then start in "interactive mode" and prompt you for the next
265    /// input:
266    ///
267    /// ```bash
268    /// > cargo build -p rust-dataflow-example-node
269    /// > target/debug/rust-dataflow-example-node
270    /// hello
271    /// Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set
272    /// Node asks for next input
273    /// ? Input ID
274    /// [empty input ID to stop]
275    /// ```
276    ///
277    /// The `rust-dataflow-example-node` expects a `tick` input, so let's set the input ID to
278    /// `tick`. Tick messages don't have any data, so we leave the "Data" empty when prompted:
279    ///
280    /// ```bash
281    /// Node asks for next input
282    /// > Input ID tick
283    /// > Data
284    /// tick 0, sending 0x943ed1be20c711a4
285    /// node sends output random with data: PrimitiveArray<UInt64>
286    /// [
287    ///   10682205980693303716,
288    /// ]
289    /// Node asks for next input
290    /// ? Input ID
291    /// [empty input ID to stop]
292    /// ```
293    ///
294    /// We see that both the `stdout` output of the node and also the output messages that it sends
295    /// are printed to the terminal. Then we get another prompt for the next input.
296    ///
297    /// If you want to send an input with data, you can either send it as text (for string data)
298    /// or as a JSON object (for struct, string, or array data). Other data types are not supported
299    /// currently.
300    ///
301    /// Empty input IDs are interpreted as stop instructions:
302    ///
303    /// ```bash
304    /// > Input ID
305    /// given input ID is empty -> stopping
306    /// Received stop
307    /// Node asks for next input
308    /// event channel was stopped -> returning empty event list
309    /// node reports EventStreamDropped
310    /// node reports closed outputs []
311    /// node reports OutputsDone
312    /// ```
313    ///
314    /// In addition to the node output, we see log messages for the different events that the node
315    /// reports. After `OutputsDone`, the node should exit.
316    ///
317    /// ### JSON data
318    ///
319    /// In addition to text input, the `Data` prompt also supports JSON objects, which will be
320    /// converted to Apache Arrow struct arrays:
321    ///
322    /// ```bash
323    /// Node asks for next input
324    /// > Input ID some_input
325    /// > Data { "field_1": 42, "field_2": { "inner": "foo" } }
326    /// ```
327    ///
328    /// This JSON data is converted to the following Arrow array:
329    ///
330    /// ```text
331    /// StructArray
332    /// -- validity: [valid, ]
333    /// [
334    ///   -- child 0: "field_1" (Int64)
335    ///      PrimitiveArray<Int64>
336    ///      [42,]
337    ///   -- child 1: "field_2" (Struct([Field { name: "inner", data_type: Utf8, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }]))
338    ///      StructArray
339    ///      -- validity: [valid,]
340    ///      [
341    ///        -- child 0: "inner" (Utf8)
342    ///        StringArray
343    ///        ["foo",]
344    ///      ]
345    /// ]
346    /// ```
347    pub fn init_interactive() -> eyre::Result<(Self, EventStream)> {
348        #[cfg(feature = "tracing")]
349        {
350            TracingBuilder::new("node")
351                .with_stdout("debug", false)
352                .build()
353                .wrap_err("failed to set up tracing subscriber")?;
354        }
355
356        let node_config = NodeConfig {
357            dataflow_id: DataflowId::new_v4(),
358            node_id: "".parse()?,
359            run_config: NodeRunConfig {
360                inputs: Default::default(),
361                outputs: Default::default(),
362            },
363            daemon_communication: Some(DaemonCommunication::Interactive),
364            dataflow_descriptor: serde_yaml::Value::Null,
365            dynamic: false,
366            write_events_to: None,
367        };
368        let (mut node, events) = Self::init(node_config)?;
369        node.interactive = true;
370        Ok((node, events))
371    }
372
373    /// Initializes a node in integration test mode.
374    ///
375    /// No connection to a dora daemon is made in this mode. Instead, inputs are read from the
376    /// specified `TestingInput`, and outputs are written to the specified `TestingOutput`.
377    /// Additional options for the testing mode can be specified through `TestingOptions`.
378    ///
379    /// It is recommended to use this function only within test functions.
380    pub fn init_testing(
381        input: TestingInput,
382        output: TestingOutput,
383        options: TestingOptions,
384    ) -> eyre::Result<(Self, EventStream)> {
385        let node_config = NodeConfig {
386            dataflow_id: DataflowId::new_v4(),
387            node_id: "".parse()?,
388            run_config: NodeRunConfig {
389                inputs: Default::default(),
390                outputs: Default::default(),
391            },
392            daemon_communication: None,
393            dataflow_descriptor: serde_yaml::Value::Null,
394            dynamic: false,
395            write_events_to: None,
396        };
397        let testing_comm = TestingCommunication {
398            input,
399            output,
400            options,
401        };
402        let (mut node, events) = Self::init_with_options(node_config, Some(testing_comm))?;
403        node.interactive = true;
404        Ok((node, events))
405    }
406
407    /// Internal initialization routine that should not be used outside of Dora.
408    #[doc(hidden)]
409    #[tracing::instrument]
410    pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
411        Self::init_with_options(node_config, None)
412    }
413
414    #[tracing::instrument(skip(testing_communication))]
415    fn init_with_options(
416        node_config: NodeConfig,
417        testing_communication: Option<TestingCommunication>,
418    ) -> eyre::Result<(Self, EventStream)> {
419        let NodeConfig {
420            dataflow_id,
421            node_id,
422            run_config,
423            daemon_communication,
424            dataflow_descriptor,
425            dynamic,
426            write_events_to,
427        } = node_config;
428        let clock = Arc::new(uhlc::HLC::default());
429        let input_config = run_config.inputs.clone();
430
431        let daemon_communication = match daemon_communication {
432            Some(comm) => comm.into(),
433            None => match testing_communication {
434                Some(comm) => {
435                    let TestingCommunication {
436                        input,
437                        output,
438                        options,
439                    } = comm;
440                    let (sender, mut receiver) = tokio::sync::mpsc::channel(5);
441                    let new_communication = DaemonCommunicationWrapper::Testing { channel: sender };
442                    let mut events = IntegrationTestingEvents::new(input, output, options)?;
443                    std::thread::spawn(move || {
444                        while let Some((request, reply_sender)) = receiver.blocking_recv() {
445                            let reply = events.request(&request);
446                            if reply_sender
447                                .send(reply.unwrap_or_else(|err| {
448                                    DaemonReply::Result(Err(format!("{err:?}")))
449                                }))
450                                .is_err()
451                            {
452                                eprintln!("failed to send reply");
453                            }
454                        }
455                    });
456                    new_communication
457                }
458                None => eyre::bail!("no daemon communication method specified"),
459            },
460        };
461
462        let event_stream = EventStream::init(
463            dataflow_id,
464            &node_id,
465            &daemon_communication,
466            input_config,
467            clock.clone(),
468            write_events_to,
469        )
470        .wrap_err("failed to init event stream")?;
471        let drop_stream =
472            DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
473                .wrap_err("failed to init drop stream")?;
474        let control_channel =
475            ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
476                .wrap_err("failed to init control channel")?;
477        let node = Self {
478            id: node_id,
479            dataflow_id,
480            node_config: run_config.clone(),
481            control_channel,
482            clock,
483            sent_out_shared_memory: HashMap::new(),
484            drop_stream,
485            cache: VecDeque::new(),
486            dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
487            warned_unknown_output: BTreeSet::new(),
488            interactive: false,
489        };
490
491        if dynamic {
492            // Inject env variable from dataflow descriptor.
493            match &node.dataflow_descriptor {
494                Ok(descriptor) => {
495                    if let Some(env_vars) = descriptor
496                        .nodes
497                        .iter()
498                        .find(|n| n.id == node.id)
499                        .and_then(|n| n.env.as_ref())
500                    {
501                        for (key, value) in env_vars {
502                            // SAFETY: setting env variable is safe as long as we don't
503                            // have multiple threads doing it at the same time.
504                            unsafe {
505                                std::env::set_var(key, value.to_string());
506                            }
507                        }
508                    }
509                }
510                Err(err) => {
511                    warn!("Could not parse dataflow descriptor: {err:#}");
512                }
513            }
514        }
515        Ok((node, event_stream))
516    }
517
518    fn validate_output(&mut self, output_id: &DataId) -> bool {
519        if !self.node_config.outputs.contains(output_id) && !self.interactive {
520            if !self.warned_unknown_output.contains(output_id) {
521                warn!("Ignoring output `{output_id}` not in node's output list.");
522                self.warned_unknown_output.insert(output_id.clone());
523            }
524            false
525        } else {
526            true
527        }
528    }
529
530    /// Send raw data from the node to the other nodes.
531    ///
532    /// We take a closure as an input to enable zero copy on send.
533    ///
534    /// ```no_run
535    /// use dora_node_api::{DoraNode, MetadataParameters};
536    /// use dora_core::config::DataId;
537    ///
538    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
539    ///
540    /// let output = DataId::from("output_id".to_owned());
541    ///
542    /// let data: &[u8] = &[0, 1, 2, 3];
543    /// let parameters = MetadataParameters::default();
544    ///
545    /// node.send_output_raw(
546    ///    output,
547    ///    parameters,
548    ///    data.len(),
549    ///    |out| {
550    ///         out.copy_from_slice(data);
551    ///     }).expect("Could not send output");
552    /// ```
553    ///
554    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
555    /// configuration file.
556    pub fn send_output_raw<F>(
557        &mut self,
558        output_id: DataId,
559        parameters: MetadataParameters,
560        data_len: usize,
561        data: F,
562    ) -> eyre::Result<()>
563    where
564        F: FnOnce(&mut [u8]),
565    {
566        if !self.validate_output(&output_id) {
567            return Ok(());
568        };
569        let mut sample = self.allocate_data_sample(data_len)?;
570        data(&mut sample);
571
572        let type_info = ArrowTypeInfo::byte_array(data_len);
573
574        self.send_output_sample(output_id, type_info, parameters, Some(sample))
575    }
576
577    /// Sends the give Arrow array as an output message.
578    ///
579    /// Uses shared memory for efficient data transfer if suitable.
580    ///
581    /// This method might copy the message once to move it to shared memory.
582    ///
583    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
584    /// configuration file.
585    pub fn send_output(
586        &mut self,
587        output_id: DataId,
588        parameters: MetadataParameters,
589        data: impl Array,
590    ) -> eyre::Result<()> {
591        if !self.validate_output(&output_id) {
592            return Ok(());
593        };
594
595        let arrow_array = data.to_data();
596
597        let total_len = required_data_size(&arrow_array);
598
599        let mut sample = self.allocate_data_sample(total_len)?;
600        let type_info = copy_array_into_sample(&mut sample, &arrow_array);
601
602        self.send_output_sample(output_id, type_info, parameters, Some(sample))
603            .wrap_err("failed to send output")?;
604
605        Ok(())
606    }
607
608    /// Send the given raw byte data as output.
609    ///
610    /// Might copy the data once to move it into shared memory.
611    ///
612    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
613    /// configuration file.
614    pub fn send_output_bytes(
615        &mut self,
616        output_id: DataId,
617        parameters: MetadataParameters,
618        data_len: usize,
619        data: &[u8],
620    ) -> eyre::Result<()> {
621        if !self.validate_output(&output_id) {
622            return Ok(());
623        };
624        self.send_output_raw(output_id, parameters, data_len, |sample| {
625            sample.copy_from_slice(data)
626        })
627    }
628
629    /// Send the give raw byte data with the provided type information.
630    ///
631    /// It is recommended to use a function like [`send_output`][Self::send_output] instead.
632    ///
633    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
634    /// configuration file.
635    pub fn send_typed_output<F>(
636        &mut self,
637        output_id: DataId,
638        type_info: ArrowTypeInfo,
639        parameters: MetadataParameters,
640        data_len: usize,
641        data: F,
642    ) -> eyre::Result<()>
643    where
644        F: FnOnce(&mut [u8]),
645    {
646        if !self.validate_output(&output_id) {
647            return Ok(());
648        };
649
650        let mut sample = self.allocate_data_sample(data_len)?;
651        data(&mut sample);
652
653        self.send_output_sample(output_id, type_info, parameters, Some(sample))
654    }
655
656    /// Sends the given [`DataSample`] as output, combined with the given type information.
657    ///
658    /// It is recommended to use a function like [`send_output`][Self::send_output] instead.
659    ///
660    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
661    /// configuration file.
662    pub fn send_output_sample(
663        &mut self,
664        output_id: DataId,
665        type_info: ArrowTypeInfo,
666        parameters: MetadataParameters,
667        sample: Option<DataSample>,
668    ) -> eyre::Result<()> {
669        if !self.interactive {
670            self.handle_finished_drop_tokens()?;
671        }
672
673        let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
674
675        let (data, shmem) = match sample {
676            Some(sample) => sample.finalize(),
677            None => (None, None),
678        };
679
680        self.control_channel
681            .send_message(output_id.clone(), metadata, data)
682            .wrap_err_with(|| format!("failed to send output {output_id}"))?;
683
684        if let Some((shared_memory, drop_token)) = shmem {
685            self.sent_out_shared_memory
686                .insert(drop_token, shared_memory);
687        }
688
689        Ok(())
690    }
691
692    /// Report the given outputs IDs as closed.
693    ///
694    /// The node is not allowed to send more outputs with the closed IDs.
695    ///
696    /// Closing outputs early can be helpful to receivers.
697    pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> {
698        for output_id in &outputs_ids {
699            if !self.node_config.outputs.remove(output_id) {
700                eyre::bail!("unknown output {output_id}");
701            }
702        }
703
704        self.control_channel
705            .report_closed_outputs(outputs_ids)
706            .wrap_err("failed to report closed outputs to daemon")?;
707
708        Ok(())
709    }
710
711    /// Returns the ID of the node as specified in the dataflow configuration file.
712    pub fn id(&self) -> &NodeId {
713        &self.id
714    }
715
716    /// Returns the unique identifier for the running dataflow instance.
717    ///
718    /// Dora assigns each dataflow instance a random identifier when started.
719    pub fn dataflow_id(&self) -> &DataflowId {
720        &self.dataflow_id
721    }
722
723    /// Returns the input and output configuration of this node.
724    pub fn node_config(&self) -> &NodeRunConfig {
725        &self.node_config
726    }
727
728    /// Allocates a [`DataSample`] of the specified size.
729    ///
730    /// The data sample will use shared memory when suitable to enable efficient data transfer
731    /// when sending an output message.
732    pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
733        let data = if data_len >= ZERO_COPY_THRESHOLD && !self.interactive {
734            // create shared memory region
735            let shared_memory = self.allocate_shared_memory(data_len)?;
736
737            DataSample {
738                inner: DataSampleInner::Shmem(shared_memory),
739                len: data_len,
740            }
741        } else {
742            let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
743
744            avec.into()
745        };
746
747        Ok(data)
748    }
749
750    fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
751        let cache_index = self
752            .cache
753            .iter()
754            .enumerate()
755            .rev()
756            .filter(|(_, s)| s.len() >= data_len)
757            .min_by_key(|(_, s)| s.len())
758            .map(|(i, _)| i);
759        let memory = match cache_index {
760            Some(i) => {
761                // we know that this index exists, so we can safely unwrap here
762                self.cache.remove(i).unwrap()
763            }
764            None => ShmemHandle(Box::new(
765                ShmemConf::new()
766                    .size(data_len)
767                    .writable(true)
768                    .create()
769                    .wrap_err("failed to allocate shared memory")?,
770            )),
771        };
772        assert!(memory.len() >= data_len);
773
774        Ok(memory)
775    }
776
777    fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
778        loop {
779            match self.drop_stream.try_recv() {
780                Ok(token) => match self.sent_out_shared_memory.remove(&token) {
781                    Some(region) => self.add_to_cache(region),
782                    None => tracing::warn!("received unknown finished drop token `{token:?}`"),
783                },
784                Err(flume::TryRecvError::Empty) => break,
785                Err(flume::TryRecvError::Disconnected) => {
786                    bail!("event stream was closed before sending all expected drop tokens")
787                }
788            }
789        }
790        Ok(())
791    }
792
793    fn add_to_cache(&mut self, memory: ShmemHandle) {
794        const MAX_CACHE_SIZE: usize = 20;
795
796        self.cache.push_back(memory);
797        while self.cache.len() > MAX_CACHE_SIZE {
798            self.cache.pop_front();
799        }
800    }
801
802    /// Returns the full dataflow descriptor that this node is part of.
803    ///
804    /// This method returns the parsed dataflow YAML file.
805    pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
806        match &self.dataflow_descriptor {
807            Ok(d) => Ok(d),
808            Err(err) => eyre::bail!(
809                "failed to parse dataflow descriptor: {err}\n\n
810                This might be caused by mismatched version numbers of dora \
811                daemon and the dora node API"
812            ),
813        }
814    }
815}
816
817impl Drop for DoraNode {
818    fn drop(&mut self) {
819        // close all outputs first to notify subscribers as early as possible
820        if let Err(err) = self
821            .control_channel
822            .report_closed_outputs(
823                std::mem::take(&mut self.node_config.outputs)
824                    .into_iter()
825                    .collect(),
826            )
827            .context("failed to close outputs on drop")
828        {
829            tracing::warn!("{err:?}")
830        }
831
832        while !self.sent_out_shared_memory.is_empty() {
833            if self.drop_stream.is_empty() {
834                tracing::trace!(
835                    "waiting for {} remaining drop tokens",
836                    self.sent_out_shared_memory.len()
837                );
838            }
839
840            match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
841                Ok(token) => {
842                    self.sent_out_shared_memory.remove(&token);
843                }
844                Err(flume::RecvTimeoutError::Disconnected) => {
845                    tracing::warn!(
846                        "finished_drop_tokens channel closed while still waiting for drop tokens; \
847                        closing {} shared memory regions that might not yet been mapped.",
848                        self.sent_out_shared_memory.len()
849                    );
850                    break;
851                }
852                Err(flume::RecvTimeoutError::Timeout) => {
853                    tracing::warn!(
854                        "timeout while waiting for drop tokens; \
855                        closing {} shared memory regions that might not yet been mapped.",
856                        self.sent_out_shared_memory.len()
857                    );
858                    break;
859                }
860            }
861        }
862
863        if let Err(err) = self.control_channel.report_outputs_done() {
864            tracing::warn!("{err:?}")
865        }
866    }
867}
868
869/// A data region suitable for sending as an output message.
870///
871/// The region is stored in shared memory when suitable to enable efficient data transfer.
872///
873/// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data.
874pub struct DataSample {
875    inner: DataSampleInner,
876    len: usize,
877}
878
879impl DataSample {
880    fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
881        match self.inner {
882            DataSampleInner::Shmem(shared_memory) => {
883                let drop_token = DropToken::generate();
884                let data = DataMessage::SharedMemory {
885                    shared_memory_id: shared_memory.get_os_id().to_owned(),
886                    len: self.len,
887                    drop_token,
888                };
889                (Some(data), Some((shared_memory, drop_token)))
890            }
891            DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
892        }
893    }
894}
895
896impl Deref for DataSample {
897    type Target = [u8];
898
899    fn deref(&self) -> &Self::Target {
900        let slice = match &self.inner {
901            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
902            DataSampleInner::Vec(data) => data,
903        };
904        &slice[..self.len]
905    }
906}
907
908impl DerefMut for DataSample {
909    fn deref_mut(&mut self) -> &mut Self::Target {
910        let slice = match &mut self.inner {
911            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
912            DataSampleInner::Vec(data) => data,
913        };
914        &mut slice[..self.len]
915    }
916}
917
918impl From<AVec<u8, ConstAlign<128>>> for DataSample {
919    fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
920        Self {
921            len: value.len(),
922            inner: DataSampleInner::Vec(value),
923        }
924    }
925}
926
927impl std::fmt::Debug for DataSample {
928    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
929        let kind = match &self.inner {
930            DataSampleInner::Shmem(_) => "SharedMemory",
931            DataSampleInner::Vec(_) => "Vec",
932        };
933        f.debug_struct("DataSample")
934            .field("len", &self.len)
935            .field("kind", &kind)
936            .finish_non_exhaustive()
937    }
938}
939
940enum DataSampleInner {
941    Shmem(ShmemHandle),
942    Vec(AVec<u8, ConstAlign<128>>),
943}
944
945struct ShmemHandle(Box<Shmem>);
946
947impl Deref for ShmemHandle {
948    type Target = Shmem;
949
950    fn deref(&self) -> &Self::Target {
951        &self.0
952    }
953}
954
955impl DerefMut for ShmemHandle {
956    fn deref_mut(&mut self) -> &mut Self::Target {
957        &mut self.0
958    }
959}
960
961unsafe impl Send for ShmemHandle {}
962unsafe impl Sync for ShmemHandle {}
963
964/// Init Opentelemetry Tracing
965///
966/// This requires a tokio runtime spawning this function to be functional
967#[cfg(feature = "tracing")]
968pub fn init_tracing(
969    node_id: &NodeId,
970    dataflow_id: &DataflowId,
971) -> eyre::Result<Arc<Mutex<Option<OtelGuard>>>> {
972    let node_id_str = node_id.to_string();
973    let guard: Arc<Mutex<Option<OtelGuard>>> = Arc::new(Mutex::new(None));
974    let clone = guard.clone();
975    let tracing_monitor = async move {
976        let mut builder = TracingBuilder::new(node_id_str);
977        // Only enable OTLP if environment variable is set
978        if std::env::var("DORA_OTLP_ENDPOINT").is_ok()
979            || std::env::var("DORA_JAEGER_TRACING").is_ok()
980        {
981            builder = builder
982                .with_otlp_tracing()
983                .context("failed to set up OTLP tracing")
984                .unwrap()
985                .with_stdout("info", true);
986            *clone.lock().unwrap() = builder.guard.take();
987        } else {
988            builder = builder.with_stdout("info", true);
989        }
990
991        builder
992            .build()
993            .wrap_err("failed to set up tracing subscriber")
994            .unwrap();
995    };
996
997    let rt = Handle::try_current().context("failed to get tokio runtime handle")?;
998    rt.spawn(tracing_monitor);
999
1000    #[cfg(feature = "metrics")]
1001    {
1002        let id = format!("{dataflow_id}/{node_id}");
1003        let monitor_task = async move {
1004            use dora_metrics::run_metrics_monitor;
1005
1006            if let Err(e) = run_metrics_monitor(id.clone())
1007                .await
1008                .wrap_err("metrics monitor exited unexpectedly")
1009            {
1010                warn!("metrics monitor failed: {:#?}", e);
1011            }
1012        };
1013        let rt = Handle::try_current().context("failed to get tokio runtime handle")?;
1014        rt.spawn(monitor_task);
1015    };
1016    Ok(guard)
1017}