typed_clickhouse/
inserter.rs

1use std::{future::Future, mem};
2
3use serde::Serialize;
4use tokio::time::{Duration, Instant};
5
6use crate::{error::Result, insert::Insert, introspection::Reflection, Client};
7
8const DEFAULT_MAX_ENTRIES: u64 = 250_000;
9const DEFAULT_MAX_DURATION: Duration = Duration::from_secs(10);
10const MAX_TIME_BIAS: f64 = 0.10; // % of `max_duration`
11
12pub struct Inserter<T> {
13    client: Client,
14    table: String,
15    max_entries: u64,
16    max_duration: Duration,
17    insert: Insert<T>,
18    next_insert_at: Instant,
19    committed: Quantities,
20    uncommitted_entries: u64,
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct Quantities {
25    pub entries: u64,
26    pub transactions: u64,
27}
28
29impl Quantities {
30    pub const ZERO: Quantities = Quantities {
31        entries: 0,
32        transactions: 0,
33    };
34}
35
36impl<T> Inserter<T>
37where
38    T: Reflection,
39{
40    pub fn new(client: &Client, table: &str) -> Result<Self> {
41        Ok(Self {
42            client: client.clone(),
43            table: table.into(),
44            max_entries: DEFAULT_MAX_ENTRIES,
45            max_duration: DEFAULT_MAX_DURATION,
46            insert: client.insert(table)?,
47            next_insert_at: Instant::now() + DEFAULT_MAX_DURATION,
48            committed: Quantities::ZERO,
49            uncommitted_entries: 0,
50        })
51    }
52
53    pub fn with_max_entries(mut self, threshold: u64) -> Self {
54        self.set_max_entries(threshold);
55        self
56    }
57
58    pub fn with_max_duration(mut self, threshold: Duration) -> Self {
59        self.set_max_duration(threshold);
60        self
61    }
62
63    pub fn set_max_entries(&mut self, threshold: u64) {
64        self.max_entries = threshold;
65    }
66
67    pub fn set_max_duration(&mut self, threshold: Duration) {
68        let prev_insert_at = self
69            .next_insert_at
70            .checked_sub(self.max_duration)
71            .unwrap_or_else(Instant::now);
72        self.next_insert_at = prev_insert_at + threshold;
73        self.max_duration = threshold;
74    }
75
76    #[inline]
77    pub fn write<'a>(&'a mut self, row: &T) -> impl Future<Output = Result<()>> + 'a + Send
78    where
79        T: Serialize,
80    {
81        self.uncommitted_entries += 1;
82        let fut = self.insert.write(row);
83        async move { fut.await }
84    }
85
86    pub async fn commit(&mut self) -> Result<Quantities> {
87        if self.uncommitted_entries > 0 {
88            self.committed.entries += self.uncommitted_entries;
89            self.committed.transactions += 1;
90            self.uncommitted_entries = 0;
91        }
92
93        let now = Instant::now();
94
95        Ok(if self.is_threshold_reached(now) {
96            self.next_insert_at = shifted_next_time(now, self.next_insert_at, self.max_duration);
97            let new_insert = self.client.insert(&self.table)?; // Actually it mustn't fail.
98            let insert = mem::replace(&mut self.insert, new_insert);
99            insert.end().await?;
100            mem::replace(&mut self.committed, Quantities::ZERO)
101        } else {
102            Quantities::ZERO
103        })
104    }
105
106    pub async fn end(self) -> Result<Quantities> {
107        self.insert.end().await?;
108        Ok(self.committed)
109    }
110
111    fn is_threshold_reached(&self, now: Instant) -> bool {
112        self.committed.entries >= self.max_entries || now >= self.next_insert_at
113    }
114}
115
116fn shifted_next_time(now: Instant, prev: Instant, max_duration: Duration) -> Instant {
117    const MAX_TIME_BIAS_255: u32 = (MAX_TIME_BIAS * 255. + 0.5) as u32;
118
119    let coef = (now.max(prev) - now.min(prev)).subsec_nanos() & 0xff;
120    let max_bias = max_duration * MAX_TIME_BIAS_255 / 255;
121    let bias = max_bias * coef / 255;
122
123    prev + max_duration + 2 * bias - max_bias
124}