cfpyo3_core/df/io/
buffer.rs

1use crate::df::meta::align_nbytes;
2use crate::df::DataFrame;
3use crate::df::{COLUMNS_NBYTES, INDEX_NBYTES};
4use crate::toolkit::array::AFloat;
5use crate::toolkit::convert::{from_vec, to_nbytes};
6use anyhow::Result;
7use bytes::Buf;
8
9fn extract_bytes<T: Sized>(buf: &mut impl Buf, nbytes: usize) -> Vec<T> {
10    // `advance` will happen inside `copy_to_bytes`
11    let mut vec_u8 = vec![0; nbytes];
12    buf.copy_to_slice(&mut vec_u8);
13    let remainder = align_nbytes(nbytes) - nbytes;
14    if remainder != 0 {
15        buf.advance(remainder);
16    }
17    unsafe { from_vec(vec_u8) }
18}
19
20impl<'a, T: AFloat> DataFrame<'a, T> {
21    /// Create an **owned** [`DataFrame`] from a [`Buf`], whose underlying bytes are returned from the [`DataFrame::to_bytes`] method.
22    ///
23    /// Since the returned [`DataFrame`] is owned:
24    /// - it is safe to use it anyway you like.
25    /// - copying is occurred during this method.
26    ///
27    /// If you want a zero-copy loading, you can try to use the [`DataFrame::from_bytes`] method with your [`Buf`].
28    ///
29    /// # Panics
30    ///
31    /// This method will panic if:
32    /// - the bytes behind the `buf` is not of the desired memory layout.
33    /// - the `buf` is not fully consumed after loading.
34    pub fn from_buffer(mut buf: impl Buf) -> Result<Self> {
35        let index_nbytes = buf.get_i64_le() as usize;
36        let columns_nbytes = buf.get_i64_le() as usize;
37
38        let index_shape = index_nbytes / INDEX_NBYTES;
39        let columns_shape = columns_nbytes / COLUMNS_NBYTES;
40
41        let index_vec = extract_bytes(&mut buf, index_nbytes);
42        let columns_vec = extract_bytes(&mut buf, columns_nbytes);
43        let values_nbytes = to_nbytes::<T>(index_shape * columns_shape);
44        let values_vec = extract_bytes(&mut buf, values_nbytes);
45
46        assert!(
47            buf.remaining() == 0,
48            "internal error: buffer not fully consumed"
49        );
50
51        DataFrame::from_vec(index_vec, columns_vec, values_vec)
52    }
53}
54
55#[cfg(test)]
56mod tests {
57    use super::*;
58    use crate::df::io::bytes::tests::get_test_df;
59
60    #[test]
61    fn test_buffer_io() {
62        let df = get_test_df();
63        let bytes = df.to_bytes().unwrap();
64        let loaded = DataFrame::<f32>::from_buffer(bytes.as_slice()).unwrap();
65        assert_eq!(df.index(), loaded.index());
66        assert_eq!(df.columns(), loaded.columns());
67        assert_eq!(df.values(), loaded.values());
68    }
69}