tsink 0.10.2

A lightweight embedded time-series database with a straightforward API
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
use super::*;
#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct CatalogContext<'a> {
    pub(in crate::engine::storage_engine) metadata_shard_index: Option<&'a MetadataShardIndex>,
    pub(in crate::engine::storage_engine) write_txn_shards:
        &'a [Mutex<()>; REGISTRY_TXN_SHARD_COUNT],
    pub(in crate::engine::storage_engine) registry: &'a RwLock<SeriesRegistry>,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct ChunkContext<'a> {
    pub(in crate::engine::storage_engine) active_builders:
        &'a [ActiveBuilderShard; IN_MEMORY_SHARD_COUNT],
    pub(in crate::engine::storage_engine) sealed_chunks:
        &'a [SealedChunkShard; IN_MEMORY_SHARD_COUNT],
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct CoordinationContext<'a> {
    pub(in crate::engine::storage_engine) lifecycle: &'a AtomicU8,
    pub(in crate::engine::storage_engine) background_fail_fast: bool,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct PersistedRefreshContext<'a> {
    pub(in crate::engine::storage_engine) persisted_refresh_in_progress: &'a AtomicBool,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct WriteSeriesValidationContext<'a> {
    pub(in crate::engine::storage_engine) chunks: ChunkContext<'a>,
    pub(in crate::engine::storage_engine) registry: &'a RwLock<SeriesRegistry>,
    pub(in crate::engine::storage_engine) persisted_index: &'a RwLock<PersistedIndexState>,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct WriteResolveContext<'a> {
    pub(in crate::engine::storage_engine) catalog: CatalogContext<'a>,
    pub(in crate::engine::storage_engine) support: WriteResolveSupportContext<'a>,
    pub(in crate::engine::storage_engine) cardinality_limit: usize,
    pub(in crate::engine::storage_engine) used_bytes: &'a AtomicU64,
    pub(in crate::engine::storage_engine) budget_bytes: &'a AtomicU64,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct WritePrepareContext<'a> {
    pub(in crate::engine::storage_engine) series_validation: WriteSeriesValidationContext<'a>,
    pub(in crate::engine::storage_engine) config: WritePrepareConfigContext,
    pub(in crate::engine::storage_engine) visibility: WritePrepareVisibilityContext<'a>,
    pub(in crate::engine::storage_engine) wal: WritePrepareWalContext<'a>,
    pub(in crate::engine::storage_engine) memory_budget: WritePrepareMemoryBudgetContext<'a>,
    pub(in crate::engine::storage_engine) admission: WriteAdmissionControlContext<'a>,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct WriteApplyContext<'a> {
    pub(in crate::engine::storage_engine) series_validation: WriteSeriesValidationContext<'a>,
    pub(in crate::engine::storage_engine) registry: WriteApplyRegistryContext<'a>,
    pub(in crate::engine::storage_engine) wal: WriteApplyWalContext<'a>,
    pub(in crate::engine::storage_engine) shard_mutation: WriteApplyShardMutationContext<'a>,
    pub(in crate::engine::storage_engine) memory: WriteApplyMemoryAccountingContext<'a>,
    pub(in crate::engine::storage_engine) publication: WriteApplyPublicationContext<'a>,
}

#[derive(Clone, Copy)]
pub(in crate::engine::storage_engine) struct WriteCommitContext<'a> {
    pub(in crate::engine::storage_engine) stage: WriteCommitStageContext<'a>,
    pub(in crate::engine::storage_engine) wal_completion: WriteCommitWalCompletionContext<'a>,
}

impl ChunkStorage {
    pub(in crate::engine::storage_engine) fn catalog_context(&self) -> CatalogContext<'_> {
        CatalogContext {
            metadata_shard_index: self.catalog.metadata_shard_index.as_ref(),
            write_txn_shards: &self.catalog.write_txn_shards,
            registry: &self.catalog.registry,
        }
    }

    pub(in crate::engine::storage_engine) fn chunk_context(&self) -> ChunkContext<'_> {
        ChunkContext {
            active_builders: &self.chunks.active_builders,
            sealed_chunks: &self.chunks.sealed_chunks,
        }
    }

    pub(in crate::engine::storage_engine) fn coordination_context(
        &self,
    ) -> CoordinationContext<'_> {
        CoordinationContext {
            lifecycle: self.coordination.lifecycle.as_ref(),
            background_fail_fast: self.background.fail_fast_enabled,
        }
    }

    pub(in crate::engine::storage_engine) fn persisted_refresh_context(
        &self,
    ) -> PersistedRefreshContext<'_> {
        PersistedRefreshContext {
            persisted_refresh_in_progress: &self.persisted.persisted_refresh_in_progress,
        }
    }

    pub(in crate::engine::storage_engine) fn write_series_validation_context(
        &self,
    ) -> WriteSeriesValidationContext<'_> {
        WriteSeriesValidationContext {
            chunks: self.chunk_context(),
            registry: &self.catalog.registry,
            persisted_index: &self.persisted.persisted_index,
        }
    }

    pub(in crate::engine::storage_engine) fn registry_memory_context(
        &self,
    ) -> RegistryMemoryContext<'_> {
        RegistryMemoryContext {
            accounting_enabled: self.memory.accounting_enabled,
            registry: &self.catalog.registry,
            registry_used_bytes: &self.memory.registry_used_bytes,
            shared_used_bytes: &self.memory.shared_used_bytes,
            used_bytes: &self.memory.used_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn clock_context(&self) -> ClockContext<'_> {
        ClockContext {
            timestamp_precision: self.runtime.timestamp_precision,
            future_skew_window: self.runtime.future_skew_window,
            #[cfg(test)]
            current_time_override: &self.current_time_override,
            #[cfg(not(test))]
            marker: std::marker::PhantomData,
        }
    }

    pub(in crate::engine::storage_engine) fn materialized_series_read_context(
        &self,
    ) -> MaterializedSeriesReadContext<'_> {
        MaterializedSeriesReadContext {
            materialized_series: &self.visibility.materialized_series,
        }
    }

    pub(in crate::engine::storage_engine) fn visibility_cache_read_context(
        &self,
    ) -> VisibilityCacheReadContext<'_> {
        VisibilityCacheReadContext {
            series_visibility_summaries: &self.visibility.series_visibility_summaries,
            series_visible_max_timestamps: &self.visibility.series_visible_max_timestamps,
            series_visible_bounded_max_timestamps: &self
                .visibility
                .series_visible_bounded_max_timestamps,
        }
    }

    pub(in crate::engine::storage_engine) fn worker_wake_context(&self) -> WorkerWakeContext<'_> {
        WorkerWakeContext {
            background: &self.background,
        }
    }

    pub(in crate::engine::storage_engine) fn registry_bookkeeping_context(
        &self,
    ) -> RegistryBookkeepingContext<'_> {
        RegistryBookkeepingContext {
            accounting_enabled: self.memory.accounting_enabled,
            pending_series_ids: &self.catalog.pending_series_ids,
            metadata_used_bytes: &self.memory.metadata_used_bytes,
            shared_used_bytes: &self.memory.shared_used_bytes,
            used_bytes: &self.memory.used_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn shard_memory_accounting_context(
        &self,
    ) -> ShardMemoryAccountingContext<'_> {
        ShardMemoryAccountingContext {
            accounting_enabled: self.memory.accounting_enabled,
            used_bytes_by_shard: &self.memory.used_bytes_by_shard,
            used_bytes: &self.memory.used_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn persisted_sealed_budget_context(
        &self,
    ) -> PersistedSealedBudgetContext<'_> {
        PersistedSealedBudgetContext {
            memory: self.shard_memory_accounting_context(),
            backpressure_lock: &self.memory.backpressure_lock,
            has_persisted_lanes: self.persisted.numeric_lane_path.is_some()
                || self.persisted.blob_lane_path.is_some(),
            persisted_chunk_watermarks: &self.chunks.persisted_chunk_watermarks,
            persisted_index: &self.persisted.persisted_index,
            sealed_chunks: &self.chunks.sealed_chunks,
            flush_metrics: &self.observability.flush,
        }
    }

    pub(in crate::engine::storage_engine) fn series_timestamp_write_context(
        &self,
    ) -> SeriesTimestampWriteContext<'_> {
        SeriesTimestampWriteContext {
            clock: self.clock_context(),
            retention_metrics: &self.observability.retention,
            tombstones: &self.visibility.tombstones,
            series_visibility_summaries: &self.visibility.series_visibility_summaries,
            series_visible_max_timestamps: &self.visibility.series_visible_max_timestamps,
            series_visible_bounded_max_timestamps: &self
                .visibility
                .series_visible_bounded_max_timestamps,
            recency_state_lock: &self.visibility.recency_state_lock,
            max_observed_timestamp: &self.visibility.max_observed_timestamp,
            max_bounded_observed_timestamp: &self.visibility.max_bounded_observed_timestamp,
            live_series_pruning_generation: &self.visibility.live_series_pruning_generation,
            accounting_enabled: self.memory.accounting_enabled,
            metadata_used_bytes: &self.memory.metadata_used_bytes,
            shared_used_bytes: &self.memory.shared_used_bytes,
            used_bytes: &self.memory.used_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn materialized_series_write_context(
        &self,
    ) -> MaterializedSeriesWriteContext<'_> {
        MaterializedSeriesWriteContext {
            accounting_enabled: self.memory.accounting_enabled,
            materialized_series: &self.visibility.materialized_series,
            live_series_pruning_generation: &self.visibility.live_series_pruning_generation,
            metadata_used_bytes: &self.memory.metadata_used_bytes,
            shared_used_bytes: &self.memory.shared_used_bytes,
            used_bytes: &self.memory.used_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn sealed_chunk_publish_context(
        &self,
    ) -> SealedChunkPublishContext<'_> {
        SealedChunkPublishContext {
            sealed_chunks: &self.chunks.sealed_chunks,
            next_chunk_sequence: &self.chunks.next_chunk_sequence,
            memory: self.shard_memory_accounting_context(),
            #[cfg(test)]
            pre_publish_hook: &self.persist_test_hooks.pre_sealed_chunk_publish_hook,
        }
    }

    pub(in crate::engine::storage_engine) fn wal_metrics_context(&self) -> WalMetricsContext<'_> {
        WalMetricsContext {
            metrics: &self.observability.wal,
        }
    }

    #[cfg(test)]
    fn write_apply_test_hooks_context(&self) -> WriteApplyTestHooksContext<'_> {
        WriteApplyTestHooksContext {
            hooks: &self.persist_test_hooks,
        }
    }

    #[cfg(test)]
    fn write_commit_test_hooks_context(&self) -> WriteCommitTestHooksContext<'_> {
        WriteCommitTestHooksContext {
            hooks: &self.persist_test_hooks,
        }
    }

    pub(in crate::engine::storage_engine) fn write_resolve_context(
        &self,
    ) -> WriteResolveContext<'_> {
        WriteResolveContext {
            catalog: self.catalog_context(),
            support: WriteResolveSupportContext {
                registry_memory: self.registry_memory_context(),
            },
            cardinality_limit: self.runtime.cardinality_limit,
            used_bytes: &self.memory.used_bytes,
            budget_bytes: &self.memory.budget_bytes,
        }
    }

    pub(in crate::engine::storage_engine) fn write_prepare_context(
        &self,
    ) -> WritePrepareContext<'_> {
        WritePrepareContext {
            series_validation: self.write_series_validation_context(),
            config: WritePrepareConfigContext {
                chunk_point_cap: self.chunks.chunk_point_cap,
                partition_window: self.runtime.partition_window,
                max_active_partition_heads_per_series: self
                    .runtime
                    .max_active_partition_heads_per_series,
            },
            visibility: WritePrepareVisibilityContext {
                clock: self.clock_context(),
                pending_series_ids: &self.catalog.pending_series_ids,
                has_metadata_shards: self.catalog.metadata_shard_index.is_some(),
                materialized_series: self.materialized_series_read_context(),
                visibility_cache: self.visibility_cache_read_context(),
                max_bounded_observed_timestamp: &self.visibility.max_bounded_observed_timestamp,
                retention_window: self.runtime.retention_window,
                future_skew_window: self.runtime.future_skew_window,
                retention_enforced: self.runtime.retention_enforced,
            },
            wal: WritePrepareWalContext {
                wal: self.persisted.wal.as_ref(),
                wal_size_limit_bytes: self.runtime.wal_size_limit_bytes,
            },
            memory_budget: WritePrepareMemoryBudgetContext {
                used_bytes: &self.memory.used_bytes,
                budget_bytes: &self.memory.budget_bytes,
            },
            admission: WriteAdmissionControlContext {
                budget: self.persisted_sealed_budget_context(),
                workers: self.worker_wake_context(),
                observability: self.observability.as_ref(),
                write_timeout: self.runtime.write_timeout,
                admission_poll_interval: self.runtime.admission_poll_interval,
                admission_backpressure_lock: &self.memory.admission_backpressure_lock,
            },
        }
    }

    pub(in crate::engine::storage_engine) fn write_apply_context(&self) -> WriteApplyContext<'_> {
        let series_validation = self.write_series_validation_context();
        WriteApplyContext {
            series_validation,
            registry: WriteApplyRegistryContext {
                series_validation,
                registry_memory: self.registry_memory_context(),
            },
            wal: WriteApplyWalContext {
                #[cfg(test)]
                test_hooks: self.write_apply_test_hooks_context(),
                #[cfg(test)]
                crash_after_samples_persisted: &self
                    .persist_test_hooks
                    .crash_after_samples_persisted,
                #[cfg(not(test))]
                marker: std::marker::PhantomData,
            },
            shard_mutation: WriteApplyShardMutationContext {
                chunks: self.chunk_context(),
                timestamps: self.series_timestamp_write_context(),
                chunk_point_cap: self.chunks.chunk_point_cap,
                partition_window: self.runtime.partition_window,
                max_active_partition_heads_per_series: self
                    .runtime
                    .max_active_partition_heads_per_series,
            },
            memory: WriteApplyMemoryAccountingContext {
                shards: self.shard_memory_accounting_context(),
            },
            publication: WriteApplyPublicationContext {
                registry_bookkeeping: self.registry_bookkeeping_context(),
                materialized_series: self.materialized_series_write_context(),
                runtime_metadata_delta: self.runtime_metadata_delta_write_context(),
                metadata_shards: self.metadata_shard_publication_context(),
                sealed_chunks: self.sealed_chunk_publish_context(),
            },
        }
    }

    pub(in crate::engine::storage_engine) fn write_commit_context(&self) -> WriteCommitContext<'_> {
        WriteCommitContext {
            stage: WriteCommitStageContext {
                chunks: self.chunk_context(),
                wal: self.persisted.wal.as_ref(),
                wal_metrics: self.wal_metrics_context(),
                #[cfg(test)]
                test_hooks: self.write_commit_test_hooks_context(),
            },
            wal_completion: WriteCommitWalCompletionContext {
                observability: self.observability.as_ref(),
                wal: self.persisted.wal.as_ref(),
                wal_metrics: self.wal_metrics_context(),
                #[cfg(test)]
                crash_before_publish_persisted: &self
                    .persist_test_hooks
                    .crash_before_publish_persisted,
            },
        }
    }

    pub(in crate::engine::storage_engine) fn lifecycle_publication_context(
        &self,
    ) -> LifecyclePublicationContext<'_> {
        LifecyclePublicationContext {
            registry: &self.catalog.registry,
            persisted_index: &self.persisted.persisted_index,
            accounting_enabled: self.memory.accounting_enabled,
            registry_used_bytes: &self.memory.registry_used_bytes,
            persisted_index_used_bytes: &self.memory.persisted_index_used_bytes,
            persisted_mmap_used_bytes: &self.memory.persisted_mmap_used_bytes,
            shared_used_bytes: &self.memory.shared_used_bytes,
            used_bytes: &self.memory.used_bytes,
            registry_bookkeeping: self.registry_bookkeeping_context(),
            materialized_series: self.materialized_series_write_context(),
            runtime_metadata_delta: self.runtime_metadata_delta_write_context(),
            metadata_shards: self.metadata_shard_publication_context(),
        }
    }
}