1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
//! WAL replay for timeseries records.
//!
//! On startup, replays `TimeseriesBatch` records into the per-core
//! columnar memtable. Only replays records with LSN > `last_flushed_wal_lsn`
//! per partition (not max_ts — safe with out-of-order data).
use crate::data::executor::core_loop::CoreLoop;
use crate::engine::timeseries::columnar_memtable::{
ColumnarMemtable, ColumnarMemtableConfig, ColumnarSchema,
};
use nodedb_types::timeseries::MetricSample;
/// Default timeseries memtable configuration for replay and auto-creation.
fn default_ts_config() -> ColumnarMemtableConfig {
ColumnarMemtableConfig {
max_memory_bytes: 64 * 1024 * 1024,
hard_memory_limit: 80 * 1024 * 1024,
max_tag_cardinality: 100_000,
}
}
impl CoreLoop {
/// Ensure a timeseries memtable exists for the given collection, creating if needed.
fn ensure_ts_memtable(&mut self, collection: &str, schema: ColumnarSchema) {
if !self.ts_memtables.contains_key(collection) {
self.ts_memtables.insert(
collection.to_string(),
ColumnarMemtable::new(schema, default_ts_config()),
);
}
}
/// Replay WAL timeseries records to rebuild in-memory memtable state after crash.
///
/// Called once during startup, after `open()` but before the event loop.
/// Processes `TimeseriesBatch` records, ignoring records for other vShards.
/// Uses LSN-based skip: only replays records with LSN > last flushed LSN.
pub fn replay_timeseries_wal(&mut self, records: &[nodedb_wal::WalRecord], num_cores: usize) {
use nodedb_wal::record::RecordType;
let mut replayed = 0usize;
let mut skipped = 0usize;
for record in records {
let logical_type = record.logical_record_type();
let record_type = RecordType::from_raw(logical_type);
let is_ts_batch = record_type == Some(RecordType::TimeseriesBatch);
if !is_ts_batch {
continue;
}
// Route by vShard to the correct core.
let vshard_id = record.header.vshard_id as usize;
let target_core = if num_cores > 0 {
vshard_id % num_cores
} else {
0
};
if target_core != self.core_id {
skipped += 1;
continue;
}
// Deserialize: (collection, raw_payload).
let Ok((collection, payload)): Result<(String, Vec<u8>), _> =
rmp_serde::from_slice(&record.payload)
else {
tracing::warn!(
core = self.core_id,
lsn = record.header.lsn,
"skipping malformed TimeseriesBatch WAL record"
);
continue;
};
let record_lsn = record.header.lsn;
// Check if this record was already flushed (LSN-based skip).
if let Some(registry) = self.ts_registries.get(&collection) {
// Find the max flushed LSN across all partitions.
let max_flushed_lsn = registry
.iter()
.map(|(_, e)| e.meta.last_flushed_wal_lsn)
.max()
.unwrap_or(0);
if record_lsn <= max_flushed_lsn {
skipped += 1;
continue;
}
}
// Re-ingest the ILP payload into the memtable.
if let Ok(input) = std::str::from_utf8(&payload) {
let lines: Vec<_> = crate::engine::timeseries::ilp::parse_batch(input)
.into_iter()
.filter_map(|r| r.ok())
.collect();
if lines.is_empty() {
continue;
}
// Ensure memtable exists.
let schema = crate::engine::timeseries::ilp_ingest::infer_schema(&lines);
self.ensure_ts_memtable(&collection, schema);
let mt = self.ts_memtables.get_mut(&collection).unwrap();
let mut series_keys = std::collections::HashMap::new();
let now_ms = 0; // Default timestamp not needed for replay (records have timestamps).
let (accepted, _) = crate::engine::timeseries::ilp_ingest::ingest_batch(
mt,
&lines,
&mut series_keys,
now_ms,
);
replayed += accepted;
} else {
// Binary payload — try msgpack-encoded samples.
if let Ok(batch) =
rmp_serde::from_slice::<nodedb_types::timeseries::TimeseriesWalBatch>(&payload)
{
self.ensure_ts_memtable(&collection, ColumnarSchema::metric_default());
let mt = self.ts_memtables.get_mut(&collection).unwrap();
for (series_id, timestamp_ms, value) in &batch.samples {
mt.ingest_metric(
*series_id,
MetricSample {
timestamp_ms: *timestamp_ms,
value: *value,
},
);
}
replayed += batch.samples.len();
}
}
}
if replayed > 0 {
tracing::info!(
core = self.core_id,
replayed,
skipped,
collections = self.ts_memtables.len(),
"WAL timeseries replay complete"
);
}
}
}