dora_node_api/node/
mod.rs

1use crate::{daemon_connection::DaemonChannel, EventStream};
2
3use self::{
4    arrow_utils::{copy_array_into_sample, required_data_size},
5    control_channel::ControlChannel,
6    drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11    config::{DataId, NodeId, NodeRunConfig},
12    descriptor::Descriptor,
13    metadata::ArrowTypeInfoExt,
14    topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15    uhlc,
16};
17
18use dora_message::{
19    daemon_to_node::{DaemonReply, NodeConfig},
20    metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
21    node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
22    DataflowId,
23};
24use eyre::{bail, WrapErr};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27    collections::{BTreeSet, HashMap, VecDeque},
28    ops::{Deref, DerefMut},
29    sync::Arc,
30    time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "metrics")]
35use dora_metrics::run_metrics_monitor;
36#[cfg(feature = "tracing")]
37use dora_tracing::TracingBuilder;
38
39use tokio::runtime::{Handle, Runtime};
40
41pub mod arrow_utils;
42mod control_channel;
43mod drop_stream;
44
45pub const ZERO_COPY_THRESHOLD: usize = 4096;
46
47enum TokioRuntime {
48    Runtime(Runtime),
49    Handle(Handle),
50}
51
52pub struct DoraNode {
53    id: NodeId,
54    dataflow_id: DataflowId,
55    node_config: NodeRunConfig,
56    control_channel: ControlChannel,
57    clock: Arc<uhlc::HLC>,
58
59    sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
60    drop_stream: DropStream,
61    cache: VecDeque<ShmemHandle>,
62
63    dataflow_descriptor: serde_yaml::Result<Descriptor>,
64    warned_unknown_output: BTreeSet<DataId>,
65    _rt: TokioRuntime,
66}
67
68impl DoraNode {
69    /// Initiate a node from environment variables set by `dora-coordinator`
70    ///
71    /// ```no_run
72    /// use dora_node_api::DoraNode;
73    ///
74    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
75    /// ```
76    ///
77    pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
78        let node_config: NodeConfig = {
79            let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
80                "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
81            )?;
82            serde_yaml::from_str(&raw).context("failed to deserialize node config")?
83        };
84        #[cfg(feature = "tracing")]
85        {
86            TracingBuilder::new(node_config.node_id.as_ref())
87                .build()
88                .wrap_err("failed to set up tracing subscriber")?;
89        }
90
91        Self::init(node_config)
92    }
93
94    /// Initiate a node from a dataflow id and a node id.
95    ///
96    /// ```no_run
97    /// use dora_node_api::DoraNode;
98    /// use dora_node_api::dora_core::config::NodeId;
99    ///
100    /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
101    /// ```
102    ///
103    pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
104        // Make sure that the node is initialized outside of dora start.
105        let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
106
107        let mut channel =
108            DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
109        let clock = Arc::new(uhlc::HLC::default());
110
111        let reply = channel
112            .request(&Timestamped {
113                inner: DaemonRequest::NodeConfig { node_id },
114                timestamp: clock.new_timestamp(),
115            })
116            .wrap_err("failed to request node config from daemon")?;
117        match reply {
118            DaemonReply::NodeConfig {
119                result: Ok(node_config),
120            } => Self::init(node_config),
121            DaemonReply::NodeConfig { result: Err(error) } => {
122                bail!("failed to get node config from daemon: {error}")
123            }
124            _ => bail!("unexpected reply from daemon"),
125        }
126    }
127
128    pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
129        if std::env::var("DORA_NODE_CONFIG").is_ok() {
130            info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
131            Self::init_from_env()
132        } else {
133            Self::init_from_node_id(node_id)
134        }
135    }
136
137    #[tracing::instrument]
138    pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
139        let NodeConfig {
140            dataflow_id,
141            node_id,
142            run_config,
143            daemon_communication,
144            dataflow_descriptor,
145            dynamic: _,
146        } = node_config;
147        let clock = Arc::new(uhlc::HLC::default());
148        let input_config = run_config.inputs.clone();
149
150        let rt = match Handle::try_current() {
151            Ok(handle) => TokioRuntime::Handle(handle),
152            Err(_) => TokioRuntime::Runtime(
153                tokio::runtime::Builder::new_multi_thread()
154                    .worker_threads(2)
155                    .enable_all()
156                    .build()
157                    .context("tokio runtime failed")?,
158            ),
159        };
160
161        #[cfg(feature = "metrics")]
162        {
163            let id = format!("{}/{}", dataflow_id, node_id);
164            let monitor_task = async move {
165                if let Err(e) = run_metrics_monitor(id.clone())
166                    .await
167                    .wrap_err("metrics monitor exited unexpectedly")
168                {
169                    warn!("metrics monitor failed: {:#?}", e);
170                }
171            };
172            match &rt {
173                TokioRuntime::Runtime(rt) => rt.spawn(monitor_task),
174                TokioRuntime::Handle(handle) => handle.spawn(monitor_task),
175            };
176        }
177
178        let event_stream = EventStream::init(
179            dataflow_id,
180            &node_id,
181            &daemon_communication,
182            input_config,
183            clock.clone(),
184        )
185        .wrap_err("failed to init event stream")?;
186        let drop_stream =
187            DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
188                .wrap_err("failed to init drop stream")?;
189        let control_channel =
190            ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
191                .wrap_err("failed to init control channel")?;
192
193        let node = Self {
194            id: node_id,
195            dataflow_id,
196            node_config: run_config.clone(),
197            control_channel,
198            clock,
199            sent_out_shared_memory: HashMap::new(),
200            drop_stream,
201            cache: VecDeque::new(),
202            dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
203            warned_unknown_output: BTreeSet::new(),
204            _rt: rt,
205        };
206        Ok((node, event_stream))
207    }
208
209    fn validate_output(&mut self, output_id: &DataId) -> bool {
210        if !self.node_config.outputs.contains(output_id) {
211            if !self.warned_unknown_output.contains(output_id) {
212                warn!("Ignoring output `{output_id}` not in node's output list.");
213                self.warned_unknown_output.insert(output_id.clone());
214            }
215            false
216        } else {
217            true
218        }
219    }
220
221    /// Send data from the node to the other nodes.
222    /// We take a closure as an input to enable zero copy on send.
223    ///
224    /// ```no_run
225    /// use dora_node_api::{DoraNode, MetadataParameters};
226    /// use dora_core::config::DataId;
227    ///
228    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
229    ///
230    /// let output = DataId::from("output_id".to_owned());
231    ///
232    /// let data: &[u8] = &[0, 1, 2, 3];
233    /// let parameters = MetadataParameters::default();
234    ///
235    /// node.send_output_raw(
236    ///    output,
237    ///    parameters,
238    ///    data.len(),
239    ///    |out| {
240    ///         out.copy_from_slice(data);
241    ///     }).expect("Could not send output");
242    /// ```
243    ///
244    pub fn send_output_raw<F>(
245        &mut self,
246        output_id: DataId,
247        parameters: MetadataParameters,
248        data_len: usize,
249        data: F,
250    ) -> eyre::Result<()>
251    where
252        F: FnOnce(&mut [u8]),
253    {
254        if !self.validate_output(&output_id) {
255            return Ok(());
256        };
257        let mut sample = self.allocate_data_sample(data_len)?;
258        data(&mut sample);
259
260        let type_info = ArrowTypeInfo::byte_array(data_len);
261
262        self.send_output_sample(output_id, type_info, parameters, Some(sample))
263    }
264
265    pub fn send_output(
266        &mut self,
267        output_id: DataId,
268        parameters: MetadataParameters,
269        data: impl Array,
270    ) -> eyre::Result<()> {
271        if !self.validate_output(&output_id) {
272            return Ok(());
273        };
274
275        let arrow_array = data.to_data();
276
277        let total_len = required_data_size(&arrow_array);
278
279        let mut sample = self.allocate_data_sample(total_len)?;
280        let type_info = copy_array_into_sample(&mut sample, &arrow_array);
281
282        self.send_output_sample(output_id, type_info, parameters, Some(sample))
283            .wrap_err("failed to send output")?;
284
285        Ok(())
286    }
287
288    pub fn send_output_bytes(
289        &mut self,
290        output_id: DataId,
291        parameters: MetadataParameters,
292        data_len: usize,
293        data: &[u8],
294    ) -> eyre::Result<()> {
295        if !self.validate_output(&output_id) {
296            return Ok(());
297        };
298        self.send_output_raw(output_id, parameters, data_len, |sample| {
299            sample.copy_from_slice(data)
300        })
301    }
302
303    pub fn send_typed_output<F>(
304        &mut self,
305        output_id: DataId,
306        type_info: ArrowTypeInfo,
307        parameters: MetadataParameters,
308        data_len: usize,
309        data: F,
310    ) -> eyre::Result<()>
311    where
312        F: FnOnce(&mut [u8]),
313    {
314        if !self.validate_output(&output_id) {
315            return Ok(());
316        };
317
318        let mut sample = self.allocate_data_sample(data_len)?;
319        data(&mut sample);
320
321        self.send_output_sample(output_id, type_info, parameters, Some(sample))
322    }
323
324    pub fn send_output_sample(
325        &mut self,
326        output_id: DataId,
327        type_info: ArrowTypeInfo,
328        parameters: MetadataParameters,
329        sample: Option<DataSample>,
330    ) -> eyre::Result<()> {
331        self.handle_finished_drop_tokens()?;
332
333        let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
334
335        let (data, shmem) = match sample {
336            Some(sample) => sample.finalize(),
337            None => (None, None),
338        };
339
340        self.control_channel
341            .send_message(output_id.clone(), metadata, data)
342            .wrap_err_with(|| format!("failed to send output {output_id}"))?;
343
344        if let Some((shared_memory, drop_token)) = shmem {
345            self.sent_out_shared_memory
346                .insert(drop_token, shared_memory);
347        }
348
349        Ok(())
350    }
351
352    pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
353        for output_id in &outputs {
354            if !self.node_config.outputs.remove(output_id) {
355                eyre::bail!("unknown output {output_id}");
356            }
357        }
358
359        self.control_channel
360            .report_closed_outputs(outputs)
361            .wrap_err("failed to report closed outputs to daemon")?;
362
363        Ok(())
364    }
365
366    pub fn id(&self) -> &NodeId {
367        &self.id
368    }
369
370    pub fn dataflow_id(&self) -> &DataflowId {
371        &self.dataflow_id
372    }
373
374    pub fn node_config(&self) -> &NodeRunConfig {
375        &self.node_config
376    }
377
378    pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
379        let data = if data_len >= ZERO_COPY_THRESHOLD {
380            // create shared memory region
381            let shared_memory = self.allocate_shared_memory(data_len)?;
382
383            DataSample {
384                inner: DataSampleInner::Shmem(shared_memory),
385                len: data_len,
386            }
387        } else {
388            let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
389
390            avec.into()
391        };
392
393        Ok(data)
394    }
395
396    fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
397        let cache_index = self
398            .cache
399            .iter()
400            .enumerate()
401            .rev()
402            .filter(|(_, s)| s.len() >= data_len)
403            .min_by_key(|(_, s)| s.len())
404            .map(|(i, _)| i);
405        let memory = match cache_index {
406            Some(i) => {
407                // we know that this index exists, so we can safely unwrap here
408                self.cache.remove(i).unwrap()
409            }
410            None => ShmemHandle(Box::new(
411                ShmemConf::new()
412                    .size(data_len)
413                    .writable(true)
414                    .create()
415                    .wrap_err("failed to allocate shared memory")?,
416            )),
417        };
418        assert!(memory.len() >= data_len);
419
420        Ok(memory)
421    }
422
423    fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
424        loop {
425            match self.drop_stream.try_recv() {
426                Ok(token) => match self.sent_out_shared_memory.remove(&token) {
427                    Some(region) => self.add_to_cache(region),
428                    None => tracing::warn!("received unknown finished drop token `{token:?}`"),
429                },
430                Err(flume::TryRecvError::Empty) => break,
431                Err(flume::TryRecvError::Disconnected) => {
432                    bail!("event stream was closed before sending all expected drop tokens")
433                }
434            }
435        }
436        Ok(())
437    }
438
439    fn add_to_cache(&mut self, memory: ShmemHandle) {
440        const MAX_CACHE_SIZE: usize = 20;
441
442        self.cache.push_back(memory);
443        while self.cache.len() > MAX_CACHE_SIZE {
444            self.cache.pop_front();
445        }
446    }
447
448    /// Returns the full dataflow descriptor that this node is part of.
449    ///
450    /// This method returns the parsed dataflow YAML file.
451    pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
452        match &self.dataflow_descriptor {
453            Ok(d) => Ok(d),
454            Err(err) => eyre::bail!(
455                "failed to parse dataflow descriptor: {err}\n\n
456                This might be caused by mismatched version numbers of dora \
457                daemon and the dora node API"
458            ),
459        }
460    }
461}
462
463impl Drop for DoraNode {
464    #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
465    fn drop(&mut self) {
466        // close all outputs first to notify subscribers as early as possible
467        if let Err(err) = self
468            .control_channel
469            .report_closed_outputs(
470                std::mem::take(&mut self.node_config.outputs)
471                    .into_iter()
472                    .collect(),
473            )
474            .context("failed to close outputs on drop")
475        {
476            tracing::warn!("{err:?}")
477        }
478
479        while !self.sent_out_shared_memory.is_empty() {
480            if self.drop_stream.len() == 0 {
481                tracing::trace!(
482                    "waiting for {} remaining drop tokens",
483                    self.sent_out_shared_memory.len()
484                );
485            }
486
487            match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
488                Ok(token) => {
489                    self.sent_out_shared_memory.remove(&token);
490                }
491                Err(flume::RecvTimeoutError::Disconnected) => {
492                    tracing::warn!(
493                        "finished_drop_tokens channel closed while still waiting for drop tokens; \
494                        closing {} shared memory regions that might not yet been mapped.",
495                        self.sent_out_shared_memory.len()
496                    );
497                    break;
498                }
499                Err(flume::RecvTimeoutError::Timeout) => {
500                    tracing::warn!(
501                        "timeout while waiting for drop tokens; \
502                        closing {} shared memory regions that might not yet been mapped.",
503                        self.sent_out_shared_memory.len()
504                    );
505                    break;
506                }
507            }
508        }
509
510        if let Err(err) = self.control_channel.report_outputs_done() {
511            tracing::warn!("{err:?}")
512        }
513    }
514}
515
516pub struct DataSample {
517    inner: DataSampleInner,
518    len: usize,
519}
520
521impl DataSample {
522    fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
523        match self.inner {
524            DataSampleInner::Shmem(shared_memory) => {
525                let drop_token = DropToken::generate();
526                let data = DataMessage::SharedMemory {
527                    shared_memory_id: shared_memory.get_os_id().to_owned(),
528                    len: self.len,
529                    drop_token,
530                };
531                (Some(data), Some((shared_memory, drop_token)))
532            }
533            DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
534        }
535    }
536}
537
538impl Deref for DataSample {
539    type Target = [u8];
540
541    fn deref(&self) -> &Self::Target {
542        let slice = match &self.inner {
543            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
544            DataSampleInner::Vec(data) => data,
545        };
546        &slice[..self.len]
547    }
548}
549
550impl DerefMut for DataSample {
551    fn deref_mut(&mut self) -> &mut Self::Target {
552        let slice = match &mut self.inner {
553            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
554            DataSampleInner::Vec(data) => data,
555        };
556        &mut slice[..self.len]
557    }
558}
559
560impl From<AVec<u8, ConstAlign<128>>> for DataSample {
561    fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
562        Self {
563            len: value.len(),
564            inner: DataSampleInner::Vec(value),
565        }
566    }
567}
568
569impl std::fmt::Debug for DataSample {
570    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571        let kind = match &self.inner {
572            DataSampleInner::Shmem(_) => "SharedMemory",
573            DataSampleInner::Vec(_) => "Vec",
574        };
575        f.debug_struct("DataSample")
576            .field("len", &self.len)
577            .field("kind", &kind)
578            .finish_non_exhaustive()
579    }
580}
581
582enum DataSampleInner {
583    Shmem(ShmemHandle),
584    Vec(AVec<u8, ConstAlign<128>>),
585}
586
587struct ShmemHandle(Box<Shmem>);
588
589impl Deref for ShmemHandle {
590    type Target = Shmem;
591
592    fn deref(&self) -> &Self::Target {
593        &self.0
594    }
595}
596
597impl DerefMut for ShmemHandle {
598    fn deref_mut(&mut self) -> &mut Self::Target {
599        &mut self.0
600    }
601}
602
603unsafe impl Send for ShmemHandle {}
604unsafe impl Sync for ShmemHandle {}