dora_node_api/node/
mod.rs

1use crate::{daemon_connection::DaemonChannel, EventStream};
2
3use self::{
4    arrow_utils::{copy_array_into_sample, required_data_size},
5    control_channel::ControlChannel,
6    drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11    config::{DataId, NodeId, NodeRunConfig},
12    descriptor::Descriptor,
13    metadata::ArrowTypeInfoExt,
14    topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15    uhlc,
16};
17
18use dora_message::{
19    daemon_to_node::{DaemonReply, NodeConfig},
20    metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
21    node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
22    DataflowId,
23};
24use eyre::{bail, WrapErr};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27    collections::{BTreeSet, HashMap, VecDeque},
28    ops::{Deref, DerefMut},
29    sync::Arc,
30    time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "tracing")]
35use dora_tracing::set_up_tracing;
36
37pub mod arrow_utils;
38mod control_channel;
39mod drop_stream;
40
41pub const ZERO_COPY_THRESHOLD: usize = 4096;
42
43pub struct DoraNode {
44    id: NodeId,
45    dataflow_id: DataflowId,
46    node_config: NodeRunConfig,
47    control_channel: ControlChannel,
48    clock: Arc<uhlc::HLC>,
49
50    sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
51    drop_stream: DropStream,
52    cache: VecDeque<ShmemHandle>,
53
54    dataflow_descriptor: Descriptor,
55    warned_unknown_output: BTreeSet<DataId>,
56}
57
58impl DoraNode {
59    /// Initiate a node from environment variables set by `dora-coordinator`
60    ///
61    /// ```no_run
62    /// use dora_node_api::DoraNode;
63    ///
64    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
65    /// ```
66    ///
67    pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
68        let node_config: NodeConfig = {
69            let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
70                "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
71            )?;
72            serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
73        };
74        #[cfg(feature = "tracing")]
75        set_up_tracing(node_config.node_id.as_ref())
76            .context("failed to set up tracing subscriber")?;
77        Self::init(node_config)
78    }
79
80    /// Initiate a node from a dataflow id and a node id.
81    ///
82    /// ```no_run
83    /// use dora_node_api::DoraNode;
84    /// use dora_node_api::dora_core::config::NodeId;
85    ///
86    /// let (mut node, mut events) = DoraNode::init_from_node_id(NodeId::from("plot".to_string())).expect("Could not init node plot");
87    /// ```
88    ///
89    pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
90        // Make sure that the node is initialized outside of dora start.
91        let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
92
93        let mut channel =
94            DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
95        let clock = Arc::new(uhlc::HLC::default());
96
97        let reply = channel
98            .request(&Timestamped {
99                inner: DaemonRequest::NodeConfig { node_id },
100                timestamp: clock.new_timestamp(),
101            })
102            .wrap_err("failed to request node config from daemon")?;
103        match reply {
104            DaemonReply::NodeConfig {
105                result: Ok(node_config),
106            } => Self::init(node_config),
107            DaemonReply::NodeConfig { result: Err(error) } => {
108                bail!("failed to get node config from daemon: {error}")
109            }
110            _ => bail!("unexpected reply from daemon"),
111        }
112    }
113
114    pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
115        if std::env::var("DORA_NODE_CONFIG").is_ok() {
116            info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
117            Self::init_from_env()
118        } else {
119            Self::init_from_node_id(node_id)
120        }
121    }
122
123    #[tracing::instrument]
124    pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
125        let NodeConfig {
126            dataflow_id,
127            node_id,
128            run_config,
129            daemon_communication,
130            dataflow_descriptor,
131            dynamic: _,
132        } = node_config;
133        let clock = Arc::new(uhlc::HLC::default());
134        let input_config = run_config.inputs.clone();
135
136        let event_stream = EventStream::init(
137            dataflow_id,
138            &node_id,
139            &daemon_communication,
140            input_config,
141            clock.clone(),
142        )
143        .wrap_err("failed to init event stream")?;
144        let drop_stream =
145            DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
146                .wrap_err("failed to init drop stream")?;
147        let control_channel =
148            ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
149                .wrap_err("failed to init control channel")?;
150
151        let node = Self {
152            id: node_id,
153            dataflow_id,
154            node_config: run_config.clone(),
155            control_channel,
156            clock,
157            sent_out_shared_memory: HashMap::new(),
158            drop_stream,
159            cache: VecDeque::new(),
160            dataflow_descriptor,
161            warned_unknown_output: BTreeSet::new(),
162        };
163        Ok((node, event_stream))
164    }
165
166    fn validate_output(&mut self, output_id: &DataId) -> bool {
167        if !self.node_config.outputs.contains(output_id) {
168            if !self.warned_unknown_output.contains(output_id) {
169                warn!("Ignoring output `{output_id}` not in node's output list.");
170                self.warned_unknown_output.insert(output_id.clone());
171            }
172            false
173        } else {
174            true
175        }
176    }
177
178    /// Send data from the node to the other nodes.
179    /// We take a closure as an input to enable zero copy on send.
180    ///
181    /// ```no_run
182    /// use dora_node_api::{DoraNode, MetadataParameters};
183    /// use dora_core::config::DataId;
184    ///
185    /// let (mut node, mut events) = DoraNode::init_from_env().expect("Could not init node.");
186    ///
187    /// let output = DataId::from("output_id".to_owned());
188    ///
189    /// let data: &[u8] = &[0, 1, 2, 3];
190    /// let parameters = MetadataParameters::default();
191    ///
192    /// node.send_output_raw(
193    ///    output,
194    ///    parameters,
195    ///    data.len(),
196    ///    |out| {
197    ///         out.copy_from_slice(data);
198    ///     }).expect("Could not send output");
199    /// ```
200    ///
201    pub fn send_output_raw<F>(
202        &mut self,
203        output_id: DataId,
204        parameters: MetadataParameters,
205        data_len: usize,
206        data: F,
207    ) -> eyre::Result<()>
208    where
209        F: FnOnce(&mut [u8]),
210    {
211        if !self.validate_output(&output_id) {
212            return Ok(());
213        };
214        let mut sample = self.allocate_data_sample(data_len)?;
215        data(&mut sample);
216
217        let type_info = ArrowTypeInfo::byte_array(data_len);
218
219        self.send_output_sample(output_id, type_info, parameters, Some(sample))
220    }
221
222    pub fn send_output(
223        &mut self,
224        output_id: DataId,
225        parameters: MetadataParameters,
226        data: impl Array,
227    ) -> eyre::Result<()> {
228        if !self.validate_output(&output_id) {
229            return Ok(());
230        };
231
232        let arrow_array = data.to_data();
233
234        let total_len = required_data_size(&arrow_array);
235
236        let mut sample = self.allocate_data_sample(total_len)?;
237        let type_info = copy_array_into_sample(&mut sample, &arrow_array);
238
239        self.send_output_sample(output_id, type_info, parameters, Some(sample))
240            .wrap_err("failed to send output")?;
241
242        Ok(())
243    }
244
245    pub fn send_output_bytes(
246        &mut self,
247        output_id: DataId,
248        parameters: MetadataParameters,
249        data_len: usize,
250        data: &[u8],
251    ) -> eyre::Result<()> {
252        if !self.validate_output(&output_id) {
253            return Ok(());
254        };
255        self.send_output_raw(output_id, parameters, data_len, |sample| {
256            sample.copy_from_slice(data)
257        })
258    }
259
260    pub fn send_typed_output<F>(
261        &mut self,
262        output_id: DataId,
263        type_info: ArrowTypeInfo,
264        parameters: MetadataParameters,
265        data_len: usize,
266        data: F,
267    ) -> eyre::Result<()>
268    where
269        F: FnOnce(&mut [u8]),
270    {
271        if !self.validate_output(&output_id) {
272            return Ok(());
273        };
274
275        let mut sample = self.allocate_data_sample(data_len)?;
276        data(&mut sample);
277
278        self.send_output_sample(output_id, type_info, parameters, Some(sample))
279    }
280
281    pub fn send_output_sample(
282        &mut self,
283        output_id: DataId,
284        type_info: ArrowTypeInfo,
285        parameters: MetadataParameters,
286        sample: Option<DataSample>,
287    ) -> eyre::Result<()> {
288        self.handle_finished_drop_tokens()?;
289
290        let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
291
292        let (data, shmem) = match sample {
293            Some(sample) => sample.finalize(),
294            None => (None, None),
295        };
296
297        self.control_channel
298            .send_message(output_id.clone(), metadata, data)
299            .wrap_err_with(|| format!("failed to send output {output_id}"))?;
300
301        if let Some((shared_memory, drop_token)) = shmem {
302            self.sent_out_shared_memory
303                .insert(drop_token, shared_memory);
304        }
305
306        Ok(())
307    }
308
309    pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
310        for output_id in &outputs {
311            if !self.node_config.outputs.remove(output_id) {
312                eyre::bail!("unknown output {output_id}");
313            }
314        }
315
316        self.control_channel
317            .report_closed_outputs(outputs)
318            .wrap_err("failed to report closed outputs to daemon")?;
319
320        Ok(())
321    }
322
323    pub fn id(&self) -> &NodeId {
324        &self.id
325    }
326
327    pub fn dataflow_id(&self) -> &DataflowId {
328        &self.dataflow_id
329    }
330
331    pub fn node_config(&self) -> &NodeRunConfig {
332        &self.node_config
333    }
334
335    pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
336        let data = if data_len >= ZERO_COPY_THRESHOLD {
337            // create shared memory region
338            let shared_memory = self.allocate_shared_memory(data_len)?;
339
340            DataSample {
341                inner: DataSampleInner::Shmem(shared_memory),
342                len: data_len,
343            }
344        } else {
345            let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
346
347            avec.into()
348        };
349
350        Ok(data)
351    }
352
353    fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
354        let cache_index = self
355            .cache
356            .iter()
357            .enumerate()
358            .rev()
359            .filter(|(_, s)| s.len() >= data_len)
360            .min_by_key(|(_, s)| s.len())
361            .map(|(i, _)| i);
362        let memory = match cache_index {
363            Some(i) => {
364                // we know that this index exists, so we can safely unwrap here
365                self.cache.remove(i).unwrap()
366            }
367            None => ShmemHandle(Box::new(
368                ShmemConf::new()
369                    .size(data_len)
370                    .writable(true)
371                    .create()
372                    .wrap_err("failed to allocate shared memory")?,
373            )),
374        };
375        assert!(memory.len() >= data_len);
376
377        Ok(memory)
378    }
379
380    fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
381        loop {
382            match self.drop_stream.try_recv() {
383                Ok(token) => match self.sent_out_shared_memory.remove(&token) {
384                    Some(region) => self.add_to_cache(region),
385                    None => tracing::warn!("received unknown finished drop token `{token:?}`"),
386                },
387                Err(flume::TryRecvError::Empty) => break,
388                Err(flume::TryRecvError::Disconnected) => {
389                    bail!("event stream was closed before sending all expected drop tokens")
390                }
391            }
392        }
393        Ok(())
394    }
395
396    fn add_to_cache(&mut self, memory: ShmemHandle) {
397        const MAX_CACHE_SIZE: usize = 20;
398
399        self.cache.push_back(memory);
400        while self.cache.len() > MAX_CACHE_SIZE {
401            self.cache.pop_front();
402        }
403    }
404
405    /// Returns the full dataflow descriptor that this node is part of.
406    ///
407    /// This method returns the parsed dataflow YAML file.
408    pub fn dataflow_descriptor(&self) -> &Descriptor {
409        &self.dataflow_descriptor
410    }
411}
412
413impl Drop for DoraNode {
414    #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
415    fn drop(&mut self) {
416        // close all outputs first to notify subscribers as early as possible
417        if let Err(err) = self
418            .control_channel
419            .report_closed_outputs(
420                std::mem::take(&mut self.node_config.outputs)
421                    .into_iter()
422                    .collect(),
423            )
424            .context("failed to close outputs on drop")
425        {
426            tracing::warn!("{err:?}")
427        }
428
429        while !self.sent_out_shared_memory.is_empty() {
430            if self.drop_stream.len() == 0 {
431                tracing::trace!(
432                    "waiting for {} remaining drop tokens",
433                    self.sent_out_shared_memory.len()
434                );
435            }
436
437            match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
438                Ok(token) => {
439                    self.sent_out_shared_memory.remove(&token);
440                }
441                Err(flume::RecvTimeoutError::Disconnected) => {
442                    tracing::warn!(
443                        "finished_drop_tokens channel closed while still waiting for drop tokens; \
444                        closing {} shared memory regions that might not yet been mapped.",
445                        self.sent_out_shared_memory.len()
446                    );
447                    break;
448                }
449                Err(flume::RecvTimeoutError::Timeout) => {
450                    tracing::warn!(
451                        "timeout while waiting for drop tokens; \
452                        closing {} shared memory regions that might not yet been mapped.",
453                        self.sent_out_shared_memory.len()
454                    );
455                    break;
456                }
457            }
458        }
459
460        if let Err(err) = self.control_channel.report_outputs_done() {
461            tracing::warn!("{err:?}")
462        }
463    }
464}
465
466pub struct DataSample {
467    inner: DataSampleInner,
468    len: usize,
469}
470
471impl DataSample {
472    fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
473        match self.inner {
474            DataSampleInner::Shmem(shared_memory) => {
475                let drop_token = DropToken::generate();
476                let data = DataMessage::SharedMemory {
477                    shared_memory_id: shared_memory.get_os_id().to_owned(),
478                    len: self.len,
479                    drop_token,
480                };
481                (Some(data), Some((shared_memory, drop_token)))
482            }
483            DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
484        }
485    }
486}
487
488impl Deref for DataSample {
489    type Target = [u8];
490
491    fn deref(&self) -> &Self::Target {
492        let slice = match &self.inner {
493            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
494            DataSampleInner::Vec(data) => data,
495        };
496        &slice[..self.len]
497    }
498}
499
500impl DerefMut for DataSample {
501    fn deref_mut(&mut self) -> &mut Self::Target {
502        let slice = match &mut self.inner {
503            DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
504            DataSampleInner::Vec(data) => data,
505        };
506        &mut slice[..self.len]
507    }
508}
509
510impl From<AVec<u8, ConstAlign<128>>> for DataSample {
511    fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
512        Self {
513            len: value.len(),
514            inner: DataSampleInner::Vec(value),
515        }
516    }
517}
518
519impl std::fmt::Debug for DataSample {
520    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521        let kind = match &self.inner {
522            DataSampleInner::Shmem(_) => "SharedMemory",
523            DataSampleInner::Vec(_) => "Vec",
524        };
525        f.debug_struct("DataSample")
526            .field("len", &self.len)
527            .field("kind", &kind)
528            .finish_non_exhaustive()
529    }
530}
531
532enum DataSampleInner {
533    Shmem(ShmemHandle),
534    Vec(AVec<u8, ConstAlign<128>>),
535}
536
537struct ShmemHandle(Box<Shmem>);
538
539impl Deref for ShmemHandle {
540    type Target = Shmem;
541
542    fn deref(&self) -> &Self::Target {
543        &self.0
544    }
545}
546
547impl DerefMut for ShmemHandle {
548    fn deref_mut(&mut self) -> &mut Self::Target {
549        &mut self.0
550    }
551}
552
553unsafe impl Send for ShmemHandle {}
554unsafe impl Sync for ShmemHandle {}