1use apache_arrow::datatypes::ArrowNativeType;
74use apache_arrow::{
75 array::{ArrayData, BufferBuilder, ListArray},
76 buffer::Buffer,
77 datatypes::{DataType, Field, ToByteSlice},
78};
79use interface::{Data, Read, UniqueIdentifier, Update};
80use std::sync::Arc;
81use std::{
82 any::{Any, type_name},
83 marker::PhantomData,
84 ops::{Deref, DerefMut},
85};
86
87#[derive(Debug, thiserror::Error)]
88pub enum ArrowError {
89 #[error("cannot open a parquet file")]
90 ArrowToFile(#[from] std::io::Error),
91 #[error("cannot build Arrow data")]
92 ArrowError(#[from] apache_arrow::error::ArrowError),
93 #[error("cannot save data to Parquet")]
94 ParquetError(#[from] parquet::errors::ParquetError),
95 #[error("no record available")]
96 NoRecord,
97 #[error("Field {0} not found")]
98 FieldNotFound(String),
99 #[error("Parsing field {0} failed")]
100 ParseField(String),
101 #[cfg(feature = "matio-rs")]
102 #[error("failed to save data to mat file")]
103 MatFile(#[from] matio_rs::MatioError),
104}
105
106type Result<T> = std::result::Result<T, ArrowError>;
107
108const MAX_CAPACITY_BYTE: usize = 2 << 29;
110
111pub enum FileFormat {
115 Parquet,
116 #[cfg(feature = "matio-rs")]
117 Matlab(MatFormat),
118}
119impl Default for FileFormat {
120 fn default() -> Self {
121 Self::Parquet
122 }
123}
124pub enum MatFormat {
130 SampleBased,
131 TimeBased(f64),
132}
133impl Default for MatFormat {
134 fn default() -> Self {
135 Self::SampleBased
136 }
137}
138
139trait BufferObject: Send + Sync {
141 fn who(&self) -> String;
142 #[allow(dead_code)]
143 fn as_any(&self) -> &dyn Any;
144 fn as_mut_any(&mut self) -> &mut dyn Any;
145 fn into_list(&mut self, n_step: usize, n: usize, data_type: DataType) -> Result<ListArray>;
146}
147
148struct ArrowBuffer<U: UniqueIdentifier>(PhantomData<U>);
150impl<T: ArrowNativeType, U: UniqueIdentifier<DataType = Vec<T>>> UniqueIdentifier
151 for ArrowBuffer<U>
152{
153 type DataType = BufferBuilder<T>;
154}
155struct LogData<U: UniqueIdentifier>(<U as UniqueIdentifier>::DataType, PhantomData<U>);
156impl<U: UniqueIdentifier> Deref for LogData<U> {
157 type Target = <U as UniqueIdentifier>::DataType;
158
159 fn deref(&self) -> &Self::Target {
160 &self.0
161 }
162}
163impl<U: UniqueIdentifier> DerefMut for LogData<U> {
164 fn deref_mut(&mut self) -> &mut Self::Target {
165 &mut self.0
166 }
167}
168impl<T, U: UniqueIdentifier<DataType = T>> LogData<U> {
169 pub fn new(data: T) -> Self {
170 Self(data, PhantomData)
171 }
172}
173impl<T, U> BufferObject for LogData<ArrowBuffer<U>>
174where
175 T: ArrowNativeType,
176 U: 'static + Send + Sync + UniqueIdentifier<DataType = Vec<T>>,
177{
178 fn who(&self) -> String {
179 type_name::<U>()
180 .split("<")
181 .map(|x| format!("{}", x.split("::").last().unwrap()))
182 .collect::<Vec<_>>()
183 .join("<")
184 }
185 fn as_any(&self) -> &dyn Any {
186 self
187 }
188 fn as_mut_any(&mut self) -> &mut dyn Any {
189 self
190 }
191 fn into_list(&mut self, n_step: usize, n: usize, data_type: DataType) -> Result<ListArray> {
192 let buffer = &mut *self;
193 let data = ArrayData::builder(data_type.clone())
194 .len(buffer.len())
195 .add_buffer(buffer.finish())
196 .build()?;
197 let offsets = (0..).step_by(n).take(n_step + 1).collect::<Vec<i32>>();
198 let list = ArrayData::builder(DataType::List(Arc::new(Field::new(
199 "values", data_type, false,
200 ))))
201 .len(n_step)
202 .add_buffer(Buffer::from(offsets.to_byte_slice()))
203 .add_child_data(data)
204 .build()?;
205 Ok(ListArray::from(list))
206 }
207}
208
209#[doc(hidden)]
210pub trait BufferDataType {
211 type ArrayType;
212 fn buffer_data_type() -> DataType;
213}
214use paste::paste;
215macro_rules! impl_buffer_types {
216 ( $( ($rs:ty,$arw:expr_2021) ),+ ) => {
217 $(
218 paste! {
219impl BufferDataType for $rs {
220 type ArrayType = apache_arrow::datatypes::[<$arw Type>];
221 fn buffer_data_type() -> DataType {
222 apache_arrow::datatypes::DataType::$arw
223 }
224}
225 }
226 )+
227 };
228}
229
230impl_buffer_types! {
231(f64,Float64),
232(f32,Float32),
233(i64,Int64),
234(i32,Int32),
235(i16,Int16),
236(i8 ,Int8),
237(u64,UInt64),
238(u32,UInt32),
239(u16,UInt16),
240(u8 ,UInt8)
241}
242
243enum DropOption {
244 Save(Option<String>),
245 NoSave,
246}
247
248mod arrow;
249pub use arrow::{Arrow, ArrowBuilder};
250
251impl Update for Arrow {}
252impl<T, U> Read<U> for Arrow
253where
254 T: ArrowNativeType,
255 U: 'static + UniqueIdentifier<DataType = Vec<T>>,
256{
257 fn read(&mut self, data: Data<U>) {
258 let r = 1 + (self.step as f64 / self.n_entry as f64).floor() as usize;
259 self.step += 1;
260 if r % self.decimation > 0 {
261 return;
262 }
263 if let Some(buffer) = self.data::<T, U>() {
264 buffer.append_slice(&data);
265 self.count += 1;
266 match self.batch_size {
267 Some(batch_size) if self.count % (self.n_entry * batch_size) == 0 => {
268 self.save();
269 }
270 _ => (),
271 }
272 }
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use std::sync::Arc;
279
280 use apache_arrow::datatypes::Schema;
281 use interface::{Data, Entry, UID};
282
283 use super::*;
284
285 #[test]
286 fn who() {
287 let exp = "gmt_dos_clients_io::optics::dispersed_fringe_sensor::DfsFftFrame<gmt_dos_clients_io::optics::Host<a::b::c>>";
288 let q = exp
289 .split("<")
290 .map(|x| format!("{}", x.split("::").last().unwrap()))
291 .collect::<Vec<_>>()
292 .join("<");
293 dbg!(q);
294 }
295
296 #[test]
297 fn arrow() {
298 let mut arw = Arrow::builder(10).build();
299 #[derive(UID)]
300 pub enum Data {}
301 <Arrow as Entry<Data>>::entry(&mut arw, 1);
302
303 let field = Field::new(
304 "Data",
305 DataType::List(Arc::new(Field::new("values", DataType::Float64, false))),
306 false,
307 );
308 let schema = Arc::new(Schema::new(vec![field]));
309 assert_eq!(arw.record().unwrap().schema(), schema);
310 }
311
312 #[test]
313 fn batch() {
314 let n_step = 8;
316 let mut arw = Arrow::builder(n_step).batch_size(n_step / 2).build();
317 #[derive(UID)]
318 pub enum U {}
319 <Arrow as Entry<U>>::entry(&mut arw, 1);
320 for i in 0..n_step {
321 arw.read(Data::<U>::new(vec![i as f64]));
322 }
323 }
324
325 #[test]
326 fn batch2() {
327 let n_step = 24;
329 let mut arw = Arrow::builder(n_step).batch_size(4).build();
330 #[derive(UID)]
331 pub enum U {}
332 <Arrow as Entry<U>>::entry(&mut arw, 1);
333 #[derive(UID)]
334 pub enum V {}
335 <Arrow as Entry<V>>::entry(&mut arw, 3);
336 for i in 0..n_step {
337 arw.read(Data::<U>::new(vec![i as f64]));
338 arw.read(Data::<V>::new(vec![(10 * i) as f64; 3]));
339 }
340 }
341}