1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
use raphtory::core::Prop;
use raphtory::db::graph::Graph;
use raphtory::db::view_api::internal::GraphViewInternalOps;
use raphtory::db::view_api::GraphViewOps;
use crate::graph_loader::source::csv_loader::CsvLoader;
use chrono::NaiveDateTime;
use serde::Deserialize;
use std::collections::HashMap;
use std::fs::File;
use std::io::{copy, Cursor};
use std::path::PathBuf;
use std::time::Duration;
use std::{fs, time::Instant};

#[derive(Deserialize, std::fmt::Debug)]
pub struct StableCoin {
    block_number: String,
    transaction_index: u32,
    from_address: String,
    to_address: String,
    time_stamp: i64,
    contract_address: String,
    value: f64,
}

fn fetch_file(file_path: PathBuf, timeout: u64) -> Result<(), Box<dyn std::error::Error>> {
    let client = reqwest::blocking::Client::builder()
        .timeout(Duration::from_secs(timeout))
        .build()?;
    let response = client
        .get("https://raw.githubusercontent.com/Raphtory/Data/main/token_transfers.csv")
        .send()?;
    let mut content = Cursor::new(response.bytes()?);
    let mut file = File::create(&file_path)?;
    copy(&mut content, &mut file)?;

    Ok(())
}

pub fn stable_coin_graph(path: Option<String>, num_shards: usize) -> Graph {
    let default_data_dir: PathBuf = PathBuf::from("/tmp/stablecoin");

    let data_dir = match path {
        Some(path) => PathBuf::from(path),
        None => default_data_dir,
    };

    let dir_str = data_dir.to_str().unwrap();
    fs::create_dir_all(dir_str).expect(&format!("Failed to create directory {}", dir_str));

    if !data_dir.join("token_transfers.csv").exists() {
        fetch_file(data_dir.join("token_transfers.csv"), 10).expect("Failed to fetch stable coin data: https://raw.githubusercontent.com/Raphtory/Data/main/token_transfers.csv");
    }

    let encoded_data_dir = data_dir.join("graphdb.bincode");

    fn restore_from_bincode(encoded_data_dir: &PathBuf) -> Option<Graph> {
        if encoded_data_dir.exists() {
            let now = Instant::now();
            let g = Graph::load_from_file(encoded_data_dir.as_path())
                .map_err(|err| {
                    println!(
                        "Restoring from bincode failed with error: {}! Reloading file!",
                        err
                    )
                })
                .ok()?;

            println!(
                "Loaded graph with {} shards from encoded data files {} with {} vertices, {} edges which took {} seconds",
                g.num_shards(),
                encoded_data_dir.to_str().unwrap(),
                g.num_vertices(),
                g.num_edges(),
                now.elapsed().as_secs()
            );

            Some(g)
        } else {
            None
        }
    }

    let g = restore_from_bincode(&encoded_data_dir).unwrap_or_else(|| {
        let g = Graph::new(num_shards);
        let now = Instant::now();

        let contract_addr_labels = HashMap::from([
            ("0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48", "USD"),
            ("0xd2877702675e6ceb975b4a1dff9fb7baf4c91ea9", "LUNC"),
            ("0xa47c8bf37f92abed4a126bda807a7b7498661acd", "USTC"),
            ("0x6b175474e89094c44da98b954eedeac495271d0f", "Dai"),
            ("0xdac17f958d2ee523a2206206994597c13d831ec7", "USDT"),
            ("0x8e870d67f660d95d5be530380d0ec0bd388289e1", "USDP"),
        ]);

        CsvLoader::new(data_dir)
            .set_header(true)
            .set_delimiter(",")
            .load_into_graph(&g, |stablecoin: StableCoin, g: &Graph| {
                let s: &str = &stablecoin.contract_address;
                let label = *contract_addr_labels.get(s).unwrap();
                g.add_edge(
                    NaiveDateTime::from_timestamp_opt(stablecoin.time_stamp, 0).unwrap(),
                    stablecoin.from_address,
                    stablecoin.to_address,
                    &vec![("value".into(), Prop::F64(stablecoin.value.into()))],
                    Some(label),
                )
                .expect("Failed to add edge");
            })
            .expect("Failed to load graph from CSV data files");

        println!(
            "Loaded graph with {} shards from CSV data files {} with {} vertices, {} edges which took {} seconds",
            g.num_shards(),
            encoded_data_dir.to_str().unwrap(),
            g.num_vertices(),
            g.num_edges(),
            now.elapsed().as_secs()
        );

        g.save_to_file(encoded_data_dir)
            .expect("Failed to save graph");

        g
    });

    g
}