Skip to main content

nexus_core/
wal.rs

1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{self, Read, Write};
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Mutex;
7
8use anyhow::{Context, Result};
9use flatbuffers::FlatBufferBuilder;
10
11use crate::generated::{
12    iam_wal_v1_generated::nexus::wal as fb_iam,
13    migration_wal_v1_generated::nexus::wal as fb_migration,
14    multipart_wal_v1_generated::nexus::wal as fb_multipart,
15    object_wal_v1_generated::nexus::wal as fb_object,
16    webhook_event_wal_v1_generated::nexus::wal as fb_webhook_event,
17    webhook_wal_v1_generated::nexus::wal as fb_webhook,
18};
19
20const FRAME_HEADER_BYTES: usize = 9;
21const RECORD_KIND_OBJECT: u8 = 1;
22const RECORD_KIND_MULTIPART: u8 = 2;
23const RECORD_KIND_IAM: u8 = 3;
24const RECORD_KIND_WEBHOOK: u8 = 4;
25const RECORD_KIND_WEBHOOK_EVENT: u8 = 5;
26const RECORD_KIND_MIGRATION: u8 = 6;
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct DevicePointer {
30    pub device_id: u16,
31    pub offset: u64,
32    pub len: u32,
33    pub checksum_crc32c: u32,
34    pub nonce_id: u64,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct WalEntryV1 {
39    pub seq: u64,
40    pub op: String,
41    pub bucket: String,
42    pub key: String,
43    pub etag: Option<String>,
44    pub pointer_or_manifest: Option<DevicePointer>,
45    pub ts_unix_ms: u64,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct MultipartPart {
50    pub part_number: u32,
51    pub etag: String,
52    pub pointer: DevicePointer,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct MultipartManifest {
57    pub upload_id: String,
58    pub bucket: String,
59    pub key: String,
60    pub parts: Vec<MultipartPart>,
61    pub final_etag: String,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct MultipartWalEntryV1 {
66    pub seq: u64,
67    pub op: String,
68    pub upload_id: String,
69    pub bucket: String,
70    pub key: String,
71    pub final_etag: Option<String>,
72    pub parts: Vec<MultipartPart>,
73    pub ts_unix_ms: u64,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct IamWalEntryV1 {
78    pub seq: u64,
79    pub op: String,
80    pub target: String,
81    pub payload: String,
82    pub ts_unix_ms: u64,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct WebhookWalEntryV1 {
87    pub seq: u64,
88    pub op: String,
89    pub bucket: String,
90    pub rule_id: String,
91    pub rule_json: String,
92    pub ts_unix_ms: u64,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct WebhookEventWalEntryV1 {
97    pub seq: u64,
98    pub op: String,
99    pub event_id: String,
100    pub rule_id: String,
101    pub bucket: String,
102    pub key: String,
103    pub payload_json: String,
104    pub ts_unix_ms: u64,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct MigrationWalEntryV1 {
109    pub seq: u64,
110    pub op: String,
111    pub bucket: String,
112    pub key: String,
113    pub payload: String,
114    pub ts_unix_ms: u64,
115}
116
117#[derive(Debug, Clone)]
118pub struct WalReplayConfig {
119    pub wal_path: PathBuf,
120    pub dry_run: bool,
121}
122
123#[derive(Debug, Clone)]
124pub struct WalReplayStats {
125    pub wal_path: PathBuf,
126    pub dry_run: bool,
127    pub object_entries: u64,
128    pub multipart_entries: u64,
129    pub object_puts: u64,
130    pub object_deletes: u64,
131    pub multipart_creates: u64,
132    pub multipart_parts: u64,
133    pub multipart_completes: u64,
134    pub multipart_aborts: u64,
135    pub iam_entries: u64,
136    pub webhook_entries: u64,
137    pub webhook_event_entries: u64,
138    pub migration_entries: u64,
139    pub last_seq: u64,
140}
141
142impl WalReplayStats {
143    pub fn to_json(&self) -> String {
144        format!(
145            "{{\"module\":\"WAL_REPLAY\",\"wal_path\":\"{}\",\"dry_run\":{},\"object_entries\":{},\"multipart_entries\":{},\"object_puts\":{},\"object_deletes\":{},\"multipart_creates\":{},\"multipart_parts\":{},\"multipart_completes\":{},\"multipart_aborts\":{},\"iam_entries\":{},\"webhook_entries\":{},\"webhook_event_entries\":{},\"migration_entries\":{},\"last_seq\":{}}}",
146            self.wal_path.display(),
147            self.dry_run,
148            self.object_entries,
149            self.multipart_entries,
150            self.object_puts,
151            self.object_deletes,
152            self.multipart_creates,
153            self.multipart_parts,
154            self.multipart_completes,
155            self.multipart_aborts,
156            self.iam_entries,
157            self.webhook_entries,
158            self.webhook_event_entries,
159            self.migration_entries,
160            self.last_seq,
161        )
162    }
163}
164
165#[derive(Debug, Clone)]
166pub struct WalReplayState {
167    pub stats: WalReplayStats,
168    pub object_index: BTreeMap<String, WalEntryV1>,
169    pub multipart_uploads: BTreeMap<String, MultipartWalEntryV1>,
170    pub iam_log: Vec<IamWalEntryV1>,
171    pub webhook_log: Vec<WebhookWalEntryV1>,
172    pub webhook_event_log: Vec<WebhookEventWalEntryV1>,
173    pub migration_log: Vec<MigrationWalEntryV1>,
174}
175
176#[derive(Debug)]
177pub struct WalWriter {
178    wal_path: PathBuf,
179    file: Mutex<File>,
180    next_seq: AtomicU64,
181}
182
183impl WalWriter {
184    pub fn open<P: AsRef<Path>>(wal_path: P) -> Result<Self> {
185        let wal_path = wal_path.as_ref().to_path_buf();
186        if let Some(parent) = wal_path.parent() {
187            if !parent.as_os_str().is_empty() {
188                std::fs::create_dir_all(parent)
189                    .with_context(|| format!("failed creating WAL parent {}", parent.display()))?;
190            }
191        }
192
193        let file = OpenOptions::new()
194            .create(true)
195            .append(true)
196            .read(true)
197            .open(&wal_path)
198            .with_context(|| format!("failed opening WAL {}", wal_path.display()))?;
199
200        let seed_seq = detect_next_seq(&wal_path).unwrap_or(1);
201
202        Ok(Self {
203            wal_path,
204            file: Mutex::new(file),
205            next_seq: AtomicU64::new(seed_seq),
206        })
207    }
208
209    pub fn wal_path(&self) -> &Path {
210        &self.wal_path
211    }
212
213    pub fn next_seq(&self) -> u64 {
214        self.next_seq.fetch_add(1, Ordering::AcqRel)
215    }
216
217    pub fn append_object_entry(&self, entry: &WalEntryV1) -> Result<()> {
218        let mut fbb = FlatBufferBuilder::with_capacity(512);
219
220        let op = fbb.create_string(entry.op.as_str());
221        let bucket = fbb.create_string(entry.bucket.as_str());
222        let key = fbb.create_string(entry.key.as_str());
223        let etag = entry.etag.as_ref().map(|v| fbb.create_string(v.as_str()));
224        let pointer = entry.pointer_or_manifest.as_ref().map(|p| {
225            fb_object::DevicePointer::create(
226                &mut fbb,
227                &fb_object::DevicePointerArgs {
228                    device_id: p.device_id,
229                    offset: p.offset,
230                    len: p.len,
231                    checksum_crc32c: p.checksum_crc32c,
232                    nonce_id: p.nonce_id,
233                },
234            )
235        });
236
237        let record = fb_object::ObjectRecordV1::create(
238            &mut fbb,
239            &fb_object::ObjectRecordV1Args {
240                seq: entry.seq,
241                op: Some(op),
242                bucket: Some(bucket),
243                key: Some(key),
244                etag,
245                ts_unix_ms: entry.ts_unix_ms,
246                pointer,
247            },
248        );
249        fb_object::finish_size_prefixed_object_record_v1_buffer(&mut fbb, record);
250
251        self.append_frame(RECORD_KIND_OBJECT, fbb.finished_data())
252    }
253
254    pub fn append_multipart_entry(&self, entry: &MultipartWalEntryV1) -> Result<()> {
255        let mut fbb = FlatBufferBuilder::with_capacity(1024);
256
257        let op = fbb.create_string(entry.op.as_str());
258        let bucket = fbb.create_string(entry.bucket.as_str());
259        let key = fbb.create_string(entry.key.as_str());
260        let upload_id = fbb.create_string(entry.upload_id.as_str());
261        let final_etag = entry
262            .final_etag
263            .as_ref()
264            .map(|v| fbb.create_string(v.as_str()));
265
266        let mut part_offsets = Vec::with_capacity(entry.parts.len());
267        for part in &entry.parts {
268            let etag = fbb.create_string(part.etag.as_str());
269            let pointer = fb_multipart::DevicePointer::create(
270                &mut fbb,
271                &fb_multipart::DevicePointerArgs {
272                    device_id: part.pointer.device_id,
273                    offset: part.pointer.offset,
274                    len: part.pointer.len,
275                    checksum_crc32c: part.pointer.checksum_crc32c,
276                    nonce_id: part.pointer.nonce_id,
277                },
278            );
279            let part_entry = fb_multipart::MultipartPartPointer::create(
280                &mut fbb,
281                &fb_multipart::MultipartPartPointerArgs {
282                    part_number: part.part_number,
283                    etag: Some(etag),
284                    pointer: Some(pointer),
285                },
286            );
287            part_offsets.push(part_entry);
288        }
289        let parts = if part_offsets.is_empty() {
290            None
291        } else {
292            Some(fbb.create_vector(&part_offsets))
293        };
294
295        let record = fb_multipart::MultipartRecordV1::create(
296            &mut fbb,
297            &fb_multipart::MultipartRecordV1Args {
298                seq: entry.seq,
299                op: Some(op),
300                bucket: Some(bucket),
301                key: Some(key),
302                upload_id: Some(upload_id),
303                final_etag,
304                ts_unix_ms: entry.ts_unix_ms,
305                parts,
306            },
307        );
308        fb_multipart::finish_size_prefixed_multipart_record_v1_buffer(&mut fbb, record);
309
310        self.append_frame(RECORD_KIND_MULTIPART, fbb.finished_data())
311    }
312
313    pub fn append_iam_entry(&self, entry: &IamWalEntryV1) -> Result<()> {
314        let mut fbb = FlatBufferBuilder::with_capacity(512);
315        let op = fbb.create_string(entry.op.as_str());
316        let target = fbb.create_string(entry.target.as_str());
317        let payload = fbb.create_string(entry.payload.as_str());
318
319        let record = fb_iam::IamRecordV1::create(
320            &mut fbb,
321            &fb_iam::IamRecordV1Args {
322                seq: entry.seq,
323                op: Some(op),
324                target: Some(target),
325                payload: Some(payload),
326                ts_unix_ms: entry.ts_unix_ms,
327            },
328        );
329        fb_iam::finish_size_prefixed_iam_record_v1_buffer(&mut fbb, record);
330
331        self.append_frame(RECORD_KIND_IAM, fbb.finished_data())
332    }
333
334    pub fn append_webhook_entry(&self, entry: &WebhookWalEntryV1) -> Result<()> {
335        let mut fbb = FlatBufferBuilder::with_capacity(512);
336        let op = fbb.create_string(entry.op.as_str());
337        let bucket = fbb.create_string(entry.bucket.as_str());
338        let rule_id = fbb.create_string(entry.rule_id.as_str());
339        let rule_json = fbb.create_string(entry.rule_json.as_str());
340
341        let record = fb_webhook::WebhookRecordV1::create(
342            &mut fbb,
343            &fb_webhook::WebhookRecordV1Args {
344                seq: entry.seq,
345                op: Some(op),
346                bucket: Some(bucket),
347                rule_id: Some(rule_id),
348                rule_json: Some(rule_json),
349                ts_unix_ms: entry.ts_unix_ms,
350            },
351        );
352        fb_webhook::finish_size_prefixed_webhook_record_v1_buffer(&mut fbb, record);
353
354        self.append_frame(RECORD_KIND_WEBHOOK, fbb.finished_data())
355    }
356
357    pub fn append_webhook_event_entry(&self, entry: &WebhookEventWalEntryV1) -> Result<()> {
358        let mut fbb = FlatBufferBuilder::with_capacity(512);
359        let op = fbb.create_string(entry.op.as_str());
360        let event_id = fbb.create_string(entry.event_id.as_str());
361        let rule_id = fbb.create_string(entry.rule_id.as_str());
362        let bucket = fbb.create_string(entry.bucket.as_str());
363        let key = fbb.create_string(entry.key.as_str());
364        let payload_json = fbb.create_string(entry.payload_json.as_str());
365
366        let record = fb_webhook_event::WebhookEventRecordV1::create(
367            &mut fbb,
368            &fb_webhook_event::WebhookEventRecordV1Args {
369                seq: entry.seq,
370                op: Some(op),
371                event_id: Some(event_id),
372                rule_id: Some(rule_id),
373                bucket: Some(bucket),
374                key: Some(key),
375                payload_json: Some(payload_json),
376                ts_unix_ms: entry.ts_unix_ms,
377            },
378        );
379        fb_webhook_event::finish_size_prefixed_webhook_event_record_v1_buffer(&mut fbb, record);
380        self.append_frame(RECORD_KIND_WEBHOOK_EVENT, fbb.finished_data())
381    }
382
383    pub fn append_migration_entry(&self, entry: &MigrationWalEntryV1) -> Result<()> {
384        let mut fbb = FlatBufferBuilder::with_capacity(512);
385        let op = fbb.create_string(entry.op.as_str());
386        let bucket = fbb.create_string(entry.bucket.as_str());
387        let key = fbb.create_string(entry.key.as_str());
388        let payload = fbb.create_string(entry.payload.as_str());
389
390        let record = fb_migration::MigrationRecordV1::create(
391            &mut fbb,
392            &fb_migration::MigrationRecordV1Args {
393                seq: entry.seq,
394                op: Some(op),
395                bucket: Some(bucket),
396                key: Some(key),
397                payload: Some(payload),
398                ts_unix_ms: entry.ts_unix_ms,
399            },
400        );
401        fb_migration::finish_size_prefixed_migration_record_v1_buffer(&mut fbb, record);
402        self.append_frame(RECORD_KIND_MIGRATION, fbb.finished_data())
403    }
404
405    fn append_frame(&self, kind: u8, payload: &[u8]) -> Result<()> {
406        let payload_len: u32 = payload
407            .len()
408            .try_into()
409            .context("WAL payload too large to frame")?;
410        let payload_crc = crc32c::crc32c(payload);
411
412        let mut header = [0_u8; FRAME_HEADER_BYTES];
413        header[0] = kind;
414        header[1..5].copy_from_slice(&payload_len.to_le_bytes());
415        header[5..9].copy_from_slice(&payload_crc.to_le_bytes());
416
417        let mut file = self
418            .file
419            .lock()
420            .map_err(|_| anyhow::anyhow!("WAL file mutex poisoned"))?;
421        file.write_all(&header)
422            .context("failed writing WAL frame header")?;
423        file.write_all(payload)
424            .context("failed writing WAL frame payload")?;
425        file.sync_data().context("failed syncing WAL append")?;
426        Ok(())
427    }
428}
429
430pub fn replay_wal(config: WalReplayConfig) -> Result<WalReplayStats> {
431    let state = replay_wal_state(config)?;
432    Ok(state.stats)
433}
434
435pub fn replay_wal_state(config: WalReplayConfig) -> Result<WalReplayState> {
436    if !config.wal_path.exists() {
437        return Ok(WalReplayState {
438            stats: WalReplayStats {
439                wal_path: config.wal_path,
440                dry_run: config.dry_run,
441                object_entries: 0,
442                multipart_entries: 0,
443                object_puts: 0,
444                object_deletes: 0,
445                multipart_creates: 0,
446                multipart_parts: 0,
447                multipart_completes: 0,
448                multipart_aborts: 0,
449                iam_entries: 0,
450                webhook_entries: 0,
451                webhook_event_entries: 0,
452                migration_entries: 0,
453                last_seq: 0,
454            },
455            object_index: BTreeMap::new(),
456            multipart_uploads: BTreeMap::new(),
457            iam_log: Vec::new(),
458            webhook_log: Vec::new(),
459            webhook_event_log: Vec::new(),
460            migration_log: Vec::new(),
461        });
462    }
463
464    let mut file = File::open(&config.wal_path)
465        .with_context(|| format!("failed opening WAL {}", config.wal_path.display()))?;
466
467    let mut object_index = BTreeMap::<String, WalEntryV1>::new();
468    let mut multipart_uploads = BTreeMap::<String, MultipartWalEntryV1>::new();
469    let mut stats = WalReplayStats {
470        wal_path: config.wal_path.clone(),
471        dry_run: config.dry_run,
472        object_entries: 0,
473        multipart_entries: 0,
474        object_puts: 0,
475        object_deletes: 0,
476        multipart_creates: 0,
477        multipart_parts: 0,
478        multipart_completes: 0,
479        multipart_aborts: 0,
480        iam_entries: 0,
481        webhook_entries: 0,
482        webhook_event_entries: 0,
483        migration_entries: 0,
484        last_seq: 0,
485    };
486    let mut iam_log = Vec::<IamWalEntryV1>::new();
487    let mut webhook_log = Vec::<WebhookWalEntryV1>::new();
488    let mut webhook_event_log = Vec::<WebhookEventWalEntryV1>::new();
489    let mut migration_log = Vec::<MigrationWalEntryV1>::new();
490
491    loop {
492        let mut header = [0_u8; FRAME_HEADER_BYTES];
493        match file.read_exact(&mut header) {
494            Ok(()) => {}
495            Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
496            Err(err) => return Err(err).context("failed reading WAL frame header"),
497        }
498
499        let kind = header[0];
500        let payload_len = u32::from_le_bytes(header[1..5].try_into().expect("header len"));
501        let payload_crc = u32::from_le_bytes(header[5..9].try_into().expect("header crc"));
502
503        // Raw block devices may expose a zero-initialized tail. Treat a zero header
504        // as end-of-log rather than corruption.
505        if kind == 0 && payload_len == 0 && payload_crc == 0 {
506            break;
507        }
508
509        let mut payload = vec![0_u8; payload_len as usize];
510        file.read_exact(&mut payload)
511            .context("failed reading WAL frame payload")?;
512
513        let actual_crc = crc32c::crc32c(&payload);
514        if actual_crc != payload_crc {
515            anyhow::bail!(
516                "WAL CRC mismatch at seq {}: expected {}, got {}",
517                stats.last_seq,
518                payload_crc,
519                actual_crc
520            );
521        }
522
523        match kind {
524            RECORD_KIND_OBJECT => {
525                stats.object_entries += 1;
526                let record = fb_object::size_prefixed_root_as_object_record_v1(&payload)
527                    .map_err(|err| anyhow::anyhow!("invalid object WAL frame: {err}"))?;
528                let entry = decode_object_record(record)?;
529                enforce_monotonic_seq(stats.last_seq, entry.seq)?;
530                stats.last_seq = entry.seq;
531
532                match entry.op.as_str() {
533                    "put" => {
534                        stats.object_puts += 1;
535                        if !config.dry_run {
536                            object_index.insert(format!("{}/{}", entry.bucket, entry.key), entry);
537                        }
538                    }
539                    "delete" => {
540                        stats.object_deletes += 1;
541                        if !config.dry_run {
542                            object_index.remove(&format!("{}/{}", entry.bucket, entry.key));
543                        }
544                    }
545                    other => anyhow::bail!("unsupported object WAL op: {other}"),
546                }
547            }
548            RECORD_KIND_MULTIPART => {
549                stats.multipart_entries += 1;
550                let record = fb_multipart::size_prefixed_root_as_multipart_record_v1(&payload)
551                    .map_err(|err| anyhow::anyhow!("invalid multipart WAL frame: {err}"))?;
552                let entry = decode_multipart_record(record)?;
553                enforce_monotonic_seq(stats.last_seq, entry.seq)?;
554                stats.last_seq = entry.seq;
555
556                match entry.op.as_str() {
557                    "create" => {
558                        stats.multipart_creates += 1;
559                        if !config.dry_run {
560                            multipart_uploads.insert(entry.upload_id.clone(), entry);
561                        }
562                    }
563                    "upload_part" => {
564                        stats.multipart_parts += 1;
565                        if !config.dry_run {
566                            let state = multipart_uploads
567                                .entry(entry.upload_id.clone())
568                                .or_insert_with(|| MultipartWalEntryV1 {
569                                    seq: entry.seq,
570                                    op: "create".to_string(),
571                                    upload_id: entry.upload_id.clone(),
572                                    bucket: entry.bucket.clone(),
573                                    key: entry.key.clone(),
574                                    final_etag: None,
575                                    parts: Vec::new(),
576                                    ts_unix_ms: entry.ts_unix_ms,
577                                });
578                            for incoming in entry.parts {
579                                if let Some(existing) = state
580                                    .parts
581                                    .iter_mut()
582                                    .find(|part| part.part_number == incoming.part_number)
583                                {
584                                    *existing = incoming;
585                                } else {
586                                    state.parts.push(incoming);
587                                }
588                            }
589                            state.parts.sort_by_key(|part| part.part_number);
590                            state.seq = entry.seq;
591                            state.ts_unix_ms = entry.ts_unix_ms;
592                        }
593                    }
594                    "complete" => {
595                        stats.multipart_completes += 1;
596                        if !config.dry_run {
597                            multipart_uploads.insert(entry.upload_id.clone(), entry);
598                        }
599                    }
600                    "abort" => {
601                        stats.multipart_aborts += 1;
602                        if !config.dry_run {
603                            multipart_uploads.remove(&entry.upload_id);
604                        }
605                    }
606                    other => anyhow::bail!("unsupported multipart WAL op: {other}"),
607                }
608            }
609            RECORD_KIND_IAM => {
610                stats.iam_entries += 1;
611                let record = fb_iam::size_prefixed_root_as_iam_record_v1(&payload)
612                    .map_err(|err| anyhow::anyhow!("invalid IAM WAL frame: {err}"))?;
613                let entry = decode_iam_record(record)?;
614                enforce_monotonic_seq(stats.last_seq, entry.seq)?;
615                stats.last_seq = entry.seq;
616                if !config.dry_run {
617                    iam_log.push(entry);
618                }
619            }
620            RECORD_KIND_WEBHOOK => {
621                stats.webhook_entries += 1;
622                let record = fb_webhook::size_prefixed_root_as_webhook_record_v1(&payload)
623                    .map_err(|err| anyhow::anyhow!("invalid webhook WAL frame: {err}"))?;
624                let entry = decode_webhook_record(record)?;
625                enforce_monotonic_seq(stats.last_seq, entry.seq)?;
626                stats.last_seq = entry.seq;
627                if !config.dry_run {
628                    webhook_log.push(entry);
629                }
630            }
631            RECORD_KIND_WEBHOOK_EVENT => {
632                stats.webhook_event_entries += 1;
633                let record =
634                    fb_webhook_event::size_prefixed_root_as_webhook_event_record_v1(&payload)
635                        .map_err(|err| {
636                            anyhow::anyhow!("invalid webhook event WAL frame: {err}")
637                        })?;
638                let entry = decode_webhook_event_record(record)?;
639                enforce_monotonic_seq(stats.last_seq, entry.seq)?;
640                stats.last_seq = entry.seq;
641                if !config.dry_run {
642                    webhook_event_log.push(entry);
643                }
644            }
645            RECORD_KIND_MIGRATION => {
646                stats.migration_entries += 1;
647                let record = fb_migration::size_prefixed_root_as_migration_record_v1(&payload)
648                    .map_err(|err| anyhow::anyhow!("invalid migration WAL frame: {err}"))?;
649                let entry = decode_migration_record(record)?;
650                enforce_monotonic_seq(stats.last_seq, entry.seq)?;
651                stats.last_seq = entry.seq;
652                if !config.dry_run {
653                    migration_log.push(entry);
654                }
655            }
656            other => anyhow::bail!("unknown WAL record kind: {other}"),
657        }
658    }
659
660    Ok(WalReplayState {
661        stats,
662        object_index,
663        multipart_uploads,
664        iam_log,
665        webhook_log,
666        webhook_event_log,
667        migration_log,
668    })
669}
670
671fn decode_object_record(record: fb_object::ObjectRecordV1<'_>) -> Result<WalEntryV1> {
672    let op = record
673        .op()
674        .context("object WAL record missing op")?
675        .to_string();
676    let bucket = record
677        .bucket()
678        .context("object WAL record missing bucket")?
679        .to_string();
680    let key = record
681        .key()
682        .context("object WAL record missing key")?
683        .to_string();
684
685    let pointer_or_manifest = record.pointer().map(|ptr| DevicePointer {
686        device_id: ptr.device_id(),
687        offset: ptr.offset(),
688        len: ptr.len(),
689        checksum_crc32c: ptr.checksum_crc32c(),
690        nonce_id: ptr.nonce_id(),
691    });
692
693    Ok(WalEntryV1 {
694        seq: record.seq(),
695        op,
696        bucket,
697        key,
698        etag: record.etag().map(ToString::to_string),
699        pointer_or_manifest,
700        ts_unix_ms: record.ts_unix_ms(),
701    })
702}
703
704fn decode_multipart_record(
705    record: fb_multipart::MultipartRecordV1<'_>,
706) -> Result<MultipartWalEntryV1> {
707    let op = record
708        .op()
709        .context("multipart WAL record missing op")?
710        .to_string();
711    let bucket = record
712        .bucket()
713        .context("multipart WAL record missing bucket")?
714        .to_string();
715    let key = record
716        .key()
717        .context("multipart WAL record missing key")?
718        .to_string();
719    let upload_id = record
720        .upload_id()
721        .context("multipart WAL record missing upload_id")?
722        .to_string();
723
724    let mut parts = Vec::new();
725    if let Some(list) = record.parts() {
726        for part in list {
727            let pointer = part
728                .pointer()
729                .context("multipart WAL part missing pointer")?;
730            parts.push(MultipartPart {
731                part_number: part.part_number(),
732                etag: part.etag().unwrap_or_default().to_string(),
733                pointer: DevicePointer {
734                    device_id: pointer.device_id(),
735                    offset: pointer.offset(),
736                    len: pointer.len(),
737                    checksum_crc32c: pointer.checksum_crc32c(),
738                    nonce_id: pointer.nonce_id(),
739                },
740            });
741        }
742    }
743
744    Ok(MultipartWalEntryV1 {
745        seq: record.seq(),
746        op,
747        upload_id,
748        bucket,
749        key,
750        final_etag: record.final_etag().map(ToString::to_string),
751        parts,
752        ts_unix_ms: record.ts_unix_ms(),
753    })
754}
755
756fn decode_iam_record(record: fb_iam::IamRecordV1<'_>) -> Result<IamWalEntryV1> {
757    Ok(IamWalEntryV1 {
758        seq: record.seq(),
759        op: record
760            .op()
761            .context("iam WAL record missing op")?
762            .to_string(),
763        target: record
764            .target()
765            .context("iam WAL record missing target")?
766            .to_string(),
767        payload: record
768            .payload()
769            .context("iam WAL record missing payload")?
770            .to_string(),
771        ts_unix_ms: record.ts_unix_ms(),
772    })
773}
774
775fn decode_webhook_record(record: fb_webhook::WebhookRecordV1<'_>) -> Result<WebhookWalEntryV1> {
776    Ok(WebhookWalEntryV1 {
777        seq: record.seq(),
778        op: record
779            .op()
780            .context("webhook WAL record missing op")?
781            .to_string(),
782        bucket: record
783            .bucket()
784            .context("webhook WAL record missing bucket")?
785            .to_string(),
786        rule_id: record
787            .rule_id()
788            .context("webhook WAL record missing rule_id")?
789            .to_string(),
790        rule_json: record
791            .rule_json()
792            .context("webhook WAL record missing rule_json")?
793            .to_string(),
794        ts_unix_ms: record.ts_unix_ms(),
795    })
796}
797
798fn decode_webhook_event_record(
799    record: fb_webhook_event::WebhookEventRecordV1<'_>,
800) -> Result<WebhookEventWalEntryV1> {
801    Ok(WebhookEventWalEntryV1 {
802        seq: record.seq(),
803        op: record
804            .op()
805            .context("webhook event WAL record missing op")?
806            .to_string(),
807        event_id: record
808            .event_id()
809            .context("webhook event WAL record missing event_id")?
810            .to_string(),
811        rule_id: record
812            .rule_id()
813            .context("webhook event WAL record missing rule_id")?
814            .to_string(),
815        bucket: record
816            .bucket()
817            .context("webhook event WAL record missing bucket")?
818            .to_string(),
819        key: record
820            .key()
821            .context("webhook event WAL record missing key")?
822            .to_string(),
823        payload_json: record
824            .payload_json()
825            .context("webhook event WAL record missing payload_json")?
826            .to_string(),
827        ts_unix_ms: record.ts_unix_ms(),
828    })
829}
830
831fn decode_migration_record(record: fb_migration::MigrationRecordV1<'_>) -> Result<MigrationWalEntryV1> {
832    Ok(MigrationWalEntryV1 {
833        seq: record.seq(),
834        op: record
835            .op()
836            .context("migration WAL record missing op")?
837            .to_string(),
838        bucket: record
839            .bucket()
840            .context("migration WAL record missing bucket")?
841            .to_string(),
842        key: record
843            .key()
844            .context("migration WAL record missing key")?
845            .to_string(),
846        payload: record
847            .payload()
848            .context("migration WAL record missing payload")?
849            .to_string(),
850        ts_unix_ms: record.ts_unix_ms(),
851    })
852}
853
854fn detect_next_seq(path: &Path) -> Option<u64> {
855    let state = replay_wal_state(WalReplayConfig {
856        wal_path: path.to_path_buf(),
857        dry_run: true,
858    })
859    .ok()?;
860    Some(state.stats.last_seq.saturating_add(1).max(1))
861}
862
863fn enforce_monotonic_seq(previous: u64, next: u64) -> Result<()> {
864    if next <= previous {
865        anyhow::bail!(
866            "WAL sequence monotonicity violation: previous={}, next={}",
867            previous,
868            next
869        );
870    }
871    Ok(())
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877
878    #[test]
879    fn wal_round_trip_object_and_multipart() {
880        let wal_path = unique_temp_path("wal-roundtrip");
881        let writer = WalWriter::open(&wal_path).expect("open WAL writer");
882
883        let seq1 = writer.next_seq();
884        writer
885            .append_object_entry(&WalEntryV1 {
886                seq: seq1,
887                op: "put".to_string(),
888                bucket: "b".to_string(),
889                key: "k".to_string(),
890                etag: Some("etag-1".to_string()),
891                pointer_or_manifest: Some(DevicePointer {
892                    device_id: 1,
893                    offset: 8192,
894                    len: 4096,
895                    checksum_crc32c: 123,
896                    nonce_id: 77,
897                }),
898                ts_unix_ms: 1,
899            })
900            .expect("append object entry");
901
902        let seq2 = writer.next_seq();
903        writer
904            .append_multipart_entry(&MultipartWalEntryV1 {
905                seq: seq2,
906                op: "create".to_string(),
907                upload_id: "u1".to_string(),
908                bucket: "b".to_string(),
909                key: "k2".to_string(),
910                final_etag: None,
911                parts: Vec::new(),
912                ts_unix_ms: 2,
913            })
914            .expect("append multipart entry");
915
916        let replay = replay_wal_state(WalReplayConfig {
917            wal_path: wal_path.clone(),
918            dry_run: false,
919        })
920        .expect("replay should succeed");
921
922        assert_eq!(replay.stats.object_entries, 1);
923        assert_eq!(replay.stats.multipart_entries, 1);
924        assert_eq!(replay.stats.last_seq, seq2);
925        assert!(replay.object_index.contains_key("b/k"));
926        assert!(replay.multipart_uploads.contains_key("u1"));
927
928        let _ = std::fs::remove_file(wal_path);
929    }
930
931    #[test]
932    fn wal_replay_rejects_non_monotonic_sequence() {
933        let wal_path = unique_temp_path("wal-bad-seq");
934        let writer = WalWriter::open(&wal_path).expect("open WAL writer");
935
936        writer
937            .append_object_entry(&WalEntryV1 {
938                seq: 3,
939                op: "put".to_string(),
940                bucket: "b".to_string(),
941                key: "k1".to_string(),
942                etag: None,
943                pointer_or_manifest: None,
944                ts_unix_ms: 1,
945            })
946            .expect("append seq3");
947        writer
948            .append_object_entry(&WalEntryV1 {
949                seq: 2,
950                op: "put".to_string(),
951                bucket: "b".to_string(),
952                key: "k2".to_string(),
953                etag: None,
954                pointer_or_manifest: None,
955                ts_unix_ms: 2,
956            })
957            .expect("append seq2");
958
959        let err = replay_wal(WalReplayConfig {
960            wal_path: wal_path.clone(),
961            dry_run: false,
962        })
963        .expect_err("expected monotonicity violation");
964        assert!(err.to_string().contains("monotonicity"));
965
966        let _ = std::fs::remove_file(wal_path);
967    }
968
969    fn unique_temp_path(prefix: &str) -> PathBuf {
970        let now = std::time::SystemTime::now()
971            .duration_since(std::time::UNIX_EPOCH)
972            .expect("valid system time")
973            .as_nanos();
974        std::env::temp_dir().join(format!("{prefix}-{}-{now}.wal", std::process::id()))
975    }
976}