dora_node_api/node/
mod.rs

1use crate::{daemon_connection::DaemonChannel, EventStream};
2
3use self::{
4    arrow_utils::{copy_array_into_sample, required_data_size},
5    control_channel::ControlChannel,
6    drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11    config::{DataId, NodeId, NodeRunConfig},
12    descriptor::Descriptor,
13    metadata::ArrowTypeInfoExt,
14    topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15    uhlc,
16};
17
18use dora_message::{
19    daemon_to_node::{DaemonReply, NodeConfig},
20    metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
21    node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
22    DataflowId,
23};
24use eyre::{bail, WrapErr};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27    collections::{BTreeSet, HashMap, VecDeque},
28    ops::{Deref, DerefMut},
29    sync::Arc,
30    time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "metrics")]
35use dora_metrics::init_meter_provider;
36#[cfg(feature = "tracing")]
37use dora_tracing::set_up_tracing;
38use tokio::runtime::{Handle, Runtime};
39
40pub mod arrow_utils;
41mod control_channel;
42mod drop_stream;
43
44pub const ZERO_COPY_THRESHOLD: usize = 4096;
45
46enum TokioRuntime {
47    Runtime(Runtime),
48    Handle(Handle),
49}
50
51pub struct DoraNode {
52    id: NodeId,
53    dataflow_id: DataflowId,
54    node_config: NodeRunConfig,
55    control_channel: ControlChannel,
56    clock: Arc<uhlc::HLC>,
57
58    sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
59    drop_stream: DropStream,
60    cache: VecDeque<ShmemHandle>,
61
62    dataflow_descriptor: Descriptor,
63    warned_unknown_output: BTreeSet<DataId>,
64    _rt: TokioRuntime,
65}
66
67impl DoraNode {
68    /// Initiate a node from environment variables set by `dora-coordinator`
69    ///
70    /// ```no_run
71    /// use dora_node_api::DoraNode;
72    ///
73    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
74    /// ```
75    ///
76    pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
77        let node_config: NodeConfig = {
78            let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
79                "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
80            )?;
81            serde_yaml::from_str(&raw).context("failed to deserialize node config")?
82        };
83        #[cfg(feature = "tracing")]
84        set_up_tracing(node_config.node_id.as_ref())
85            .context("failed to set up tracing subscriber")?;
86        Self::init(node_config)
87    }
88
89    /// Initiate a node from a dataflow id and a node id.
90    ///
91    /// ```no_run
92    /// use dora_node_api::DoraNode;
93    /// use dora_node_api::dora_core::config::NodeId;
94    ///
95    /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
96    /// ```
97    ///
98    pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
99        // Make sure that the node is initialized outside of dora start.
100        let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
101
102        let mut channel =
103            DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
104        let clock = Arc::new(uhlc::HLC::default());
105
106        let reply = channel
107            .request(&Timestamped {
108                inner: DaemonRequest::NodeConfig { node_id },
109                timestamp: clock.new_timestamp(),
110            })
111            .wrap_err("failed to request node config from daemon")?;
112        match reply {
113            DaemonReply::NodeConfig {
114                result: Ok(node_config),
115            } => Self::init(node_config),
116            DaemonReply::NodeConfig { result: Err(error) } => {
117                bail!("failed to get node config from daemon: {error}")
118            }
119            _ => bail!("unexpected reply from daemon"),
120        }
121    }
122
123    pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
124        if std::env::var("DORA_NODE_CONFIG").is_ok() {
125            info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
126            Self::init_from_env()
127        } else {
128            Self::init_from_node_id(node_id)
129        }
130    }
131
132    #[tracing::instrument]
133    pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
134        let NodeConfig {
135            dataflow_id,
136            node_id,
137            run_config,
138            daemon_communication,
139            dataflow_descriptor,
140            dynamic: _,
141        } = node_config;
142        let clock = Arc::new(uhlc::HLC::default());
143        let input_config = run_config.inputs.clone();
144
145        let rt = match Handle::try_current() {
146            Ok(handle) => TokioRuntime::Handle(handle),
147            Err(_) => TokioRuntime::Runtime(
148                tokio::runtime::Builder::new_multi_thread()
149                    .worker_threads(2)
150                    .enable_all()
151                    .build()
152                    .context("tokio runtime failed")?,
153            ),
154        };
155
156        let id = format!("{}/{}", dataflow_id, node_id);
157
158        #[cfg(feature = "metrics")]
159        match &rt {
160            TokioRuntime::Runtime(rt) => rt.spawn(async {
161                if let Err(e) = init_meter_provider(id)
162                    .await
163                    .context("failed to init metrics provider")
164                {
165                    warn!("could not create metric provider with err: {:#?}", e);
166                }
167            }),
168            TokioRuntime::Handle(handle) => handle.spawn(async {
169                if let Err(e) = init_meter_provider(id)
170                    .await
171                    .context("failed to init metrics provider")
172                {
173                    warn!("could not create metric provider with err: {:#?}", e);
174                }
175            }),
176        };
177
178        let event_stream = EventStream::init(
179            dataflow_id,
180            &node_id,
181            &daemon_communication,
182            input_config,
183            clock.clone(),
184        )
185        .wrap_err("failed to init event stream")?;
186        let drop_stream =
187            DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
188                .wrap_err("failed to init drop stream")?;
189        let control_channel =
190            ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
191                .wrap_err("failed to init control channel")?;
192
193        let node = Self {
194            id: node_id,
195            dataflow_id,
196            node_config: run_config.clone(),
197            control_channel,
198            clock,
199            sent_out_shared_memory: HashMap::new(),
200            drop_stream,
201            cache: VecDeque::new(),
202            dataflow_descriptor,
203            warned_unknown_output: BTreeSet::new(),
204            _rt: rt,
205        };
206        Ok((node, event_stream))
207    }
208
209    fn validate_output(&mut self, output_id: &DataId) -> bool {
210        if !self.node_config.outputs.contains(output_id) {
211            if !self.warned_unknown_output.contains(output_id) {
212                warn!("Ignoring output `{output_id}` not in node's output list.");
213                self.warned_unknown_output.insert(output_id.clone());
214            }
215            false
216        } else {
217            true
218        }
219    }
220
221    /// Send data from the node to the other nodes.
222    /// We take a closure as an input to enable zero copy on send.
223    ///
224    /// ```no_run
225    /// use dora_node_api::{DoraNode, MetadataParameters};
226    /// use dora_core::config::DataId;
227    ///
228    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
229    ///
230    /// let output = DataId::from("output_id".to_owned());
231    ///
232    /// let data: &[u8] = &[0, 1, 2, 3];
233    /// let parameters = MetadataParameters::default();
234    ///
235    /// node.send_output_raw(
236    ///    output,
237    ///    parameters,
238    ///    data.len(),
239    ///    |out| {
240    ///         out.copy_from_slice(data);
241    ///     }).expect("Could not send output");
242    /// ```
243    ///
244    pub fn send_output_raw<F>(
245        &mut self,
246        output_id: DataId,
247        parameters: MetadataParameters,
248        data_len: usize,
249        data: F,
250    ) -> eyre::Result<()>
251    where
252        F: FnOnce(&mut [u8]),
253    {
254        if !self.validate_output(&output_id) {
255            return Ok(());
256        };
257        let mut sample = self.allocate_data_sample(data_len)?;
258        data(&mut sample);
259
260        let type_info = ArrowTypeInfo::byte_array(data_len);
261
262        self.send_output_sample(output_id, type_info, parameters, Some(sample))
263    }
264
265    pub fn send_output(
266        &mut self,
267        output_id: DataId,
268        parameters: MetadataParameters,
269        data: impl Array,
270    ) -> eyre::Result<()> {
271        if !self.validate_output(&output_id) {
272            return Ok(());
273        };
274
275        let arrow_array = data.to_data();
276
277        let total_len = required_data_size(&arrow_array);
278
279        let mut sample = self.allocate_data_sample(total_len)?;
280        let type_info = copy_array_into_sample(&mut sample, &arrow_array);
281
282        self.send_output_sample(output_id, type_info, parameters, Some(sample))
283            .wrap_err("failed to send output")?;
284
285        Ok(())
286    }
287
288    pub fn send_output_bytes(
289        &mut self,
290        output_id: DataId,
291        parameters: MetadataParameters,
292        data_len: usize,
293        data: &[u8],
294    ) -> eyre::Result<()> {
295        if !self.validate_output(&output_id) {
296            return Ok(());
297        };
298        self.send_output_raw(output_id, parameters, data_len, |sample| {
299            sample.copy_from_slice(data)
300        })
301    }
302
303    pub fn send_typed_output<F>(
304        &mut self,
305        output_id: DataId,
306        type_info: ArrowTypeInfo,
307        parameters: MetadataParameters,
308        data_len: usize,
309        data: F,
310    ) -> eyre::Result<()>
311    where
312        F: FnOnce(&mut [u8]),
313    {
314        if !self.validate_output(&output_id) {
315            return Ok(());
316        };
317
318        let mut sample = self.allocate_data_sample(data_len)?;
319        data(&mut sample);
320
321        self.send_output_sample(output_id, type_info, parameters, Some(sample))
322    }
323
324    pub fn send_output_sample(
325        &mut self,
326        output_id: DataId,
327        type_info: ArrowTypeInfo,
328        parameters: MetadataParameters,
329        sample: Option<DataSample>,
330    ) -> eyre::Result<()> {
331        self.handle_finished_drop_tokens()?;
332
333        let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
334
335        let (data, shmem) = match sample {
336            Some(sample) => sample.finalize(),
337            None => (None, None),
338        };
339
340        self.control_channel
341            .send_message(output_id.clone(), metadata, data)
342            .wrap_err_with(|| format!("failed to send output {output_id}"))?;
343
344        if let Some((shared_memory, drop_token)) = shmem {
345            self.sent_out_shared_memory
346                .insert(drop_token, shared_memory);
347        }
348
349        Ok(())
350    }
351
352    pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
353        for output_id in &outputs {
354            if !self.node_config.outputs.remove(output_id) {
355                eyre::bail!("unknown output {output_id}");
356            }
357        }
358
359        self.control_channel
360            .report_closed_outputs(outputs)
361            .wrap_err("failed to report closed outputs to daemon")?;
362
363        Ok(())
364    }
365
366    pub fn id(&self) -> &NodeId {
367        &self.id
368    }
369
370    pub fn dataflow_id(&self) -> &DataflowId {
371        &self.dataflow_id
372    }
373
374    pub fn node_config(&self) -> &NodeRunConfig {
375        &self.node_config
376    }
377
378    pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
379        let data = if data_len >= ZERO_COPY_THRESHOLD {
380            // create shared memory region
381            let shared_memory = self.allocate_shared_memory(data_len)?;
382
383            DataSample {
384                inner: DataSampleInner::Shmem(shared_memory),
385                len: data_len,
386            }
387        } else {
388            let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
389
390            avec.into()
391        };
392
393        Ok(data)
394    }
395
396    fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
397        let cache_index = self
398            .cache
399            .iter()
400            .enumerate()
401            .rev()
402            .filter(|(_, s)| s.len() >= data_len)
403            .min_by_key(|(_, s)| s.len())
404            .map(|(i, _)| i);
405        let memory = match cache_index {
406            Some(i) => {
407                // we know that this index exists, so we can safely unwrap here
408                self.cache.remove(i).unwrap()
409            }
410            None => ShmemHandle(Box::new(
411                ShmemConf::new()
412                    .size(data_len)
413                    .writable(true)
414                    .create()
415                    .wrap_err("failed to allocate shared memory")?,
416            )),
417        };
418        assert!(memory.len() >= data_len);
419
420        Ok(memory)
421    }
422
423    fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
424        loop {
425            match self.drop_stream.try_recv() {
426                Ok(token) => match self.sent_out_shared_memory.remove(&token) {
427                    Some(region) => self.add_to_cache(region),
428                    None => tracing::warn!("received unknown finished drop token `{token:?}`"),
429                },
430                Err(flume::TryRecvError::Empty) => break,
431                Err(flume::TryRecvError::Disconnected) => {
432                    bail!("event stream was closed before sending all expected drop tokens")
433                }
434            }
435        }
436        Ok(())
437    }
438
439    fn add_to_cache(&mut self, memory: ShmemHandle) {
440        const MAX_CACHE_SIZE: usize = 20;
441
442        self.cache.push_back(memory);
443        while self.cache.len() > MAX_CACHE_SIZE {
444            self.cache.pop_front();
445        }
446    }
447
448    /// Returns the full dataflow descriptor that this node is part of.
449    ///
450    /// This method returns the parsed dataflow YAML file.
451    pub fn dataflow_descriptor(&self) -> &Descriptor {
452        &self.dataflow_descriptor
453    }
454}
455
456impl Drop for DoraNode {
457    #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
458    fn drop(&mut self) {
459        // close all outputs first to notify subscribers as early as possible
460        if let Err(err) = self
461            .control_channel
462            .report_closed_outputs(
463                std::mem::take(&mut self.node_config.outputs)
464                    .into_iter()
465                    .collect(),
466            )
467            .context("failed to close outputs on drop")
468        {
469            tracing::warn!("{err:?}")
470        }
471
472        while !self.sent_out_shared_memory.is_empty() {
473            if self.drop_stream.len() == 0 {
474                tracing::trace!(
475                    "waiting for {} remaining drop tokens",
476                    self.sent_out_shared_memory.len()
477                );
478            }
479
480            match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
481                Ok(token) => {
482                    self.sent_out_shared_memory.remove(&token);
483                }
484                Err(flume::RecvTimeoutError::Disconnected) => {
485                    tracing::warn!(
486                        "finished_drop_tokens channel closed while still waiting for drop tokens; \
487                        closing {} shared memory regions that might not yet been mapped.",
488                        self.sent_out_shared_memory.len()
489                    );
490                    break;
491                }
492                Err(flume::RecvTimeoutError::Timeout) => {
493                    tracing::warn!(
494                        "timeout while waiting for drop tokens; \
495                        closing {} shared memory regions that might not yet been mapped.",
496                        self.sent_out_shared_memory.len()
497                    );
498                    break;
499                }
500            }
501        }
502
503        if let Err(err) = self.control_channel.report_outputs_done() {
504            tracing::warn!("{err:?}")
505        }
506    }
507}
508
509pub struct DataSample {
510    inner: DataSampleInner,
511    len: usize,
512}
513
514impl DataSample {
515    fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
516        match self.inner {
517            DataSampleInner::Shmem(shared_memory) => {
518                let drop_token = DropToken::generate();
519                let data = DataMessage::SharedMemory {
520                    shared_memory_id: shared_memory.get_os_id().to_owned(),
521                    len: self.len,
522                    drop_token,
523                };
524                (Some(data), Some((shared_memory, drop_token)))
525            }
526            DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
527        }
528    }
529}
530
531impl Deref for DataSample {
532    type Target = [u8];
533
534    fn deref(&self) -> &Self::Target {
535        let slice = match &self.inner {
536            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
537            DataSampleInner::Vec(data) => data,
538        };
539        &slice[..self.len]
540    }
541}
542
543impl DerefMut for DataSample {
544    fn deref_mut(&mut self) -> &mut Self::Target {
545        let slice = match &mut self.inner {
546            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
547            DataSampleInner::Vec(data) => data,
548        };
549        &mut slice[..self.len]
550    }
551}
552
553impl From<AVec<u8, ConstAlign<128>>> for DataSample {
554    fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
555        Self {
556            len: value.len(),
557            inner: DataSampleInner::Vec(value),
558        }
559    }
560}
561
562impl std::fmt::Debug for DataSample {
563    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564        let kind = match &self.inner {
565            DataSampleInner::Shmem(_) => "SharedMemory",
566            DataSampleInner::Vec(_) => "Vec",
567        };
568        f.debug_struct("DataSample")
569            .field("len", &self.len)
570            .field("kind", &kind)
571            .finish_non_exhaustive()
572    }
573}
574
575enum DataSampleInner {
576    Shmem(ShmemHandle),
577    Vec(AVec<u8, ConstAlign<128>>),
578}
579
580struct ShmemHandle(Box<Shmem>);
581
582impl Deref for ShmemHandle {
583    type Target = Shmem;
584
585    fn deref(&self) -> &Self::Target {
586        &self.0
587    }
588}
589
590impl DerefMut for ShmemHandle {
591    fn deref_mut(&mut self) -> &mut Self::Target {
592        &mut self.0
593    }
594}
595
596unsafe impl Send for ShmemHandle {}
597unsafe impl Sync for ShmemHandle {}