1use anyhow::Result;
2use hadb_io::ObjectStore;
3
4use crate::physical::{self, PhysicalChangeset};
5
6pub const GENERATION_INCREMENTAL: u64 = 0;
8pub const GENERATION_SNAPSHOT: u64 = 1;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
13pub enum ChangesetKind {
14 Physical,
15 Journal,
16}
17
18impl ChangesetKind {
19 pub fn extension(self) -> &'static str {
20 match self {
21 ChangesetKind::Physical => "hadbp",
22 ChangesetKind::Journal => "hadbj",
23 }
24 }
25}
26
27#[derive(Debug, Clone)]
29pub struct DiscoveredChangeset {
30 pub key: String,
31 pub seq: u64,
32 pub kind: ChangesetKind,
33}
34
35pub fn format_key(
39 prefix: &str,
40 db_name: &str,
41 generation: u64,
42 seq: u64,
43 kind: ChangesetKind,
44) -> String {
45 format!(
46 "{}{}/{:04x}/{:016x}.{}",
47 prefix,
48 db_name,
49 generation,
50 seq,
51 kind.extension()
52 )
53}
54
55pub async fn upload_physical(
57 storage: &dyn ObjectStore,
58 prefix: &str,
59 db_name: &str,
60 changeset: &PhysicalChangeset,
61) -> Result<()> {
62 let key = format_key(prefix, db_name, GENERATION_INCREMENTAL, changeset.header.seq, ChangesetKind::Physical);
63 let data = physical::encode(changeset);
64 storage.upload_bytes(&key, data).await
65}
66
67pub async fn upload_physical_snapshot(
69 storage: &dyn ObjectStore,
70 prefix: &str,
71 db_name: &str,
72 changeset: &PhysicalChangeset,
73) -> Result<()> {
74 let key = format_key(prefix, db_name, GENERATION_SNAPSHOT, changeset.header.seq, ChangesetKind::Physical);
75 let data = physical::encode(changeset);
76 storage.upload_bytes(&key, data).await
77}
78
79pub async fn download_physical(storage: &dyn ObjectStore, key: &str) -> Result<PhysicalChangeset> {
81 let data = storage.download_bytes(key).await?;
82 physical::decode(&data).map_err(|e| anyhow::anyhow!("failed to decode changeset at {}: {}", key, e))
83}
84
85pub async fn discover_after(
90 storage: &dyn ObjectStore,
91 prefix: &str,
92 db_name: &str,
93 after_seq: u64,
94 kind: ChangesetKind,
95) -> Result<Vec<DiscoveredChangeset>> {
96 let ext = kind.extension();
97 let incr_prefix = format!("{}{}/{:04x}/", prefix, db_name, GENERATION_INCREMENTAL);
98 let start_after_key = format!("{}{:016x}.{}", incr_prefix, after_seq, ext);
99
100 let keys = storage.list_objects_after(&incr_prefix, &start_after_key).await?;
101
102 let mut changesets = Vec::new();
103 for key in &keys {
104 let filename = match key.strip_prefix(&incr_prefix) {
105 Some(f) => f,
106 None => continue,
107 };
108 if !filename.ends_with(&format!(".{}", ext)) {
109 continue;
110 }
111 let hex_part = &filename[..filename.len() - ext.len() - 1]; let seq = match u64::from_str_radix(hex_part, 16) {
113 Ok(v) => v,
114 Err(_) => continue,
115 };
116 changesets.push(DiscoveredChangeset {
117 key: key.clone(),
118 seq,
119 kind,
120 });
121 }
122
123 changesets.sort_by_key(|c| c.seq);
124 Ok(changesets)
125}
126
127pub async fn discover_latest_snapshot(
129 storage: &dyn ObjectStore,
130 prefix: &str,
131 db_name: &str,
132 kind: ChangesetKind,
133) -> Result<Option<DiscoveredChangeset>> {
134 let ext = kind.extension();
135 let snap_prefix = format!("{}{}/{:04x}/", prefix, db_name, GENERATION_SNAPSHOT);
136 let keys = storage.list_objects(&snap_prefix).await?;
137
138 let mut latest: Option<DiscoveredChangeset> = None;
139 for key in &keys {
140 let filename = match key.strip_prefix(&snap_prefix) {
141 Some(f) => f,
142 None => continue,
143 };
144 if !filename.ends_with(&format!(".{}", ext)) {
145 continue;
146 }
147 let hex_part = &filename[..filename.len() - ext.len() - 1];
148 let seq = match u64::from_str_radix(hex_part, 16) {
149 Ok(v) => v,
150 Err(_) => continue,
151 };
152 match &latest {
153 Some(prev) if prev.seq >= seq => {}
154 _ => {
155 latest = Some(DiscoveredChangeset {
156 key: key.clone(),
157 seq,
158 kind,
159 });
160 }
161 }
162 }
163 Ok(latest)
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169 use crate::physical::{PageEntry, PageId, PageIdSize, PhysicalChangeset};
170 use crate::test_utils::InMemoryObjectStore;
171
172 fn page(id: u64, fill: u8, len: usize) -> PageEntry {
173 PageEntry {
174 page_id: PageId::U64(id),
175 data: vec![fill; len],
176 }
177 }
178
179 fn make_cs(seq: u64, prev: u64) -> PhysicalChangeset {
180 PhysicalChangeset::new(seq, prev, PageIdSize::U64, 262144, vec![page(seq - 1, seq as u8, 32)])
181 }
182
183 #[tokio::test]
184 async fn test_format_key_physical() {
185 assert_eq!(
186 format_key("wal/", "mydb", 0, 1, ChangesetKind::Physical),
187 "wal/mydb/0000/0000000000000001.hadbp"
188 );
189 }
190
191 #[tokio::test]
192 async fn test_format_key_journal() {
193 assert_eq!(
194 format_key("wal/", "mydb", 0, 255, ChangesetKind::Journal),
195 "wal/mydb/0000/00000000000000ff.hadbj"
196 );
197 }
198
199 #[tokio::test]
200 async fn test_upload_download_roundtrip() {
201 let store = InMemoryObjectStore::new();
202 let cs = make_cs(1, 0);
203
204 upload_physical(&store, "test/", "mydb", &cs).await.unwrap();
205
206 let key = format_key("test/", "mydb", GENERATION_INCREMENTAL, 1, ChangesetKind::Physical);
207 let downloaded = download_physical(&store, &key).await.unwrap();
208 assert_eq!(cs, downloaded);
209 }
210
211 #[tokio::test]
212 async fn test_upload_snapshot_roundtrip() {
213 let store = InMemoryObjectStore::new();
214 let cs = make_cs(1, 0);
215
216 upload_physical_snapshot(&store, "test/", "mydb", &cs).await.unwrap();
217
218 let key = format_key("test/", "mydb", GENERATION_SNAPSHOT, 1, ChangesetKind::Physical);
219 let downloaded = download_physical(&store, &key).await.unwrap();
220 assert_eq!(cs, downloaded);
221 }
222
223 #[tokio::test]
224 async fn test_discover_empty() {
225 let store = InMemoryObjectStore::new();
226 let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
227 assert!(results.is_empty());
228 }
229
230 #[tokio::test]
231 async fn test_discover_after_zero_returns_all() {
232 let store = InMemoryObjectStore::new();
233 for seq in 1..=5 {
234 upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
235 }
236
237 let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
238 assert_eq!(results.len(), 5);
239 assert_eq!(results[0].seq, 1);
240 assert_eq!(results[4].seq, 5);
241 }
242
243 #[tokio::test]
244 async fn test_discover_after_partial() {
245 let store = InMemoryObjectStore::new();
246 for seq in 1..=5 {
247 upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
248 }
249
250 let results = discover_after(&store, "test/", "mydb", 3, ChangesetKind::Physical).await.unwrap();
251 assert_eq!(results.len(), 2);
252 assert_eq!(results[0].seq, 4);
253 assert_eq!(results[1].seq, 5);
254 }
255
256 #[tokio::test]
257 async fn test_discover_sorted() {
258 let store = InMemoryObjectStore::new();
259 for seq in [5, 2, 4, 1, 3] {
260 upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
261 }
262 let seqs: Vec<u64> = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical)
263 .await.unwrap().iter().map(|r| r.seq).collect();
264 assert_eq!(seqs, vec![1, 2, 3, 4, 5]);
265 }
266
267 #[tokio::test]
268 async fn test_discover_latest_snapshot() {
269 let store = InMemoryObjectStore::new();
270
271 assert!(discover_latest_snapshot(&store, "test/", "mydb", ChangesetKind::Physical).await.unwrap().is_none());
272
273 upload_physical_snapshot(&store, "test/", "mydb", &make_cs(1, 0)).await.unwrap();
274 upload_physical_snapshot(&store, "test/", "mydb", &make_cs(5, 0)).await.unwrap();
275
276 let found = discover_latest_snapshot(&store, "test/", "mydb", ChangesetKind::Physical).await.unwrap();
277 assert_eq!(found.unwrap().seq, 5);
278 }
279
280 #[tokio::test]
281 async fn test_download_nonexistent() {
282 let store = InMemoryObjectStore::new();
283 assert!(download_physical(&store, "no/such/key.hadbp").await.is_err());
284 }
285
286 #[tokio::test]
287 async fn test_discover_ignores_junk_keys() {
288 let store = InMemoryObjectStore::new();
289 upload_physical(&store, "test/", "mydb", &make_cs(1, 0)).await.unwrap();
290 store.insert("test/mydb/0000/readme.txt", vec![0u8; 10]).await;
291 store.insert("test/mydb/0000/not-hex.hadbp", vec![0u8; 10]).await;
292
293 let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
294 assert_eq!(results.len(), 1);
295 }
296
297 #[tokio::test]
298 async fn test_discover_100_changesets() {
299 let store = InMemoryObjectStore::new();
300 for seq in 1..=100 {
301 upload_physical(&store, "test/", "mydb", &make_cs(seq, 0)).await.unwrap();
302 }
303 let results = discover_after(&store, "test/", "mydb", 50, ChangesetKind::Physical).await.unwrap();
304 assert_eq!(results.len(), 50);
305 assert_eq!(results[0].seq, 51);
306 }
307
308 #[tokio::test]
309 async fn test_discover_isolates_databases() {
310 let store = InMemoryObjectStore::new();
311 let cs_a = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page(0, 0xAA, 32)]);
312 let cs_b = PhysicalChangeset::new(1, 0, PageIdSize::U64, 262144, vec![page(0, 0xBB, 32)]);
313 upload_physical(&store, "test/", "db_a", &cs_a).await.unwrap();
314 upload_physical(&store, "test/", "db_b", &cs_b).await.unwrap();
315
316 let results = discover_after(&store, "test/", "db_a", 0, ChangesetKind::Physical).await.unwrap();
317 assert_eq!(results.len(), 1);
318 let downloaded = download_physical(&store, &results[0].key).await.unwrap();
319 assert_eq!(downloaded.pages[0].data[0], 0xAA);
320 }
321
322 #[tokio::test]
323 async fn test_discover_isolates_kinds() {
324 let store = InMemoryObjectStore::new();
325 upload_physical(&store, "test/", "mydb", &make_cs(1, 0)).await.unwrap();
327 store.insert("test/mydb/0000/0000000000000001.hadbj", vec![0u8; 10]).await;
329
330 let results = discover_after(&store, "test/", "mydb", 0, ChangesetKind::Physical).await.unwrap();
332 assert_eq!(results.len(), 1);
333 assert_eq!(results[0].kind, ChangesetKind::Physical);
334 }
335
336 #[tokio::test]
337 async fn test_prefix_with_slashes() {
338 let store = InMemoryObjectStore::new();
339 upload_physical(&store, "ha/prod/", "my.db", &make_cs(1, 0)).await.unwrap();
340 let results = discover_after(&store, "ha/prod/", "my.db", 0, ChangesetKind::Physical).await.unwrap();
341 assert_eq!(results.len(), 1);
342 }
343}