cfpyo3_core/df/io/
buffer.rs1use crate::df::meta::align_nbytes;
2use crate::df::DataFrame;
3use crate::df::{COLUMNS_NBYTES, INDEX_NBYTES};
4use crate::toolkit::array::AFloat;
5use crate::toolkit::convert::{from_vec, to_nbytes};
6use anyhow::Result;
7use bytes::Buf;
8
9fn extract_bytes<T: Sized>(buf: &mut impl Buf, nbytes: usize) -> Vec<T> {
10 let mut vec_u8 = vec![0; nbytes];
12 buf.copy_to_slice(&mut vec_u8);
13 let remainder = align_nbytes(nbytes) - nbytes;
14 if remainder != 0 {
15 buf.advance(remainder);
16 }
17 unsafe { from_vec(vec_u8) }
18}
19
20impl<'a, T: AFloat> DataFrame<'a, T> {
21 pub fn from_buffer(mut buf: impl Buf) -> Result<Self> {
35 let index_nbytes = buf.get_i64_le() as usize;
36 let columns_nbytes = buf.get_i64_le() as usize;
37
38 let index_shape = index_nbytes / INDEX_NBYTES;
39 let columns_shape = columns_nbytes / COLUMNS_NBYTES;
40
41 let index_vec = extract_bytes(&mut buf, index_nbytes);
42 let columns_vec = extract_bytes(&mut buf, columns_nbytes);
43 let values_nbytes = to_nbytes::<T>(index_shape * columns_shape);
44 let values_vec = extract_bytes(&mut buf, values_nbytes);
45
46 assert!(
47 buf.remaining() == 0,
48 "internal error: buffer not fully consumed"
49 );
50
51 DataFrame::from_vec(index_vec, columns_vec, values_vec)
52 }
53}
54
55#[cfg(test)]
56mod tests {
57 use super::*;
58 use crate::df::io::bytes::tests::get_test_df;
59
60 #[test]
61 fn test_buffer_io() {
62 let df = get_test_df();
63 let bytes = df.to_bytes().unwrap();
64 let loaded = DataFrame::<f32>::from_buffer(bytes.as_slice()).unwrap();
65 assert_eq!(df.index(), loaded.index());
66 assert_eq!(df.columns(), loaded.columns());
67 assert_eq!(df.values(), loaded.values());
68 }
69}