1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
use crate::pregel::ColumnIdentifier::{Custom, Dst, Id, Src};
use duckdb::arrow::array::{Array, Int32Array};
use duckdb::arrow::record_batch::RecordBatch;
use duckdb::Connection;
use polars::prelude::*;
use polars::series::Series;
use std::fmt::{Debug, Display, Formatter};
use std::path::Path;
use std::{error, fmt};

/// The `GraphFrame` type is a struct containing two `DataFrame` fields, `vertices`
/// and `edges`.
///
/// Properties:
///
/// * `vertices`: The `vertices` property is a `DataFrame` that represents the nodes
/// in a graph. It must contain a column named Id.
///
/// * `edges`: The `edges` property is a `DataFrame` that represents the edges of a
/// graph. It must contain -- at least -- two columns: Src and Dst.
pub struct GraphFrame {
    pub vertices: DataFrame,
    pub edges: DataFrame,
}

type Result<T> = std::result::Result<T, GraphFrameError>;

/// `GraphFrameError` is an enum that represents the different types of errors that
/// can occur when working with a `GraphFrame`. It has three variants: `DuckDbError`,
/// `FromPolars` and `MissingColumn`.
#[derive(Debug)]
pub enum GraphFrameError {
    DuckDbError(&'static str),
    FromPolars(PolarsError),
    MissingColumn(MissingColumnError),
}

impl Display for GraphFrameError {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match self {
            GraphFrameError::DuckDbError(error) => Display::fmt(error, f),
            GraphFrameError::FromPolars(error) => Display::fmt(error, f),
            GraphFrameError::MissingColumn(error) => Display::fmt(error, f),
        }
    }
}

impl error::Error for GraphFrameError {
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        match *self {
            GraphFrameError::DuckDbError(_) => None,
            GraphFrameError::FromPolars(ref e) => Some(e),
            GraphFrameError::MissingColumn(_) => None,
        }
    }
}

/// `MissingColumnError` is an enum that represents errors that occur when a
/// required column is missing from a DataFrame. The `Debug` trait allows for easy
///  debugging of the enum by printing its values in a formatted way.
#[derive(Debug)]
pub enum MissingColumnError {
    Id,
    Src,
    Dst,
}

impl Display for MissingColumnError {
    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
        let message = |df, column: &str| {
            format!(
                "The provided {} must contain a column named {} for the Graph to be created",
                df, column
            )
        };

        match self {
            MissingColumnError::Id => write!(f, "{}", message("vertices", Id.as_ref())),
            MissingColumnError::Src => write!(f, "{}", message("edges", Src.as_ref())),
            MissingColumnError::Dst => write!(f, "{}", message("edges", Dst.as_ref())),
        }
    }
}

impl From<PolarsError> for GraphFrameError {
    fn from(err: PolarsError) -> GraphFrameError {
        GraphFrameError::FromPolars(err)
    }
}

impl GraphFrame {
    /// The function creates a new GraphFrame object with given vertices and edges
    /// DataFrames, checking for required columns.
    ///
    /// Arguments:
    ///
    /// * `vertices`: A DataFrame containing information about the vertices of the
    /// graph, such as their IDs and attributes.
    ///
    /// * `edges`: A DataFrame containing information about the edges in the graph. It
    /// should have columns named "src" and "dst" to represent the source and
    /// destination vertices of each edge.
    ///
    /// Returns:
    ///
    /// a `Result<Self>` where `Self` is the `GraphFrame` struct. The `Ok` variant of
    /// the `Result` contains an instance of `GraphFrame` initialized with the provided
    /// `vertices` and `edges` DataFrames. If any of the required columns (`Id`, `Src`,
    /// `Dst`) are missing in the DataFrames, the function returns an `Error`.
    pub fn new(vertices: DataFrame, edges: DataFrame) -> Result<Self> {
        if !vertices.get_column_names().contains(&Id.as_ref()) {
            return Err(GraphFrameError::MissingColumn(MissingColumnError::Id));
        }
        if !edges.get_column_names().contains(&Src.as_ref()) {
            return Err(GraphFrameError::MissingColumn(MissingColumnError::Src));
        }
        if !edges.get_column_names().contains(&Dst.as_ref()) {
            return Err(GraphFrameError::MissingColumn(MissingColumnError::Dst));
        }

        Ok(GraphFrame { vertices, edges })
    }

    /// This function creates a new `GraphFrame` from a given set of edges by selecting
    /// source and destination vertices and concatenating them into a unique set of
    /// vertices.
    ///
    /// Arguments:
    ///
    /// * `edges`: A DataFrame containing the edges of a graph, with at least two
    /// columns named "src" and "dst" representing the source and destination vertices
    /// of each edge.
    ///
    /// Returns:
    ///
    /// The `from_edges` function returns a `Result<Self>` where `Self` is the
    /// `GraphFrame` struct.
    pub fn from_edges(edges: DataFrame) -> Result<Self> {
        let srcs = edges
            .clone()
            .lazy()
            .select([col(Src.as_ref()).alias(Id.as_ref())]);
        let dsts = edges
            .clone()
            .lazy()
            .select([col(Dst.as_ref()).alias(Id.as_ref())]);
        let vertices = concat([srcs, dsts], false, true)?
            .unique(
                Some(vec![Id.as_ref().to_string()]),
                UniqueKeepStrategy::First,
            )
            .collect()?;

        GraphFrame::new(vertices, edges)
    }

