1use crate::{daemon_connection::DaemonChannel, EventStream};
2
3use self::{
4 arrow_utils::{copy_array_into_sample, required_data_size},
5 control_channel::ControlChannel,
6 drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11 config::{DataId, NodeId, NodeRunConfig},
12 descriptor::Descriptor,
13 metadata::ArrowTypeInfoExt,
14 topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15 uhlc,
16};
17
18use dora_message::{
19 daemon_to_node::{DaemonReply, NodeConfig},
20 metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
21 node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
22 DataflowId,
23};
24use eyre::{bail, WrapErr};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27 collections::{BTreeSet, HashMap, VecDeque},
28 ops::{Deref, DerefMut},
29 sync::Arc,
30 time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "metrics")]
35use dora_metrics::init_meter_provider;
36#[cfg(feature = "tracing")]
37use dora_tracing::set_up_tracing;
38use tokio::runtime::{Handle, Runtime};
39
40pub mod arrow_utils;
41mod control_channel;
42mod drop_stream;
43
44pub const ZERO_COPY_THRESHOLD: usize = 4096;
45
46enum TokioRuntime {
47 Runtime(Runtime),
48 Handle(Handle),
49}
50
51pub struct DoraNode {
52 id: NodeId,
53 dataflow_id: DataflowId,
54 node_config: NodeRunConfig,
55 control_channel: ControlChannel,
56 clock: Arc<uhlc::HLC>,
57
58 sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
59 drop_stream: DropStream,
60 cache: VecDeque<ShmemHandle>,
61
62 dataflow_descriptor: Descriptor,
63 warned_unknown_output: BTreeSet<DataId>,
64 _rt: TokioRuntime,
65}
66
67impl DoraNode {
68 pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
77 let node_config: NodeConfig = {
78 let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
79 "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
80 )?;
81 serde_yaml::from_str(&raw).context("failed to deserialize node config")?
82 };
83 #[cfg(feature = "tracing")]
84 set_up_tracing(node_config.node_id.as_ref())
85 .context("failed to set up tracing subscriber")?;
86 Self::init(node_config)
87 }
88
89 pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
99 let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
101
102 let mut channel =
103 DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
104 let clock = Arc::new(uhlc::HLC::default());
105
106 let reply = channel
107 .request(&Timestamped {
108 inner: DaemonRequest::NodeConfig { node_id },
109 timestamp: clock.new_timestamp(),
110 })
111 .wrap_err("failed to request node config from daemon")?;
112 match reply {
113 DaemonReply::NodeConfig {
114 result: Ok(node_config),
115 } => Self::init(node_config),
116 DaemonReply::NodeConfig { result: Err(error) } => {
117 bail!("failed to get node config from daemon: {error}")
118 }
119 _ => bail!("unexpected reply from daemon"),
120 }
121 }
122
123 pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
124 if std::env::var("DORA_NODE_CONFIG").is_ok() {
125 info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
126 Self::init_from_env()
127 } else {
128 Self::init_from_node_id(node_id)
129 }
130 }
131
132 #[tracing::instrument]
133 pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
134 let NodeConfig {
135 dataflow_id,
136 node_id,
137 run_config,
138 daemon_communication,
139 dataflow_descriptor,
140 dynamic: _,
141 } = node_config;
142 let clock = Arc::new(uhlc::HLC::default());
143 let input_config = run_config.inputs.clone();
144
145 let rt = match Handle::try_current() {
146 Ok(handle) => TokioRuntime::Handle(handle),
147 Err(_) => TokioRuntime::Runtime(
148 tokio::runtime::Builder::new_multi_thread()
149 .worker_threads(2)
150 .enable_all()
151 .build()
152 .context("tokio runtime failed")?,
153 ),
154 };
155
156 let id = format!("{}/{}", dataflow_id, node_id);
157
158 #[cfg(feature = "metrics")]
159 match &rt {
160 TokioRuntime::Runtime(rt) => rt.spawn(async {
161 if let Err(e) = init_meter_provider(id)
162 .await
163 .context("failed to init metrics provider")
164 {
165 warn!("could not create metric provider with err: {:#?}", e);
166 }
167 }),
168 TokioRuntime::Handle(handle) => handle.spawn(async {
169 if let Err(e) = init_meter_provider(id)
170 .await
171 .context("failed to init metrics provider")
172 {
173 warn!("could not create metric provider with err: {:#?}", e);
174 }
175 }),
176 };
177
178 let event_stream = EventStream::init(
179 dataflow_id,
180 &node_id,
181 &daemon_communication,
182 input_config,
183 clock.clone(),
184 )
185 .wrap_err("failed to init event stream")?;
186 let drop_stream =
187 DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
188 .wrap_err("failed to init drop stream")?;
189 let control_channel =
190 ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
191 .wrap_err("failed to init control channel")?;
192
193 let node = Self {
194 id: node_id,
195 dataflow_id,
196 node_config: run_config.clone(),
197 control_channel,
198 clock,
199 sent_out_shared_memory: HashMap::new(),
200 drop_stream,
201 cache: VecDeque::new(),
202 dataflow_descriptor,
203 warned_unknown_output: BTreeSet::new(),
204 _rt: rt,
205 };
206 Ok((node, event_stream))
207 }
208
209 fn validate_output(&mut self, output_id: &DataId) -> bool {
210 if !self.node_config.outputs.contains(output_id) {
211 if !self.warned_unknown_output.contains(output_id) {
212 warn!("Ignoring output `{output_id}` not in node's output list.");
213 self.warned_unknown_output.insert(output_id.clone());
214 }
215 false
216 } else {
217 true
218 }
219 }
220
221 pub fn send_output_raw<F>(
245 &mut self,
246 output_id: DataId,
247 parameters: MetadataParameters,
248 data_len: usize,
249 data: F,
250 ) -> eyre::Result<()>
251 where
252 F: FnOnce(&mut [u8]),
253 {
254 if !self.validate_output(&output_id) {
255 return Ok(());
256 };
257 let mut sample = self.allocate_data_sample(data_len)?;
258 data(&mut sample);
259
260 let type_info = ArrowTypeInfo::byte_array(data_len);
261
262 self.send_output_sample(output_id, type_info, parameters, Some(sample))
263 }
264
265 pub fn send_output(
266 &mut self,
267 output_id: DataId,
268 parameters: MetadataParameters,
269 data: impl Array,
270 ) -> eyre::Result<()> {
271 if !self.validate_output(&output_id) {
272 return Ok(());
273 };
274
275 let arrow_array = data.to_data();
276
277 let total_len = required_data_size(&arrow_array);
278
279 let mut sample = self.allocate_data_sample(total_len)?;
280 let type_info = copy_array_into_sample(&mut sample, &arrow_array);
281
282 self.send_output_sample(output_id, type_info, parameters, Some(sample))
283 .wrap_err("failed to send output")?;
284
285 Ok(())
286 }
287
288 pub fn send_output_bytes(
289 &mut self,
290 output_id: DataId,
291 parameters: MetadataParameters,
292 data_len: usize,
293 data: &[u8],
294 ) -> eyre::Result<()> {
295 if !self.validate_output(&output_id) {
296 return Ok(());
297 };
298 self.send_output_raw(output_id, parameters, data_len, |sample| {
299 sample.copy_from_slice(data)
300 })
301 }
302
303 pub fn send_typed_output<F>(
304 &mut self,
305 output_id: DataId,
306 type_info: ArrowTypeInfo,
307 parameters: MetadataParameters,
308 data_len: usize,
309 data: F,
310 ) -> eyre::Result<()>
311 where
312 F: FnOnce(&mut [u8]),
313 {
314 if !self.validate_output(&output_id) {
315 return Ok(());
316 };
317
318 let mut sample = self.allocate_data_sample(data_len)?;
319 data(&mut sample);
320
321 self.send_output_sample(output_id, type_info, parameters, Some(sample))
322 }
323
324 pub fn send_output_sample(
325 &mut self,
326 output_id: DataId,
327 type_info: ArrowTypeInfo,
328 parameters: MetadataParameters,
329 sample: Option<DataSample>,
330 ) -> eyre::Result<()> {
331 self.handle_finished_drop_tokens()?;
332
333 let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
334
335 let (data, shmem) = match sample {
336 Some(sample) => sample.finalize(),
337 None => (None, None),
338 };
339
340 self.control_channel
341 .send_message(output_id.clone(), metadata, data)
342 .wrap_err_with(|| format!("failed to send output {output_id}"))?;
343
344 if let Some((shared_memory, drop_token)) = shmem {
345 self.sent_out_shared_memory
346 .insert(drop_token, shared_memory);
347 }
348
349 Ok(())
350 }
351
352 pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
353 for output_id in &outputs {
354 if !self.node_config.outputs.remove(output_id) {
355 eyre::bail!("unknown output {output_id}");
356 }
357 }
358
359 self.control_channel
360 .report_closed_outputs(outputs)
361 .wrap_err("failed to report closed outputs to daemon")?;
362
363 Ok(())
364 }
365
366 pub fn id(&self) -> &NodeId {
367 &self.id
368 }
369
370 pub fn dataflow_id(&self) -> &DataflowId {
371 &self.dataflow_id
372 }
373
374 pub fn node_config(&self) -> &NodeRunConfig {
375 &self.node_config
376 }
377
378 pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
379 let data = if data_len >= ZERO_COPY_THRESHOLD {
380 let shared_memory = self.allocate_shared_memory(data_len)?;
382
383 DataSample {
384 inner: DataSampleInner::Shmem(shared_memory),
385 len: data_len,
386 }
387 } else {
388 let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
389
390 avec.into()
391 };
392
393 Ok(data)
394 }
395
396 fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
397 let cache_index = self
398 .cache
399 .iter()
400 .enumerate()
401 .rev()
402 .filter(|(_, s)| s.len() >= data_len)
403 .min_by_key(|(_, s)| s.len())
404 .map(|(i, _)| i);
405 let memory = match cache_index {
406 Some(i) => {
407 self.cache.remove(i).unwrap()
409 }
410 None => ShmemHandle(Box::new(
411 ShmemConf::new()
412 .size(data_len)
413 .writable(true)
414 .create()
415 .wrap_err("failed to allocate shared memory")?,
416 )),
417 };
418 assert!(memory.len() >= data_len);
419
420 Ok(memory)
421 }
422
423 fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
424 loop {
425 match self.drop_stream.try_recv() {
426 Ok(token) => match self.sent_out_shared_memory.remove(&token) {
427 Some(region) => self.add_to_cache(region),
428 None => tracing::warn!("received unknown finished drop token `{token:?}`"),
429 },
430 Err(flume::TryRecvError::Empty) => break,
431 Err(flume::TryRecvError::Disconnected) => {
432 bail!("event stream was closed before sending all expected drop tokens")
433 }
434 }
435 }
436 Ok(())
437 }
438
439 fn add_to_cache(&mut self, memory: ShmemHandle) {
440 const MAX_CACHE_SIZE: usize = 20;
441
442 self.cache.push_back(memory);
443 while self.cache.len() > MAX_CACHE_SIZE {
444 self.cache.pop_front();
445 }
446 }
447
448 pub fn dataflow_descriptor(&self) -> &Descriptor {
452 &self.dataflow_descriptor
453 }
454}
455
456impl Drop for DoraNode {
457 #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
458 fn drop(&mut self) {
459 if let Err(err) = self
461 .control_channel
462 .report_closed_outputs(
463 std::mem::take(&mut self.node_config.outputs)
464 .into_iter()
465 .collect(),
466 )
467 .context("failed to close outputs on drop")
468 {
469 tracing::warn!("{err:?}")
470 }
471
472 while !self.sent_out_shared_memory.is_empty() {
473 if self.drop_stream.len() == 0 {
474 tracing::trace!(
475 "waiting for {} remaining drop tokens",
476 self.sent_out_shared_memory.len()
477 );
478 }
479
480 match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
481 Ok(token) => {
482 self.sent_out_shared_memory.remove(&token);
483 }
484 Err(flume::RecvTimeoutError::Disconnected) => {
485 tracing::warn!(
486 "finished_drop_tokens channel closed while still waiting for drop tokens; \
487 closing {} shared memory regions that might not yet been mapped.",
488 self.sent_out_shared_memory.len()
489 );
490 break;
491 }
492 Err(flume::RecvTimeoutError::Timeout) => {
493 tracing::warn!(
494 "timeout while waiting for drop tokens; \
495 closing {} shared memory regions that might not yet been mapped.",
496 self.sent_out_shared_memory.len()
497 );
498 break;
499 }
500 }
501 }
502
503 if let Err(err) = self.control_channel.report_outputs_done() {
504 tracing::warn!("{err:?}")
505 }
506 }
507}
508
509pub struct DataSample {
510 inner: DataSampleInner,
511 len: usize,
512}
513
514impl DataSample {
515 fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
516 match self.inner {
517 DataSampleInner::Shmem(shared_memory) => {
518 let drop_token = DropToken::generate();
519 let data = DataMessage::SharedMemory {
520 shared_memory_id: shared_memory.get_os_id().to_owned(),
521 len: self.len,
522 drop_token,
523 };
524 (Some(data), Some((shared_memory, drop_token)))
525 }
526 DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
527 }
528 }
529}
530
531impl Deref for DataSample {
532 type Target = [u8];
533
534 fn deref(&self) -> &Self::Target {
535 let slice = match &self.inner {
536 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
537 DataSampleInner::Vec(data) => data,
538 };
539 &slice[..self.len]
540 }
541}
542
543impl DerefMut for DataSample {
544 fn deref_mut(&mut self) -> &mut Self::Target {
545 let slice = match &mut self.inner {
546 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
547 DataSampleInner::Vec(data) => data,
548 };
549 &mut slice[..self.len]
550 }
551}
552
553impl From<AVec<u8, ConstAlign<128>>> for DataSample {
554 fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
555 Self {
556 len: value.len(),
557 inner: DataSampleInner::Vec(value),
558 }
559 }
560}
561
562impl std::fmt::Debug for DataSample {
563 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
564 let kind = match &self.inner {
565 DataSampleInner::Shmem(_) => "SharedMemory",
566 DataSampleInner::Vec(_) => "Vec",
567 };
568 f.debug_struct("DataSample")
569 .field("len", &self.len)
570 .field("kind", &kind)
571 .finish_non_exhaustive()
572 }
573}
574
575enum DataSampleInner {
576 Shmem(ShmemHandle),
577 Vec(AVec<u8, ConstAlign<128>>),
578}
579
580struct ShmemHandle(Box<Shmem>);
581
582impl Deref for ShmemHandle {
583 type Target = Shmem;
584
585 fn deref(&self) -> &Self::Target {
586 &self.0
587 }
588}
589
590impl DerefMut for ShmemHandle {
591 fn deref_mut(&mut self) -> &mut Self::Target {
592 &mut self.0
593 }
594}
595
596unsafe impl Send for ShmemHandle {}
597unsafe impl Sync for ShmemHandle {}