tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
use super::metrics::{
    CompactionObservabilityCounters, FlushObservabilityCounters, QueryObservabilityCounters,
    RollupObservabilityCounters, WalObservabilityCounters,
};
use super::*;
use crate::storage::StorageHealthSnapshot;
use crate::{
    CompactionObservabilitySnapshot, FlushObservabilitySnapshot, QueryObservabilitySnapshot,
    RetentionObservabilitySnapshot, RollupObservabilitySnapshot, WalObservabilitySnapshot,
};

#[derive(Debug, Clone, Copy)]
struct WalRuntimeSnapshot {
    enabled: bool,
    sync_mode: &'static str,
    acknowledged_writes_durable: bool,
    size_bytes: u64,
    segment_count: u64,
    active_segment: u64,
    highwater_segment: u64,
    highwater_frame: u64,
    durable_highwater_segment: u64,
    durable_highwater_frame: u64,
}

impl WalRuntimeSnapshot {
    fn from_wal(wal: Option<&FramedWal>) -> Self {
        match wal {
            Some(wal) => {
                let highwater = wal.current_appended_highwater();
                let durable_highwater = wal.current_durable_highwater();
                Self {
                    enabled: true,
                    sync_mode: wal.sync_mode().as_str(),
                    acknowledged_writes_durable: wal.sync_mode().acknowledged_writes_are_durable(),
                    size_bytes: wal.total_size_bytes().unwrap_or(0),
                    segment_count: wal.segment_count().unwrap_or(0),
                    active_segment: wal.active_segment(),
                    highwater_segment: highwater.segment,
                    highwater_frame: highwater.frame,
                    durable_highwater_segment: durable_highwater.segment,
                    durable_highwater_frame: durable_highwater.frame,
                }
            }
            None => Self {
                enabled: false,
                sync_mode: "disabled",
                acknowledged_writes_durable: false,
                size_bytes: 0,
                segment_count: 0,
                active_segment: 0,
                highwater_segment: 0,
                highwater_frame: 0,
                durable_highwater_segment: 0,
                durable_highwater_frame: 0,
            },
        }
    }
}

#[derive(Clone, Copy)]
struct WalSnapshotView<'a> {
    counters: &'a WalObservabilityCounters,
    runtime: WalRuntimeSnapshot,
}

#[derive(Clone, Copy)]
struct RetentionSnapshotView<'a> {
    counters: &'a super::metrics::RetentionObservabilityCounters,
    max_observed_timestamp: Option<i64>,
    recency_reference_timestamp: Option<i64>,
    future_skew_window: i64,
}

impl From<WalSnapshotView<'_>> for WalObservabilitySnapshot {
    fn from(value: WalSnapshotView<'_>) -> Self {
        Self {
            enabled: value.runtime.enabled,
            sync_mode: value.runtime.sync_mode.to_string(),
            acknowledged_writes_durable: value.runtime.acknowledged_writes_durable,
            size_bytes: value.runtime.size_bytes,
            segment_count: value.runtime.segment_count,
            active_segment: value.runtime.active_segment,
            highwater_segment: value.runtime.highwater_segment,
            highwater_frame: value.runtime.highwater_frame,
            durable_highwater_segment: value.runtime.durable_highwater_segment,
            durable_highwater_frame: value.runtime.durable_highwater_frame,
            replay_runs_total: value.counters.replay_runs_total.load(Ordering::Relaxed),
            replay_frames_total: value.counters.replay_frames_total.load(Ordering::Relaxed),
            replay_series_definitions_total: value
                .counters
                .replay_series_definitions_total
                .load(Ordering::Relaxed),
            replay_sample_batches_total: value
                .counters
                .replay_sample_batches_total
                .load(Ordering::Relaxed),
            replay_points_total: value.counters.replay_points_total.load(Ordering::Relaxed),
            replay_errors_total: value.counters.replay_errors_total.load(Ordering::Relaxed),
            replay_duration_nanos_total: value
                .counters
                .replay_duration_nanos_total
                .load(Ordering::Relaxed),
            append_series_definitions_total: value
                .counters
                .append_series_definitions_total
                .load(Ordering::Relaxed),
            append_sample_batches_total: value
                .counters
                .append_sample_batches_total
                .load(Ordering::Relaxed),
            append_points_total: value.counters.append_points_total.load(Ordering::Relaxed),
            append_bytes_total: value.counters.append_bytes_total.load(Ordering::Relaxed),
            append_errors_total: value.counters.append_errors_total.load(Ordering::Relaxed),
            resets_total: value.counters.resets_total.load(Ordering::Relaxed),
            reset_errors_total: value.counters.reset_errors_total.load(Ordering::Relaxed),
        }
    }
}

