dora_node_api/event_stream/
event.rs

1use std::{ptr::NonNull, sync::Arc};
2
3use aligned_vec::{AVec, ConstAlign};
4use dora_arrow_convert::{ArrowData, IntoArrow};
5use dora_core::config::{DataId, OperatorId};
6use dora_message::metadata::{ArrowTypeInfo, BufferOffset, Metadata};
7use eyre::{Context, Result};
8use shared_memory_extended::{Shmem, ShmemConf};
9
10#[derive(Debug)]
11#[non_exhaustive]
12pub enum Event {
13    Stop(StopCause),
14    Reload {
15        operator_id: Option<OperatorId>,
16    },
17    Input {
18        id: DataId,
19        metadata: Metadata,
20        data: ArrowData,
21    },
22    InputClosed {
23        id: DataId,
24    },
25    Error(String),
26}
27
28#[derive(Debug, Clone)]
29#[non_exhaustive]
30pub enum StopCause {
31    Manual,
32    AllInputsClosed,
33}
34
35pub enum RawData {
36    Empty,
37    Vec(AVec<u8, ConstAlign<128>>),
38    SharedMemory(SharedMemoryData),
39}
40
41impl RawData {
42    pub fn into_arrow_array(self, type_info: &ArrowTypeInfo) -> Result<arrow::array::ArrayData> {
43        let raw_buffer = match self {
44            RawData::Empty => return Ok(().into_arrow().into()),
45            RawData::Vec(data) => {
46                let ptr = NonNull::new(data.as_ptr() as *mut _).unwrap();
47                let len = data.len();
48
49                unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
50            }
51            RawData::SharedMemory(data) => {
52                let ptr = NonNull::new(data.data.as_ptr() as *mut _).unwrap();
53                let len = data.data.len();
54
55                unsafe { arrow::buffer::Buffer::from_custom_allocation(ptr, len, Arc::new(data)) }
56            }
57        };
58
59        buffer_into_arrow_array(&raw_buffer, type_info)
60    }
61}
62
63pub struct SharedMemoryData {
64    pub data: MappedInputData,
65    pub _drop: flume::Sender<()>,
66}
67
68fn buffer_into_arrow_array(
69    raw_buffer: &arrow::buffer::Buffer,
70    type_info: &ArrowTypeInfo,
71) -> eyre::Result<arrow::array::ArrayData> {
72    if raw_buffer.is_empty() {
73        return Ok(arrow::array::ArrayData::new_empty(&type_info.data_type));
74    }
75
76    let mut buffers = Vec::new();
77    for BufferOffset { offset, len } in &type_info.buffer_offsets {
78        buffers.push(raw_buffer.slice_with_length(*offset, *len));
79    }
80
81    let mut child_data = Vec::new();
82    for child_type_info in &type_info.child_data {
83        child_data.push(buffer_into_arrow_array(raw_buffer, child_type_info)?)
84    }
85
86    arrow::array::ArrayData::try_new(
87        type_info.data_type.clone(),
88        type_info.len,
89        type_info
90            .validity
91            .clone()
92            .map(arrow::buffer::Buffer::from_vec),
93        type_info.offset,
94        buffers,
95        child_data,
96    )
97    .context("Error creating Arrow array")
98}
99
100impl std::fmt::Debug for RawData {
101    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
102        f.debug_struct("Data").finish_non_exhaustive()
103    }
104}
105
106pub struct MappedInputData {
107    memory: Box<Shmem>,
108    len: usize,
109}
110
111impl MappedInputData {
112    pub(crate) unsafe fn map(shared_memory_id: &str, len: usize) -> eyre::Result<Self> {
113        let memory = Box::new(
114            ShmemConf::new()
115                .os_id(shared_memory_id)
116                .writable(false)
117                .open()
118                .wrap_err("failed to map shared memory input")?,
119        );
120        Ok(MappedInputData { memory, len })
121    }
122}
123
124impl std::ops::Deref for MappedInputData {
125    type Target = [u8];
126
127    fn deref(&self) -> &Self::Target {
128        unsafe { &self.memory.as_slice()[..self.len] }
129    }
130}
131
132unsafe impl Send for MappedInputData {}
133unsafe impl Sync for MappedInputData {}