1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
use emath::History;
use parking_lot::Mutex;
use re_chunk_store::{ChunkStoreDiff, ChunkStoreDiffKind, ChunkStoreEvent};
use re_sorbet::TimestampMetadata;
use saturating_cast::SaturatingCast as _;
/// Statistics about the latency of incoming data to a store.
#[derive(Default)]
pub struct IngestionStatistics {
stats: Mutex<LatencyStats>,
}
impl Clone for IngestionStatistics {
fn clone(&self) -> Self {
Self {
stats: Mutex::new(self.stats.lock().clone()),
}
}
}
impl IngestionStatistics {
#[inline]
pub fn on_events(&self, chunk_timestamps: &TimestampMetadata, events: &[ChunkStoreEvent]) {
re_tracing::profile_function!();
let now_nanos = nanos_since_epoch();
let mut stats = self.stats.lock();
for event in events {
stats.on_new_chunk(now_nanos, chunk_timestamps, &event.diff);
}
}
}
impl IngestionStatistics {
/// The latest (smoothed) reading of the latency of the ingestion pipeline.
pub fn latency_snapshot(&self) -> LatencySnapshot {
self.stats.lock().snapshot()
}
}
/// Statistics about the latency of incoming data to a store.
#[derive(Clone, Debug)]
pub struct LatencyStats {
// All latencies measured in seconds.
/// Full end-to-end latency, from the time the data was logged in the SDK,
/// up until it was added to the store.
e2e: History<f32>,
// All the individual parts:
/// Delay between the time `RowId` was created and the `ChunkId` was created,
/// i.e. the time it took to get the data from the `log` call to be batched by the batcher.
log2chunk: History<f32>,
/// Time between chunk creation and IPC encoding (start of gRPC transmission).
chunk2encode: History<f32>,
/// Time between encoding to IPC and decoding again,
/// e.g. the time it takes to send the data over the network.
transmission: History<f32>,
/// Time from the incoming IPC data being decoded to it being ingested into the store.
decode2ingest: History<f32>,
}
impl Default for LatencyStats {
fn default() -> Self {
let min_samples = 0; // 0: we stop displaying e2e latency if input stops
let max_samples = 8 * 1024; // don't waste too much memory on this - we just need enough to get a good average
let max_age = 1.0; // don't keep too long of a rolling average, or the stats get outdated.
Self {
e2e: History::new(min_samples..max_samples, max_age),
log2chunk: History::new(min_samples..max_samples, max_age),
chunk2encode: History::new(min_samples..max_samples, max_age),
transmission: History::new(min_samples..max_samples, max_age),
decode2ingest: History::new(min_samples..max_samples, max_age),
}
}
}
impl LatencyStats {
fn on_new_chunk(
&mut self,
now_nanos: i64,
chunk_timestamps: &TimestampMetadata,
diff: &ChunkStoreDiff,
) {
if diff.kind != ChunkStoreDiffKind::Addition {
return;
}
let Self {
e2e,
log2chunk,
chunk2encode,
transmission,
decode2ingest,
} = self;
let now = now_nanos as f64 / 1e9;
// We use the chunk id for timing, so we need to get the _original_ id:
let original_chunk_id = diff.split_source.unwrap_or_else(|| diff.chunk.id());
let chunk_creation_nanos = original_chunk_id
.nanos_since_epoch()
.saturating_cast::<i64>();
let TimestampMetadata {
grpc_encoded_at,
grpc_decoded_at,
} = chunk_timestamps;
let grpc_encoded_at_nanos = grpc_encoded_at.and_then(system_time_to_nanos);
let grpc_decoded_at_nanos = grpc_decoded_at.and_then(system_time_to_nanos);
for row_id in diff.chunk.row_ids() {
let row_creation_nanos = row_id.nanos_since_epoch().saturating_cast::<i64>();
// Total:
if let Some(e2e_nanos) = now_nanos.checked_sub(row_creation_nanos) {
e2e.add(now, e2e_nanos as f32 / 1e9);
}
// First step: log() call to chunk creation (batcher latency):
if let Some(log2chunk_nanos) = chunk_creation_nanos.checked_sub(row_creation_nanos) {
log2chunk.add(now, log2chunk_nanos as f32 / 1e9);
}
}
if let Some(grpc_encoded_at_nanos) = grpc_encoded_at_nanos {
chunk2encode.add(
now,
(grpc_encoded_at_nanos - chunk_creation_nanos) as f32 / 1e9,
);
if let Some(grpc_decoded_at_nanos) = grpc_decoded_at_nanos {
transmission.add(
now,
(grpc_decoded_at_nanos - grpc_encoded_at_nanos) as f32 / 1e9,
);
}
}
if let Some(grpc_decoded_at_nanos) = grpc_decoded_at_nanos {
decode2ingest.add(now, (now_nanos - grpc_decoded_at_nanos) as f32 / 1e9);
}
}
/// What is the mean latency between the time data was logged in the SDK and the time it was ingested?
///
/// This is based on the clocks of the viewer and the SDK being in sync,
/// so if the recording was done on another machine, this is likely very inaccurate.
pub fn snapshot(&mut self) -> LatencySnapshot {
let Self {
e2e,
log2chunk,
chunk2encode,
transmission,
decode2ingest,
} = self;
{
// make sure the averages is up-to-date:
let now_nanos = nanos_since_epoch();
let now = now_nanos as f64 / 1e9;
e2e.flush(now);
log2chunk.flush(now);
chunk2encode.flush(now);
transmission.flush(now);
decode2ingest.flush(now);
}
LatencySnapshot {
e2e: e2e.average(),
log2chunk: log2chunk.average(),
chunk2encode: chunk2encode.average(),
transmission: transmission.average(),
decode2ingest: decode2ingest.average(),
}
}
}
fn nanos_since_epoch() -> i64 {
if let Ok(duration_since_epoch) = web_time::SystemTime::UNIX_EPOCH.elapsed() {
duration_since_epoch.as_nanos() as i64
} else {
re_log::warn_once!("Broken system clock: unable to get current time since epoch.");
0
}
}
fn system_time_to_nanos(system_time: web_time::SystemTime) -> Option<i64> {
system_time
.duration_since(web_time::SystemTime::UNIX_EPOCH)
.ok()
.map(|d| d.as_nanos() as i64)
}
/// The latest (smoothed) reading of the latency of the ingestion pipeline.
///
/// All measurements are in seconds, and are only valid if the clocks of the viewer and the SDK are in sync.
///
/// They are `None` if unknown.
#[derive(Clone, Copy, Debug)]
pub struct LatencySnapshot {
/// From the data is logged in the SDK, up until it was added to the store.
pub e2e: Option<f32>,
/// Delay between the time `RowId` was created and the `ChunkId` was created,
/// i.e. the time it took to get the data from the `log` call to be batched by the batcher.
pub log2chunk: Option<f32>,
/// Time between chunk creation and IPC encoding (start of gRPC transmission).
pub chunk2encode: Option<f32>,
/// Time between encoding to IPC and decoding again,
/// e.g. the time it takes to send the data over the network.
pub transmission: Option<f32>,
/// Time from the incoming IPC data being decoded to it being ingested into the store.
pub decode2ingest: Option<f32>,
}