Skip to main content

insert_select/
insert-select.rs

1extern crate chrono;
2extern crate clickhouse_driver;
3extern crate tokio;
4extern crate uuid;
5
6use std::net::Ipv4Addr;
7use std::{env, io, time};
8
9use uuid::Uuid;
10
11use clickhouse_driver::prelude::types::Decimal32;
12use clickhouse_driver::prelude::*;
13
14type ServerDate = chrono::DateTime<chrono::Utc>;
15
16#[derive(Debug)]
17struct Blob {
18    id: u64,
19    url: String,
20    date: ServerDate,
21    client: Uuid,
22    ip: Ipv4Addr,
23    value: Decimal32,
24}
25
26impl Deserialize for Blob {
27    fn deserialize(row: Row) -> errors::Result<Self> {
28        let err = || errors::ConversionError::UnsupportedConversion;
29
30        let id: u64 = row.value(0)?.ok_or_else(err)?;
31        let url: &str = row.value(1)?.ok_or_else(err)?;
32        let date: ServerDate = row.value(2)?.ok_or_else(err)?;
33        let client: Uuid = row.value(3)?.ok_or_else(err)?;
34        let ip = row.value(4)?.ok_or_else(err)?;
35        let value: Decimal32 = row.value(5)?.ok_or_else(err)?;
36
37        Ok(Blob {
38            id,
39            date,
40            client,
41            value,
42            url: url.to_string(),
43            ip,
44        })
45    }
46}
47const C: u64 = 10000;
48
49#[tokio::main]
50async fn main() -> Result<(), io::Error> {
51    let ddl = "
52        CREATE TABLE IF NOT EXISTS blob (
53            id          UInt64,
54            url         String,
55            date        DateTime,
56            client      UUID,
57            ip          IPv4,
58            value       Decimal32(2)
59        ) ENGINE=MergeTree PARTITION BY id ORDER BY date";
60
61    let uuid = Uuid::new_v4();
62    let ip: Ipv4Addr = "127.0.0.1".parse().unwrap();
63    let value = Decimal32::from(4000_i32, 2);
64    let now = chrono::offset::Utc::now();
65    //let today = chrono::offset::Utc::today();
66
67    let id = vec![0u64, 159, 146, 150];
68    let url = vec![
69        "https://www.rust-lang.org/",
70        "https://tokio.rs/",
71        "https://github.com/ddulesov/",
72        "https://internals.rust-lang.org/",
73    ];
74    let date = vec![now; 4];
75    let client = vec![uuid; 4];
76    let ip = vec![ip; 4];
77    let value = vec![value; 4];
78
79    let block = {
80        Block::new("blob")
81            .add("id", id.clone())
82            .add("url", url.clone())
83            .add("date", date.clone())
84            .add("client", client.clone())
85            .add("ip", ip.clone())
86            .add("value", value.clone())
87    };
88
89    let database_url =
90        env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
91
92    let pool = Pool::create(database_url.as_str())?;
93    {
94        let mut start = time::Instant::now();
95        let mut conn = pool.connection().await?;
96        eprintln!("connection establish {} msec", start.elapsed().as_millis());
97        start = time::Instant::now();
98        conn.execute("DROP TABLE IF EXISTS blob").await?;
99        conn.execute(ddl).await?;
100        eprintln!("drop and create table {} msec", start.elapsed().as_millis());
101        start = time::Instant::now();
102        let mut insert = conn.insert(&block).await?;
103        eprintln!("first block insert {} msec", start.elapsed().as_millis());
104        eprintln!("INSERT...");
105        start = time::Instant::now();
106        for _ in 1u64..C {
107            // we can use  the same block repeatedly
108            // let block = {
109            //     Block::new("")
110            //         .add("id", id.clone())
111            //         .add("url", url.clone())
112            //         .add("date", date.clone())
113            //         .add("client", client.clone())
114            //         .add("ip", ip.clone())
115            //         .add("value", value.clone())
116            // };
117            insert.next(&block).await?;
118        }
119
120        insert.commit().await?;
121        eprintln!(
122            "{} block insert {} msec",
123            C - 1,
124            start.elapsed().as_millis()
125        );
126        // Stop inserting pipeline before  next query be called
127        drop(insert);
128
129        eprintln!("SELECT...");
130        start = time::Instant::now();
131        let mut result = conn
132            .query("SELECT id, url, date, client, ip, value FROM blob  LIMIT 30000")
133            .await?;
134
135        while let Some(block) = result.next().await? {
136            eprintln!("fetch block {} msec", start.elapsed().as_millis());
137            for (i, row) in block.iter::<Blob>().enumerate() {
138                if i % 1000 == 0 {
139                    println!("{:5} {:?}", i, row);
140                }
141            }
142            start = time::Instant::now();
143        }
144    }
145
146    Ok(())
147}