    /// This function creates a `GraphFrame` from data stored in a DuckDB database.
    ///
    /// Arguments:
    ///
    /// * `path`: A string representing the path to a DuckDB database file.
    ///
    /// Returns:
    ///
    /// a `Result<Self>` where `Self` is the `GraphFrame` struct.
    pub fn from_duckdb(path: &str) -> Result<Self> {
        let database_path = match Path::new(path).try_exists() {
            Ok(true) => Path::new(path),
            Ok(false) => {
                return Err(GraphFrameError::DuckDbError(
                    "The provided path does not exist",
                ))
            }
            _ => {
                return Err(GraphFrameError::DuckDbError(
                    "Cannot open a connection with the provided Database",
                ))
            }
        };
        let connection = match Connection::open(database_path) {
            Ok(connection) => connection,
            Err(_) => {
                return Err(GraphFrameError::DuckDbError(
                    "Cannot connect to the provided Database",
                ))
            }
        };

        let mut statement = match connection.prepare(
            // TODO: include the rest of the entities
            "select src_id, property_id, dst_id from edge
            union
            select src_id, property_id, dst_id from coordinate
            union
            select src_id, property_id, dst_id from quantity
            union
            select src_id, property_id, dst_id from string
            union
            select src_id, property_id, dst_id from time",
        ) {
            Ok(statement) => statement,
            Err(_) => {
                return Err(GraphFrameError::DuckDbError(
                    "Cannot prepare the provided statement",
                ))
            }
        };

        let batches: Vec<RecordBatch> = match statement.query_arrow([]) {
            Ok(arrow) => arrow.collect(),
            Err(_) => {
                return Err(GraphFrameError::DuckDbError(
                    "Error executing the Arrow query",
                ))
            }
        };

        let mut dataframe = DataFrame::default();
        for batch in batches {
            let src_id = batch.column(0); // TODO: by name?
            let property_id = batch.column(1);
            let src_dst = batch.column(2);

            let srcs = Series::new(
                Src.as_ref(),
                src_id
                    .as_any()
                    .downcast_ref::<Int32Array>()
                    .unwrap()
                    .values()
                    .to_vec(),
            );

            let properties = Series::new(
                Custom("property_id".to_string()).as_ref(),
                property_id
                    .as_any()
                    .downcast_ref::<Int32Array>()
                    .unwrap()
                    .values()
                    .to_vec(),
            );

            let dsts = Series::new(
                Dst.as_ref(),
                src_dst
                    .as_any()
                    .downcast_ref::<Int32Array>()
                    .unwrap()
                    .values()
                    .to_vec(),
            );

            let tmp_dataframe = match DataFrame::new(vec![srcs, properties, dsts]) {
                Ok(tmp_dataframe) => tmp_dataframe,
                Err(_) => return Err(GraphFrameError::DuckDbError("Error creating the DataFrame")),
            };

            dataframe = match dataframe.vstack(&tmp_dataframe) {
                Ok(dataframe) => dataframe,
                Err(_) => {
                    return Err(GraphFrameError::DuckDbError(
                        "Error stacking the DataFrames",
                    ))
                }
            };
        }

        GraphFrame::from_edges(dataframe)
    }

    /// This function calculates the out-degree of each node in a graph represented by
    /// its edges. The out-degree of a node is defined as the number of out-going edges;
    /// that is, edges that have as a source the actual node, and as a destination any
    /// other node in a directed-graph.
    ///
    /// Returns:
    ///
    /// This function returns a `Result` containing a `DataFrame`. The `DataFrame`
    /// contains the out-degree of each node in the graph represented by the `Graph`
    /// object. The original `DataFrame` is preserved; that is, we extend it with
    /// the out-degrees of each node.
    pub fn out_degrees(self) -> PolarsResult<DataFrame> {
        self.edges
            .lazy()
            .groupby([col(Src.as_ref()).alias(Id.as_ref())])
            .agg([count().alias(Custom("out_degree".to_owned()).as_ref())])
            .collect()
    }

    /// This function calculates the in-degree of each node in a graph represented by
    /// its edges. The out-degree of a node is defined as the number of incoming edges;
    /// that is, edges that have as a source any node, and as a destination the node
    /// itself in a directed-graph.
    ///
    /// Returns:
    ///
    /// This function returns a `Result` containing a `DataFrame`. The `DataFrame`
    /// contains the in-degree of each node in the graph represented by the `Graph`
    /// object. The original `DataFrame` is preserved; that is, we extend it with
    /// the in-degrees of each node.
    pub fn in_degrees(self) -> PolarsResult<DataFrame> {
        self.edges
            .lazy()
            .groupby([col(Dst.as_ref())])
            .agg([count().alias(Custom("in_degree".to_owned()).as_ref())])
            .collect()
    }
}

impl Display for GraphFrame {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        write!(
            f,
            "GraphFrame:\nVertices:\n{}\nEdges:\n{}",
            self.vertices, self.edges
        )
    }
}