impl From<RetentionSnapshotView<'_>> for RetentionObservabilitySnapshot {
    fn from(value: RetentionSnapshotView<'_>) -> Self {
        let future_skew_max_timestamp = value
            .counters
            .future_skew_max_timestamp
            .load(Ordering::Relaxed);
        Self {
            max_observed_timestamp: value.max_observed_timestamp,
            recency_reference_timestamp: value.recency_reference_timestamp,
            future_skew_window: value.future_skew_window,
            future_skew_points_total: value
                .counters
                .future_skew_points_total
                .load(Ordering::Relaxed),
            future_skew_max_timestamp: (future_skew_max_timestamp != i64::MIN)
                .then_some(future_skew_max_timestamp),
        }
    }
}

impl From<&FlushObservabilityCounters> for FlushObservabilitySnapshot {
    fn from(counters: &FlushObservabilityCounters) -> Self {
        Self {
            pipeline_runs_total: counters.pipeline_runs_total.load(Ordering::Relaxed),
            pipeline_success_total: counters.pipeline_success_total.load(Ordering::Relaxed),
            pipeline_timeout_total: counters.pipeline_timeout_total.load(Ordering::Relaxed),
            pipeline_errors_total: counters.pipeline_errors_total.load(Ordering::Relaxed),
            pipeline_duration_nanos_total: counters
                .pipeline_duration_nanos_total
                .load(Ordering::Relaxed),
            admission_backpressure_delays_total: counters
                .admission_backpressure_delays_total
                .load(Ordering::Relaxed),
            admission_backpressure_delay_nanos_total: counters
                .admission_backpressure_delay_nanos_total
                .load(Ordering::Relaxed),
            admission_pressure_relief_requests_total: counters
                .admission_pressure_relief_requests_total
                .load(Ordering::Relaxed),
            admission_pressure_relief_observed_total: counters
                .admission_pressure_relief_observed_total
                .load(Ordering::Relaxed),
            active_flush_runs_total: counters.active_flush_runs_total.load(Ordering::Relaxed),
            active_flush_errors_total: counters.active_flush_errors_total.load(Ordering::Relaxed),
            active_flushed_series_total: counters
                .active_flushed_series_total
                .load(Ordering::Relaxed),
            active_flushed_chunks_total: counters
                .active_flushed_chunks_total
                .load(Ordering::Relaxed),
            active_flushed_points_total: counters
                .active_flushed_points_total
                .load(Ordering::Relaxed),
            persist_runs_total: counters.persist_runs_total.load(Ordering::Relaxed),
            persist_success_total: counters.persist_success_total.load(Ordering::Relaxed),
            persist_noop_total: counters.persist_noop_total.load(Ordering::Relaxed),
            persist_errors_total: counters.persist_errors_total.load(Ordering::Relaxed),
            persisted_series_total: counters.persisted_series_total.load(Ordering::Relaxed),
            persisted_chunks_total: counters.persisted_chunks_total.load(Ordering::Relaxed),
            persisted_points_total: counters.persisted_points_total.load(Ordering::Relaxed),
            persisted_segments_total: counters.persisted_segments_total.load(Ordering::Relaxed),
            persist_duration_nanos_total: counters
                .persist_duration_nanos_total
                .load(Ordering::Relaxed),
            evicted_sealed_chunks_total: counters
                .evicted_sealed_chunks_total
                .load(Ordering::Relaxed),
            tier_moves_total: counters.tier_moves_total.load(Ordering::Relaxed),
            tier_move_errors_total: counters.tier_move_errors_total.load(Ordering::Relaxed),
            expired_segments_total: counters.expired_segments_total.load(Ordering::Relaxed),
            hot_segments_visible: counters.hot_segments_visible.load(Ordering::Relaxed),
            warm_segments_visible: counters.warm_segments_visible.load(Ordering::Relaxed),
            cold_segments_visible: counters.cold_segments_visible.load(Ordering::Relaxed),
        }
    }
}

impl From<&CompactionObservabilityCounters> for CompactionObservabilitySnapshot {
    fn from(counters: &CompactionObservabilityCounters) -> Self {
        Self {
            runs_total: counters.runs_total.load(Ordering::Relaxed),
            success_total: counters.success_total.load(Ordering::Relaxed),
            noop_total: counters.noop_total.load(Ordering::Relaxed),
            errors_total: counters.errors_total.load(Ordering::Relaxed),
            source_segments_total: counters.source_segments_total.load(Ordering::Relaxed),
            output_segments_total: counters.output_segments_total.load(Ordering::Relaxed),
            source_chunks_total: counters.source_chunks_total.load(Ordering::Relaxed),
            output_chunks_total: counters.output_chunks_total.load(Ordering::Relaxed),
            source_points_total: counters.source_points_total.load(Ordering::Relaxed),
            output_points_total: counters.output_points_total.load(Ordering::Relaxed),
            duration_nanos_total: counters.duration_nanos_total.load(Ordering::Relaxed),
        }
    }
}

