1use std::collections::HashSet;
2use std::io::{Cursor, Read};
3use std::sync::Arc;
4use std::time::Duration;
5
6use aws_sdk_s3::primitives::ByteStream;
7use serde::{Deserialize, Serialize};
8use tokio::task::JoinHandle;
9use tracing::{debug, error, info, warn};
10
11use crate::config::S3Config;
12use crate::errors::{StoreError, TieringError};
13use crate::io_budget::IoTokenBucket;
14use crate::store::{KvStore, SnapshotState, ValueEntry};
15
16#[derive(Debug)]
17pub struct TieringManager {
18 handle: JoinHandle<()>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct TierManifest {
23 pub version: u32,
24 pub revision: i64,
25 pub compact_revision: i64,
26 pub chunks: Vec<TierChunkMeta>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct TierChunkMeta {
31 pub key: String,
32 pub size: usize,
33 pub crc32c: u32,
34 pub records: usize,
35}
36
37pub const CHUNK_MAGIC: &[u8; 4] = b"SST1";
38
39impl TieringManager {
40 pub fn start(
41 node_id: u64,
42 cfg: Option<S3Config>,
43 interval: Duration,
44 sst_target_bytes: usize,
45 store: Arc<KvStore>,
46 io_limiter: IoTokenBucket,
47 sqe_limiter: IoTokenBucket,
48 ) -> Self {
49 let handle = tokio::spawn(async move {
50 let Some(cfg) = cfg else {
51 info!("tiering disabled: no S3 configuration provided");
52 return;
53 };
54
55 if node_id != 1 {
56 info!(node_id, "tiering uploader disabled on non-primary node");
57 return;
58 }
59
60 let effective_target_bytes = sst_target_bytes.max(4 * 1024);
61
62 let shared_cfg = aws_config::defaults(aws_config::BehaviorVersion::latest())
63 .endpoint_url(cfg.endpoint.clone())
64 .region(aws_config::Region::new(cfg.region.clone()))
65 .load()
66 .await;
67
68 let s3_conf = aws_sdk_s3::config::Builder::from(&shared_cfg)
69 .force_path_style(true)
70 .build();
71 let client = aws_sdk_s3::Client::from_conf(s3_conf);
72
73 loop {
74 tokio::time::sleep(interval).await;
75
76 let snapshot = store.snapshot_state();
77 if snapshot.kv.is_empty() {
78 continue;
79 }
80
81 let (metas, blobs) = build_sst_chunks(&snapshot.kv, effective_target_bytes);
82 if metas.is_empty() {
83 continue;
84 }
85
86 let base = format!("{}/cluster-1", cfg.key_prefix);
87
88 for (meta, body) in metas.iter().zip(blobs.iter()) {
89 sqe_limiter.acquire(1).await;
90 io_limiter.acquire(io_tokens_for_bytes(body.len())).await;
91 let object_key = format!("{base}/{}", meta.key);
92 let put = client
93 .put_object()
94 .bucket(&cfg.bucket)
95 .key(&object_key)
96 .metadata("crc32c", meta.crc32c.to_string())
97 .body(ByteStream::from(body.clone()))
98 .send()
99 .await;
100
101 if let Err(err) = put {
102 warn!(error = %err, key = %object_key, "chunk tier upload failed");
103 }
104 }
105
106 let manifest = TierManifest {
107 version: 2,
108 revision: snapshot.revision,
109 compact_revision: snapshot.compact_revision,
110 chunks: metas,
111 };
112
113 let manifest_key = format!("{base}/manifest.json");
114 let manifest_bytes = match serde_json::to_vec_pretty(&manifest) {
115 Ok(v) => v,
116 Err(err) => {
117 error!(error = %err, "failed to serialize manifest");
118 continue;
119 }
120 };
121
122 io_limiter
123 .acquire(io_tokens_for_bytes(manifest_bytes.len()))
124 .await;
125 sqe_limiter.acquire(1).await;
126 let out = client
127 .put_object()
128 .bucket(&cfg.bucket)
129 .key(&manifest_key)
130 .body(ByteStream::from(manifest_bytes))
131 .send()
132 .await;
133
134 match out {
135 Ok(_) => {
136 debug!(bucket = %cfg.bucket, key = %manifest_key, "uploaded manifest and chunked sstables");
137 if let Err(err) = gc_stale_chunks(
138 &client,
139 &cfg.bucket,
140 &base,
141 &manifest,
142 &io_limiter,
143 &sqe_limiter,
144 )
145 .await
146 {
147 warn!(error = %err, "failed to delete stale chunk objects");
148 }
149 }
150 Err(err) => warn!(error = %err, "manifest upload failed"),
151 }
152 }
153 });
154
155 Self { handle }
156 }
157
158 pub async fn join(self) {
159 let _ = self.handle.await;
160 }
161
162 pub async fn restore_if_empty(
163 _node_id: u64,
164 cfg: Option<&S3Config>,
165 store: Arc<KvStore>,
166 ) -> Result<bool, TieringError> {
167 let Some(cfg) = cfg else {
168 return Ok(false);
169 };
170
171 if !store.is_empty() {
172 return Ok(false);
173 }
174
175 let shared_cfg = aws_config::defaults(aws_config::BehaviorVersion::latest())
176 .endpoint_url(cfg.endpoint.clone())
177 .region(aws_config::Region::new(cfg.region.clone()))
178 .load()
179 .await;
180
181 let s3_conf = aws_sdk_s3::config::Builder::from(&shared_cfg)
182 .force_path_style(true)
183 .build();
184 let client = aws_sdk_s3::Client::from_conf(s3_conf);
185
186 let base = format!("{}/cluster-1", cfg.key_prefix);
187 let manifest_key = format!("{base}/manifest.json");
188
189 let manifest_obj = client
190 .get_object()
191 .bucket(&cfg.bucket)
192 .key(&manifest_key)
193 .send()
194 .await
195 .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
196
197 let manifest_bytes = manifest_obj
198 .body
199 .collect()
200 .await
201 .map_err(|e| TieringError::AwsSdk(e.to_string()))?
202 .into_bytes()
203 .to_vec();
204
205 let manifest: TierManifest = serde_json::from_slice(&manifest_bytes)
206 .map_err(|e| TieringError::Store(StoreError::Internal(e.to_string())))?;
207
208 let mut kv = Vec::new();
209 for chunk in &manifest.chunks {
210 let key = format!("{base}/{}", chunk.key);
211 let obj = client
212 .get_object()
213 .bucket(&cfg.bucket)
214 .key(&key)
215 .send()
216 .await
217 .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
218
219 let bytes = obj
220 .body
221 .collect()
222 .await
223 .map_err(|e| TieringError::AwsSdk(e.to_string()))?
224 .into_bytes()
225 .to_vec();
226
227 let checksum = crc32c::crc32c(&bytes);
228 if checksum != chunk.crc32c {
229 return Err(TieringError::ChecksumMismatch);
230 }
231
232 decode_chunk(&bytes, &mut kv)
233 .map_err(|e| TieringError::Store(StoreError::Internal(e.to_string())))?;
234 }
235
236 store
237 .load_snapshot_state(SnapshotState {
238 kv,
239 leases: Vec::new(),
240 next_lease_id: 1,
241 revision: manifest.revision,
242 compact_revision: manifest.compact_revision,
243 })
244 .map_err(StoreError::from)
245 .map_err(TieringError::from)?;
246
247 info!(bucket = %cfg.bucket, key = %manifest_key, "restored state from chunked object storage");
248 Ok(true)
249 }
250}
251
252async fn gc_stale_chunks(
253 client: &aws_sdk_s3::Client,
254 bucket: &str,
255 base: &str,
256 manifest: &TierManifest,
257 io_limiter: &IoTokenBucket,
258 sqe_limiter: &IoTokenBucket,
259) -> Result<(), TieringError> {
260 let keep = manifest
261 .chunks
262 .iter()
263 .map(|c| format!("{base}/{}", c.key))
264 .collect::<HashSet<_>>();
265
266 let prefix = format!("{base}/chunks/");
267 let mut continuation: Option<String> = None;
268 loop {
269 let mut req = client.list_objects_v2().bucket(bucket).prefix(&prefix);
270 if let Some(token) = continuation.clone() {
271 req = req.continuation_token(token);
272 }
273 let resp = req
274 .send()
275 .await
276 .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
277
278 for obj in resp.contents() {
279 if let Some(key) = obj.key() {
280 if !keep.contains(key) {
281 sqe_limiter.acquire(1).await;
282 io_limiter.acquire(1).await;
283 client
284 .delete_object()
285 .bucket(bucket)
286 .key(key)
287 .send()
288 .await
289 .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
290 }
291 }
292 }
293
294 if !resp.is_truncated().unwrap_or(false) {
295 break;
296 }
297 continuation = resp.next_continuation_token().map(|s| s.to_string());
298 }
299
300 Ok(())
301}
302
303fn io_tokens_for_bytes(bytes: usize) -> u64 {
304 ((bytes as u64).saturating_add(4095) / 4096).max(1)
305}
306
307fn encode_record(buf: &mut Vec<u8>, key: &[u8], v: &ValueEntry) {
308 buf.extend_from_slice(&(key.len() as u32).to_le_bytes());
309 buf.extend_from_slice(key);
310 buf.extend_from_slice(&(v.value.len() as u32).to_le_bytes());
311 buf.extend_from_slice(&v.value);
312 buf.extend_from_slice(&v.create_revision.to_le_bytes());
313 buf.extend_from_slice(&v.mod_revision.to_le_bytes());
314 buf.extend_from_slice(&v.version.to_le_bytes());
315 buf.extend_from_slice(&v.lease.to_le_bytes());
316}
317
318pub fn build_sst_chunks(
319 rows: &[(Vec<u8>, ValueEntry)],
320 target_bytes: usize,
321) -> (Vec<TierChunkMeta>, Vec<Vec<u8>>) {
322 let mut metas = Vec::new();
323 let mut blobs = Vec::new();
324
325 let mut chunk_idx = 1_u64;
326 let mut current = Vec::new();
327 current.extend_from_slice(CHUNK_MAGIC);
328 let mut records = 0usize;
329
330 for (key, val) in rows {
331 let mut record = Vec::new();
332 encode_record(&mut record, key, val);
333
334 if current.len() + record.len() > target_bytes && records > 0 {
335 let crc = crc32c::crc32c(¤t);
336 let name = format!("chunks/{chunk_idx:05}-{crc:08x}.sst");
337 metas.push(TierChunkMeta {
338 key: name,
339 size: current.len(),
340 crc32c: crc,
341 records,
342 });
343 blobs.push(current);
344
345 chunk_idx += 1;
346 current = Vec::new();
347 current.extend_from_slice(CHUNK_MAGIC);
348 records = 0;
349 }
350
351 current.extend_from_slice(&record);
352 records += 1;
353 }
354
355 if records > 0 {
356 let crc = crc32c::crc32c(¤t);
357 let name = format!("chunks/{chunk_idx:05}-{crc:08x}.sst");
358 metas.push(TierChunkMeta {
359 key: name,
360 size: current.len(),
361 crc32c: crc,
362 records,
363 });
364 blobs.push(current);
365 }
366
367 (metas, blobs)
368}
369
370pub fn decode_chunk(
371 bytes: &[u8],
372 out: &mut Vec<(Vec<u8>, ValueEntry)>,
373) -> Result<(), std::io::Error> {
374 if bytes.len() < CHUNK_MAGIC.len() || &bytes[..CHUNK_MAGIC.len()] != CHUNK_MAGIC {
375 return Err(std::io::Error::new(
376 std::io::ErrorKind::InvalidData,
377 "invalid chunk magic",
378 ));
379 }
380
381 let mut cur = Cursor::new(&bytes[CHUNK_MAGIC.len()..]);
382
383 while (cur.position() as usize) < (bytes.len() - CHUNK_MAGIC.len()) {
384 let mut u32_buf = [0_u8; 4];
385 if cur.read_exact(&mut u32_buf).is_err() {
386 break;
387 }
388 let key_len = u32::from_le_bytes(u32_buf) as usize;
389
390 let mut key = vec![0_u8; key_len];
391 cur.read_exact(&mut key)?;
392
393 cur.read_exact(&mut u32_buf)?;
394 let val_len = u32::from_le_bytes(u32_buf) as usize;
395
396 let mut value = vec![0_u8; val_len];
397 cur.read_exact(&mut value)?;
398
399 let mut i64_buf = [0_u8; 8];
400 cur.read_exact(&mut i64_buf)?;
401 let create_revision = i64::from_le_bytes(i64_buf);
402 cur.read_exact(&mut i64_buf)?;
403 let mod_revision = i64::from_le_bytes(i64_buf);
404 cur.read_exact(&mut i64_buf)?;
405 let version = i64::from_le_bytes(i64_buf);
406 cur.read_exact(&mut i64_buf)?;
407 let lease = i64::from_le_bytes(i64_buf);
408
409 out.push((
410 key,
411 ValueEntry {
412 value,
413 create_revision,
414 mod_revision,
415 version,
416 lease,
417 },
418 ));
419 }
420
421 Ok(())
422}
423
424pub fn decode_chunk_to_rows(bytes: &[u8]) -> Result<Vec<(Vec<u8>, ValueEntry)>, std::io::Error> {
425 let mut out = Vec::new();
426 decode_chunk(bytes, &mut out)?;
427 Ok(out)
428}
429
430pub fn build_chunk_bundle(
431 rows: &[(Vec<u8>, ValueEntry)],
432 target_bytes: usize,
433 revision: i64,
434 compact_revision: i64,
435) -> (TierManifest, Vec<(TierChunkMeta, Vec<u8>)>) {
436 let (metas, blobs) = build_sst_chunks(rows, target_bytes);
437 let bundle = metas.iter().cloned().zip(blobs).collect::<Vec<_>>();
438 (
439 TierManifest {
440 version: 2,
441 revision,
442 compact_revision,
443 chunks: metas,
444 },
445 bundle,
446 )
447}