dora_node_api/event_stream/
event.rs1use std::{ptr::NonNull, sync::Arc};
2
3use aligned_vec::{AVec, ConstAlign};
4use dora_arrow_convert::{ArrowData, IntoArrow};
5use dora_core::config::{DataId, OperatorId};
6use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
7use eyre::{Context, Result};
8use shared_memory_extended::{Shmem, ShmemConf};
9
10#[derive(Debug)]
11#[non_exhaustive]
12pub enum Event {
13 Stop(StopCause),
14 Reload {
15 operator_id: Option<OperatorId>,
16 },
17 Input {
18 id: DataId,
19 metadata: Metadata,
20 data: ArrowData,
21 },
22 InputClosed {
23 id: DataId,
24 },
25 Error(String),
26}
27
28#[derive(Debug, Clone)]
29#[non_exhaustive]
30pub enum StopCause {
31 Manual,
32 AllInputsClosed,
33}
34
35pub enum RawData {
36 Empty,
37 Vec(AVec<u8, ConstAlign<128>>),
38 SharedMemory(SharedMemoryData),
39}
40
41impl RawData {
42 pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> {
43 let raw_buffer = match self {
44 RawData::Empty => return Ok(().into_arrow().into()),
45 RawData::Vec(data) => {
46 let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
47 let len = data.len();
48
49 unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
50 }
51 RawData::SharedMemory(data) => {
52 let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
53 let len = data.data.len();
54
55 unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
56 }
57 };
58
59 buffer_into_arrow_array(&raw_buffer, type_info)
60 }
61}
62
63pub struct SharedMemoryData {
64 pub data: MappedInputData,
65 pub _drop: flume::Sender<()>,
66}
67
68fn buffer_into_arrow_array(
69 raw_buffer: &arrow::buffer::Buffer,
70 type_info: &ArrowTypeInfo,
71) -> eyre::Result<arrow::array::ArrayData> {
72 if raw_buffer.is_empty() {
73 return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
74 }
75
76 let mut buffers = Vec::new();
77 for BufferOffset { offset, len } in &type_info.buffer_offsets {
78 buffers.push(raw_buffer.slice_with_length(*offset, *len));
79 }
80
81 let mut child_data = Vec::new();
82 for child_type_info in &type_info.child_data {
83 child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
84 }
85
86 arrow::array::ArrayData::try_new(
87 type_info.data_type.clone(),
88 type_info.len,
89 type_info
90 .validity
91 .clone()
92 .map(arrow::buffer::Buffer::from_vec),
93 type_info.offset,
94 buffers,
95 child_data,
96 )
97 .context("Error creating Arrow array")
98}
99
100impl std::fmt::Debug for RawData {
101 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102 f.debug_struct("Data").finish_non_exhaustive()
103 }
104}
105
106pub struct MappedInputData {
107 memory: Box<Shmem>,
108 len: usize,
109}
110
111impl MappedInputData {
112 pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
113 let memory = Box::new(
114 ShmemConf::new()
115 .os_id(shared_memory_id)
116 .writable(false)
117 .open()
118 .wrap_err("failed to map shared memory input")?,
119 );
120 Ok(MappedInputData { memory, len })
121 }
122}
123
124impl std::ops::Deref for MappedInputData {
125 type Target = [u8];
126
127 fn deref(&self) -> &Self::Target {
128 unsafe { &self.memory.as_slice()[..self.len] }
129 }
130}
131
132unsafe impl Send for MappedInputData {}
133unsafe impl Sync for MappedInputData {}