impl From<&QueryObservabilityCounters> for QueryObservabilitySnapshot {
    fn from(counters: &QueryObservabilityCounters) -> Self {
        Self {
            select_calls_total: counters.select_calls_total.load(Ordering::Relaxed),
            select_errors_total: counters.select_errors_total.load(Ordering::Relaxed),
            select_duration_nanos_total: counters
                .select_duration_nanos_total
                .load(Ordering::Relaxed),
            select_points_returned_total: counters
                .select_points_returned_total
                .load(Ordering::Relaxed),
            select_with_options_calls_total: counters
                .select_with_options_calls_total
                .load(Ordering::Relaxed),
            select_with_options_errors_total: counters
                .select_with_options_errors_total
                .load(Ordering::Relaxed),
            select_with_options_duration_nanos_total: counters
                .select_with_options_duration_nanos_total
                .load(Ordering::Relaxed),
            select_with_options_points_returned_total: counters
                .select_with_options_points_returned_total
                .load(Ordering::Relaxed),
            select_all_calls_total: counters.select_all_calls_total.load(Ordering::Relaxed),
            select_all_errors_total: counters.select_all_errors_total.load(Ordering::Relaxed),
            select_all_duration_nanos_total: counters
                .select_all_duration_nanos_total
                .load(Ordering::Relaxed),
            select_all_series_returned_total: counters
                .select_all_series_returned_total
                .load(Ordering::Relaxed),
            select_all_points_returned_total: counters
                .select_all_points_returned_total
                .load(Ordering::Relaxed),
            select_series_calls_total: counters.select_series_calls_total.load(Ordering::Relaxed),
            select_series_errors_total: counters.select_series_errors_total.load(Ordering::Relaxed),
            select_series_duration_nanos_total: counters
                .select_series_duration_nanos_total
                .load(Ordering::Relaxed),
            select_series_returned_total: counters
                .select_series_returned_total
                .load(Ordering::Relaxed),
            merge_path_queries_total: counters.merge_path_queries_total.load(Ordering::Relaxed),
            merge_path_shard_snapshots_total: counters
                .merge_path_shard_snapshots_total
                .load(Ordering::Relaxed),
            merge_path_shard_snapshot_wait_nanos_total: counters
                .merge_path_shard_snapshot_wait_nanos_total
                .load(Ordering::Relaxed),
            merge_path_shard_snapshot_hold_nanos_total: counters
                .merge_path_shard_snapshot_hold_nanos_total
                .load(Ordering::Relaxed),
            append_sort_path_queries_total: counters
                .append_sort_path_queries_total
                .load(Ordering::Relaxed),
            hot_only_query_plans_total: counters.hot_only_query_plans_total.load(Ordering::Relaxed),
            warm_tier_query_plans_total: counters
                .warm_tier_query_plans_total
                .load(Ordering::Relaxed),
            cold_tier_query_plans_total: counters
                .cold_tier_query_plans_total
                .load(Ordering::Relaxed),
            hot_tier_persisted_chunks_read_total: counters
                .hot_tier_persisted_chunks_read_total
                .load(Ordering::Relaxed),
            warm_tier_persisted_chunks_read_total: counters
                .warm_tier_persisted_chunks_read_total
                .load(Ordering::Relaxed),
            cold_tier_persisted_chunks_read_total: counters
                .cold_tier_persisted_chunks_read_total
                .load(Ordering::Relaxed),
            warm_tier_fetch_duration_nanos_total: counters
                .warm_tier_fetch_duration_nanos_total
                .load(Ordering::Relaxed),
            cold_tier_fetch_duration_nanos_total: counters
                .cold_tier_fetch_duration_nanos_total
                .load(Ordering::Relaxed),
            rollup_query_plans_total: counters.rollup_query_plans_total.load(Ordering::Relaxed),
            partial_rollup_query_plans_total: counters
                .partial_rollup_query_plans_total
                .load(Ordering::Relaxed),
            rollup_points_read_total: counters.rollup_points_read_total.load(Ordering::Relaxed),
        }
    }
}

impl From<&RollupObservabilityCounters> for RollupObservabilitySnapshot {
    fn from(counters: &RollupObservabilityCounters) -> Self {
        Self {
            worker_runs_total: counters.worker_runs_total.load(Ordering::Relaxed),
            worker_success_total: counters.worker_success_total.load(Ordering::Relaxed),
            worker_errors_total: counters.worker_errors_total.load(Ordering::Relaxed),
            policy_runs_total: counters.policy_runs_total.load(Ordering::Relaxed),
            buckets_materialized_total: counters.buckets_materialized_total.load(Ordering::Relaxed),
            points_materialized_total: counters.points_materialized_total.load(Ordering::Relaxed),
            last_run_duration_nanos: counters.last_run_duration_nanos.load(Ordering::Relaxed),
            policies: Vec::new(),
        }
    }
}

