1use crate::{EventStream, daemon_connection::DaemonChannel};
2
3use self::{
4 arrow_utils::{copy_array_into_sample, required_data_size},
5 control_channel::ControlChannel,
6 drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11 config::{DataId, NodeId, NodeRunConfig},
12 descriptor::Descriptor,
13 metadata::ArrowTypeInfoExt,
14 topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15 uhlc,
16};
17
18use dora_message::{
19 DataflowId,
20 daemon_to_node::{DaemonReply, NodeConfig},
21 metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
22 node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
23};
24use eyre::{WrapErr, bail};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27 collections::{BTreeSet, HashMap, VecDeque},
28 ops::{Deref, DerefMut},
29 sync::Arc,
30 time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "metrics")]
35use dora_metrics::run_metrics_monitor;
36#[cfg(feature = "tracing")]
37use dora_tracing::TracingBuilder;
38
39use tokio::runtime::{Handle, Runtime};
40
41pub mod arrow_utils;
42mod control_channel;
43mod drop_stream;
44
45pub const ZERO_COPY_THRESHOLD: usize = 4096;
60
61#[allow(dead_code)]
62enum TokioRuntime {
63 Runtime(Runtime),
64 Handle(Handle),
65}
66
67pub struct DoraNode {
72 id: NodeId,
73 dataflow_id: DataflowId,
74 node_config: NodeRunConfig,
75 control_channel: ControlChannel,
76 clock: Arc<uhlc::HLC>,
77
78 sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
79 drop_stream: DropStream,
80 cache: VecDeque<ShmemHandle>,
81
82 dataflow_descriptor: serde_yaml::Result<Descriptor>,
83 warned_unknown_output: BTreeSet<DataId>,
84 _rt: TokioRuntime,
85}
86
87impl DoraNode {
88 pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
101 let node_config: NodeConfig = {
102 let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
103 "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
104 )?;
105 serde_yaml::from_str(&raw).context("failed to deserialize node config")?
106 };
107 #[cfg(feature = "tracing")]
108 {
109 TracingBuilder::new(node_config.node_id.as_ref())
110 .build()
111 .wrap_err("failed to set up tracing subscriber")?;
112 }
113
114 Self::init(node_config)
115 }
116
117 pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
129 let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
131
132 let mut channel =
133 DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
134 let clock = Arc::new(uhlc::HLC::default());
135
136 let reply = channel
137 .request(&Timestamped {
138 inner: DaemonRequest::NodeConfig { node_id },
139 timestamp: clock.new_timestamp(),
140 })
141 .wrap_err("failed to request node config from daemon")?;
142 match reply {
143 DaemonReply::NodeConfig {
144 result: Ok(node_config),
145 } => Self::init(node_config),
146 DaemonReply::NodeConfig { result: Err(error) } => {
147 bail!("failed to get node config from daemon: {error}")
148 }
149 _ => bail!("unexpected reply from daemon"),
150 }
151 }
152
153 pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
159 if std::env::var("DORA_NODE_CONFIG").is_ok() {
160 info!(
161 "Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"
162 );
163 Self::init_from_env()
164 } else {
165 Self::init_from_node_id(node_id)
166 }
167 }
168
169 #[doc(hidden)]
171 #[tracing::instrument]
172 pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
173 let NodeConfig {
174 dataflow_id,
175 node_id,
176 run_config,
177 daemon_communication,
178 dataflow_descriptor,
179 dynamic: _,
180 } = node_config;
181 let clock = Arc::new(uhlc::HLC::default());
182 let input_config = run_config.inputs.clone();
183
184 let rt = match Handle::try_current() {
185 Ok(handle) => TokioRuntime::Handle(handle),
186 Err(_) => TokioRuntime::Runtime(
187 tokio::runtime::Builder::new_multi_thread()
188 .worker_threads(2)
189 .enable_all()
190 .build()
191 .context("tokio runtime failed")?,
192 ),
193 };
194
195 #[cfg(feature = "metrics")]
196 {
197 let id = format!("{dataflow_id}/{node_id}");
198 let monitor_task = async move {
199 if let Err(e) = run_metrics_monitor(id.clone())
200 .await
201 .wrap_err("metrics monitor exited unexpectedly")
202 {
203 warn!("metrics monitor failed: {:#?}", e);
204 }
205 };
206 match &rt {
207 TokioRuntime::Runtime(rt) => rt.spawn(monitor_task),
208 TokioRuntime::Handle(handle) => handle.spawn(monitor_task),
209 };
210 }
211
212 let event_stream = EventStream::init(
213 dataflow_id,
214 &node_id,
215 &daemon_communication,
216 input_config,
217 clock.clone(),
218 )
219 .wrap_err("failed to init event stream")?;
220 let drop_stream =
221 DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
222 .wrap_err("failed to init drop stream")?;
223 let control_channel =
224 ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
225 .wrap_err("failed to init control channel")?;
226
227 let node = Self {
228 id: node_id,
229 dataflow_id,
230 node_config: run_config.clone(),
231 control_channel,
232 clock,
233 sent_out_shared_memory: HashMap::new(),
234 drop_stream,
235 cache: VecDeque::new(),
236 dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
237 warned_unknown_output: BTreeSet::new(),
238 _rt: rt,
239 };
240 Ok((node, event_stream))
241 }
242
243 fn validate_output(&mut self, output_id: &DataId) -> bool {
244 if !self.node_config.outputs.contains(output_id) {
245 if !self.warned_unknown_output.contains(output_id) {
246 warn!("Ignoring output `{output_id}` not in node's output list.");
247 self.warned_unknown_output.insert(output_id.clone());
248 }
249 false
250 } else {
251 true
252 }
253 }
254
255 pub fn send_output_raw<F>(
282 &mut self,
283 output_id: DataId,
284 parameters: MetadataParameters,
285 data_len: usize,
286 data: F,
287 ) -> eyre::Result<()>
288 where
289 F: FnOnce(&mut [u8]),
290 {
291 if !self.validate_output(&output_id) {
292 return Ok(());
293 };
294 let mut sample = self.allocate_data_sample(data_len)?;
295 data(&mut sample);
296
297 let type_info = ArrowTypeInfo::byte_array(data_len);
298
299 self.send_output_sample(output_id, type_info, parameters, Some(sample))
300 }
301
302 pub fn send_output(
311 &mut self,
312 output_id: DataId,
313 parameters: MetadataParameters,
314 data: impl Array,
315 ) -> eyre::Result<()> {
316 if !self.validate_output(&output_id) {
317 return Ok(());
318 };
319
320 let arrow_array = data.to_data();
321
322 let total_len = required_data_size(&arrow_array);
323
324 let mut sample = self.allocate_data_sample(total_len)?;
325 let type_info = copy_array_into_sample(&mut sample, &arrow_array);
326
327 self.send_output_sample(output_id, type_info, parameters, Some(sample))
328 .wrap_err("failed to send output")?;
329
330 Ok(())
331 }
332
333 pub fn send_output_bytes(
340 &mut self,
341 output_id: DataId,
342 parameters: MetadataParameters,
343 data_len: usize,
344 data: &[u8],
345 ) -> eyre::Result<()> {
346 if !self.validate_output(&output_id) {
347 return Ok(());
348 };
349 self.send_output_raw(output_id, parameters, data_len, |sample| {
350 sample.copy_from_slice(data)
351 })
352 }
353
354 pub fn send_typed_output<F>(
361 &mut self,
362 output_id: DataId,
363 type_info: ArrowTypeInfo,
364 parameters: MetadataParameters,
365 data_len: usize,
366 data: F,
367 ) -> eyre::Result<()>
368 where
369 F: FnOnce(&mut [u8]),
370 {
371 if !self.validate_output(&output_id) {
372 return Ok(());
373 };
374
375 let mut sample = self.allocate_data_sample(data_len)?;
376 data(&mut sample);
377
378 self.send_output_sample(output_id, type_info, parameters, Some(sample))
379 }
380
381 pub fn send_output_sample(
388 &mut self,
389 output_id: DataId,
390 type_info: ArrowTypeInfo,
391 parameters: MetadataParameters,
392 sample: Option<DataSample>,
393 ) -> eyre::Result<()> {
394 self.handle_finished_drop_tokens()?;
395
396 let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
397
398 let (data, shmem) = match sample {
399 Some(sample) => sample.finalize(),
400 None => (None, None),
401 };
402
403 self.control_channel
404 .send_message(output_id.clone(), metadata, data)
405 .wrap_err_with(|| format!("failed to send output {output_id}"))?;
406
407 if let Some((shared_memory, drop_token)) = shmem {
408 self.sent_out_shared_memory
409 .insert(drop_token, shared_memory);
410 }
411
412 Ok(())
413 }
414
415 pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> {
421 for output_id in &outputs_ids {
422 if !self.node_config.outputs.remove(output_id) {
423 eyre::bail!("unknown output {output_id}");
424 }
425 }
426
427 self.control_channel
428 .report_closed_outputs(outputs_ids)
429 .wrap_err("failed to report closed outputs to daemon")?;
430
431 Ok(())
432 }
433
434 pub fn id(&self) -> &NodeId {
436 &self.id
437 }
438
439 pub fn dataflow_id(&self) -> &DataflowId {
443 &self.dataflow_id
444 }
445
446 pub fn node_config(&self) -> &NodeRunConfig {
448 &self.node_config
449 }
450
451 pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
456 let data = if data_len >= ZERO_COPY_THRESHOLD {
457 let shared_memory = self.allocate_shared_memory(data_len)?;
459
460 DataSample {
461 inner: DataSampleInner::Shmem(shared_memory),
462 len: data_len,
463 }
464 } else {
465 let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
466
467 avec.into()
468 };
469
470 Ok(data)
471 }
472
473 fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
474 let cache_index = self
475 .cache
476 .iter()
477 .enumerate()
478 .rev()
479 .filter(|(_, s)| s.len() >= data_len)
480 .min_by_key(|(_, s)| s.len())
481 .map(|(i, _)| i);
482 let memory = match cache_index {
483 Some(i) => {
484 self.cache.remove(i).unwrap()
486 }
487 None => ShmemHandle(Box::new(
488 ShmemConf::new()
489 .size(data_len)
490 .writable(true)
491 .create()
492 .wrap_err("failed to allocate shared memory")?,
493 )),
494 };
495 assert!(memory.len() >= data_len);
496
497 Ok(memory)
498 }
499
500 fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
501 loop {
502 match self.drop_stream.try_recv() {
503 Ok(token) => match self.sent_out_shared_memory.remove(&token) {
504 Some(region) => self.add_to_cache(region),
505 None => tracing::warn!("received unknown finished drop token `{token:?}`"),
506 },
507 Err(flume::TryRecvError::Empty) => break,
508 Err(flume::TryRecvError::Disconnected) => {
509 bail!("event stream was closed before sending all expected drop tokens")
510 }
511 }
512 }
513 Ok(())
514 }
515
516 fn add_to_cache(&mut self, memory: ShmemHandle) {
517 const MAX_CACHE_SIZE: usize = 20;
518
519 self.cache.push_back(memory);
520 while self.cache.len() > MAX_CACHE_SIZE {
521 self.cache.pop_front();
522 }
523 }
524
525 pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
529 match &self.dataflow_descriptor {
530 Ok(d) => Ok(d),
531 Err(err) => eyre::bail!(
532 "failed to parse dataflow descriptor: {err}\n\n
533 This might be caused by mismatched version numbers of dora \
534 daemon and the dora node API"
535 ),
536 }
537 }
538}
539
540impl Drop for DoraNode {
541 #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
542 fn drop(&mut self) {
543 if let Err(err) = self
545 .control_channel
546 .report_closed_outputs(
547 std::mem::take(&mut self.node_config.outputs)
548 .into_iter()
549 .collect(),
550 )
551 .context("failed to close outputs on drop")
552 {
553 tracing::warn!("{err:?}")
554 }
555
556 while !self.sent_out_shared_memory.is_empty() {
557 if self.drop_stream.is_empty() {
558 tracing::trace!(
559 "waiting for {} remaining drop tokens",
560 self.sent_out_shared_memory.len()
561 );
562 }
563
564 match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
565 Ok(token) => {
566 self.sent_out_shared_memory.remove(&token);
567 }
568 Err(flume::RecvTimeoutError::Disconnected) => {
569 tracing::warn!(
570 "finished_drop_tokens channel closed while still waiting for drop tokens; \
571 closing {} shared memory regions that might not yet been mapped.",
572 self.sent_out_shared_memory.len()
573 );
574 break;
575 }
576 Err(flume::RecvTimeoutError::Timeout) => {
577 tracing::warn!(
578 "timeout while waiting for drop tokens; \
579 closing {} shared memory regions that might not yet been mapped.",
580 self.sent_out_shared_memory.len()
581 );
582 break;
583 }
584 }
585 }
586
587 if let Err(err) = self.control_channel.report_outputs_done() {
588 tracing::warn!("{err:?}")
589 }
590 }
591}
592
593pub struct DataSample {
599 inner: DataSampleInner,
600 len: usize,
601}
602
603impl DataSample {
604 fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
605 match self.inner {
606 DataSampleInner::Shmem(shared_memory) => {
607 let drop_token = DropToken::generate();
608 let data = DataMessage::SharedMemory {
609 shared_memory_id: shared_memory.get_os_id().to_owned(),
610 len: self.len,
611 drop_token,
612 };
613 (Some(data), Some((shared_memory, drop_token)))
614 }
615 DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
616 }
617 }
618}
619
620impl Deref for DataSample {
621 type Target = [u8];
622
623 fn deref(&self) -> &Self::Target {
624 let slice = match &self.inner {
625 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
626 DataSampleInner::Vec(data) => data,
627 };
628 &slice[..self.len]
629 }
630}
631
632impl DerefMut for DataSample {
633 fn deref_mut(&mut self) -> &mut Self::Target {
634 let slice = match &mut self.inner {
635 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
636 DataSampleInner::Vec(data) => data,
637 };
638 &mut slice[..self.len]
639 }
640}
641
642impl From<AVec<u8, ConstAlign<128>>> for DataSample {
643 fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
644 Self {
645 len: value.len(),
646 inner: DataSampleInner::Vec(value),
647 }
648 }
649}
650
651impl std::fmt::Debug for DataSample {
652 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
653 let kind = match &self.inner {
654 DataSampleInner::Shmem(_) => "SharedMemory",
655 DataSampleInner::Vec(_) => "Vec",
656 };
657 f.debug_struct("DataSample")
658 .field("len", &self.len)
659 .field("kind", &kind)
660 .finish_non_exhaustive()
661 }
662}
663
664enum DataSampleInner {
665 Shmem(ShmemHandle),
666 Vec(AVec<u8, ConstAlign<128>>),
667}
668
669struct ShmemHandle(Box<Shmem>);
670
671impl Deref for ShmemHandle {
672 type Target = Shmem;
673
674 fn deref(&self) -> &Self::Target {
675 &self.0
676 }
677}
678
679impl DerefMut for ShmemHandle {
680 fn deref_mut(&mut self) -> &mut Self::Target {
681 &mut self.0
682 }
683}
684
685unsafe impl Send for ShmemHandle {}
686unsafe impl Sync for ShmemHandle {}