dora_node_api/node/
mod.rs

1use crate::{EventStream, daemon_connection::DaemonChannel};
2
3use self::{
4    arrow_utils::{copy_array_into_sample, required_data_size},
5    control_channel::ControlChannel,
6    drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11    config::{DataId, NodeId, NodeRunConfig},
12    descriptor::Descriptor,
13    metadata::ArrowTypeInfoExt,
14    topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15    uhlc,
16};
17
18use dora_message::{
19    DataflowId,
20    daemon_to_node::{DaemonReply, NodeConfig},
21    metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
22    node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
23};
24use eyre::{WrapErr, bail};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27    collections::{BTreeSet, HashMap, VecDeque},
28    ops::{Deref, DerefMut},
29    sync::Arc,
30    time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "metrics")]
35use dora_metrics::run_metrics_monitor;
36#[cfg(feature = "tracing")]
37use dora_tracing::TracingBuilder;
38
39use tokio::runtime::{Handle, Runtime};
40
41pub mod arrow_utils;
42mod control_channel;
43mod drop_stream;
44
45/// The data size threshold at which we start using shared memory.
46///
47/// Shared memory works by sharing memory pages. This means that the smallest
48/// memory region that can be shared is one memory page, which is typically
49/// 4KiB.
50///
51/// Using shared memory for messages smaller than the page size still requires
52/// sharing a full page, so we have some memory overhead. We also have some
53/// performance overhead because we need to issue multiple syscalls. For small
54/// messages it is faster to send them over a traditional TCP stream (or similar).
55///
56/// This hardcoded threshold value specifies which messages are sent through
57/// shared memory. Messages that are smaller than this threshold are sent through
58/// TCP.
59pub const ZERO_COPY_THRESHOLD: usize = 4096;
60
61#[allow(dead_code)]
62enum TokioRuntime {
63    Runtime(Runtime),
64    Handle(Handle),
65}
66
67/// Allows sending outputs and retrieving node information.
68///
69/// The main purpose of this struct is to send outputs via Dora. There are also functions available
70/// for retrieving the node configuration.
71pub struct DoraNode {
72    id: NodeId,
73    dataflow_id: DataflowId,
74    node_config: NodeRunConfig,
75    control_channel: ControlChannel,
76    clock: Arc<uhlc::HLC>,
77
78    sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
79    drop_stream: DropStream,
80    cache: VecDeque<ShmemHandle>,
81
82    dataflow_descriptor: serde_yaml::Result<Descriptor>,
83    warned_unknown_output: BTreeSet<DataId>,
84    _rt: TokioRuntime,
85}
86
87impl DoraNode {
88    /// Initiate a node from environment variables set by the Dora daemon.
89    ///
90    /// This is the recommended initialization function for Dora nodes, which are spawned by
91    /// Dora daemon instances.
92    ///
93    ///
94    /// ```no_run
95    /// use dora_node_api::DoraNode;
96    ///
97    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
98    /// ```
99    ///
100    pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
101        let node_config: NodeConfig = {
102            let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
103                "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
104            )?;
105            serde_yaml::from_str(&raw).context("failed to deserialize node config")?
106        };
107        #[cfg(feature = "tracing")]
108        {
109            TracingBuilder::new(node_config.node_id.as_ref())
110                .build()
111                .wrap_err("failed to set up tracing subscriber")?;
112        }
113
114        Self::init(node_config)
115    }
116
117    /// Initiate a node from a dataflow id and a node id.
118    ///
119    /// This initialization function should be used for [_dynamic nodes_](index.html#dynamic-nodes).
120    ///
121    /// ```no_run
122    /// use dora_node_api::DoraNode;
123    /// use dora_node_api::dora_core::config::NodeId;
124    ///
125    /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
126    /// ```
127    ///
128    pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
129        // Make sure that the node is initialized outside of dora start.
130        let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
131
132        let mut channel =
133            DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
134        let clock = Arc::new(uhlc::HLC::default());
135
136        let reply = channel
137            .request(&Timestamped {
138                inner: DaemonRequest::NodeConfig { node_id },
139                timestamp: clock.new_timestamp(),
140            })
141            .wrap_err("failed to request node config from daemon")?;
142        match reply {
143            DaemonReply::NodeConfig {
144                result: Ok(node_config),
145            } => Self::init(node_config),
146            DaemonReply::NodeConfig { result: Err(error) } => {
147                bail!("failed to get node config from daemon: {error}")
148            }
149            _ => bail!("unexpected reply from daemon"),
150        }
151    }
152
153    /// Dynamic initialization function for nodes that are sometimes used as dynamic nodes.
154    ///
155    /// This function first tries initializing the traditional way through
156    /// [`init_from_env`][Self::init_from_env]. If this fails, it falls back to
157    /// [`init_from_node_id`][Self::init_from_node_id].
158    pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
159        if std::env::var("DORA_NODE_CONFIG").is_ok() {
160            info!(
161                "Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"
162            );
163            Self::init_from_env()
164        } else {
165            Self::init_from_node_id(node_id)
166        }
167    }
168
169    /// Internal initialization routine that should not be used outside of Dora.
170    #[doc(hidden)]
171    #[tracing::instrument]
172    pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
173        let NodeConfig {
174            dataflow_id,
175            node_id,
176            run_config,
177            daemon_communication,
178            dataflow_descriptor,
179            dynamic: _,
180        } = node_config;
181        let clock = Arc::new(uhlc::HLC::default());
182        let input_config = run_config.inputs.clone();
183
184        let rt = match Handle::try_current() {
185            Ok(handle) => TokioRuntime::Handle(handle),
186            Err(_) => TokioRuntime::Runtime(
187                tokio::runtime::Builder::new_multi_thread()
188                    .worker_threads(2)
189                    .enable_all()
190                    .build()
191                    .context("tokio runtime failed")?,
192            ),
193        };
194
195        #[cfg(feature = "metrics")]
196        {
197            let id = format!("{dataflow_id}/{node_id}");
198            let monitor_task = async move {
199                if let Err(e) = run_metrics_monitor(id.clone())
200                    .await
201                    .wrap_err("metrics monitor exited unexpectedly")
202                {
203                    warn!("metrics monitor failed: {:#?}", e);
204                }
205            };
206            match &rt {
207                TokioRuntime::Runtime(rt) => rt.spawn(monitor_task),
208                TokioRuntime::Handle(handle) => handle.spawn(monitor_task),
209            };
210        }
211
212        let event_stream = EventStream::init(
213            dataflow_id,
214            &node_id,
215            &daemon_communication,
216            input_config,
217            clock.clone(),
218        )
219        .wrap_err("failed to init event stream")?;
220        let drop_stream =
221            DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
222                .wrap_err("failed to init drop stream")?;
223        let control_channel =
224            ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
225                .wrap_err("failed to init control channel")?;
226
227        let node = Self {
228            id: node_id,
229            dataflow_id,
230            node_config: run_config.clone(),
231            control_channel,
232            clock,
233            sent_out_shared_memory: HashMap::new(),
234            drop_stream,
235            cache: VecDeque::new(),
236            dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
237            warned_unknown_output: BTreeSet::new(),
238            _rt: rt,
239        };
240        Ok((node, event_stream))
241    }
242
243    fn validate_output(&mut self, output_id: &DataId) -> bool {
244        if !self.node_config.outputs.contains(output_id) {
245            if !self.warned_unknown_output.contains(output_id) {
246                warn!("Ignoring output `{output_id}` not in node's output list.");
247                self.warned_unknown_output.insert(output_id.clone());
248            }
249            false
250        } else {
251            true
252        }
253    }
254
255    /// Send raw data from the node to the other nodes.
256    ///
257    /// We take a closure as an input to enable zero copy on send.
258    ///
259    /// ```no_run
260    /// use dora_node_api::{DoraNode, MetadataParameters};
261    /// use dora_core::config::DataId;
262    ///
263    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
264    ///
265    /// let output = DataId::from("output_id".to_owned());
266    ///
267    /// let data: &[u8] = &[0, 1, 2, 3];
268    /// let parameters = MetadataParameters::default();
269    ///
270    /// node.send_output_raw(
271    ///    output,
272    ///    parameters,
273    ///    data.len(),
274    ///    |out| {
275    ///         out.copy_from_slice(data);
276    ///     }).expect("Could not send output");
277    /// ```
278    ///
279    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
280    /// configuration file.
281    pub fn send_output_raw<F>(
282        &mut self,
283        output_id: DataId,
284        parameters: MetadataParameters,
285        data_len: usize,
286        data: F,
287    ) -> eyre::Result<()>
288    where
289        F: FnOnce(&mut [u8]),
290    {
291        if !self.validate_output(&output_id) {
292            return Ok(());
293        };
294        let mut sample = self.allocate_data_sample(data_len)?;
295        data(&mut sample);
296
297        let type_info = ArrowTypeInfo::byte_array(data_len);
298
299        self.send_output_sample(output_id, type_info, parameters, Some(sample))
300    }
301
302    /// Sends the give Arrow array as an output message.
303    ///
304    /// Uses shared memory for efficient data transfer if suitable.
305    ///
306    /// This method might copy the message once to move it to shared memory.
307    ///    
308    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
309    /// configuration file.
310    pub fn send_output(
311        &mut self,
312        output_id: DataId,
313        parameters: MetadataParameters,
314        data: impl Array,
315    ) -> eyre::Result<()> {
316        if !self.validate_output(&output_id) {
317            return Ok(());
318        };
319
320        let arrow_array = data.to_data();
321
322        let total_len = required_data_size(&arrow_array);
323
324        let mut sample = self.allocate_data_sample(total_len)?;
325        let type_info = copy_array_into_sample(&mut sample, &arrow_array);
326
327        self.send_output_sample(output_id, type_info, parameters, Some(sample))
328            .wrap_err("failed to send output")?;
329
330        Ok(())
331    }
332
333    /// Send the given raw byte data as output.
334    ///
335    /// Might copy the data once to move it into shared memory.
336    ///
337    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
338    /// configuration file.
339    pub fn send_output_bytes(
340        &mut self,
341        output_id: DataId,
342        parameters: MetadataParameters,
343        data_len: usize,
344        data: &[u8],
345    ) -> eyre::Result<()> {
346        if !self.validate_output(&output_id) {
347            return Ok(());
348        };
349        self.send_output_raw(output_id, parameters, data_len, |sample| {
350            sample.copy_from_slice(data)
351        })
352    }
353
354    /// Send the give raw byte data with the provided type information.
355    ///
356    /// It is recommended to use a function like [`send_output`][Self::send_output] instead.
357    ///
358    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
359    /// configuration file.
360    pub fn send_typed_output<F>(
361        &mut self,
362        output_id: DataId,
363        type_info: ArrowTypeInfo,
364        parameters: MetadataParameters,
365        data_len: usize,
366        data: F,
367    ) -> eyre::Result<()>
368    where
369        F: FnOnce(&mut [u8]),
370    {
371        if !self.validate_output(&output_id) {
372            return Ok(());
373        };
374
375        let mut sample = self.allocate_data_sample(data_len)?;
376        data(&mut sample);
377
378        self.send_output_sample(output_id, type_info, parameters, Some(sample))
379    }
380
381    /// Sends the given [`DataSample`] as output, combined with the given type information.
382    ///
383    /// It is recommended to use a function like [`send_output`][Self::send_output] instead.
384    ///
385    /// Ignores the output if the given `output_id` is not specified as node output in the dataflow
386    /// configuration file.
387    pub fn send_output_sample(
388        &mut self,
389        output_id: DataId,
390        type_info: ArrowTypeInfo,
391        parameters: MetadataParameters,
392        sample: Option<DataSample>,
393    ) -> eyre::Result<()> {
394        self.handle_finished_drop_tokens()?;
395
396        let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
397
398        let (data, shmem) = match sample {
399            Some(sample) => sample.finalize(),
400            None => (None, None),
401        };
402
403        self.control_channel
404            .send_message(output_id.clone(), metadata, data)
405            .wrap_err_with(|| format!("failed to send output {output_id}"))?;
406
407        if let Some((shared_memory, drop_token)) = shmem {
408            self.sent_out_shared_memory
409                .insert(drop_token, shared_memory);
410        }
411
412        Ok(())
413    }
414
415    /// Report the given outputs IDs as closed.
416    ///
417    /// The node is not allowed to send more outputs with the closed IDs.
418    ///
419    /// Closing outputs early can be helpful to receivers.
420    pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> {
421        for output_id in &outputs_ids {
422            if !self.node_config.outputs.remove(output_id) {
423                eyre::bail!("unknown output {output_id}");
424            }
425        }
426
427        self.control_channel
428            .report_closed_outputs(outputs_ids)
429            .wrap_err("failed to report closed outputs to daemon")?;
430
431        Ok(())
432    }
433
434    /// Returns the ID of the node as specified in the dataflow configuration file.
435    pub fn id(&self) -> &NodeId {
436        &self.id
437    }
438
439    /// Returns the unique identifier for the running dataflow instance.
440    ///
441    /// Dora assigns each dataflow instance a random identifier when started.
442    pub fn dataflow_id(&self) -> &DataflowId {
443        &self.dataflow_id
444    }
445
446    /// Returns the input and output configuration of this node.
447    pub fn node_config(&self) -> &NodeRunConfig {
448        &self.node_config
449    }
450
451    /// Allocates a [`DataSample`] of the specified size.
452    ///
453    /// The data sample will use shared memory when suitable to enable efficient data transfer
454    /// when sending an output message.
455    pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
456        let data = if data_len >= ZERO_COPY_THRESHOLD {
457            // create shared memory region
458            let shared_memory = self.allocate_shared_memory(data_len)?;
459
460            DataSample {
461                inner: DataSampleInner::Shmem(shared_memory),
462                len: data_len,
463            }
464        } else {
465            let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
466
467            avec.into()
468        };
469
470        Ok(data)
471    }
472
473    fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
474        let cache_index = self
475            .cache
476            .iter()
477            .enumerate()
478            .rev()
479            .filter(|(_, s)| s.len() >= data_len)
480            .min_by_key(|(_, s)| s.len())
481            .map(|(i, _)| i);
482        let memory = match cache_index {
483            Some(i) => {
484                // we know that this index exists, so we can safely unwrap here
485                self.cache.remove(i).unwrap()
486            }
487            None => ShmemHandle(Box::new(
488                ShmemConf::new()
489                    .size(data_len)
490                    .writable(true)
491                    .create()
492                    .wrap_err("failed to allocate shared memory")?,
493            )),
494        };
495        assert!(memory.len() >= data_len);
496
497        Ok(memory)
498    }
499
500    fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
501        loop {
502            match self.drop_stream.try_recv() {
503                Ok(token) => match self.sent_out_shared_memory.remove(&token) {
504                    Some(region) => self.add_to_cache(region),
505                    None => tracing::warn!("received unknown finished drop token `{token:?}`"),
506                },
507                Err(flume::TryRecvError::Empty) => break,
508                Err(flume::TryRecvError::Disconnected) => {
509                    bail!("event stream was closed before sending all expected drop tokens")
510                }
511            }
512        }
513        Ok(())
514    }
515
516    fn add_to_cache(&mut self, memory: ShmemHandle) {
517        const MAX_CACHE_SIZE: usize = 20;
518
519        self.cache.push_back(memory);
520        while self.cache.len() > MAX_CACHE_SIZE {
521            self.cache.pop_front();
522        }
523    }
524
525    /// Returns the full dataflow descriptor that this node is part of.
526    ///
527    /// This method returns the parsed dataflow YAML file.
528    pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
529        match &self.dataflow_descriptor {
530            Ok(d) => Ok(d),
531            Err(err) => eyre::bail!(
532                "failed to parse dataflow descriptor: {err}\n\n
533                This might be caused by mismatched version numbers of dora \
534                daemon and the dora node API"
535            ),
536        }
537    }
538}
539
540impl Drop for DoraNode {
541    #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
542    fn drop(&mut self) {
543        // close all outputs first to notify subscribers as early as possible
544        if let Err(err) = self
545            .control_channel
546            .report_closed_outputs(
547                std::mem::take(&mut self.node_config.outputs)
548                    .into_iter()
549                    .collect(),
550            )
551            .context("failed to close outputs on drop")
552        {
553            tracing::warn!("{err:?}")
554        }
555
556        while !self.sent_out_shared_memory.is_empty() {
557            if self.drop_stream.is_empty() {
558                tracing::trace!(
559                    "waiting for {} remaining drop tokens",
560                    self.sent_out_shared_memory.len()
561                );
562            }
563
564            match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
565                Ok(token) => {
566                    self.sent_out_shared_memory.remove(&token);
567                }
568                Err(flume::RecvTimeoutError::Disconnected) => {
569                    tracing::warn!(
570                        "finished_drop_tokens channel closed while still waiting for drop tokens; \
571                        closing {} shared memory regions that might not yet been mapped.",
572                        self.sent_out_shared_memory.len()
573                    );
574                    break;
575                }
576                Err(flume::RecvTimeoutError::Timeout) => {
577                    tracing::warn!(
578                        "timeout while waiting for drop tokens; \
579                        closing {} shared memory regions that might not yet been mapped.",
580                        self.sent_out_shared_memory.len()
581                    );
582                    break;
583                }
584            }
585        }
586
587        if let Err(err) = self.control_channel.report_outputs_done() {
588            tracing::warn!("{err:?}")
589        }
590    }
591}
592
593/// A data region suitable for sending as an output message.
594///
595/// The region is stored in shared memory when suitable to enable efficient data transfer.
596///
597/// `DataSample` implements the [`Deref`] and [`DerefMut`] traits to read and write the mapped data.
598pub struct DataSample {
599    inner: DataSampleInner,
600    len: usize,
601}
602
603impl DataSample {
604    fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
605        match self.inner {
606            DataSampleInner::Shmem(shared_memory) => {
607                let drop_token = DropToken::generate();
608                let data = DataMessage::SharedMemory {
609                    shared_memory_id: shared_memory.get_os_id().to_owned(),
610                    len: self.len,
611                    drop_token,
612                };
613                (Some(data), Some((shared_memory, drop_token)))
614            }
615            DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
616        }
617    }
618}
619
620impl Deref for DataSample {
621    type Target = [u8];
622
623    fn deref(&self) -> &Self::Target {
624        let slice = match &self.inner {
625            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
626            DataSampleInner::Vec(data) => data,
627        };
628        &slice[..self.len]
629    }
630}
631
632impl DerefMut for DataSample {
633    fn deref_mut(&mut self) -> &mut Self::Target {
634        let slice = match &mut self.inner {
635            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
636            DataSampleInner::Vec(data) => data,
637        };
638        &mut slice[..self.len]
639    }
640}
641
642impl From<AVec<u8, ConstAlign<128>>> for DataSample {
643    fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
644        Self {
645            len: value.len(),
646            inner: DataSampleInner::Vec(value),
647        }
648    }
649}
650
651impl std::fmt::Debug for DataSample {
652    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653        let kind = match &self.inner {
654            DataSampleInner::Shmem(_) => "SharedMemory",
655            DataSampleInner::Vec(_) => "Vec",
656        };
657        f.debug_struct("DataSample")
658            .field("len", &self.len)
659            .field("kind", &kind)
660            .finish_non_exhaustive()
661    }
662}
663
664enum DataSampleInner {
665    Shmem(ShmemHandle),
666    Vec(AVec<u8, ConstAlign<128>>),
667}
668
669struct ShmemHandle(Box<Shmem>);
670
671impl Deref for ShmemHandle {
672    type Target = Shmem;
673
674    fn deref(&self) -> &Self::Target {
675        &self.0
676    }
677}
678
679impl DerefMut for ShmemHandle {
680    fn deref_mut(&mut self) -> &mut Self::Target {
681        &mut self.0
682    }
683}
684
685unsafe impl Send for ShmemHandle {}
686unsafe impl Sync for ShmemHandle {}