raphtory_storage/graph/nodes/
row.rs

1use raphtory_api::core::entities::properties::prop::Prop;
2use raphtory_core::storage::node_entry::MemRow;
3
4#[cfg(feature = "storage")]
5use {
6    polars_arrow::datatypes::ArrowDataType,
7    pometry_storage::{
8        graph::TemporalGraph, properties::TemporalProps, timestamps::TimeStamps, tprops::DiskTProp,
9    },
10    raphtory_api::core::{entities::VID, storage::timeindex::TimeIndexEntry},
11};
12
13#[derive(Debug, Copy, Clone)]
14pub enum Row<'a> {
15    Mem(MemRow<'a>),
16    #[cfg(feature = "storage")]
17    Disk(DiskRow<'a>),
18}
19
20impl<'a> IntoIterator for Row<'a> {
21    type Item = (usize, Option<Prop>);
22
23    type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'a>;
24
25    fn into_iter(self) -> Self::IntoIter {
26        match self {
27            Row::Mem(mem_row) => mem_row.into_iter(),
28            #[cfg(feature = "storage")]
29            Row::Disk(disk_row) => disk_row.into_iter(),
30        }
31    }
32}
33
34#[cfg(feature = "storage")]
35#[derive(Debug, Copy, Clone)]
36pub struct DiskRow<'a> {
37    graph: &'a TemporalGraph,
38    ts: TimeStamps<'a, TimeIndexEntry>,
39    layer: usize,
40    row: usize,
41}
42
43#[cfg(feature = "storage")]
44impl<'a> DiskRow<'a> {
45    pub fn new(
46        graph: &'a TemporalGraph,
47        ts: TimeStamps<'a, TimeIndexEntry>,
48        row: usize,
49        layer: usize,
50    ) -> Self {
51        Self {
52            graph,
53            ts,
54            row,
55            layer,
56        }
57    }
58
59    pub fn temporal_props(&'a self) -> &'a TemporalProps<VID> {
60        &self.graph.node_properties().temporal_props()[self.layer]
61    }
62}
63
64#[cfg(feature = "storage")]
65impl<'a> IntoIterator for DiskRow<'a> {
66    type Item = (usize, Option<Prop>);
67
68    type IntoIter = Box<dyn Iterator<Item = Self::Item> + 'a>;
69
70    fn into_iter(self) -> Self::IntoIter {
71        let props = self.temporal_props();
72        let iter = (0..props.prop_dtypes().len()).filter_map(move |prop_id| {
73            let global_prop = self
74                .graph
75                .prop_mapping()
76                .globalise_node_prop_id(self.layer, prop_id)?;
77            let props = self.temporal_props();
78            Some((
79                global_prop,
80                get(
81                    &props.prop_for_ts::<TimeIndexEntry>(self.ts, prop_id),
82                    self.row,
83                ),
84            ))
85        });
86        Box::new(iter)
87    }
88}
89
90#[cfg(feature = "storage")]
91fn get<'a>(disk_col: &DiskTProp<'a, TimeIndexEntry>, row: usize) -> Option<Prop> {
92    use bigdecimal::BigDecimal;
93    use num_traits::FromPrimitive;
94
95    match disk_col {
96        DiskTProp::Empty(_) => None,
97        DiskTProp::Bool(tprop_column) => tprop_column.get(row).map(|p| p.into()),
98        DiskTProp::Str64(tprop_column) => tprop_column.get(row).map(|p| p.into()),
99        DiskTProp::Str32(tprop_column) => tprop_column.get(row).map(|p| p.into()),
100        DiskTProp::Str(tprop_column) => tprop_column.get(row).map(|p| p.into()),
101        DiskTProp::I32(tprop_column) => tprop_column.get(row).map(|p| p.into()),
102        DiskTProp::I64(tprop_column) => tprop_column.get(row).map(|p| p.into()),
103        DiskTProp::U8(tprop_column) => tprop_column.get(row).map(|p| p.into()),
104        DiskTProp::U16(tprop_column) => tprop_column.get(row).map(|p| p.into()),
105        DiskTProp::U32(tprop_column) => tprop_column.get(row).map(|p| p.into()),
106        DiskTProp::U64(tprop_column) => tprop_column.get(row).map(|p| p.into()),
107        DiskTProp::F32(tprop_column) => tprop_column.get(row).map(|p| p.into()),
108        DiskTProp::F64(tprop_column) => tprop_column.get(row).map(|p| p.into()),
109        DiskTProp::I128(tprop_column) => {
110            let d_type = tprop_column.data_type()?;
111            match d_type {
112                ArrowDataType::Decimal(_, scale) => tprop_column.get(row).map(|p| {
113                    BigDecimal::from_i128(p)
114                        .unwrap()
115                        .with_scale(*scale as i64)
116                        .into()
117                }),
118                _ => unimplemented!("{d_type:?} not supported as disk_graph property"),
119            }
120        }
121    }
122}