1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::{future::Future, mem};
use serde::Serialize;
use tokio::time::{Duration, Instant};
use crate::{error::Result, insert::Insert, introspection::Reflection, Client};
const DEFAULT_MAX_ENTRIES: u64 = 250_000;
const DEFAULT_MAX_DURATION: Duration = Duration::from_secs(10);
const MAX_TIME_BIAS: f64 = 0.10;
pub struct Inserter<T> {
client: Client,
table: String,
max_entries: u64,
max_duration: Duration,
insert: Insert<T>,
next_insert_at: Instant,
committed: Quantities,
uncommitted_entries: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Quantities {
pub entries: u64,
pub transactions: u64,
}
impl Quantities {
pub const ZERO: Quantities = Quantities {
entries: 0,
transactions: 0,
};
}
impl<T> Inserter<T>
where
T: Reflection,
{
pub fn new(client: &Client, table: &str) -> Result<Self> {
Ok(Self {
client: client.clone(),
table: table.into(),
max_entries: DEFAULT_MAX_ENTRIES,
max_duration: DEFAULT_MAX_DURATION,
insert: client.insert(table)?,
next_insert_at: Instant::now() + DEFAULT_MAX_DURATION,
committed: Quantities::ZERO,
uncommitted_entries: 0,
})
}
pub fn with_max_entries(mut self, threshold: u64) -> Self {
self.set_max_entries(threshold);
self
}
pub fn with_max_duration(mut self, threshold: Duration) -> Self {
self.set_max_duration(threshold);
self
}
pub fn set_max_entries(&mut self, threshold: u64) {
self.max_entries = threshold;
}
pub fn set_max_duration(&mut self, threshold: Duration) {
let prev_insert_at = self
.next_insert_at
.checked_sub(self.max_duration)
.unwrap_or_else(Instant::now);
self.next_insert_at = prev_insert_at + threshold;
self.max_duration = threshold;
}
#[inline]
pub fn write<'a>(&'a mut self, row: &T) -> impl Future<Output = Result<()>> + 'a + Send
where
T: Serialize,
{
self.uncommitted_entries += 1;
let fut = self.insert.write(row);
async move { fut.await }
}
pub async fn commit(&mut self) -> Result<Quantities> {
if self.uncommitted_entries > 0 {
self.committed.entries += self.uncommitted_entries;
self.committed.transactions += 1;
self.uncommitted_entries = 0;
}
let now = Instant::now();
Ok(if self.is_threshold_reached(now) {
self.next_insert_at = shifted_next_time(now, self.next_insert_at, self.max_duration);
let new_insert = self.client.insert(&self.table)?;
let insert = mem::replace(&mut self.insert, new_insert);
insert.end().await?;
mem::replace(&mut self.committed, Quantities::ZERO)
} else {
Quantities::ZERO
})
}
pub async fn end(self) -> Result<Quantities> {
self.insert.end().await?;
Ok(self.committed)
}
fn is_threshold_reached(&self, now: Instant) -> bool {
self.committed.entries >= self.max_entries || now >= self.next_insert_at
}
}
fn shifted_next_time(now: Instant, prev: Instant, max_duration: Duration) -> Instant {
const MAX_TIME_BIAS_255: u32 = (MAX_TIME_BIAS * 255. + 0.5) as u32;
let coef = (now.max(prev) - now.min(prev)).subsec_nanos() & 0xff;
let max_bias = max_duration * MAX_TIME_BIAS_255 / 255;
let bias = max_bias * coef / 255;
prev + max_duration + 2 * bias - max_bias
}