typed_clickhouse/
inserter.rs1use std::{future::Future, mem};
2
3use serde::Serialize;
4use tokio::time::{Duration, Instant};
5
6use crate::{error::Result, insert::Insert, introspection::Reflection, Client};
7
8const DEFAULT_MAX_ENTRIES: u64 = 250_000;
9const DEFAULT_MAX_DURATION: Duration = Duration::from_secs(10);
10const MAX_TIME_BIAS: f64 = 0.10; pub struct Inserter<T> {
13 client: Client,
14 table: String,
15 max_entries: u64,
16 max_duration: Duration,
17 insert: Insert<T>,
18 next_insert_at: Instant,
19 committed: Quantities,
20 uncommitted_entries: u64,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct Quantities {
25 pub entries: u64,
26 pub transactions: u64,
27}
28
29impl Quantities {
30 pub const ZERO: Quantities = Quantities {
31 entries: 0,
32 transactions: 0,
33 };
34}
35
36impl<T> Inserter<T>
37where
38 T: Reflection,
39{
40 pub fn new(client: &Client, table: &str) -> Result<Self> {
41 Ok(Self {
42 client: client.clone(),
43 table: table.into(),
44 max_entries: DEFAULT_MAX_ENTRIES,
45 max_duration: DEFAULT_MAX_DURATION,
46 insert: client.insert(table)?,
47 next_insert_at: Instant::now() + DEFAULT_MAX_DURATION,
48 committed: Quantities::ZERO,
49 uncommitted_entries: 0,
50 })
51 }
52
53 pub fn with_max_entries(mut self, threshold: u64) -> Self {
54 self.set_max_entries(threshold);
55 self
56 }
57
58 pub fn with_max_duration(mut self, threshold: Duration) -> Self {
59 self.set_max_duration(threshold);
60 self
61 }
62
63 pub fn set_max_entries(&mut self, threshold: u64) {
64 self.max_entries = threshold;
65 }
66
67 pub fn set_max_duration(&mut self, threshold: Duration) {
68 let prev_insert_at = self
69 .next_insert_at
70 .checked_sub(self.max_duration)
71 .unwrap_or_else(Instant::now);
72 self.next_insert_at = prev_insert_at + threshold;
73 self.max_duration = threshold;
74 }
75
76 #[inline]
77 pub fn write<'a>(&'a mut self, row: &T) -> impl Future<Output = Result<()>> + 'a + Send
78 where
79 T: Serialize,
80 {
81 self.uncommitted_entries += 1;
82 let fut = self.insert.write(row);
83 async move { fut.await }
84 }
85
86 pub async fn commit(&mut self) -> Result<Quantities> {
87 if self.uncommitted_entries > 0 {
88 self.committed.entries += self.uncommitted_entries;
89 self.committed.transactions += 1;
90 self.uncommitted_entries = 0;
91 }
92
93 let now = Instant::now();
94
95 Ok(if self.is_threshold_reached(now) {
96 self.next_insert_at = shifted_next_time(now, self.next_insert_at, self.max_duration);
97 let new_insert = self.client.insert(&self.table)?; let insert = mem::replace(&mut self.insert, new_insert);
99 insert.end().await?;
100 mem::replace(&mut self.committed, Quantities::ZERO)
101 } else {
102 Quantities::ZERO
103 })
104 }
105
106 pub async fn end(self) -> Result<Quantities> {
107 self.insert.end().await?;
108 Ok(self.committed)
109 }
110
111 fn is_threshold_reached(&self, now: Instant) -> bool {
112 self.committed.entries >= self.max_entries || now >= self.next_insert_at
113 }
114}
115
116fn shifted_next_time(now: Instant, prev: Instant, max_duration: Duration) -> Instant {
117 const MAX_TIME_BIAS_255: u32 = (MAX_TIME_BIAS * 255. + 0.5) as u32;
118
119 let coef = (now.max(prev) - now.min(prev)).subsec_nanos() & 0xff;
120 let max_bias = max_duration * MAX_TIME_BIAS_255 / 255;
121 let bias = max_bias * coef / 255;
122
123 prev + max_duration + 2 * bias - max_bias
124}