dora_node_api/event_stream/
event.rs

1use std::{ptr::NonNull, sync::Arc};
2
3use aligned_vec::{AVec, ConstAlign};
4use dora_arrow_convert::{ArrowData, IntoArrow};
5use dora_core::config::{DataId, OperatorId};
6use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
7use eyre::{Context, Result};
8use shared_memory_extended::{Shmem, ShmemConf};
9
10#[derive(Debug)]
11#[non_exhaustive]
12pub enum Event {
13    Stop,
14    Reload {
15        operator_id: Option<OperatorId>,
16    },
17    Input {
18        id: DataId,
19        metadata: Metadata,
20        data: ArrowData,
21    },
22    InputClosed {
23        id: DataId,
24    },
25    Error(String),
26}
27
28pub enum RawData {
29    Empty,
30    Vec(AVec<u8, ConstAlign<128>>),
31    SharedMemory(SharedMemoryData),
32}
33
34impl RawData {
35    pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> {
36        let raw_buffer = match self {
37            RawData::Empty => return Ok(().into_arrow().into()),
38            RawData::Vec(data) => {
39                let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
40                let len = data.len();
41
42                unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
43            }
44            RawData::SharedMemory(data) => {
45                let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
46                let len = data.data.len();
47
48                unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
49            }
50        };
51
52        buffer_into_arrow_array(&raw_buffer, type_info)
53    }
54}
55
56pub struct SharedMemoryData {
57    pub data: MappedInputData,
58    pub _drop: flume::Sender<()>,
59}
60
61fn buffer_into_arrow_array(
62    raw_buffer: &arrow::buffer::Buffer,
63    type_info: &ArrowTypeInfo,
64) -> eyre::Result<arrow::array::ArrayData> {
65    if raw_buffer.is_empty() {
66        return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
67    }
68
69    let mut buffers = Vec::new();
70    for BufferOffset { offset, len } in &type_info.buffer_offsets {
71        buffers.push(raw_buffer.slice_with_length(*offset, *len));
72    }
73
74    let mut child_data = Vec::new();
75    for child_type_info in &type_info.child_data {
76        child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
77    }
78
79    arrow::array::ArrayData::try_new(
80        type_info.data_type.clone(),
81        type_info.len,
82        type_info
83            .validity
84            .clone()
85            .map(arrow::buffer::Buffer::from_vec),
86        type_info.offset,
87        buffers,
88        child_data,
89    )
90    .context("Error creating Arrow array")
91}
92
93impl std::fmt::Debug for RawData {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        f.debug_struct("Data").finish_non_exhaustive()
96    }
97}
98
99pub struct MappedInputData {
100    memory: Box<Shmem>,
101    len: usize,
102}
103
104impl MappedInputData {
105    pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
106        let memory = Box::new(
107            ShmemConf::new()
108                .os_id(shared_memory_id)
109                .writable(false)
110                .open()
111                .wrap_err("failed to map shared memory input")?,
112        );
113        Ok(MappedInputData { memory, len })
114    }
115}
116
117impl std::ops::Deref for MappedInputData {
118    type Target = [u8];
119
120    fn deref(&self) -> &Self::Target {
121        unsafe { &self.memory.as_slice()[..self.len] }
122    }
123}
124
125unsafe impl Send for MappedInputData {}
126unsafe impl Sync for MappedInputData {}