1use crate::{daemon_connection::DaemonChannel, EventStream};
2
3use self::{
4 arrow_utils::{copy_array_into_sample, required_data_size},
5 control_channel::ControlChannel,
6 drop_stream::DropStream,
7};
8use aligned_vec::{AVec, ConstAlign};
9use arrow::array::Array;
10use dora_core::{
11 config::{DataId, NodeId, NodeRunConfig},
12 descriptor::Descriptor,
13 metadata::ArrowTypeInfoExt,
14 topics::{DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT, LOCALHOST},
15 uhlc,
16};
17
18use dora_message::{
19 daemon_to_node::{DaemonReply, NodeConfig},
20 metadata::{ArrowTypeInfo, Metadata, MetadataParameters},
21 node_to_daemon::{DaemonRequest, DataMessage, DropToken, Timestamped},
22 DataflowId,
23};
24use eyre::{bail, WrapErr};
25use shared_memory_extended::{Shmem, ShmemConf};
26use std::{
27 collections::{BTreeSet, HashMap, VecDeque},
28 ops::{Deref, DerefMut},
29 sync::Arc,
30 time::Duration,
31};
32use tracing::{info, warn};
33
34#[cfg(feature = "tracing")]
35use dora_tracing::set_up_tracing;
36
37pub mod arrow_utils;
38mod control_channel;
39mod drop_stream;
40
41pub const ZERO_COPY_THRESHOLD: usize = 4096;
42
43pub struct DoraNode {
44 id: NodeId,
45 dataflow_id: DataflowId,
46 node_config: NodeRunConfig,
47 control_channel: ControlChannel,
48 clock: Arc<uhlc::HLC>,
49
50 sent_out_shared_memory: HashMap<DropToken, ShmemHandle>,
51 drop_stream: DropStream,
52 cache: VecDeque<ShmemHandle>,
53
54 dataflow_descriptor: Descriptor,
55 warned_unknown_output: BTreeSet<DataId>,
56}
57
58impl DoraNode {
59 pub fn init_from_env() -> eyre::Result<(Self, EventStream)> {
68 let node_config: NodeConfig = {
69 let raw = std::env::var("DORA_NODE_CONFIG").wrap_err(
70 "env variable DORA_NODE_CONFIG must be set. Are you sure your using `dora start`?",
71 )?;
72 serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
73 };
74 #[cfg(feature = "tracing")]
75 set_up_tracing(node_config.node_id.as_ref())
76 .context("failed to set up tracing subscriber")?;
77 Self::init(node_config)
78 }
79
80 pub fn init_from_node_id(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
90 let daemon_address = (LOCALHOST, DORA_DAEMON_LOCAL_LISTEN_PORT_DEFAULT).into();
92
93 let mut channel =
94 DaemonChannel::new_tcp(daemon_address).context("Could not connect to the daemon")?;
95 let clock = Arc::new(uhlc::HLC::default());
96
97 let reply = channel
98 .request(&Timestamped {
99 inner: DaemonRequest::NodeConfig { node_id },
100 timestamp: clock.new_timestamp(),
101 })
102 .wrap_err("failed to request node config from daemon")?;
103 match reply {
104 DaemonReply::NodeConfig {
105 result: Ok(node_config),
106 } => Self::init(node_config),
107 DaemonReply::NodeConfig { result: Err(error) } => {
108 bail!("failed to get node config from daemon: {error}")
109 }
110 _ => bail!("unexpected reply from daemon"),
111 }
112 }
113
114 pub fn init_flexible(node_id: NodeId) -> eyre::Result<(Self, EventStream)> {
115 if std::env::var("DORA_NODE_CONFIG").is_ok() {
116 info!("Skipping {node_id} specified within the node initialization in favor of `DORA_NODE_CONFIG` specified by `dora start`");
117 Self::init_from_env()
118 } else {
119 Self::init_from_node_id(node_id)
120 }
121 }
122
123 #[tracing::instrument]
124 pub fn init(node_config: NodeConfig) -> eyre::Result<(Self, EventStream)> {
125 let NodeConfig {
126 dataflow_id,
127 node_id,
128 run_config,
129 daemon_communication,
130 dataflow_descriptor,
131 dynamic: _,
132 } = node_config;
133 let clock = Arc::new(uhlc::HLC::default());
134 let input_config = run_config.inputs.clone();
135
136 let event_stream = EventStream::init(
137 dataflow_id,
138 &node_id,
139 &daemon_communication,
140 input_config,
141 clock.clone(),
142 )
143 .wrap_err("failed to init event stream")?;
144 let drop_stream =
145 DropStream::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
146 .wrap_err("failed to init drop stream")?;
147 let control_channel =
148 ControlChannel::init(dataflow_id, &node_id, &daemon_communication, clock.clone())
149 .wrap_err("failed to init control channel")?;
150
151 let node = Self {
152 id: node_id,
153 dataflow_id,
154 node_config: run_config.clone(),
155 control_channel,
156 clock,
157 sent_out_shared_memory: HashMap::new(),
158 drop_stream,
159 cache: VecDeque::new(),
160 dataflow_descriptor,
161 warned_unknown_output: BTreeSet::new(),
162 };
163 Ok((node, event_stream))
164 }
165
166 fn validate_output(&mut self, output_id: &DataId) -> bool {
167 if !self.node_config.outputs.contains(output_id) {
168 if !self.warned_unknown_output.contains(output_id) {
169 warn!("Ignoring output `{output_id}` not in node's output list.");
170 self.warned_unknown_output.insert(output_id.clone());
171 }
172 false
173 } else {
174 true
175 }
176 }
177
178 pub fn send_output_raw<F>(
202 &mut self,
203 output_id: DataId,
204 parameters: MetadataParameters,
205 data_len: usize,
206 data: F,
207 ) -> eyre::Result<()>
208 where
209 F: FnOnce(&mut [u8]),
210 {
211 if !self.validate_output(&output_id) {
212 return Ok(());
213 };
214 let mut sample = self.allocate_data_sample(data_len)?;
215 data(&mut sample);
216
217 let type_info = ArrowTypeInfo::byte_array(data_len);
218
219 self.send_output_sample(output_id, type_info, parameters, Some(sample))
220 }
221
222 pub fn send_output(
223 &mut self,
224 output_id: DataId,
225 parameters: MetadataParameters,
226 data: impl Array,
227 ) -> eyre::Result<()> {
228 if !self.validate_output(&output_id) {
229 return Ok(());
230 };
231
232 let arrow_array = data.to_data();
233
234 let total_len = required_data_size(&arrow_array);
235
236 let mut sample = self.allocate_data_sample(total_len)?;
237 let type_info = copy_array_into_sample(&mut sample, &arrow_array);
238
239 self.send_output_sample(output_id, type_info, parameters, Some(sample))
240 .wrap_err("failed to send output")?;
241
242 Ok(())
243 }
244
245 pub fn send_output_bytes(
246 &mut self,
247 output_id: DataId,
248 parameters: MetadataParameters,
249 data_len: usize,
250 data: &[u8],
251 ) -> eyre::Result<()> {
252 if !self.validate_output(&output_id) {
253 return Ok(());
254 };
255 self.send_output_raw(output_id, parameters, data_len, |sample| {
256 sample.copy_from_slice(data)
257 })
258 }
259
260 pub fn send_typed_output<F>(
261 &mut self,
262 output_id: DataId,
263 type_info: ArrowTypeInfo,
264 parameters: MetadataParameters,
265 data_len: usize,
266 data: F,
267 ) -> eyre::Result<()>
268 where
269 F: FnOnce(&mut [u8]),
270 {
271 if !self.validate_output(&output_id) {
272 return Ok(());
273 };
274
275 let mut sample = self.allocate_data_sample(data_len)?;
276 data(&mut sample);
277
278 self.send_output_sample(output_id, type_info, parameters, Some(sample))
279 }
280
281 pub fn send_output_sample(
282 &mut self,
283 output_id: DataId,
284 type_info: ArrowTypeInfo,
285 parameters: MetadataParameters,
286 sample: Option<DataSample>,
287 ) -> eyre::Result<()> {
288 self.handle_finished_drop_tokens()?;
289
290 let metadata = Metadata::from_parameters(self.clock.new_timestamp(), type_info, parameters);
291
292 let (data, shmem) = match sample {
293 Some(sample) => sample.finalize(),
294 None => (None, None),
295 };
296
297 self.control_channel
298 .send_message(output_id.clone(), metadata, data)
299 .wrap_err_with(|| format!("failed to send output {output_id}"))?;
300
301 if let Some((shared_memory, drop_token)) = shmem {
302 self.sent_out_shared_memory
303 .insert(drop_token, shared_memory);
304 }
305
306 Ok(())
307 }
308
309 pub fn close_outputs(&mut self, outputs: Vec<DataId>) -> eyre::Result<()> {
310 for output_id in &outputs {
311 if !self.node_config.outputs.remove(output_id) {
312 eyre::bail!("unknown output {output_id}");
313 }
314 }
315
316 self.control_channel
317 .report_closed_outputs(outputs)
318 .wrap_err("failed to report closed outputs to daemon")?;
319
320 Ok(())
321 }
322
323 pub fn id(&self) -> &NodeId {
324 &self.id
325 }
326
327 pub fn dataflow_id(&self) -> &DataflowId {
328 &self.dataflow_id
329 }
330
331 pub fn node_config(&self) -> &NodeRunConfig {
332 &self.node_config
333 }
334
335 pub fn allocate_data_sample(&mut self, data_len: usize) -> eyre::Result<DataSample> {
336 let data = if data_len >= ZERO_COPY_THRESHOLD {
337 let shared_memory = self.allocate_shared_memory(data_len)?;
339
340 DataSample {
341 inner: DataSampleInner::Shmem(shared_memory),
342 len: data_len,
343 }
344 } else {
345 let avec: AVec<u8, ConstAlign<128>> = AVec::__from_elem(128, 0, data_len);
346
347 avec.into()
348 };
349
350 Ok(data)
351 }
352
353 fn allocate_shared_memory(&mut self, data_len: usize) -> eyre::Result<ShmemHandle> {
354 let cache_index = self
355 .cache
356 .iter()
357 .enumerate()
358 .rev()
359 .filter(|(_, s)| s.len() >= data_len)
360 .min_by_key(|(_, s)| s.len())
361 .map(|(i, _)| i);
362 let memory = match cache_index {
363 Some(i) => {
364 self.cache.remove(i).unwrap()
366 }
367 None => ShmemHandle(Box::new(
368 ShmemConf::new()
369 .size(data_len)
370 .writable(true)
371 .create()
372 .wrap_err("failed to allocate shared memory")?,
373 )),
374 };
375 assert!(memory.len() >= data_len);
376
377 Ok(memory)
378 }
379
380 fn handle_finished_drop_tokens(&mut self) -> eyre::Result<()> {
381 loop {
382 match self.drop_stream.try_recv() {
383 Ok(token) => match self.sent_out_shared_memory.remove(&token) {
384 Some(region) => self.add_to_cache(region),
385 None => tracing::warn!("received unknown finished drop token `{token:?}`"),
386 },
387 Err(flume::TryRecvError::Empty) => break,
388 Err(flume::TryRecvError::Disconnected) => {
389 bail!("event stream was closed before sending all expected drop tokens")
390 }
391 }
392 }
393 Ok(())
394 }
395
396 fn add_to_cache(&mut self, memory: ShmemHandle) {
397 const MAX_CACHE_SIZE: usize = 20;
398
399 self.cache.push_back(memory);
400 while self.cache.len() > MAX_CACHE_SIZE {
401 self.cache.pop_front();
402 }
403 }
404
405 pub fn dataflow_descriptor(&self) -> &Descriptor {
409 &self.dataflow_descriptor
410 }
411}
412
413impl Drop for DoraNode {
414 #[tracing::instrument(skip(self), fields(self.id = %self.id), level = "trace")]
415 fn drop(&mut self) {
416 if let Err(err) = self
418 .control_channel
419 .report_closed_outputs(
420 std::mem::take(&mut self.node_config.outputs)
421 .into_iter()
422 .collect(),
423 )
424 .context("failed to close outputs on drop")
425 {
426 tracing::warn!("{err:?}")
427 }
428
429 while !self.sent_out_shared_memory.is_empty() {
430 if self.drop_stream.len() == 0 {
431 tracing::trace!(
432 "waiting for {} remaining drop tokens",
433 self.sent_out_shared_memory.len()
434 );
435 }
436
437 match self.drop_stream.recv_timeout(Duration::from_secs(2)) {
438 Ok(token) => {
439 self.sent_out_shared_memory.remove(&token);
440 }
441 Err(flume::RecvTimeoutError::Disconnected) => {
442 tracing::warn!(
443 "finished_drop_tokens channel closed while still waiting for drop tokens; \
444 closing {} shared memory regions that might not yet been mapped.",
445 self.sent_out_shared_memory.len()
446 );
447 break;
448 }
449 Err(flume::RecvTimeoutError::Timeout) => {
450 tracing::warn!(
451 "timeout while waiting for drop tokens; \
452 closing {} shared memory regions that might not yet been mapped.",
453 self.sent_out_shared_memory.len()
454 );
455 break;
456 }
457 }
458 }
459
460 if let Err(err) = self.control_channel.report_outputs_done() {
461 tracing::warn!("{err:?}")
462 }
463 }
464}
465
466pub struct DataSample {
467 inner: DataSampleInner,
468 len: usize,
469}
470
471impl DataSample {
472 fn finalize(self) -> (Option<DataMessage>, Option<(ShmemHandle, DropToken)>) {
473 match self.inner {
474 DataSampleInner::Shmem(shared_memory) => {
475 let drop_token = DropToken::generate();
476 let data = DataMessage::SharedMemory {
477 shared_memory_id: shared_memory.get_os_id().to_owned(),
478 len: self.len,
479 drop_token,
480 };
481 (Some(data), Some((shared_memory, drop_token)))
482 }
483 DataSampleInner::Vec(buffer) => (Some(DataMessage::Vec(buffer)), None),
484 }
485 }
486}
487
488impl Deref for DataSample {
489 type Target = [u8];
490
491 fn deref(&self) -> &Self::Target {
492 let slice = match &self.inner {
493 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice() },
494 DataSampleInner::Vec(data) => data,
495 };
496 &slice[..self.len]
497 }
498}
499
500impl DerefMut for DataSample {
501 fn deref_mut(&mut self) -> &mut Self::Target {
502 let slice = match &mut self.inner {
503 DataSampleInner::Shmem(handle) => unsafe { handle.as_slice_mut() },
504 DataSampleInner::Vec(data) => data,
505 };
506 &mut slice[..self.len]
507 }
508}
509
510impl From<AVec<u8, ConstAlign<128>>> for DataSample {
511 fn from(value: AVec<u8, ConstAlign<128>>) -> Self {
512 Self {
513 len: value.len(),
514 inner: DataSampleInner::Vec(value),
515 }
516 }
517}
518
519impl std::fmt::Debug for DataSample {
520 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
521 let kind = match &self.inner {
522 DataSampleInner::Shmem(_) => "SharedMemory",
523 DataSampleInner::Vec(_) => "Vec",
524 };
525 f.debug_struct("DataSample")
526 .field("len", &self.len)
527 .field("kind", &kind)
528 .finish_non_exhaustive()
529 }
530}
531
532enum DataSampleInner {
533 Shmem(ShmemHandle),
534 Vec(AVec<u8, ConstAlign<128>>),
535}
536
537struct ShmemHandle(Box<Shmem>);
538
539impl Deref for ShmemHandle {
540 type Target = Shmem;
541
542 fn deref(&self) -> &Self::Target {
543 &self.0
544 }
545}
546
547impl DerefMut for ShmemHandle {
548 fn deref_mut(&mut self) -> &mut Self::Target {
549 &mut self.0
550 }
551}
552
553unsafe impl Send for ShmemHandle {}
554unsafe impl Sync for ShmemHandle {}