dora_node_api/event_stream/
event.rs1use std::{ptr::NonNull, sync::Arc};
2
3use aligned_vec::{AVec, ConstAlign};
4use dora_arrow_convert::{ArrowData, IntoArrow};
5use dora_core::config::{DataId, OperatorId};
6use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
7use eyre::{Context, Result};
8use shared_memory_extended::{Shmem, ShmemConf};
9
10#[derive(Debug)]
11#[non_exhaustive]
12pub enum Event {
13 Stop,
14 Reload {
15 operator_id: Option<OperatorId>,
16 },
17 Input {
18 id: DataId,
19 metadata: Metadata,
20 data: ArrowData,
21 },
22 InputClosed {
23 id: DataId,
24 },
25 Error(String),
26}
27
28pub enum RawData {
29 Empty,
30 Vec(AVec<u8, ConstAlign<128>>),
31 SharedMemory(SharedMemoryData),
32}
33
34impl RawData {
35 pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> {
36 let raw_buffer = match self {
37 RawData::Empty => return Ok(().into_arrow().into()),
38 RawData::Vec(data) => {
39 let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
40 let len = data.len();
41
42 unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
43 }
44 RawData::SharedMemory(data) => {
45 let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
46 let len = data.data.len();
47
48 unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
49 }
50 };
51
52 buffer_into_arrow_array(&raw_buffer, type_info)
53 }
54}
55
56pub struct SharedMemoryData {
57 pub data: MappedInputData,
58 pub _drop: flume::Sender<()>,
59}
60
61fn buffer_into_arrow_array(
62 raw_buffer: &arrow::buffer::Buffer,
63 type_info: &ArrowTypeInfo,
64) -> eyre::Result<arrow::array::ArrayData> {
65 if raw_buffer.is_empty() {
66 return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
67 }
68
69 let mut buffers = Vec::new();
70 for BufferOffset { offset, len } in &type_info.buffer_offsets {
71 buffers.push(raw_buffer.slice_with_length(*offset, *len));
72 }
73
74 let mut child_data = Vec::new();
75 for child_type_info in &type_info.child_data {
76 child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
77 }
78
79 arrow::array::ArrayData::try_new(
80 type_info.data_type.clone(),
81 type_info.len,
82 type_info
83 .validity
84 .clone()
85 .map(arrow::buffer::Buffer::from_vec),
86 type_info.offset,
87 buffers,
88 child_data,
89 )
90 .context("Error creating Arrow array")
91}
92
93impl std::fmt::Debug for RawData {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 f.debug_struct("Data").finish_non_exhaustive()
96 }
97}
98
99pub struct MappedInputData {
100 memory: Box<Shmem>,
101 len: usize,
102}
103
104impl MappedInputData {
105 pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
106 let memory = Box::new(
107 ShmemConf::new()
108 .os_id(shared_memory_id)
109 .writable(false)
110 .open()
111 .wrap_err("failed to map shared memory input")?,
112 );
113 Ok(MappedInputData { memory, len })
114 }
115}
116
117impl std::ops::Deref for MappedInputData {
118 type Target = [u8];
119
120 fn deref(&self) -> &Self::Target {
121 unsafe { &self.memory.as_slice()[..self.len] }
122 }
123}
124
125unsafe impl Send for MappedInputData {}
126unsafe impl Sync for MappedInputData {}