impl ChunkStorage {
    pub(super) fn observability_snapshot_impl(&self) -> StorageObservabilitySnapshot {
        let now_unix_ms = current_unix_millis_u64();
        let last_refresh_attempt_unix_ms = {
            let ts = self
                .observability
                .remote
                .last_refresh_attempt_unix_ms
                .load(Ordering::Relaxed);
            (ts > 0).then_some(ts)
        };
        let last_successful_refresh_unix_ms = {
            let ts = self
                .observability
                .remote
                .last_successful_refresh_unix_ms
                .load(Ordering::Relaxed);
            (ts > 0).then_some(ts)
        };
        let next_refresh_retry_unix_ms = {
            let ts = self
                .observability
                .remote
                .next_refresh_retry_unix_ms
                .load(Ordering::Relaxed);
            (ts > 0).then_some(ts)
        };
        StorageObservabilitySnapshot {
            memory: self.memory_observability_snapshot(),
            wal: WalObservabilitySnapshot::from(WalSnapshotView {
                counters: &self.observability.wal,
                runtime: WalRuntimeSnapshot::from_wal(self.persisted.wal.as_ref()),
            }),
            retention: RetentionObservabilitySnapshot::from(RetentionSnapshotView {
                counters: &self.observability.retention,
                max_observed_timestamp: self.max_observed_timestamp(),
                recency_reference_timestamp: self.retention_recency_reference_timestamp(),
                future_skew_window: self.runtime.future_skew_window,
            }),
            flush: FlushObservabilitySnapshot::from(&self.observability.flush),
            compaction: CompactionObservabilitySnapshot::from(&self.observability.compaction),
            query: QueryObservabilitySnapshot::from(&self.observability.query),
            rollups: self.rollup_observability_snapshot(),
            remote: RemoteStorageObservabilitySnapshot {
                enabled: self.persisted.tiered_storage.is_some(),
                runtime_mode: self.runtime.runtime_mode,
                cache_policy: self.persisted.remote_segment_cache_policy,
                metadata_refresh_interval_ms: u64::try_from(
                    self.persisted.remote_segment_refresh_interval.as_millis(),
                )
                .unwrap_or(u64::MAX),
                mirror_hot_segments: self
                    .persisted
                    .tiered_storage
                    .as_ref()
                    .is_some_and(|config| config.mirror_hot_segments),
                catalog_refreshes_total: self
                    .observability
                    .remote
                    .catalog_refreshes_total
                    .load(Ordering::Relaxed),
                catalog_refresh_errors_total: self
                    .observability
                    .remote
                    .catalog_refresh_errors_total
                    .load(Ordering::Relaxed),
                accessible: self
                    .observability
                    .remote
                    .accessible_or_default(self.persisted.tiered_storage.is_some()),
                last_refresh_attempt_unix_ms,
                last_successful_refresh_unix_ms,
                consecutive_refresh_failures: self
                    .observability
                    .remote
                    .consecutive_refresh_failures
                    .load(Ordering::Relaxed),
                next_refresh_retry_unix_ms,
                backoff_active: next_refresh_retry_unix_ms
                    .is_some_and(|retry_at| retry_at > now_unix_ms),
                last_refresh_error: self.observability.remote.last_refresh_error.read().clone(),
            },
            health: StorageHealthSnapshot {
                background_errors_total: self
                    .observability
                    .health
                    .background_errors_total
                    .load(Ordering::Relaxed),
                maintenance_errors_total: self
                    .observability
                    .health
                    .maintenance_errors_total
                    .load(Ordering::Relaxed),
                degraded: self
                    .observability
                    .health
                    .background_errors_total
                    .load(Ordering::Relaxed)
                    > 0
                    || self
                        .observability
                        .health
                        .maintenance_errors_total
                        .load(Ordering::Relaxed)
                        > 0
                    || (self.persisted.tiered_storage.is_some()
                        && !self.observability.remote.accessible_or_default(true)),
                fail_fast_enabled: self.background.fail_fast_enabled,
                fail_fast_triggered: self
                    .observability
                    .health
                    .fail_fast_triggered
                    .load(Ordering::SeqCst),
                last_background_error: self
                    .observability
                    .health
                    .last_background_error
                    .read()
                    .clone(),
                last_maintenance_error: self
                    .observability
                    .health
                    .last_maintenance_error
                    .read()
                    .clone(),
            },
        }
    }
}