insert_select/
insert-select.rs1extern crate chrono;
2extern crate clickhouse_driver;
3extern crate tokio;
4extern crate uuid;
5
6use std::net::Ipv4Addr;
7use std::{env, io, time};
8
9use uuid::Uuid;
10
11use clickhouse_driver::prelude::types::Decimal32;
12use clickhouse_driver::prelude::*;
13
14type ServerDate = chrono::DateTime<chrono::Utc>;
15
16#[derive(Debug)]
17struct Blob {
18 id: u64,
19 url: String,
20 date: ServerDate,
21 client: Uuid,
22 ip: Ipv4Addr,
23 value: Decimal32,
24}
25
26impl Deserialize for Blob {
27 fn deserialize(row: Row) -> errors::Result<Self> {
28 let err = || errors::ConversionError::UnsupportedConversion;
29
30 let id: u64 = row.value(0)?.ok_or_else(err)?;
31 let url: &str = row.value(1)?.ok_or_else(err)?;
32 let date: ServerDate = row.value(2)?.ok_or_else(err)?;
33 let client: Uuid = row.value(3)?.ok_or_else(err)?;
34 let ip = row.value(4)?.ok_or_else(err)?;
35 let value: Decimal32 = row.value(5)?.ok_or_else(err)?;
36
37 Ok(Blob {
38 id,
39 date,
40 client,
41 value,
42 url: url.to_string(),
43 ip,
44 })
45 }
46}
47const C: u64 = 10000;
48
49#[tokio::main]
50async fn main() -> Result<(), io::Error> {
51 let ddl = "
52 CREATE TABLE IF NOT EXISTS blob (
53 id UInt64,
54 url String,
55 date DateTime,
56 client UUID,
57 ip IPv4,
58 value Decimal32(2)
59 ) ENGINE=MergeTree PARTITION BY id ORDER BY date";
60
61 let uuid = Uuid::new_v4();
62 let ip: Ipv4Addr = "127.0.0.1".parse().unwrap();
63 let value = Decimal32::from(4000_i32, 2);
64 let now = chrono::offset::Utc::now();
65 let id = vec![0u64, 159, 146, 150];
68 let url = vec![
69 "https://www.rust-lang.org/",
70 "https://tokio.rs/",
71 "https://github.com/ddulesov/",
72 "https://internals.rust-lang.org/",
73 ];
74 let date = vec![now; 4];
75 let client = vec![uuid; 4];
76 let ip = vec![ip; 4];
77 let value = vec![value; 4];
78
79 let block = {
80 Block::new("blob")
81 .add("id", id.clone())
82 .add("url", url.clone())
83 .add("date", date.clone())
84 .add("client", client.clone())
85 .add("ip", ip.clone())
86 .add("value", value.clone())
87 };
88
89 let database_url =
90 env::var("DATABASE_URL").unwrap_or_else(|_| "tcp://localhost:9000?compression=lz4".into());
91
92 let pool = Pool::create(database_url.as_str())?;
93 {
94 let mut start = time::Instant::now();
95 let mut conn = pool.connection().await?;
96 eprintln!("connection establish {} msec", start.elapsed().as_millis());
97 start = time::Instant::now();
98 conn.execute("DROP TABLE IF EXISTS blob").await?;
99 conn.execute(ddl).await?;
100 eprintln!("drop and create table {} msec", start.elapsed().as_millis());
101 start = time::Instant::now();
102 let mut insert = conn.insert(&block).await?;
103 eprintln!("first block insert {} msec", start.elapsed().as_millis());
104 eprintln!("INSERT...");
105 start = time::Instant::now();
106 for _ in 1u64..C {
107 insert.next(&block).await?;
118 }
119
120 insert.commit().await?;
121 eprintln!(
122 "{} block insert {} msec",
123 C - 1,
124 start.elapsed().as_millis()
125 );
126 drop(insert);
128
129 eprintln!("SELECT...");
130 start = time::Instant::now();
131 let mut result = conn
132 .query("SELECT id, url, date, client, ip, value FROM blob LIMIT 30000")
133 .await?;
134
135 while let Some(block) = result.next().await? {
136 eprintln!("fetch block {} msec", start.elapsed().as_millis());
137 for (i, row) in block.iter::<Blob>().enumerate() {
138 if i % 1000 == 0 {
139 println!("{:5} {:?}", i, row);
140 }
141 }
142 start = time::Instant::now();
143 }
144 }
145
146 Ok(())
147}