gmt_dos_clients_arrow/
lib.rs

1/*!
2# Actor client for Apache Arrow
3
4A simulation data logger that records the data in the [Apache Arrow] format and
5automatically saves the data into a [Parquet] file (`data.parquet`) at the end of a simulation.
6
7[Apache Arrow]: https://docs.rs/arrow
8[Parquet]: https://docs.rs/parquet
9
10# Example
11
12An Arrow logger setup for 1000 time steps
13```
14use gmt_dos_clients_arrow::Arrow;
15use gmt_dos_actors::prelude::*;
16let logging = Arrow::builder(1000).build();
17```
18setting the name of the Parquet file
19```
20# use gmt_dos_clients_arrow::Arrow;
21# use gmt_dos_actors::prelude::*;
22
23let logging = Arrow::builder(1000)
24                       .filename("my_data.parquet")
25                       .build();
26```
27opting out of saving the data to the Parquet file
28```
29# use gmt_dos_clients_arrow::Arrow;
30# use gmt_dos_actors::prelude::*;
31
32let logging = Arrow::builder(1000)
33                       .no_save()
34                       .build();
35```
36Logging an output into an [Arrow] logger:
37```
38use gmt_dos_actors::prelude::*;
39use gmt_dos_clients::signals::Signals;
40use gmt_dos_clients_arrow::Arrow;
41use interface::UID;
42
43let logging = Arrow::builder(1000).build().into_arcx();
44let mut sink = Terminator::<_>::new(logging);
45let mut source: Initiator<_> = Signals::new(1, 100).into();
46#[derive(UID)]
47enum Source {};
48source.add_output().build::<Source>().logn(&mut sink, 42);
49# Ok::<(), gmt_dos_actors::model::ModelError>(())
50```
51or if `Signals` implements the trait: `Size<Source>`
52```
53use gmt_dos_actors::prelude::*;
54use gmt_dos_clients::signals::Signals;
55use gmt_dos_clients_arrow::Arrow;
56use interface::{Size, UID};
57
58let logging = Arrow::builder(1000).build().into_arcx();
59let mut sink = Terminator::<_>::new(logging);
60let mut source: Initiator<_> = Signals::new(1, 100).into();
61#[derive(UID)]
62enum Source {};
63impl Size<Source> for Signals {
64    fn len(&self) -> usize {
65        42
66    }
67}
68source.add_output().build::<Source>().log(&mut sink);
69# Ok::<(), gmt_dos_actors::model::ModelError>(())
70```
71*/
72
73use apache_arrow::datatypes::ArrowNativeType;
74use apache_arrow::{
75    array::{ArrayData, BufferBuilder, ListArray},
76    buffer::Buffer,
77    datatypes::{DataType, Field, ToByteSlice},
78};
79use interface::{Data, Read, UniqueIdentifier, Update};
80use std::sync::Arc;
81use std::{
82    any::{Any, type_name},
83    marker::PhantomData,
84    ops::{Deref, DerefMut},
85};
86
87#[derive(Debug, thiserror::Error)]
88pub enum ArrowError {
89    #[error("cannot open a parquet file")]
90    ArrowToFile(#[from] std::io::Error),
91    #[error("cannot build Arrow data")]
92    ArrowError(#[from] apache_arrow::error::ArrowError),
93    #[error("cannot save data to Parquet")]
94    ParquetError(#[from] parquet::errors::ParquetError),
95    #[error("no record available")]
96    NoRecord,
97    #[error("Field {0} not found")]
98    FieldNotFound(String),
99    #[error("Parsing field {0} failed")]
100    ParseField(String),
101    #[cfg(feature = "matio-rs")]
102    #[error("failed to save data to mat file")]
103    MatFile(#[from] matio_rs::MatioError),
104}
105
106type Result<T> = std::result::Result<T, ArrowError>;
107
108// Buffers 1GB max capacity
109const MAX_CAPACITY_BYTE: usize = 2 << 29;
110
111/// Format to write data to file
112///
113/// Use parquet as the default file format
114pub enum FileFormat {
115    Parquet,
116    #[cfg(feature = "matio-rs")]
117    Matlab(MatFormat),
118}
119impl Default for FileFormat {
120    fn default() -> Self {
121        Self::Parquet
122    }
123}
124/// Matlab data format
125///
126/// The Matlab data format is either `SampleBased` and does not include the time vector
127/// or is `TimeBased` and does include a time vector.
128/// The default format is `SampledBased`
129pub enum MatFormat {
130    SampleBased,
131    TimeBased(f64),
132}
133impl Default for MatFormat {
134    fn default() -> Self {
135        Self::SampleBased
136    }
137}
138
139/// Buffers generic interface
140trait BufferObject: Send + Sync {
141    fn who(&self) -> String;
142    #[allow(dead_code)]
143    fn as_any(&self) -> &dyn Any;
144    fn as_mut_any(&mut self) -> &mut dyn Any;
145    fn into_list(&mut self, n_step: usize, n: usize, data_type: DataType) -> Result<ListArray>;
146}
147
148/// Arrow buffer type match to a dos-actors Data type
149struct ArrowBuffer<U: UniqueIdentifier>(PhantomData<U>);
150impl<T: ArrowNativeType, U: UniqueIdentifier<DataType = Vec<T>>> UniqueIdentifier
151    for ArrowBuffer<U>
152{
153    type DataType = BufferBuilder<T>;
154}
155struct LogData<U: UniqueIdentifier>(<U as UniqueIdentifier>::DataType, PhantomData<U>);
156impl<U: UniqueIdentifier> Deref for LogData<U> {
157    type Target = <U as UniqueIdentifier>::DataType;
158
159    fn deref(&self) -> &Self::Target {
160        &self.0
161    }
162}
163impl<U: UniqueIdentifier> DerefMut for LogData<U> {
164    fn deref_mut(&mut self) -> &mut Self::Target {
165        &mut self.0
166    }
167}
168impl<T, U: UniqueIdentifier<DataType = T>> LogData<U> {
169    pub fn new(data: T) -> Self {
170        Self(data, PhantomData)
171    }
172}
173impl<T, U> BufferObject for LogData<ArrowBuffer<U>>
174where
175    T: ArrowNativeType,
176    U: 'static + Send + Sync + UniqueIdentifier<DataType = Vec<T>>,
177{
178    fn who(&self) -> String {
179        type_name::<U>()
180            .split("<")
181            .map(|x| format!("{}", x.split("::").last().unwrap()))
182            .collect::<Vec<_>>()
183            .join("<")
184    }
185    fn as_any(&self) -> &dyn Any {
186        self
187    }
188    fn as_mut_any(&mut self) -> &mut dyn Any {
189        self
190    }
191    fn into_list(&mut self, n_step: usize, n: usize, data_type: DataType) -> Result<ListArray> {
192        let buffer = &mut *self;
193        let data = ArrayData::builder(data_type.clone())
194            .len(buffer.len())
195            .add_buffer(buffer.finish())
196            .build()?;
197        let offsets = (0..).step_by(n).take(n_step + 1).collect::<Vec<i32>>();
198        let list = ArrayData::builder(DataType::List(Arc::new(Field::new(
199            "values", data_type, false,
200        ))))
201        .len(n_step)
202        .add_buffer(Buffer::from(offsets.to_byte_slice()))
203        .add_child_data(data)
204        .build()?;
205        Ok(ListArray::from(list))
206    }
207}
208
209#[doc(hidden)]
210pub trait BufferDataType {
211    type ArrayType;
212    fn buffer_data_type() -> DataType;
213}
214use paste::paste;
215macro_rules! impl_buffer_types {
216    ( $( ($rs:ty,$arw:expr_2021) ),+ ) => {
217	    $(
218        paste! {
219impl BufferDataType for $rs {
220    type ArrayType = apache_arrow::datatypes::[<$arw Type>];
221    fn buffer_data_type() -> DataType {
222        apache_arrow::datatypes::DataType::$arw
223    }
224}
225        }
226		)+
227    };
228}
229
230impl_buffer_types! {
231(f64,Float64),
232(f32,Float32),
233(i64,Int64),
234(i32,Int32),
235(i16,Int16),
236(i8 ,Int8),
237(u64,UInt64),
238(u32,UInt32),
239(u16,UInt16),
240(u8 ,UInt8)
241}
242
243enum DropOption {
244    Save(Option<String>),
245    NoSave,
246}
247
248mod arrow;
249pub use arrow::{Arrow, ArrowBuilder};
250
251impl Update for Arrow {}
252impl<T, U> Read<U> for Arrow
253where
254    T: ArrowNativeType,
255    U: 'static + UniqueIdentifier<DataType = Vec<T>>,
256{
257    fn read(&mut self, data: Data<U>) {
258        let r = 1 + (self.step as f64 / self.n_entry as f64).floor() as usize;
259        self.step += 1;
260        if r % self.decimation > 0 {
261            return;
262        }
263        if let Some(buffer) = self.data::<T, U>() {
264            buffer.append_slice(&data);
265            self.count += 1;
266            match self.batch_size {
267                Some(batch_size) if self.count % (self.n_entry * batch_size) == 0 => {
268                    self.save();
269                }
270                _ => (),
271            }
272        }
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use std::sync::Arc;
279
280    use apache_arrow::datatypes::Schema;
281    use interface::{Data, Entry, UID};
282
283    use super::*;
284
285    #[test]
286    fn who() {
287        let exp = "gmt_dos_clients_io::optics::dispersed_fringe_sensor::DfsFftFrame<gmt_dos_clients_io::optics::Host<a::b::c>>";
288        let q = exp
289            .split("<")
290            .map(|x| format!("{}", x.split("::").last().unwrap()))
291            .collect::<Vec<_>>()
292            .join("<");
293        dbg!(q);
294    }
295
296    #[test]
297    fn arrow() {
298        let mut arw = Arrow::builder(10).build();
299        #[derive(UID)]
300        pub enum Data {}
301        <Arrow as Entry<Data>>::entry(&mut arw, 1);
302
303        let field = Field::new(
304            "Data",
305            DataType::List(Arc::new(Field::new("values", DataType::Float64, false))),
306            false,
307        );
308        let schema = Arc::new(Schema::new(vec![field]));
309        assert_eq!(arw.record().unwrap().schema(), schema);
310    }
311
312    #[test]
313    fn batch() {
314        //env_logger::init();
315        let n_step = 8;
316        let mut arw = Arrow::builder(n_step).batch_size(n_step / 2).build();
317        #[derive(UID)]
318        pub enum U {}
319        <Arrow as Entry<U>>::entry(&mut arw, 1);
320        for i in 0..n_step {
321            arw.read(Data::<U>::new(vec![i as f64]));
322        }
323    }
324
325    #[test]
326    fn batch2() {
327        //env_logger::init();
328        let n_step = 24;
329        let mut arw = Arrow::builder(n_step).batch_size(4).build();
330        #[derive(UID)]
331        pub enum U {}
332        <Arrow as Entry<U>>::entry(&mut arw, 1);
333        #[derive(UID)]
334        pub enum V {}
335        <Arrow as Entry<V>>::entry(&mut arw, 3);
336        for i in 0..n_step {
337            arw.read(Data::<U>::new(vec![i as f64]));
338            arw.read(Data::<V>::new(vec![(10 * i) as f64; 3]));
339        }
340    }
341}