1use crate::{
2 DaemonCommunicationWrapper, EventStream,
3 daemon_connection::{DaemonChannel, IntegrationTestingEvents},
4 integration_testing::{
5 TestingCommunication, TestingInput, TestingOptions, TestingOutput,
6 take_testing_communication,
7 },
8};
9
10use self::{
11 arrow_utils::{copy_array_into_sample, required_data_size},
12 control_channel::ControlChannel,
13 drop_stream::DropStream,
14};
15use aligned_vec::{AVec, ConstAlign};
16use arrow::array::Array;
17use colored::Colorize;
18use dora_core::{
19 config::{DataId, NodeId, NodeRunConfig},
20 descriptor::Descriptor,
21 metadata::ArrowTypeInfoExt,
22 topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
23 uhlc,
24};
25use dora_message::{
26 DataflowId,
27 daemon_to_node::{DaemonCommunication, DaemonReply, NodeConfig},
28 metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
29 node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
30};
31use eyre::{WrapErr, bail};
32use is_terminal::IsTerminal;
33use shared_memory_extended::{Shmem, ShmemConf};
34
35use std::sync::Mutex;
36use std::{
37 collections::{BTreeSet, HashMap, VecDeque},
38 ops::{Deref, DerefMut},
39 path::PathBuf,
40 sync::Arc,
41 time::Duration,
42};
43use tokio::runtime::Handle;
44
45#[cfg(feature = "tracing")]
46use dora_tracing::{OtelGuard, TracingBuilder};
47use tracing::{info, warn};
48
49pub mod arrow_utils;
50mod control_channel;
51mod drop_stream;
52
53pub const ZERO_COPY_THRESHOLD: usize = 4096;
68
69pub struct DoraNode {
74 id: NodeId,
75 dataflow_id: DataflowId,
76 node_config: NodeRunConfig,
77 control_channel: ControlChannel,
78 clock: Arc<uhlc::HLC>,
79
80 sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
81 drop_stream: DropStream,
82 cache: VecDeque<ShmemHandle>,
83
84 dataflow_descriptor: serde_yaml::Result<Descriptor>,
85 warned_unknown_output: BTreeSet<DataId>,
86 interactive: bool,
87}
88
89impl DoraNode {
90 pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
116 Self::init_from_env_inner(true)
117 }
118
119 pub fn init_from_env_force() -> eyre::Result<(Self, EventStream)> {
125 Self::init_from_env_inner(false)
126 }
127
128 fn init_from_env_inner(fallback_to_interactive: bool) -> eyre::Result<(Self, EventStream)> {
129 if let Some(testing_comm) = take_testing_communication() {
130 let TestingCommunication {
131 input,
132 output,
133 options,
134 } = *testing_comm;
135 return Self::init_testing(input, output, options);
136 }
137
138 match std::env::var("DORA_NODE_CONFIG") {
140 Ok(raw) => {
141 let node_config: NodeConfig =
142 serde_yaml::from_str(&raw).context("failed to deserialize node config")?;
143 return Self::init(node_config);
144 }
145 Err(std::env::VarError::NotUnicode(_)) => {
146 bail!("DORA_NODE_CONFIG env variable is not valid unicode")
147 }
148 Err(std::env::VarError::NotPresent) => {} };
150
151 match std::env::var("DORA_TEST_WITH_INPUTS") {
153 Ok(raw) => {
154 let input_file = PathBuf::from(raw);
155 let output_file = match std::env::var("DORA_TEST_WRITE_OUTPUTS_TO") {
156 Ok(raw) => PathBuf::from(raw),
157 Err(std::env::VarError::NotUnicode(_)) => {
158 bail!("DORA_TEST_WRITE_OUTPUTS_TO env variable is not valid unicode")
159 }
160 Err(std::env::VarError::NotPresent) => {
161 input_file.with_file_name("outputs.jsonl")
162 }
163 };
164 let skip_output_time_offsets =
165 std::env::var_os("DORA_TEST_NO_OUTPUT_TIME_OFFSET").is_some();
166
167 let input = TestingInput::FromJsonFile(input_file);
168 let output = TestingOutput::ToFile(output_file);
169 let options = TestingOptions {
170 skip_output_time_offsets,
171 };
172
173 return Self::init_testing(input, output, options);
174 }
175 Err(std::env::VarError::NotUnicode(_)) => {
176 bail!("DORA_TEST_WITH_INPUTS env variable is not valid unicode")
177 }
178 Err(std::env::VarError::NotPresent) => {} }
180
181 if fallback_to_interactive && std::io::stdin().is_terminal() {
183 println!(
184 "{}",
185 "Starting node in interactive mode as DORA_NODE_CONFIG env variable is not set"
186 .green()
187 );
188 return Self::init_interactive();
189 }
190
191 bail!("DORA_NODE_CONFIG env variable is not set")
193 }
194
195 pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
207 let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
209
210 let mut channel =
211 DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
212 let clock = Arc::new(uhlc::HLC::default());
213
214 let reply = channel
215 .request(&Timestamped {
216 inner: DaemonRequest::NodeConfig { node_id },
217 timestamp: clock.new_timestamp(),
218 })
219 .wrap_err("failed to request node config from daemon")?;
220
221 match reply {
222 DaemonReply::NodeConfig {
223 result: Ok(node_config),
224 } => Self::init(node_config),
225 DaemonReply::NodeConfig { result: Err(error) } => {
226 bail!("failed to get node config from daemon: {error}")
227 }
228 _ => bail!("unexpected reply from daemon"),
229 }
230 }
231
232 pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
238 if std::env::var("DORA_NODE_CONFIG").is_ok() {
239 info!(
240 "Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`"
241 );
242 Self::init_from_env()
243 } else {
244 Self::init_from_node_id(node_id)
245 }
246 }
247
248 pub fn init_interactive() -> eyre::Result<(Self, EventStream)> {
348 #[cfg(feature = "tracing")]
349 {
350 TracingBuilder::new("node")
351 .with_stdout("debug", false)
352 .build()
353 .wrap_err("failed to set up tracing subscriber")?;
354 }
355
356 let node_config = NodeConfig {
357 dataflow_id: DataflowId::new_v4(),
358 node_id: "".parse()?,
359 run_config: NodeRunConfig {
360 inputs: Default::default(),
361 outputs: Default::default(),
362 },
363 daemon_communication: Some(DaemonCommunication::Interactive),
364 dataflow_descriptor: serde_yaml::Value::Null,
365 dynamic: false,
366 write_events_to: None,
367 };
368 let (mut node, events) = Self::init(node_config)?;
369 node.interactive = true;
370 Ok((node, events))
371 }
372
373 pub fn init_testing(
381 input: TestingInput,
382 output: TestingOutput,
383 options: TestingOptions,
384 ) -> eyre::Result<(Self, EventStream)> {
385 let node_config = NodeConfig {
386 dataflow_id: DataflowId::new_v4(),
387 node_id: "".parse()?,
388 run_config: NodeRunConfig {
389 inputs: Default::default(),
390 outputs: Default::default(),
391 },
392 daemon_communication: None,
393 dataflow_descriptor: serde_yaml::Value::Null,
394 dynamic: false,
395 write_events_to: None,
396 };
397 let testing_comm = TestingCommunication {
398 input,
399 output,
400 options,
401 };
402 let (mut node, events) = Self::init_with_options(node_config, Some(testing_comm))?;
403 node.interactive = true;
404 Ok((node, events))
405 }
406
407 #[doc(hidden)]
409 #[tracing::instrument]
410 pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
411 Self::init_with_options(node_config, None)
412 }
413
414 #[tracing::instrument(skip(testing_communication))]
415 fn init_with_options(
416 node_config: NodeConfig,
417 testing_communication: Option<TestingCommunication>,
418 ) -> eyre::Result<(Self, EventStream)> {
419 let NodeConfig {
420 dataflow_id,
421 node_id,
422 run_config,
423 daemon_communication,
424 dataflow_descriptor,
425 dynamic,
426 write_events_to,
427 } = node_config;
428 let clock = Arc::new(uhlc::HLC::default());
429 let input_config = run_config.inputs.clone();
430
431 let daemon_communication = match daemon_communication {
432 Some(comm) => comm.into(),
433 None => match testing_communication {
434 Some(comm) => {
435 let TestingCommunication {
436 input,
437 output,
438 options,
439 } = comm;
440 let (sender, mut receiver) = tokio::sync::mpsc::channel(5);
441 let new_communication = DaemonCommunicationWrapper::Testing { channel: sender };
442 let mut events = IntegrationTestingEvents::new(input, output, options)?;
443 std::thread::spawn(move || {
444 while let Some((request, reply_sender)) = receiver.blocking_recv() {
445 let reply = events.request(&request);
446 if reply_sender
447 .send(reply.unwrap_or_else(|err| {
448 DaemonReply::Result(Err(format!("{err:?}")))
449 }))
450 .is_err()
451 {
452 eprintln!("failed to send reply");
453 }
454 }
455 });
456 new_communication
457 }
458 None => eyre::bail!("no daemon communication method specified"),
459 },
460 };
461
462 let event_stream = EventStream::init(
463 dataflow_id,
464 &node_id,
465 &daemon_communication,
466 input_config,
467 clock.clone(),
468 write_events_to,
469 )
470 .wrap_err("failed to init event stream")?;
471 let drop_stream =
472 DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
473 .wrap_err("failed to init drop stream")?;
474 let control_channel =
475 ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
476 .wrap_err("failed to init control channel")?;
477 let node = Self {
478 id: node_id,
479 dataflow_id,
480 node_config: run_config.clone(),
481 control_channel,
482 clock,
483 sent_out_shared_memory: HashMap::new(),
484 drop_stream,
485 cache: VecDeque::new(),
486 dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
487 warned_unknown_output: BTreeSet::new(),
488 interactive: false,
489 };
490
491 if dynamic {
492 match &node.dataflow_descriptor {
494 Ok(descriptor) => {
495 if let Some(env_vars) = descriptor
496 .nodes
497 .iter()
498 .find(|n| n.id == node.id)
499 .and_then(|n| n.env.as_ref())
500 {
501 for (key, value) in env_vars {
502 unsafe {
505 std::env::set_var(key, value.to_string());
506 }
507 }
508 }
509 }
510 Err(err) => {
511 warn!("Could not parse dataflow descriptor: {err:#}");
512 }
513 }
514 }
515 Ok((node, event_stream))
516 }
517
518 fn validate_output(&mut self, output_id: &DataId) -> bool {
519 if !self.node_config.outputs.contains(output_id) && !self.interactive {
520 if !self.warned_unknown_output.contains(output_id) {
521 warn!("Ignoring output `{output_id}` not in node's output list.");
522 self.warned_unknown_output.insert(output_id.clone());
523 }
524 false
525 } else {
526 true
527 }
528 }
529
530 pub fn send_output_raw<F>(
557 &mut self,
558 output_id: DataId,
559 parameters: MetadataParameters,
560 data_len: usize,
561 data: F,
562 ) -> eyre::Result<()>
563 where
564 F: FnOnce(&mut [u8]),
565 {
566 if !self.validate_output(&output_id) {
567 return Ok(());
568 };
569 let mut sample = self.allocate_data_sample(data_len)?;
570 data(&mut sample);
571
572 let type_info = ArrowTypeInfo::byte_array(data_len);
573
574 self.send_output_sample(output_id, type_info, parameters, Some(sample))
575 }
576
577 pub fn send_output(
586 &mut self,
587 output_id: DataId,
588 parameters: MetadataParameters,
589 data: impl Array,
590 ) -> eyre::Result<()> {
591 if !self.validate_output(&output_id) {
592 return Ok(());
593 };
594
595 let arrow_array = data.to_data();
596
597 let total_len = required_data_size(&arrow_array);
598
599 let mut sample = self.allocate_data_sample(total_len)?;
600 let type_info = copy_array_into_sample(&mut sample, &arrow_array);
601
602 self.send_output_sample(output_id, type_info, parameters, Some(sample))
603 .wrap_err("failed to send output")?;
604
605 Ok(())
606 }
607
608 pub fn send_output_bytes(
615 &mut self,
616 output_id: DataId,
617 parameters: MetadataParameters,
618 data_len: usize,
619 data: &[u8],
620 ) -> eyre::Result<()> {
621 if !self.validate_output(&output_id) {
622 return Ok(());
623 };
624 self.send_output_raw(output_id, parameters, data_len, |sample| {
625 sample.copy_from_slice(data)
626 })
627 }
628
629 pub fn send_typed_output<F>(
636 &mut self,
637 output_id: DataId,
638 type_info: ArrowTypeInfo,
639 parameters: MetadataParameters,
640 data_len: usize,
641 data: F,
642 ) -> eyre::Result<()>
643 where
644 F: FnOnce(&mut [u8]),
645 {
646 if !self.validate_output(&output_id) {
647 return Ok(());
648 };
649
650 let mut sample = self.allocate_data_sample(data_len)?;
651 data(&mut sample);
652
653 self.send_output_sample(output_id, type_info, parameters, Some(sample))
654 }
655
656 pub fn send_output_sample(
663 &mut self,
664 output_id: DataId,
665 type_info: ArrowTypeInfo,
666 parameters: MetadataParameters,
667 sample: Option<DataSample>,
668 ) -> eyre::Result<()> {
669 if !self.interactive {
670 self.handle_finished_drop_tokens()?;
671 }
672
673 let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
674
675 let (data, shmem) = match sample {
676 Some(sample) => sample.finalize(),
677 None => (None, None),
678 };
679
680 self.control_channel
681 .send_message(output_id.clone(), metadata, data)
682 .wrap_err_with(|| format!("failed to send output {output_id}"))?;
683
684 if let Some((shared_memory, drop_token)) = shmem {
685 self.sent_out_shared_memory
686 .insert(drop_token, shared_memory);
687 }
688
689 Ok(())
690 }
691
692 pub fn close_outputs(&mut self, outputs_ids: Vec<DataId>) -> eyre::Result<()> {
698 for output_id in &outputs_ids {
699 if !self.node_config.outputs.remove(output_id) {
700 eyre::bail!("unknown output {output_id}");
701 }
702 }
703
704 self.control_channel
705 .report_closed_outputs(outputs_ids)
706 .wrap_err("failed to report closed outputs to daemon")?;
707
708 Ok(())
709 }
710
711 pub fn id(&self) -> &NodeId {
713 &self.id
714 }
715
716 pub fn dataflow_id(&self) -> &DataflowId {
720 &self.dataflow_id
721 }
722
723 pub fn node_config(&self) -> &NodeRunConfig {
725 &self.node_config
726 }
727
728 pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
733 let data = if data_len >= ZERO_COPY_THRESHOLD && !self.interactive {
734 let shared_memory = self.allocate_shared_memory(data_len)?;
736
737 DataSample {
738 inner: DataSampleInner::Shmem(shared_memory),
739 len: data_len,
740 }
741 } else {
742 let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
743
744 avec.into()
745 };
746
747 Ok(data)
748 }
749
750 fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
751 let cache_index = self
752 .cache
753 .iter()
754 .enumerate()
755 .rev()
756 .filter(|(_, s)| s.len() >= data_len)
757 .min_by_key(|(_, s)| s.len())
758 .map(|(i, _)| i);
759 let memory = match cache_index {
760 Some(i) => {
761 self.cache.remove(i).unwrap()
763 }
764 None => ShmemHandle(Box::new(
765 ShmemConf::new()
766 .size(data_len)
767 .writable(true)
768 .create()
769 .wrap_err("failed to allocate shared memory")?,
770 )),
771 };
772 assert!(memory.len() >= data_len);
773
774 Ok(memory)
775 }
776
777 fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
778 loop {
779 match self.drop_stream.try_recv() {
780 Ok(token) => match self.sent_out_shared_memory.remove(&token) {
781 Some(region) => self.add_to_cache(region),
782 None => tracing::warn!("received unknown finished drop token `{token:?}`"),
783 },
784 Err(flume::TryRecvError::Empty) => break,
785 Err(flume::TryRecvError::Disconnected) => {
786 bail!("event stream was closed before sending all expected drop tokens")
787 }
788 }
789 }
790 Ok(())
791 }
792
793 fn add_to_cache(&mut self, memory: ShmemHandle) {
794 const MAX_CACHE_SIZE: usize = 20;
795
796 self.cache.push_back(memory);
797 while self.cache.len() > MAX_CACHE_SIZE {
798 self.cache.pop_front();
799 }
800 }
801
802 pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
806 match &self.dataflow_descriptor {
807 Ok(d) => Ok(d),
808 Err(err) => eyre::bail!(
809 "failed to parse dataflow descriptor: {err}\n\n
810 This might be caused by mismatched version numbers of dora \
811 daemon and the dora node API"
812 ),
813 }
814 }
815}
816
817impl Drop for DoraNode {
818 fn drop(&mut self) {
819 if let Err(err) = self
821 .control_channel
822 .report_closed_outputs(
823 std::mem::take(&mut self.node_config.outputs)
824 .into_iter()
825 .collect(),
826 )
827 .context("failed to close outputs on drop")
828 {
829 tracing::warn!("{err:?}")
830 }
831
832 while !self.sent_out_shared_memory.is_empty() {
833 if self.drop_stream.is_empty() {
834 tracing::trace!(
835 "waiting for {} remaining drop tokens",
836 self.sent_out_shared_memory.len()
837 );
838 }
839
840 match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
841 Ok(token) => {
842 self.sent_out_shared_memory.remove(&token);
843 }
844 Err(flume::RecvTimeoutError::Disconnected) => {
845 tracing::warn!(
846 "finished_drop_tokens channel closed while still waiting for drop tokens; \
847 closing {} shared memory regions that might not yet been mapped.",
848 self.sent_out_shared_memory.len()
849 );
850 break;
851 }
852 Err(flume::RecvTimeoutError::Timeout) => {
853 tracing::warn!(
854 "timeout while waiting for drop tokens; \
855 closing {} shared memory regions that might not yet been mapped.",
856 self.sent_out_shared_memory.len()
857 );
858 break;
859 }
860 }
861 }
862
863 if let Err(err) = self.control_channel.report_outputs_done() {
864 tracing::warn!("{err:?}")
865 }
866 }
867}
868
869pub struct DataSample {
875 inner: DataSampleInner,
876 len: usize,
877}
878
879impl DataSample {
880 fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
881 match self.inner {
882 DataSampleInner::Shmem(shared_memory) => {
883 let drop_token = DropToken::generate();
884 let data = DataMessage::SharedMemory {
885 shared_memory_id: shared_memory.get_os_id().to_owned(),
886 len: self.len,
887 drop_token,
888 };
889 (Some(data), Some((shared_memory, drop_token)))
890 }
891 DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
892 }
893 }
894}
895
896impl Deref for DataSample {
897 type Target = [u8];
898
899 fn deref(&self) -> &Self::Target {
900 let slice = match &self.inner {
901 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
902 DataSampleInner::Vec(data) => data,
903 };
904 &slice[..self.len]
905 }
906}
907
908impl DerefMut for DataSample {
909 fn deref_mut(&mut self) -> &mut Self::Target {
910 let slice = match &mut self.inner {
911 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
912 DataSampleInner::Vec(data) => data,
913 };
914 &mut slice[..self.len]
915 }
916}
917
918impl From<AVec<u8, ConstAlign<128>>> for DataSample {
919 fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
920 Self {
921 len: value.len(),
922 inner: DataSampleInner::Vec(value),
923 }
924 }
925}
926
927impl std::fmt::Debug for DataSample {
928 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
929 let kind = match &self.inner {
930 DataSampleInner::Shmem(_) => "SharedMemory",
931 DataSampleInner::Vec(_) => "Vec",
932 };
933 f.debug_struct("DataSample")
934 .field("len", &self.len)
935 .field("kind", &kind)
936 .finish_non_exhaustive()
937 }
938}
939
940enum DataSampleInner {
941 Shmem(ShmemHandle),
942 Vec(AVec<u8, ConstAlign<128>>),
943}
944
945struct ShmemHandle(Box<Shmem>);
946
947impl Deref for ShmemHandle {
948 type Target = Shmem;
949
950 fn deref(&self) -> &Self::Target {
951 &self.0
952 }
953}
954
955impl DerefMut for ShmemHandle {
956 fn deref_mut(&mut self) -> &mut Self::Target {
957 &mut self.0
958 }
959}
960
961unsafe impl Send for ShmemHandle {}
962unsafe impl Sync for ShmemHandle {}
963
964#[cfg(feature = "tracing")]
968pub fn init_tracing(
969 node_id: &NodeId,
970 dataflow_id: &DataflowId,
971) -> eyre::Result<Arc<Mutex<Option<OtelGuard>>>> {
972 let node_id_str = node_id.to_string();
973 let guard: Arc<Mutex<Option<OtelGuard>>> = Arc::new(Mutex::new(None));
974 let clone = guard.clone();
975 let tracing_monitor = async move {
976 let mut builder = TracingBuilder::new(node_id_str);
977 if std::env::var("DORA_OTLP_ENDPOINT").is_ok()
979 || std::env::var("DORA_JAEGER_TRACING").is_ok()
980 {
981 builder = builder
982 .with_otlp_tracing()
983 .context("failed to set up OTLP tracing")
984 .unwrap()
985 .with_stdout("info", true);
986 *clone.lock().unwrap() = builder.guard.take();
987 } else {
988 builder = builder.with_stdout("info", true);
989 }
990
991 builder
992 .build()
993 .wrap_err("failed to set up tracing subscriber")
994 .unwrap();
995 };
996
997 let rt = Handle::try_current().context("failed to get tokio runtime handle")?;
998 rt.spawn(tracing_monitor);
999
1000 #[cfg(feature = "metrics")]
1001 {
1002 let id = format!("{dataflow_id}/{node_id}");
1003 let monitor_task = async move {
1004 use dora_metrics::run_metrics_monitor;
1005
1006 if let Err(e) = run_metrics_monitor(id.clone())
1007 .await
1008 .wrap_err("metrics monitor exited unexpectedly")
1009 {
1010 warn!("metrics monitor failed: {:#?}", e);
1011 }
1012 };
1013 let rt = Handle::try_current().context("failed to get tokio runtime handle")?;
1014 rt.spawn(monitor_task);
1015 };
1016 Ok(guard)
1017}