Skip to main content

hadb_changeset/
storage.rs

1use anyhow::Result;
2use hadb_io::ObjectStore;
3
4use crate::physical::{self, PhysicalChangeset};
5
6/// Generation 0 = live incremental changesets.
7pub const GENERATION_INCREMENTAL: u64 = 0;
8/// Generation 1+ = snapshots (full database as pages).
9pub const GENERATION_SNAPSHOT: u64 = 1;
10
11/// Which format: physical (.hadbp) or journal (.hadbj).
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ChangesetKind {
14    Physical,
15    Journal,
16}
17
18impl ChangesetKind {
19    pub fn extension(self) -> &'static str {
20        match self {
21            ChangesetKind::Physical => "hadbp",
22            ChangesetKind::Journal => "hadbj",
23        }
24    }
25}
26
27/// A discovered changeset from S3 listing.
28#[derive(Debug, Clone)]
29pub struct DiscoveredChangeset {
30    pub key: String,
31    pub seq: u64,
32    pub kind: ChangesetKind,
33}
34
35/// Format an S3 key for a changeset.
36///
37/// Layout: `{prefix}{db_name}/{generation:04x}/{seq:016x}.{ext}`
38pub fn format_key(
39    prefix: &str,
40    db_name: &str,
41    generation: u64,
42    seq: u64,
43    kind: ChangesetKind,
44) -> String {
45    format!(
46        "{}{}/{:04x}/{:016x}.{}",
47        prefix,
48        db_name,
49        generation,
50        seq,
51        kind.extension()
52    )
53}
54
55/// Upload a physical changeset as an incremental.
56pub async fn upload_physical(
57    storage: &dyn ObjectStore,
58    prefix: &str,
59    db_name: &str,
60    changeset: &PhysicalChangeset,
61) -> Result<()> {
62    let key = format_key(prefix, db_name, GENERATION_INCREMENTAL, changeset.header.seq, ChangesetKind::Physical);
63    let data = physical::encode(changeset);
64    storage.upload_bytes(&key, data).await
65}
66
67/// Upload a physical changeset as a snapshot.
68pub async fn upload_physical_snapshot(
69    storage: &dyn ObjectStore,
70    prefix: &str,
71    db_name: &str,
72    changeset: &PhysicalChangeset,
73) -> Result<()> {
74    let key = format_key(prefix, db_name, GENERATION_SNAPSHOT, changeset.header.seq, ChangesetKind::Physical);
75    let data = physical::encode(changeset);
76    storage.upload_bytes(&key, data).await
77}
78
79/// Download and decode a physical changeset.
80pub async fn download_physical(storage: &dyn ObjectStore, key: &str) -> Result<PhysicalChangeset> {
81    let data = storage.download_bytes(key).await?;
82    physical::decode(&data).map_err(|e| anyhow::anyhow!("failed to decode changeset at {}: {}", key, e))
83}
84
85/// Discover incremental changesets after a given sequence number.
86///
87/// Uses `list_objects_after` to efficiently skip past already-applied changesets.
88/// Returns changesets sorted by seq (ascending).
89pub async fn discover_after(
90    storage: &dyn ObjectStore,
91    prefix: &str,
92    db_name: &str,
93    after_seq: u64,
94    kind: ChangesetKind,
95) -> Result<Vec<DiscoveredChangeset>> {
96    let ext = kind.extension();
97    let incr_prefix = format!("{}{}/{:04x}/", prefix, db_name, GENERATION_INCREMENTAL);
98    let start_after_key = format!("{}{:016x}.{}", incr_prefix, after_seq, ext);
99
100    let keys = storage.list_objects_after(&incr_prefix, &start_after_key).await?;
101
102    let mut changesets = Vec::new();
103    for key in &keys {
104        let filename = match key.strip_prefix(&incr_prefix) {
105            Some(f) => f,
106            None => continue,
107        };
108        if !filename.ends_with(&format!(".{}", ext)) {
109            continue;
110        }
111        let hex_part = &filename[..filename.len() - ext.len() - 1]; // strip ".{ext}"
112        let seq = match u64::from_str_radix(hex_part, 16) {
113            Ok(v) => v,
114            Err(_) => continue,
115        };
116        changesets.push(DiscoveredChangeset {
117            key: key.clone(),
118            seq,
119            kind,
120        });
121    }
122
123    changesets.sort_by_key(|c| c.seq);
124    Ok(changesets)
125}
126
127/// Discover the latest snapshot changeset (if any).
128pub async fn discover_latest_snapshot(
129    storage: &dyn ObjectStore,
130    prefix: &str,
131    db_name: &str,
132    kind: ChangesetKind,
133) -> Result<Option<DiscoveredChangeset>> {
134    let ext = kind.extension();
135    let snap_prefix = format!("{}{}/{:04x}/", prefix, db_name, GENERATION_SNAPSHOT);
136    let keys = storage.list_objects(&snap_prefix).await?;
137
138    let mut latest: Option<DiscoveredChangeset> = None;
139    for key in &keys {
140        let filename = match key.strip_prefix(&snap_prefix) {
141            Some(f) => f,
142            None => continue,
143        };
144        if !filename.ends_with(&format!(".{}", ext)) {
145            continue;
146        }
147        let hex_part = &filename[..filename.len() - ext.len() - 1];
148        let seq = match u64::from_str_radix(hex_part, 16) {
149            Ok(v) => v,
150            Err(_) => continue,
151        };
152        match &latest {
153            Some(prev) if prev.seq >= seq => {}
154            _ => {
155                latest = Some(DiscoveredChangeset {
156                    key: key.clone(),
157                    seq,
158                    kind,
159                });
160            }
161        }
162    }
163    Ok(latest)
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169    use crate::physical::{PageEntry, PageId, PageIdSize, PhysicalChangeset};
170    use crate::test_utils::InMemoryObjectStore;
171
172    fn page(id: u64, fill: u8, len: usize) -> PageEntry {
173        PageEntry {
174            page_id: PageId::U64(id),
175            data: vec![fill; len],
176        }
177    }
178
179    fn make_cs(seq: u64, prev: u64) -> PhysicalChangeset {
180        PhysicalChangeset::new(seq, prev, PageIdSize::U64, 262144, vec![page(seq - 1, seq as u8, 32)])
181    }
182
183    #[tokio::test]
184    async fn test_format_key_physical() {
185        assert_eq!(
186            format_key("wal/", "mydb", 0, 1, ChangesetKind::Physical),
187            "wal/mydb/0000/0000000000000001.hadbp"
188        );
189    }
190
191    #[tokio::test]
192    async fn test_format_key_journal() {
193        assert_eq!(
194            format_key("wal/", "mydb", 0, 255, ChangesetKind::Journal),
195            "wal/mydb/0000/00000000000000ff.hadbj"
196        );
197    }
198
199    #[tokio::test]
200    async fn test_upload_download_roundtrip() {
201        let store = InMemoryObjectStore::new();
202        let cs = make_cs(1, 0);
203
204        upload_physical(&store, "test/", "mydb", &cs).await.unwrap();
205
206        let key = format_key("test/", "mydb", GENERATION_INCREMENTAL, 1, ChangesetKind::Physical);
207        let downloaded = download_physical(&store, &key).await.unwrap();
208        assert_eq!(cs, downloaded);
209    }
210
211    #[tokio::test]
212    async fn test_upload_snapshot_roundtrip() {
213        let store = InMemoryObjectStore::new();
214        let cs = make_cs(1, 0);
215
216        upload_physical_snapshot(&store, "test/", "mydb", &cs).await.unwrap();
217
218        let key = format_key("test/", "mydb", GENERATION_SNAPSHOT, 1, ChangesetKind::Physical);
219        let downloaded = download_physical(&store, &key).await.unwrap();
220        assert_eq!(cs, downloaded);
221    }
222
223    #[tokio::test]
224    async fn test_discover_empty() {
225        let store = InMemoryObjectStore::new();
226        let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
227        assert!(results.is_empty());
228    }
229
230    #[tokio::test]
231    async fn test_discover_after_zero_returns_all() {
232        let store = InMemoryObjectStore::new();
233        for seq in 1..=5 {
234            upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
235        }
236
237        let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
238        assert_eq!(results.len(), 5);
239        assert_eq!(results[0].seq, 1);
240        assert_eq!(results[4].seq, 5);
241    }
242
243    #[tokio::test]
244    async fn test_discover_after_partial() {
245        let store = InMemoryObjectStore::new();
246        for seq in 1..=5 {
247            upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
248        }
249
250        let results = discover_after(&store, "test/", "mydb", 3, ChangesetKind::Physical).await.unwrap();
251        assert_eq!(results.len(), 2);
252        assert_eq!(results[0].seq, 4);
253        assert_eq!(results[1].seq, 5);
254    }
255
256    #[tokio::test]
257    async fn test_discover_sorted() {
258        let store = InMemoryObjectStore::new();
259        for seq in [5, 2, 4, 1, 3] {
260            upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
261        }
262        let seqs: Vec<u64> = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical)
263            .await.unwrap().iter().map(|r| r.seq).collect();
264        assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
265    }
266
267    #[tokio::test]
268    async fn test_discover_latest_snapshot() {
269        let store = InMemoryObjectStore::new();
270
271        assert!(discover_latest_snapshot(&store, "test/", "mydb", ChangesetKind::Physical).await.unwrap().is_none());
272
273        upload_physical_snapshot(&store, "test/", "mydb", &make_cs(1, 0)).await.unwrap();
274        upload_physical_snapshot(&store, "test/", "mydb", &make_cs(5, 0)).await.unwrap();
275
276        let found = discover_latest_snapshot(&store, "test/", "mydb", ChangesetKind::Physical).await.unwrap();
277        assert_eq!(found.unwrap().seq, 5);
278    }
279
280    #[tokio::test]
281    async fn test_download_nonexistent() {
282        let store = InMemoryObjectStore::new();
283        assert!(download_physical(&store, "no/such/key.hadbp").await.is_err());
284    }
285
286    #[tokio::test]
287    async fn test_discover_ignores_junk_keys() {
288        let store = InMemoryObjectStore::new();
289        upload_physical(&store, "test/", "mydb", &make_cs(1, 0)).await.unwrap();
290        store.insert("test/mydb/0000/readme.txt", vec![0u8; 10]).await;
291        store.insert("test/mydb/0000/not-hex.hadbp", vec![0u8; 10]).await;
292
293        let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
294        assert_eq!(results.len(), 1);
295    }
296
297    #[tokio::test]
298    async fn test_discover_100_changesets() {
299        let store = InMemoryObjectStore::new();
300        for seq in 1..=100 {
301            upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
302        }
303        let results = discover_after(&store, "test/", "mydb", 50, ChangesetKind::Physical).await.unwrap();
304        assert_eq!(results.len(), 50);
305        assert_eq!(results[0].seq, 51);
306    }
307
308    #[tokio::test]
309    async fn test_discover_isolates_databases() {
310        let store = InMemoryObjectStore::new();
311        let cs_a = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page(0, 0xAA, 32)]);
312        let cs_b = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page(0, 0xBB, 32)]);
313        upload_physical(&store, "test/", "db_a", &cs_a).await.unwrap();
314        upload_physical(&store, "test/", "db_b", &cs_b).await.unwrap();
315
316        let results = discover_after(&store, "test/", "db_a", 0, ChangesetKind::Physical).await.unwrap();
317        assert_eq!(results.len(), 1);
318        let downloaded = download_physical(&store, &results[0].key).await.unwrap();
319        assert_eq!(downloaded.pages[0].data[0], 0xAA);
320    }
321
322    #[tokio::test]
323    async fn test_discover_isolates_kinds() {
324        let store = InMemoryObjectStore::new();
325        // Upload a physical changeset
326        upload_physical(&store, "test/", "mydb", &make_cs(1, 0)).await.unwrap();
327        // Upload a fake journal key manually
328        store.insert("test/mydb/0000/0000000000000001.hadbj", vec![0u8; 10]).await;
329
330        // Physical discover should not see the journal file
331        let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
332        assert_eq!(results.len(), 1);
333        assert_eq!(results[0].kind, ChangesetKind::Physical);
334    }
335
336    #[tokio::test]
337    async fn test_prefix_with_slashes() {
338        let store = InMemoryObjectStore::new();
339        upload_physical(&store, "ha/prod/", "my.db", &make_cs(1, 0)).await.unwrap();
340        let results = discover_after(&store, "ha/prod/", "my.db", 0, ChangesetKind::Physical).await.unwrap();
341        assert_eq!(results.len(), 1);
342    }
343}