1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use duckdb::arrow::array::{Array, UInt32Array, UInt8Array};
use duckdb::arrow::record_batch::RecordBatch;
use duckdb::Connection;
use polars::frame::DataFrame;
use polars::prelude::NamedFrom;
use polars::series::Series;
use pregel_rs::pregel::Column;
use rayon::iter::ParallelIterator;
use rayon::prelude::IntoParallelIterator;
use std::path::Path;
use strum::IntoEnumIterator;
use wikidata_rs::dtype::DataType;

use super::Backend;

pub struct DuckDB;

/// The `DuckDB` block defines a Rust module that contains `import` and `export`.
impl Backend for DuckDB {
    /// This function retrieves data from a DuckDB database and returns it as a
    /// DataFrame.
    ///
    /// Arguments:
    ///
    /// * `path`: The path to the DuckDB database file.
    ///
    /// Returns:
    ///
    /// This function returns a `Result<DataFrame, String>`, where the `DataFrame`
    /// is the result of querying and processing data from a DuckDB database, and
    /// the `String` is an error message in case any error occurs during the
    /// execution of the function.
    fn import(path: &str) -> Result<DataFrame, String> {
        let stmt = DataType::iter()
            .map(|dtype| {
                format!(
                    "SELECT src_id, property_id, dst_id, CAST({:} AS UTINYINT) FROM {:}",
                    u8::from(&dtype),
                    dtype.as_ref()
                )
            })
            .collect::<Vec<String>>()
            .join(" UNION ");

        let connection: Connection = match Path::new(path).try_exists() {
            Ok(true) => match Connection::open(Path::new(path)) {
                Ok(connection) => connection,
                Err(_) => return Err(String::from("Cannot connect to the database")),
            },
            _ => return Err(String::from("Make sure you provide an existing path")),
        };

        let mut statement = match connection.prepare(stmt.as_ref()) {
            Ok(statement) => statement,
            Err(error) => return Err(format!("Cannot prepare the provided statement {}", error)),
        };

        let batches: Vec<RecordBatch> = match statement.query_arrow([]) {
            Ok(arrow) => arrow.collect(),
            Err(_) => return Err(String::from("Error executing the Arrow query")),
        };

        Ok(batches
            .into_par_iter()
            .map(|batch| {
                match DataFrame::new(vec![
                    Series::new(
                        Column::Src.as_ref(),
                        // because we know that the first column is the src_id
                        batch
                            .column(0)
                            .as_any()
                            .downcast_ref::<UInt32Array>()
                            .unwrap()
                            .values(),
                    ),
                    Series::new(
                        Column::Custom("property_id").as_ref(),
                        // because we know that the second column is the property_id
                        batch
                            .column(1)
                            .as_any()
                            .downcast_ref::<UInt32Array>()
                            .unwrap()
                            .values(),
                    ),
                    Series::new(
                        Column::Dst.as_ref(),
                        // because we know that the third column is the dst_id
                        batch
                            .column(2)
                            .as_any()
                            .downcast_ref::<UInt32Array>()
                            .unwrap()
                            .values(),
                    ),
                    Series::new(
                        Column::Custom("dtype").as_ref(),
                        // because we know that the fourth column is the dtype
                        batch
                            .column(3)
                            .as_any()
                            .downcast_ref::<UInt8Array>()
                            .unwrap()
                            .values(),
                    ),
                ]) {
                    Ok(tmp_dataframe) => tmp_dataframe,
                    Err(_) => DataFrame::empty(),
                }
            })
            .reduce(DataFrame::empty, |acc, e| acc.vstack(&e).unwrap()))
    }

    fn export(_path: &str, _df: DataFrame) -> Result<(), String> {
        todo!()
    }
}