Skip to main content

astra_core/
tiering.rs

1use std::collections::HashSet;
2use std::io::{Cursor, Read};
3use std::sync::Arc;
4use std::time::Duration;
5
6use aws_sdk_s3::primitives::ByteStream;
7use serde::{Deserialize, Serialize};
8use tokio::task::JoinHandle;
9use tracing::{debug, error, info, warn};
10
11use crate::config::S3Config;
12use crate::errors::{StoreError, TieringError};
13use crate::io_budget::IoTokenBucket;
14use crate::store::{KvStore, SnapshotState, ValueEntry};
15
16#[derive(Debug)]
17pub struct TieringManager {
18    handle: JoinHandle<()>,
19}
20
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct TierManifest {
23    pub version: u32,
24    pub revision: i64,
25    pub compact_revision: i64,
26    pub chunks: Vec<TierChunkMeta>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
30pub struct TierChunkMeta {
31    pub key: String,
32    pub size: usize,
33    pub crc32c: u32,
34    pub records: usize,
35}
36
37pub const CHUNK_MAGIC: &[u8; 4] = b"SST1";
38
39impl TieringManager {
40    pub fn start(
41        node_id: u64,
42        cfg: Option<S3Config>,
43        interval: Duration,
44        sst_target_bytes: usize,
45        store: Arc<KvStore>,
46        io_limiter: IoTokenBucket,
47        sqe_limiter: IoTokenBucket,
48    ) -> Self {
49        let handle = tokio::spawn(async move {
50            let Some(cfg) = cfg else {
51                info!("tiering disabled: no S3 configuration provided");
52                return;
53            };
54
55            if node_id != 1 {
56                info!(node_id, "tiering uploader disabled on non-primary node");
57                return;
58            }
59
60            let effective_target_bytes = sst_target_bytes.max(4 * 1024);
61
62            let shared_cfg = aws_config::defaults(aws_config::BehaviorVersion::latest())
63                .endpoint_url(cfg.endpoint.clone())
64                .region(aws_config::Region::new(cfg.region.clone()))
65                .load()
66                .await;
67
68            let s3_conf = aws_sdk_s3::config::Builder::from(&shared_cfg)
69                .force_path_style(true)
70                .build();
71            let client = aws_sdk_s3::Client::from_conf(s3_conf);
72
73            loop {
74                tokio::time::sleep(interval).await;
75
76                let snapshot = store.snapshot_state();
77                if snapshot.kv.is_empty() {
78                    continue;
79                }
80
81                let (metas, blobs) = build_sst_chunks(&snapshot.kv, effective_target_bytes);
82                if metas.is_empty() {
83                    continue;
84                }
85
86                let base = format!("{}/cluster-1", cfg.key_prefix);
87
88                for (meta, body) in metas.iter().zip(blobs.iter()) {
89                    sqe_limiter.acquire(1).await;
90                    io_limiter.acquire(io_tokens_for_bytes(body.len())).await;
91                    let object_key = format!("{base}/{}", meta.key);
92                    let put = client
93                        .put_object()
94                        .bucket(&cfg.bucket)
95                        .key(&object_key)
96                        .metadata("crc32c", meta.crc32c.to_string())
97                        .body(ByteStream::from(body.clone()))
98                        .send()
99                        .await;
100
101                    if let Err(err) = put {
102                        warn!(error = %err, key = %object_key, "chunk tier upload failed");
103                    }
104                }
105
106                let manifest = TierManifest {
107                    version: 2,
108                    revision: snapshot.revision,
109                    compact_revision: snapshot.compact_revision,
110                    chunks: metas,
111                };
112
113                let manifest_key = format!("{base}/manifest.json");
114                let manifest_bytes = match serde_json::to_vec_pretty(&manifest) {
115                    Ok(v) => v,
116                    Err(err) => {
117                        error!(error = %err, "failed to serialize manifest");
118                        continue;
119                    }
120                };
121
122                io_limiter
123                    .acquire(io_tokens_for_bytes(manifest_bytes.len()))
124                    .await;
125                sqe_limiter.acquire(1).await;
126                let out = client
127                    .put_object()
128                    .bucket(&cfg.bucket)
129                    .key(&manifest_key)
130                    .body(ByteStream::from(manifest_bytes))
131                    .send()
132                    .await;
133
134                match out {
135                    Ok(_) => {
136                        debug!(bucket = %cfg.bucket, key = %manifest_key, "uploaded manifest and chunked sstables");
137                        if let Err(err) = gc_stale_chunks(
138                            &client,
139                            &cfg.bucket,
140                            &base,
141                            &manifest,
142                            &io_limiter,
143                            &sqe_limiter,
144                        )
145                        .await
146                        {
147                            warn!(error = %err, "failed to delete stale chunk objects");
148                        }
149                    }
150                    Err(err) => warn!(error = %err, "manifest upload failed"),
151                }
152            }
153        });
154
155        Self { handle }
156    }
157
158    pub async fn join(self) {
159        let _ = self.handle.await;
160    }
161
162    pub async fn restore_if_empty(
163        _node_id: u64,
164        cfg: Option<&S3Config>,
165        store: Arc<KvStore>,
166    ) -> Result<bool, TieringError> {
167        let Some(cfg) = cfg else {
168            return Ok(false);
169        };
170
171        if !store.is_empty() {
172            return Ok(false);
173        }
174
175        let shared_cfg = aws_config::defaults(aws_config::BehaviorVersion::latest())
176            .endpoint_url(cfg.endpoint.clone())
177            .region(aws_config::Region::new(cfg.region.clone()))
178            .load()
179            .await;
180
181        let s3_conf = aws_sdk_s3::config::Builder::from(&shared_cfg)
182            .force_path_style(true)
183            .build();
184        let client = aws_sdk_s3::Client::from_conf(s3_conf);
185
186        let base = format!("{}/cluster-1", cfg.key_prefix);
187        let manifest_key = format!("{base}/manifest.json");
188
189        let manifest_obj = client
190            .get_object()
191            .bucket(&cfg.bucket)
192            .key(&manifest_key)
193            .send()
194            .await
195            .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
196
197        let manifest_bytes = manifest_obj
198            .body
199            .collect()
200            .await
201            .map_err(|e| TieringError::AwsSdk(e.to_string()))?
202            .into_bytes()
203            .to_vec();
204
205        let manifest: TierManifest = serde_json::from_slice(&manifest_bytes)
206            .map_err(|e| TieringError::Store(StoreError::Internal(e.to_string())))?;
207
208        let mut kv = Vec::new();
209        for chunk in &manifest.chunks {
210            let key = format!("{base}/{}", chunk.key);
211            let obj = client
212                .get_object()
213                .bucket(&cfg.bucket)
214                .key(&key)
215                .send()
216                .await
217                .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
218
219            let bytes = obj
220                .body
221                .collect()
222                .await
223                .map_err(|e| TieringError::AwsSdk(e.to_string()))?
224                .into_bytes()
225                .to_vec();
226
227            let checksum = crc32c::crc32c(&bytes);
228            if checksum != chunk.crc32c {
229                return Err(TieringError::ChecksumMismatch);
230            }
231
232            decode_chunk(&bytes, &mut kv)
233                .map_err(|e| TieringError::Store(StoreError::Internal(e.to_string())))?;
234        }
235
236        store
237            .load_snapshot_state(SnapshotState {
238                kv,
239                leases: Vec::new(),
240                next_lease_id: 1,
241                revision: manifest.revision,
242                compact_revision: manifest.compact_revision,
243            })
244            .map_err(StoreError::from)
245            .map_err(TieringError::from)?;
246
247        info!(bucket = %cfg.bucket, key = %manifest_key, "restored state from chunked object storage");
248        Ok(true)
249    }
250}
251
252async fn gc_stale_chunks(
253    client: &aws_sdk_s3::Client,
254    bucket: &str,
255    base: &str,
256    manifest: &TierManifest,
257    io_limiter: &IoTokenBucket,
258    sqe_limiter: &IoTokenBucket,
259) -> Result<(), TieringError> {
260    let keep = manifest
261        .chunks
262        .iter()
263        .map(|c| format!("{base}/{}", c.key))
264        .collect::<HashSet<_>>();
265
266    let prefix = format!("{base}/chunks/");
267    let mut continuation: Option<String> = None;
268    loop {
269        let mut req = client.list_objects_v2().bucket(bucket).prefix(&prefix);
270        if let Some(token) = continuation.clone() {
271            req = req.continuation_token(token);
272        }
273        let resp = req
274            .send()
275            .await
276            .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
277
278        for obj in resp.contents() {
279            if let Some(key) = obj.key() {
280                if !keep.contains(key) {
281                    sqe_limiter.acquire(1).await;
282                    io_limiter.acquire(1).await;
283                    client
284                        .delete_object()
285                        .bucket(bucket)
286                        .key(key)
287                        .send()
288                        .await
289                        .map_err(|e| TieringError::AwsSdk(e.to_string()))?;
290                }
291            }
292        }
293
294        if !resp.is_truncated().unwrap_or(false) {
295            break;
296        }
297        continuation = resp.next_continuation_token().map(|s| s.to_string());
298    }
299
300    Ok(())
301}
302
303fn io_tokens_for_bytes(bytes: usize) -> u64 {
304    ((bytes as u64).saturating_add(4095) / 4096).max(1)
305}
306
307fn encode_record(buf: &mut Vec<u8>, key: &[u8], v: &ValueEntry) {
308    buf.extend_from_slice(&(key.len() as u32).to_le_bytes());
309    buf.extend_from_slice(key);
310    buf.extend_from_slice(&(v.value.len() as u32).to_le_bytes());
311    buf.extend_from_slice(&v.value);
312    buf.extend_from_slice(&v.create_revision.to_le_bytes());
313    buf.extend_from_slice(&v.mod_revision.to_le_bytes());
314    buf.extend_from_slice(&v.version.to_le_bytes());
315    buf.extend_from_slice(&v.lease.to_le_bytes());
316}
317
318pub fn build_sst_chunks(
319    rows: &[(Vec<u8>, ValueEntry)],
320    target_bytes: usize,
321) -> (Vec<TierChunkMeta>, Vec<Vec<u8>>) {
322    let mut metas = Vec::new();
323    let mut blobs = Vec::new();
324
325    let mut chunk_idx = 1_u64;
326    let mut current = Vec::new();
327    current.extend_from_slice(CHUNK_MAGIC);
328    let mut records = 0usize;
329
330    for (key, val) in rows {
331        let mut record = Vec::new();
332        encode_record(&mut record, key, val);
333
334        if current.len() + record.len() > target_bytes && records > 0 {
335            let crc = crc32c::crc32c(&current);
336            let name = format!("chunks/{chunk_idx:05}-{crc:08x}.sst");
337            metas.push(TierChunkMeta {
338                key: name,
339                size: current.len(),
340                crc32c: crc,
341                records,
342            });
343            blobs.push(current);
344
345            chunk_idx += 1;
346            current = Vec::new();
347            current.extend_from_slice(CHUNK_MAGIC);
348            records = 0;
349        }
350
351        current.extend_from_slice(&record);
352        records += 1;
353    }
354
355    if records > 0 {
356        let crc = crc32c::crc32c(&current);
357        let name = format!("chunks/{chunk_idx:05}-{crc:08x}.sst");
358        metas.push(TierChunkMeta {
359            key: name,
360            size: current.len(),
361            crc32c: crc,
362            records,
363        });
364        blobs.push(current);
365    }
366
367    (metas, blobs)
368}
369
370pub fn decode_chunk(
371    bytes: &[u8],
372    out: &mut Vec<(Vec<u8>, ValueEntry)>,
373) -> Result<(), std::io::Error> {
374    if bytes.len() < CHUNK_MAGIC.len() || &bytes[..CHUNK_MAGIC.len()] != CHUNK_MAGIC {
375        return Err(std::io::Error::new(
376            std::io::ErrorKind::InvalidData,
377            "invalid chunk magic",
378        ));
379    }
380
381    let mut cur = Cursor::new(&bytes[CHUNK_MAGIC.len()..]);
382
383    while (cur.position() as usize) < (bytes.len() - CHUNK_MAGIC.len()) {
384        let mut u32_buf = [0_u8; 4];
385        if cur.read_exact(&mut u32_buf).is_err() {
386            break;
387        }
388        let key_len = u32::from_le_bytes(u32_buf) as usize;
389
390        let mut key = vec![0_u8; key_len];
391        cur.read_exact(&mut key)?;
392
393        cur.read_exact(&mut u32_buf)?;
394        let val_len = u32::from_le_bytes(u32_buf) as usize;
395
396        let mut value = vec![0_u8; val_len];
397        cur.read_exact(&mut value)?;
398
399        let mut i64_buf = [0_u8; 8];
400        cur.read_exact(&mut i64_buf)?;
401        let create_revision = i64::from_le_bytes(i64_buf);
402        cur.read_exact(&mut i64_buf)?;
403        let mod_revision = i64::from_le_bytes(i64_buf);
404        cur.read_exact(&mut i64_buf)?;
405        let version = i64::from_le_bytes(i64_buf);
406        cur.read_exact(&mut i64_buf)?;
407        let lease = i64::from_le_bytes(i64_buf);
408
409        out.push((
410            key,
411            ValueEntry {
412                value,
413                create_revision,
414                mod_revision,
415                version,
416                lease,
417            },
418        ));
419    }
420
421    Ok(())
422}
423
424pub fn decode_chunk_to_rows(bytes: &[u8]) -> Result<Vec<(Vec<u8>, ValueEntry)>, std::io::Error> {
425    let mut out = Vec::new();
426    decode_chunk(bytes, &mut out)?;
427    Ok(out)
428}
429
430pub fn build_chunk_bundle(
431    rows: &[(Vec<u8>, ValueEntry)],
432    target_bytes: usize,
433    revision: i64,
434    compact_revision: i64,
435) -> (TierManifest, Vec<(TierChunkMeta, Vec<u8>)>) {
436    let (metas, blobs) = build_sst_chunks(rows, target_bytes);
437    let bundle = metas.iter().cloned().zip(blobs).collect::<Vec<_>>();
438    (
439        TierManifest {
440            version: 2,
441            revision,
442            compact_revision,
443            chunks: metas,
444        },
445        bundle,
446    )
447}