1use crate::{daemon_connection::DaemonChannel, EventStream};
2
3use self::{
4 arrow_utils::{copy_array_into_sample, required_data_size},
5 control_channel::ControlChannel,
6 drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11 config::{DataId, NodeId, NodeRunConfig},
12 descriptor::Descriptor,
13 metadata::ArrowTypeInfoExt,
14 topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15 uhlc,
16};
17
18use dora_message::{
19 daemon_to_node::{DaemonReply, NodeConfig},
20 metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
21 node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
22 DataflowId,
23};
24use eyre::{bail, WrapErr};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27 collections::{BTreeSet, HashMap, VecDeque},
28 ops::{Deref, DerefMut},
29 sync::Arc,
30 time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "metrics")]
35use dora_metrics::run_metrics_monitor;
36#[cfg(feature = "tracing")]
37use dora_tracing::TracingBuilder;
38
39use tokio::runtime::{Handle, Runtime};
40
41pub mod arrow_utils;
42mod control_channel;
43mod drop_stream;
44
45pub const ZERO_COPY_THRESHOLD: usize = 4096;
46
47enum TokioRuntime {
48 Runtime(Runtime),
49 Handle(Handle),
50}
51
52pub struct DoraNode {
53 id: NodeId,
54 dataflow_id: DataflowId,
55 node_config: NodeRunConfig,
56 control_channel: ControlChannel,
57 clock: Arc<uhlc::HLC>,
58
59 sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
60 drop_stream: DropStream,
61 cache: VecDeque<ShmemHandle>,
62
63 dataflow_descriptor: serde_yaml::Result<Descriptor>,
64 warned_unknown_output: BTreeSet<DataId>,
65 _rt: TokioRuntime,
66}
67
68impl DoraNode {
69 pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
78 let node_config: NodeConfig = {
79 let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
80 "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
81 )?;
82 serde_yaml::from_str(&raw).context("failed to deserialize node config")?
83 };
84 #[cfg(feature = "tracing")]
85 {
86 TracingBuilder::new(node_config.node_id.as_ref())
87 .build()
88 .wrap_err("failed to set up tracing subscriber")?;
89 }
90
91 Self::init(node_config)
92 }
93
94 pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
104 let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
106
107 let mut channel =
108 DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
109 let clock = Arc::new(uhlc::HLC::default());
110
111 let reply = channel
112 .request(&Timestamped {
113 inner: DaemonRequest::NodeConfig { node_id },
114 timestamp: clock.new_timestamp(),
115 })
116 .wrap_err("failed to request node config from daemon")?;
117 match reply {
118 DaemonReply::NodeConfig {
119 result: Ok(node_config),
120 } => Self::init(node_config),
121 DaemonReply::NodeConfig { result: Err(error) } => {
122 bail!("failed to get node config from daemon: {error}")
123 }
124 _ => bail!("unexpected reply from daemon"),
125 }
126 }
127
128 pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
129 if std::env::var("DORA_NODE_CONFIG").is_ok() {
130 info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
131 Self::init_from_env()
132 } else {
133 Self::init_from_node_id(node_id)
134 }
135 }
136
137 #[tracing::instrument]
138 pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
139 let NodeConfig {
140 dataflow_id,
141 node_id,
142 run_config,
143 daemon_communication,
144 dataflow_descriptor,
145 dynamic: _,
146 } = node_config;
147 let clock = Arc::new(uhlc::HLC::default());
148 let input_config = run_config.inputs.clone();
149
150 let rt = match Handle::try_current() {
151 Ok(handle) => TokioRuntime::Handle(handle),
152 Err(_) => TokioRuntime::Runtime(
153 tokio::runtime::Builder::new_multi_thread()
154 .worker_threads(2)
155 .enable_all()
156 .build()
157 .context("tokio runtime failed")?,
158 ),
159 };
160
161 #[cfg(feature = "metrics")]
162 {
163 let id = format!("{}/{}", dataflow_id, node_id);
164 let monitor_task = async move {
165 if let Err(e) = run_metrics_monitor(id.clone())
166 .await
167 .wrap_err("metrics monitor exited unexpectedly")
168 {
169 warn!("metrics monitor failed: {:#?}", e);
170 }
171 };
172 match &rt {
173 TokioRuntime::Runtime(rt) => rt.spawn(monitor_task),
174 TokioRuntime::Handle(handle) => handle.spawn(monitor_task),
175 };
176 }
177
178 let event_stream = EventStream::init(
179 dataflow_id,
180 &node_id,
181 &daemon_communication,
182 input_config,
183 clock.clone(),
184 )
185 .wrap_err("failed to init event stream")?;
186 let drop_stream =
187 DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
188 .wrap_err("failed to init drop stream")?;
189 let control_channel =
190 ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
191 .wrap_err("failed to init control channel")?;
192
193 let node = Self {
194 id: node_id,
195 dataflow_id,
196 node_config: run_config.clone(),
197 control_channel,
198 clock,
199 sent_out_shared_memory: HashMap::new(),
200 drop_stream,
201 cache: VecDeque::new(),
202 dataflow_descriptor: serde_yaml::from_value(dataflow_descriptor),
203 warned_unknown_output: BTreeSet::new(),
204 _rt: rt,
205 };
206 Ok((node, event_stream))
207 }
208
209 fn validate_output(&mut self, output_id: &DataId) -> bool {
210 if !self.node_config.outputs.contains(output_id) {
211 if !self.warned_unknown_output.contains(output_id) {
212 warn!("Ignoring output `{output_id}` not in node's output list.");
213 self.warned_unknown_output.insert(output_id.clone());
214 }
215 false
216 } else {
217 true
218 }
219 }
220
221 pub fn send_output_raw<F>(
245 &mut self,
246 output_id: DataId,
247 parameters: MetadataParameters,
248 data_len: usize,
249 data: F,
250 ) -> eyre::Result<()>
251 where
252 F: FnOnce(&mut [u8]),
253 {
254 if !self.validate_output(&output_id) {
255 return Ok(());
256 };
257 let mut sample = self.allocate_data_sample(data_len)?;
258 data(&mut sample);
259
260 let type_info = ArrowTypeInfo::byte_array(data_len);
261
262 self.send_output_sample(output_id, type_info, parameters, Some(sample))
263 }
264
265 pub fn send_output(
266 &mut self,
267 output_id: DataId,
268 parameters: MetadataParameters,
269 data: impl Array,
270 ) -> eyre::Result<()> {
271 if !self.validate_output(&output_id) {
272 return Ok(());
273 };
274
275 let arrow_array = data.to_data();
276
277 let total_len = required_data_size(&arrow_array);
278
279 let mut sample = self.allocate_data_sample(total_len)?;
280 let type_info = copy_array_into_sample(&mut sample, &arrow_array);
281
282 self.send_output_sample(output_id, type_info, parameters, Some(sample))
283 .wrap_err("failed to send output")?;
284
285 Ok(())
286 }
287
288 pub fn send_output_bytes(
289 &mut self,
290 output_id: DataId,
291 parameters: MetadataParameters,
292 data_len: usize,
293 data: &[u8],
294 ) -> eyre::Result<()> {
295 if !self.validate_output(&output_id) {
296 return Ok(());
297 };
298 self.send_output_raw(output_id, parameters, data_len, |sample| {
299 sample.copy_from_slice(data)
300 })
301 }
302
303 pub fn send_typed_output<F>(
304 &mut self,
305 output_id: DataId,
306 type_info: ArrowTypeInfo,
307 parameters: MetadataParameters,
308 data_len: usize,
309 data: F,
310 ) -> eyre::Result<()>
311 where
312 F: FnOnce(&mut [u8]),
313 {
314 if !self.validate_output(&output_id) {
315 return Ok(());
316 };
317
318 let mut sample = self.allocate_data_sample(data_len)?;
319 data(&mut sample);
320
321 self.send_output_sample(output_id, type_info, parameters, Some(sample))
322 }
323
324 pub fn send_output_sample(
325 &mut self,
326 output_id: DataId,
327 type_info: ArrowTypeInfo,
328 parameters: MetadataParameters,
329 sample: Option<DataSample>,
330 ) -> eyre::Result<()> {
331 self.handle_finished_drop_tokens()?;
332
333 let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
334
335 let (data, shmem) = match sample {
336 Some(sample) => sample.finalize(),
337 None => (None, None),
338 };
339
340 self.control_channel
341 .send_message(output_id.clone(), metadata, data)
342 .wrap_err_with(|| format!("failed to send output {output_id}"))?;
343
344 if let Some((shared_memory, drop_token)) = shmem {
345 self.sent_out_shared_memory
346 .insert(drop_token, shared_memory);
347 }
348
349 Ok(())
350 }
351
352 pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
353 for output_id in &outputs {
354 if !self.node_config.outputs.remove(output_id) {
355 eyre::bail!("unknown output {output_id}");
356 }
357 }
358
359 self.control_channel
360 .report_closed_outputs(outputs)
361 .wrap_err("failed to report closed outputs to daemon")?;
362
363 Ok(())
364 }
365
366 pub fn id(&self) -> &NodeId {
367 &self.id
368 }
369
370 pub fn dataflow_id(&self) -> &DataflowId {
371 &self.dataflow_id
372 }
373
374 pub fn node_config(&self) -> &NodeRunConfig {
375 &self.node_config
376 }
377
378 pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
379 let data = if data_len >= ZERO_COPY_THRESHOLD {
380 let shared_memory = self.allocate_shared_memory(data_len)?;
382
383 DataSample {
384 inner: DataSampleInner::Shmem(shared_memory),
385 len: data_len,
386 }
387 } else {
388 let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
389
390 avec.into()
391 };
392
393 Ok(data)
394 }
395
396 fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
397 let cache_index = self
398 .cache
399 .iter()
400 .enumerate()
401 .rev()
402 .filter(|(_, s)| s.len() >= data_len)
403 .min_by_key(|(_, s)| s.len())
404 .map(|(i, _)| i);
405 let memory = match cache_index {
406 Some(i) => {
407 self.cache.remove(i).unwrap()
409 }
410 None => ShmemHandle(Box::new(
411 ShmemConf::new()
412 .size(data_len)
413 .writable(true)
414 .create()
415 .wrap_err("failed to allocate shared memory")?,
416 )),
417 };
418 assert!(memory.len() >= data_len);
419
420 Ok(memory)
421 }
422
423 fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
424 loop {
425 match self.drop_stream.try_recv() {
426 Ok(token) => match self.sent_out_shared_memory.remove(&token) {
427 Some(region) => self.add_to_cache(region),
428 None => tracing::warn!("received unknown finished drop token `{token:?}`"),
429 },
430 Err(flume::TryRecvError::Empty) => break,
431 Err(flume::TryRecvError::Disconnected) => {
432 bail!("event stream was closed before sending all expected drop tokens")
433 }
434 }
435 }
436 Ok(())
437 }
438
439 fn add_to_cache(&mut self, memory: ShmemHandle) {
440 const MAX_CACHE_SIZE: usize = 20;
441
442 self.cache.push_back(memory);
443 while self.cache.len() > MAX_CACHE_SIZE {
444 self.cache.pop_front();
445 }
446 }
447
448 pub fn dataflow_descriptor(&self) -> eyre::Result<&Descriptor> {
452 match &self.dataflow_descriptor {
453 Ok(d) => Ok(d),
454 Err(err) => eyre::bail!(
455 "failed to parse dataflow descriptor: {err}\n\n
456 This might be caused by mismatched version numbers of dora \
457 daemon and the dora node API"
458 ),
459 }
460 }
461}
462
463impl Drop for DoraNode {
464 #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
465 fn drop(&mut self) {
466 if let Err(err) = self
468 .control_channel
469 .report_closed_outputs(
470 std::mem::take(&mut self.node_config.outputs)
471 .into_iter()
472 .collect(),
473 )
474 .context("failed to close outputs on drop")
475 {
476 tracing::warn!("{err:?}")
477 }
478
479 while !self.sent_out_shared_memory.is_empty() {
480 if self.drop_stream.len() == 0 {
481 tracing::trace!(
482 "waiting for {} remaining drop tokens",
483 self.sent_out_shared_memory.len()
484 );
485 }
486
487 match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
488 Ok(token) => {
489 self.sent_out_shared_memory.remove(&token);
490 }
491 Err(flume::RecvTimeoutError::Disconnected) => {
492 tracing::warn!(
493 "finished_drop_tokens channel closed while still waiting for drop tokens; \
494 closing {} shared memory regions that might not yet been mapped.",
495 self.sent_out_shared_memory.len()
496 );
497 break;
498 }
499 Err(flume::RecvTimeoutError::Timeout) => {
500 tracing::warn!(
501 "timeout while waiting for drop tokens; \
502 closing {} shared memory regions that might not yet been mapped.",
503 self.sent_out_shared_memory.len()
504 );
505 break;
506 }
507 }
508 }
509
510 if let Err(err) = self.control_channel.report_outputs_done() {
511 tracing::warn!("{err:?}")
512 }
513 }
514}
515
516pub struct DataSample {
517 inner: DataSampleInner,
518 len: usize,
519}
520
521impl DataSample {
522 fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
523 match self.inner {
524 DataSampleInner::Shmem(shared_memory) => {
525 let drop_token = DropToken::generate();
526 let data = DataMessage::SharedMemory {
527 shared_memory_id: shared_memory.get_os_id().to_owned(),
528 len: self.len,
529 drop_token,
530 };
531 (Some(data), Some((shared_memory, drop_token)))
532 }
533 DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
534 }
535 }
536}
537
538impl Deref for DataSample {
539 type Target = [u8];
540
541 fn deref(&self) -> &Self::Target {
542 let slice = match &self.inner {
543 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
544 DataSampleInner::Vec(data) => data,
545 };
546 &slice[..self.len]
547 }
548}
549
550impl DerefMut for DataSample {
551 fn deref_mut(&mut self) -> &mut Self::Target {
552 let slice = match &mut self.inner {
553 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
554 DataSampleInner::Vec(data) => data,
555 };
556 &mut slice[..self.len]
557 }
558}
559
560impl From<AVec<u8, ConstAlign<128>>> for DataSample {
561 fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
562 Self {
563 len: value.len(),
564 inner: DataSampleInner::Vec(value),
565 }
566 }
567}
568
569impl std::fmt::Debug for DataSample {
570 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
571 let kind = match &self.inner {
572 DataSampleInner::Shmem(_) => "SharedMemory",
573 DataSampleInner::Vec(_) => "Vec",
574 };
575 f.debug_struct("DataSample")
576 .field("len", &self.len)
577 .field("kind", &kind)
578 .finish_non_exhaustive()
579 }
580}
581
582enum DataSampleInner {
583 Shmem(ShmemHandle),
584 Vec(AVec<u8, ConstAlign<128>>),
585}
586
587struct ShmemHandle(Box<Shmem>);
588
589impl Deref for ShmemHandle {
590 type Target = Shmem;
591
592 fn deref(&self) -> &Self::Target {
593 &self.0
594 }
595}
596
597impl DerefMut for ShmemHandle {
598 fn deref_mut(&mut self) -> &mut Self::Target {
599 &mut self.0
600 }
601}
602
603unsafe impl Send for ShmemHandle {}
604unsafe impl Sync for ShmemHandle {}