1use std::collections::BTreeMap;
2use std::fs::{File, OpenOptions};
3use std::io::{self, Read, Write};
4use std::path::{Path, PathBuf};
5use std::sync::atomic::{AtomicU64, Ordering};
6use std::sync::Mutex;
7
8use anyhow::{Context, Result};
9use flatbuffers::FlatBufferBuilder;
10
11use crate::generated::{
12 iam_wal_v1_generated::nexus::wal as fb_iam,
13 migration_wal_v1_generated::nexus::wal as fb_migration,
14 multipart_wal_v1_generated::nexus::wal as fb_multipart,
15 object_wal_v1_generated::nexus::wal as fb_object,
16 webhook_event_wal_v1_generated::nexus::wal as fb_webhook_event,
17 webhook_wal_v1_generated::nexus::wal as fb_webhook,
18};
19
20const FRAME_HEADER_BYTES: usize = 9;
21const RECORD_KIND_OBJECT: u8 = 1;
22const RECORD_KIND_MULTIPART: u8 = 2;
23const RECORD_KIND_IAM: u8 = 3;
24const RECORD_KIND_WEBHOOK: u8 = 4;
25const RECORD_KIND_WEBHOOK_EVENT: u8 = 5;
26const RECORD_KIND_MIGRATION: u8 = 6;
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub struct DevicePointer {
30 pub device_id: u16,
31 pub offset: u64,
32 pub len: u32,
33 pub checksum_crc32c: u32,
34 pub nonce_id: u64,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct WalEntryV1 {
39 pub seq: u64,
40 pub op: String,
41 pub bucket: String,
42 pub key: String,
43 pub etag: Option<String>,
44 pub pointer_or_manifest: Option<DevicePointer>,
45 pub ts_unix_ms: u64,
46}
47
48#[derive(Debug, Clone, PartialEq, Eq)]
49pub struct MultipartPart {
50 pub part_number: u32,
51 pub etag: String,
52 pub pointer: DevicePointer,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
56pub struct MultipartManifest {
57 pub upload_id: String,
58 pub bucket: String,
59 pub key: String,
60 pub parts: Vec<MultipartPart>,
61 pub final_etag: String,
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
65pub struct MultipartWalEntryV1 {
66 pub seq: u64,
67 pub op: String,
68 pub upload_id: String,
69 pub bucket: String,
70 pub key: String,
71 pub final_etag: Option<String>,
72 pub parts: Vec<MultipartPart>,
73 pub ts_unix_ms: u64,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct IamWalEntryV1 {
78 pub seq: u64,
79 pub op: String,
80 pub target: String,
81 pub payload: String,
82 pub ts_unix_ms: u64,
83}
84
85#[derive(Debug, Clone, PartialEq, Eq)]
86pub struct WebhookWalEntryV1 {
87 pub seq: u64,
88 pub op: String,
89 pub bucket: String,
90 pub rule_id: String,
91 pub rule_json: String,
92 pub ts_unix_ms: u64,
93}
94
95#[derive(Debug, Clone, PartialEq, Eq)]
96pub struct WebhookEventWalEntryV1 {
97 pub seq: u64,
98 pub op: String,
99 pub event_id: String,
100 pub rule_id: String,
101 pub bucket: String,
102 pub key: String,
103 pub payload_json: String,
104 pub ts_unix_ms: u64,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct MigrationWalEntryV1 {
109 pub seq: u64,
110 pub op: String,
111 pub bucket: String,
112 pub key: String,
113 pub payload: String,
114 pub ts_unix_ms: u64,
115}
116
117#[derive(Debug, Clone)]
118pub struct WalReplayConfig {
119 pub wal_path: PathBuf,
120 pub dry_run: bool,
121}
122
123#[derive(Debug, Clone)]
124pub struct WalReplayStats {
125 pub wal_path: PathBuf,
126 pub dry_run: bool,
127 pub object_entries: u64,
128 pub multipart_entries: u64,
129 pub object_puts: u64,
130 pub object_deletes: u64,
131 pub multipart_creates: u64,
132 pub multipart_parts: u64,
133 pub multipart_completes: u64,
134 pub multipart_aborts: u64,
135 pub iam_entries: u64,
136 pub webhook_entries: u64,
137 pub webhook_event_entries: u64,
138 pub migration_entries: u64,
139 pub last_seq: u64,
140}
141
142impl WalReplayStats {
143 pub fn to_json(&self) -> String {
144 format!(
145 "{{\"module\":\"WAL_REPLAY\",\"wal_path\":\"{}\",\"dry_run\":{},\"object_entries\":{},\"multipart_entries\":{},\"object_puts\":{},\"object_deletes\":{},\"multipart_creates\":{},\"multipart_parts\":{},\"multipart_completes\":{},\"multipart_aborts\":{},\"iam_entries\":{},\"webhook_entries\":{},\"webhook_event_entries\":{},\"migration_entries\":{},\"last_seq\":{}}}",
146 self.wal_path.display(),
147 self.dry_run,
148 self.object_entries,
149 self.multipart_entries,
150 self.object_puts,
151 self.object_deletes,
152 self.multipart_creates,
153 self.multipart_parts,
154 self.multipart_completes,
155 self.multipart_aborts,
156 self.iam_entries,
157 self.webhook_entries,
158 self.webhook_event_entries,
159 self.migration_entries,
160 self.last_seq,
161 )
162 }
163}
164
165#[derive(Debug, Clone)]
166pub struct WalReplayState {
167 pub stats: WalReplayStats,
168 pub object_index: BTreeMap<String, WalEntryV1>,
169 pub multipart_uploads: BTreeMap<String, MultipartWalEntryV1>,
170 pub iam_log: Vec<IamWalEntryV1>,
171 pub webhook_log: Vec<WebhookWalEntryV1>,
172 pub webhook_event_log: Vec<WebhookEventWalEntryV1>,
173 pub migration_log: Vec<MigrationWalEntryV1>,
174}
175
176#[derive(Debug)]
177pub struct WalWriter {
178 wal_path: PathBuf,
179 file: Mutex<File>,
180 next_seq: AtomicU64,
181}
182
183impl WalWriter {
184 pub fn open<P: AsRef<Path>>(wal_path: P) -> Result<Self> {
185 let wal_path = wal_path.as_ref().to_path_buf();
186 if let Some(parent) = wal_path.parent() {
187 if !parent.as_os_str().is_empty() {
188 std::fs::create_dir_all(parent)
189 .with_context(|| format!("failed creating WAL parent {}", parent.display()))?;
190 }
191 }
192
193 let file = OpenOptions::new()
194 .create(true)
195 .append(true)
196 .read(true)
197 .open(&wal_path)
198 .with_context(|| format!("failed opening WAL {}", wal_path.display()))?;
199
200 let seed_seq = detect_next_seq(&wal_path).unwrap_or(1);
201
202 Ok(Self {
203 wal_path,
204 file: Mutex::new(file),
205 next_seq: AtomicU64::new(seed_seq),
206 })
207 }
208
209 pub fn wal_path(&self) -> &Path {
210 &self.wal_path
211 }
212
213 pub fn next_seq(&self) -> u64 {
214 self.next_seq.fetch_add(1, Ordering::AcqRel)
215 }
216
217 pub fn append_object_entry(&self, entry: &WalEntryV1) -> Result<()> {
218 let mut fbb = FlatBufferBuilder::with_capacity(512);
219
220 let op = fbb.create_string(entry.op.as_str());
221 let bucket = fbb.create_string(entry.bucket.as_str());
222 let key = fbb.create_string(entry.key.as_str());
223 let etag = entry.etag.as_ref().map(|v| fbb.create_string(v.as_str()));
224 let pointer = entry.pointer_or_manifest.as_ref().map(|p| {
225 fb_object::DevicePointer::create(
226 &mut fbb,
227 &fb_object::DevicePointerArgs {
228 device_id: p.device_id,
229 offset: p.offset,
230 len: p.len,
231 checksum_crc32c: p.checksum_crc32c,
232 nonce_id: p.nonce_id,
233 },
234 )
235 });
236
237 let record = fb_object::ObjectRecordV1::create(
238 &mut fbb,
239 &fb_object::ObjectRecordV1Args {
240 seq: entry.seq,
241 op: Some(op),
242 bucket: Some(bucket),
243 key: Some(key),
244 etag,
245 ts_unix_ms: entry.ts_unix_ms,
246 pointer,
247 },
248 );
249 fb_object::finish_size_prefixed_object_record_v1_buffer(&mut fbb, record);
250
251 self.append_frame(RECORD_KIND_OBJECT, fbb.finished_data())
252 }
253
254 pub fn append_multipart_entry(&self, entry: &MultipartWalEntryV1) -> Result<()> {
255 let mut fbb = FlatBufferBuilder::with_capacity(1024);
256
257 let op = fbb.create_string(entry.op.as_str());
258 let bucket = fbb.create_string(entry.bucket.as_str());
259 let key = fbb.create_string(entry.key.as_str());
260 let upload_id = fbb.create_string(entry.upload_id.as_str());
261 let final_etag = entry
262 .final_etag
263 .as_ref()
264 .map(|v| fbb.create_string(v.as_str()));
265
266 let mut part_offsets = Vec::with_capacity(entry.parts.len());
267 for part in &entry.parts {
268 let etag = fbb.create_string(part.etag.as_str());
269 let pointer = fb_multipart::DevicePointer::create(
270 &mut fbb,
271 &fb_multipart::DevicePointerArgs {
272 device_id: part.pointer.device_id,
273 offset: part.pointer.offset,
274 len: part.pointer.len,
275 checksum_crc32c: part.pointer.checksum_crc32c,
276 nonce_id: part.pointer.nonce_id,
277 },
278 );
279 let part_entry = fb_multipart::MultipartPartPointer::create(
280 &mut fbb,
281 &fb_multipart::MultipartPartPointerArgs {
282 part_number: part.part_number,
283 etag: Some(etag),
284 pointer: Some(pointer),
285 },
286 );
287 part_offsets.push(part_entry);
288 }
289 let parts = if part_offsets.is_empty() {
290 None
291 } else {
292 Some(fbb.create_vector(&part_offsets))
293 };
294
295 let record = fb_multipart::MultipartRecordV1::create(
296 &mut fbb,
297 &fb_multipart::MultipartRecordV1Args {
298 seq: entry.seq,
299 op: Some(op),
300 bucket: Some(bucket),
301 key: Some(key),
302 upload_id: Some(upload_id),
303 final_etag,
304 ts_unix_ms: entry.ts_unix_ms,
305 parts,
306 },
307 );
308 fb_multipart::finish_size_prefixed_multipart_record_v1_buffer(&mut fbb, record);
309
310 self.append_frame(RECORD_KIND_MULTIPART, fbb.finished_data())
311 }
312
313 pub fn append_iam_entry(&self, entry: &IamWalEntryV1) -> Result<()> {
314 let mut fbb = FlatBufferBuilder::with_capacity(512);
315 let op = fbb.create_string(entry.op.as_str());
316 let target = fbb.create_string(entry.target.as_str());
317 let payload = fbb.create_string(entry.payload.as_str());
318
319 let record = fb_iam::IamRecordV1::create(
320 &mut fbb,
321 &fb_iam::IamRecordV1Args {
322 seq: entry.seq,
323 op: Some(op),
324 target: Some(target),
325 payload: Some(payload),
326 ts_unix_ms: entry.ts_unix_ms,
327 },
328 );
329 fb_iam::finish_size_prefixed_iam_record_v1_buffer(&mut fbb, record);
330
331 self.append_frame(RECORD_KIND_IAM, fbb.finished_data())
332 }
333
334 pub fn append_webhook_entry(&self, entry: &WebhookWalEntryV1) -> Result<()> {
335 let mut fbb = FlatBufferBuilder::with_capacity(512);
336 let op = fbb.create_string(entry.op.as_str());
337 let bucket = fbb.create_string(entry.bucket.as_str());
338 let rule_id = fbb.create_string(entry.rule_id.as_str());
339 let rule_json = fbb.create_string(entry.rule_json.as_str());
340
341 let record = fb_webhook::WebhookRecordV1::create(
342 &mut fbb,
343 &fb_webhook::WebhookRecordV1Args {
344 seq: entry.seq,
345 op: Some(op),
346 bucket: Some(bucket),
347 rule_id: Some(rule_id),
348 rule_json: Some(rule_json),
349 ts_unix_ms: entry.ts_unix_ms,
350 },
351 );
352 fb_webhook::finish_size_prefixed_webhook_record_v1_buffer(&mut fbb, record);
353
354 self.append_frame(RECORD_KIND_WEBHOOK, fbb.finished_data())
355 }
356
357 pub fn append_webhook_event_entry(&self, entry: &WebhookEventWalEntryV1) -> Result<()> {
358 let mut fbb = FlatBufferBuilder::with_capacity(512);
359 let op = fbb.create_string(entry.op.as_str());
360 let event_id = fbb.create_string(entry.event_id.as_str());
361 let rule_id = fbb.create_string(entry.rule_id.as_str());
362 let bucket = fbb.create_string(entry.bucket.as_str());
363 let key = fbb.create_string(entry.key.as_str());
364 let payload_json = fbb.create_string(entry.payload_json.as_str());
365
366 let record = fb_webhook_event::WebhookEventRecordV1::create(
367 &mut fbb,
368 &fb_webhook_event::WebhookEventRecordV1Args {
369 seq: entry.seq,
370 op: Some(op),
371 event_id: Some(event_id),
372 rule_id: Some(rule_id),
373 bucket: Some(bucket),
374 key: Some(key),
375 payload_json: Some(payload_json),
376 ts_unix_ms: entry.ts_unix_ms,
377 },
378 );
379 fb_webhook_event::finish_size_prefixed_webhook_event_record_v1_buffer(&mut fbb, record);
380 self.append_frame(RECORD_KIND_WEBHOOK_EVENT, fbb.finished_data())
381 }
382
383 pub fn append_migration_entry(&self, entry: &MigrationWalEntryV1) -> Result<()> {
384 let mut fbb = FlatBufferBuilder::with_capacity(512);
385 let op = fbb.create_string(entry.op.as_str());
386 let bucket = fbb.create_string(entry.bucket.as_str());
387 let key = fbb.create_string(entry.key.as_str());
388 let payload = fbb.create_string(entry.payload.as_str());
389
390 let record = fb_migration::MigrationRecordV1::create(
391 &mut fbb,
392 &fb_migration::MigrationRecordV1Args {
393 seq: entry.seq,
394 op: Some(op),
395 bucket: Some(bucket),
396 key: Some(key),
397 payload: Some(payload),
398 ts_unix_ms: entry.ts_unix_ms,
399 },
400 );
401 fb_migration::finish_size_prefixed_migration_record_v1_buffer(&mut fbb, record);
402 self.append_frame(RECORD_KIND_MIGRATION, fbb.finished_data())
403 }
404
405 fn append_frame(&self, kind: u8, payload: &[u8]) -> Result<()> {
406 let payload_len: u32 = payload
407 .len()
408 .try_into()
409 .context("WAL payload too large to frame")?;
410 let payload_crc = crc32c::crc32c(payload);
411
412 let mut header = [0_u8; FRAME_HEADER_BYTES];
413 header[0] = kind;
414 header[1..5].copy_from_slice(&payload_len.to_le_bytes());
415 header[5..9].copy_from_slice(&payload_crc.to_le_bytes());
416
417 let mut file = self
418 .file
419 .lock()
420 .map_err(|_| anyhow::anyhow!("WAL file mutex poisoned"))?;
421 file.write_all(&header)
422 .context("failed writing WAL frame header")?;
423 file.write_all(payload)
424 .context("failed writing WAL frame payload")?;
425 file.sync_data().context("failed syncing WAL append")?;
426 Ok(())
427 }
428}
429
430pub fn replay_wal(config: WalReplayConfig) -> Result<WalReplayStats> {
431 let state = replay_wal_state(config)?;
432 Ok(state.stats)
433}
434
435pub fn replay_wal_state(config: WalReplayConfig) -> Result<WalReplayState> {
436 if !config.wal_path.exists() {
437 return Ok(WalReplayState {
438 stats: WalReplayStats {
439 wal_path: config.wal_path,
440 dry_run: config.dry_run,
441 object_entries: 0,
442 multipart_entries: 0,
443 object_puts: 0,
444 object_deletes: 0,
445 multipart_creates: 0,
446 multipart_parts: 0,
447 multipart_completes: 0,
448 multipart_aborts: 0,
449 iam_entries: 0,
450 webhook_entries: 0,
451 webhook_event_entries: 0,
452 migration_entries: 0,
453 last_seq: 0,
454 },
455 object_index: BTreeMap::new(),
456 multipart_uploads: BTreeMap::new(),
457 iam_log: Vec::new(),
458 webhook_log: Vec::new(),
459 webhook_event_log: Vec::new(),
460 migration_log: Vec::new(),
461 });
462 }
463
464 let mut file = File::open(&config.wal_path)
465 .with_context(|| format!("failed opening WAL {}", config.wal_path.display()))?;
466
467 let mut object_index = BTreeMap::<String, WalEntryV1>::new();
468 let mut multipart_uploads = BTreeMap::<String, MultipartWalEntryV1>::new();
469 let mut stats = WalReplayStats {
470 wal_path: config.wal_path.clone(),
471 dry_run: config.dry_run,
472 object_entries: 0,
473 multipart_entries: 0,
474 object_puts: 0,
475 object_deletes: 0,
476 multipart_creates: 0,
477 multipart_parts: 0,
478 multipart_completes: 0,
479 multipart_aborts: 0,
480 iam_entries: 0,
481 webhook_entries: 0,
482 webhook_event_entries: 0,
483 migration_entries: 0,
484 last_seq: 0,
485 };
486 let mut iam_log = Vec::<IamWalEntryV1>::new();
487 let mut webhook_log = Vec::<WebhookWalEntryV1>::new();
488 let mut webhook_event_log = Vec::<WebhookEventWalEntryV1>::new();
489 let mut migration_log = Vec::<MigrationWalEntryV1>::new();
490
491 loop {
492 let mut header = [0_u8; FRAME_HEADER_BYTES];
493 match file.read_exact(&mut header) {
494 Ok(()) => {}
495 Err(err) if err.kind() == io::ErrorKind::UnexpectedEof => break,
496 Err(err) => return Err(err).context("failed reading WAL frame header"),
497 }
498
499 let kind = header[0];
500 let payload_len = u32::from_le_bytes(header[1..5].try_into().expect("header len"));
501 let payload_crc = u32::from_le_bytes(header[5..9].try_into().expect("header crc"));
502
503 if kind == 0 && payload_len == 0 && payload_crc == 0 {
506 break;
507 }
508
509 let mut payload = vec![0_u8; payload_len as usize];
510 file.read_exact(&mut payload)
511 .context("failed reading WAL frame payload")?;
512
513 let actual_crc = crc32c::crc32c(&payload);
514 if actual_crc != payload_crc {
515 anyhow::bail!(
516 "WAL CRC mismatch at seq {}: expected {}, got {}",
517 stats.last_seq,
518 payload_crc,
519 actual_crc
520 );
521 }
522
523 match kind {
524 RECORD_KIND_OBJECT => {
525 stats.object_entries += 1;
526 let record = fb_object::size_prefixed_root_as_object_record_v1(&payload)
527 .map_err(|err| anyhow::anyhow!("invalid object WAL frame: {err}"))?;
528 let entry = decode_object_record(record)?;
529 enforce_monotonic_seq(stats.last_seq, entry.seq)?;
530 stats.last_seq = entry.seq;
531
532 match entry.op.as_str() {
533 "put" => {
534 stats.object_puts += 1;
535 if !config.dry_run {
536 object_index.insert(format!("{}/{}", entry.bucket, entry.key), entry);
537 }
538 }
539 "delete" => {
540 stats.object_deletes += 1;
541 if !config.dry_run {
542 object_index.remove(&format!("{}/{}", entry.bucket, entry.key));
543 }
544 }
545 other => anyhow::bail!("unsupported object WAL op: {other}"),
546 }
547 }
548 RECORD_KIND_MULTIPART => {
549 stats.multipart_entries += 1;
550 let record = fb_multipart::size_prefixed_root_as_multipart_record_v1(&payload)
551 .map_err(|err| anyhow::anyhow!("invalid multipart WAL frame: {err}"))?;
552 let entry = decode_multipart_record(record)?;
553 enforce_monotonic_seq(stats.last_seq, entry.seq)?;
554 stats.last_seq = entry.seq;
555
556 match entry.op.as_str() {
557 "create" => {
558 stats.multipart_creates += 1;
559 if !config.dry_run {
560 multipart_uploads.insert(entry.upload_id.clone(), entry);
561 }
562 }
563 "upload_part" => {
564 stats.multipart_parts += 1;
565 if !config.dry_run {
566 let state = multipart_uploads
567 .entry(entry.upload_id.clone())
568 .or_insert_with(|| MultipartWalEntryV1 {
569 seq: entry.seq,
570 op: "create".to_string(),
571 upload_id: entry.upload_id.clone(),
572 bucket: entry.bucket.clone(),
573 key: entry.key.clone(),
574 final_etag: None,
575 parts: Vec::new(),
576 ts_unix_ms: entry.ts_unix_ms,
577 });
578 for incoming in entry.parts {
579 if let Some(existing) = state
580 .parts
581 .iter_mut()
582 .find(|part| part.part_number == incoming.part_number)
583 {
584 *existing = incoming;
585 } else {
586 state.parts.push(incoming);
587 }
588 }
589 state.parts.sort_by_key(|part| part.part_number);
590 state.seq = entry.seq;
591 state.ts_unix_ms = entry.ts_unix_ms;
592 }
593 }
594 "complete" => {
595 stats.multipart_completes += 1;
596 if !config.dry_run {
597 multipart_uploads.insert(entry.upload_id.clone(), entry);
598 }
599 }
600 "abort" => {
601 stats.multipart_aborts += 1;
602 if !config.dry_run {
603 multipart_uploads.remove(&entry.upload_id);
604 }
605 }
606 other => anyhow::bail!("unsupported multipart WAL op: {other}"),
607 }
608 }
609 RECORD_KIND_IAM => {
610 stats.iam_entries += 1;
611 let record = fb_iam::size_prefixed_root_as_iam_record_v1(&payload)
612 .map_err(|err| anyhow::anyhow!("invalid IAM WAL frame: {err}"))?;
613 let entry = decode_iam_record(record)?;
614 enforce_monotonic_seq(stats.last_seq, entry.seq)?;
615 stats.last_seq = entry.seq;
616 if !config.dry_run {
617 iam_log.push(entry);
618 }
619 }
620 RECORD_KIND_WEBHOOK => {
621 stats.webhook_entries += 1;
622 let record = fb_webhook::size_prefixed_root_as_webhook_record_v1(&payload)
623 .map_err(|err| anyhow::anyhow!("invalid webhook WAL frame: {err}"))?;
624 let entry = decode_webhook_record(record)?;
625 enforce_monotonic_seq(stats.last_seq, entry.seq)?;
626 stats.last_seq = entry.seq;
627 if !config.dry_run {
628 webhook_log.push(entry);
629 }
630 }
631 RECORD_KIND_WEBHOOK_EVENT => {
632 stats.webhook_event_entries += 1;
633 let record =
634 fb_webhook_event::size_prefixed_root_as_webhook_event_record_v1(&payload)
635 .map_err(|err| {
636 anyhow::anyhow!("invalid webhook event WAL frame: {err}")
637 })?;
638 let entry = decode_webhook_event_record(record)?;
639 enforce_monotonic_seq(stats.last_seq, entry.seq)?;
640 stats.last_seq = entry.seq;
641 if !config.dry_run {
642 webhook_event_log.push(entry);
643 }
644 }
645 RECORD_KIND_MIGRATION => {
646 stats.migration_entries += 1;
647 let record = fb_migration::size_prefixed_root_as_migration_record_v1(&payload)
648 .map_err(|err| anyhow::anyhow!("invalid migration WAL frame: {err}"))?;
649 let entry = decode_migration_record(record)?;
650 enforce_monotonic_seq(stats.last_seq, entry.seq)?;
651 stats.last_seq = entry.seq;
652 if !config.dry_run {
653 migration_log.push(entry);
654 }
655 }
656 other => anyhow::bail!("unknown WAL record kind: {other}"),
657 }
658 }
659
660 Ok(WalReplayState {
661 stats,
662 object_index,
663 multipart_uploads,
664 iam_log,
665 webhook_log,
666 webhook_event_log,
667 migration_log,
668 })
669}
670
671fn decode_object_record(record: fb_object::ObjectRecordV1<'_>) -> Result<WalEntryV1> {
672 let op = record
673 .op()
674 .context("object WAL record missing op")?
675 .to_string();
676 let bucket = record
677 .bucket()
678 .context("object WAL record missing bucket")?
679 .to_string();
680 let key = record
681 .key()
682 .context("object WAL record missing key")?
683 .to_string();
684
685 let pointer_or_manifest = record.pointer().map(|ptr| DevicePointer {
686 device_id: ptr.device_id(),
687 offset: ptr.offset(),
688 len: ptr.len(),
689 checksum_crc32c: ptr.checksum_crc32c(),
690 nonce_id: ptr.nonce_id(),
691 });
692
693 Ok(WalEntryV1 {
694 seq: record.seq(),
695 op,
696 bucket,
697 key,
698 etag: record.etag().map(ToString::to_string),
699 pointer_or_manifest,
700 ts_unix_ms: record.ts_unix_ms(),
701 })
702}
703
704fn decode_multipart_record(
705 record: fb_multipart::MultipartRecordV1<'_>,
706) -> Result<MultipartWalEntryV1> {
707 let op = record
708 .op()
709 .context("multipart WAL record missing op")?
710 .to_string();
711 let bucket = record
712 .bucket()
713 .context("multipart WAL record missing bucket")?
714 .to_string();
715 let key = record
716 .key()
717 .context("multipart WAL record missing key")?
718 .to_string();
719 let upload_id = record
720 .upload_id()
721 .context("multipart WAL record missing upload_id")?
722 .to_string();
723
724 let mut parts = Vec::new();
725 if let Some(list) = record.parts() {
726 for part in list {
727 let pointer = part
728 .pointer()
729 .context("multipart WAL part missing pointer")?;
730 parts.push(MultipartPart {
731 part_number: part.part_number(),
732 etag: part.etag().unwrap_or_default().to_string(),
733 pointer: DevicePointer {
734 device_id: pointer.device_id(),
735 offset: pointer.offset(),
736 len: pointer.len(),
737 checksum_crc32c: pointer.checksum_crc32c(),
738 nonce_id: pointer.nonce_id(),
739 },
740 });
741 }
742 }
743
744 Ok(MultipartWalEntryV1 {
745 seq: record.seq(),
746 op,
747 upload_id,
748 bucket,
749 key,
750 final_etag: record.final_etag().map(ToString::to_string),
751 parts,
752 ts_unix_ms: record.ts_unix_ms(),
753 })
754}
755
756fn decode_iam_record(record: fb_iam::IamRecordV1<'_>) -> Result<IamWalEntryV1> {
757 Ok(IamWalEntryV1 {
758 seq: record.seq(),
759 op: record
760 .op()
761 .context("iam WAL record missing op")?
762 .to_string(),
763 target: record
764 .target()
765 .context("iam WAL record missing target")?
766 .to_string(),
767 payload: record
768 .payload()
769 .context("iam WAL record missing payload")?
770 .to_string(),
771 ts_unix_ms: record.ts_unix_ms(),
772 })
773}
774
775fn decode_webhook_record(record: fb_webhook::WebhookRecordV1<'_>) -> Result<WebhookWalEntryV1> {
776 Ok(WebhookWalEntryV1 {
777 seq: record.seq(),
778 op: record
779 .op()
780 .context("webhook WAL record missing op")?
781 .to_string(),
782 bucket: record
783 .bucket()
784 .context("webhook WAL record missing bucket")?
785 .to_string(),
786 rule_id: record
787 .rule_id()
788 .context("webhook WAL record missing rule_id")?
789 .to_string(),
790 rule_json: record
791 .rule_json()
792 .context("webhook WAL record missing rule_json")?
793 .to_string(),
794 ts_unix_ms: record.ts_unix_ms(),
795 })
796}
797
798fn decode_webhook_event_record(
799 record: fb_webhook_event::WebhookEventRecordV1<'_>,
800) -> Result<WebhookEventWalEntryV1> {
801 Ok(WebhookEventWalEntryV1 {
802 seq: record.seq(),
803 op: record
804 .op()
805 .context("webhook event WAL record missing op")?
806 .to_string(),
807 event_id: record
808 .event_id()
809 .context("webhook event WAL record missing event_id")?
810 .to_string(),
811 rule_id: record
812 .rule_id()
813 .context("webhook event WAL record missing rule_id")?
814 .to_string(),
815 bucket: record
816 .bucket()
817 .context("webhook event WAL record missing bucket")?
818 .to_string(),
819 key: record
820 .key()
821 .context("webhook event WAL record missing key")?
822 .to_string(),
823 payload_json: record
824 .payload_json()
825 .context("webhook event WAL record missing payload_json")?
826 .to_string(),
827 ts_unix_ms: record.ts_unix_ms(),
828 })
829}
830
831fn decode_migration_record(record: fb_migration::MigrationRecordV1<'_>) -> Result<MigrationWalEntryV1> {
832 Ok(MigrationWalEntryV1 {
833 seq: record.seq(),
834 op: record
835 .op()
836 .context("migration WAL record missing op")?
837 .to_string(),
838 bucket: record
839 .bucket()
840 .context("migration WAL record missing bucket")?
841 .to_string(),
842 key: record
843 .key()
844 .context("migration WAL record missing key")?
845 .to_string(),
846 payload: record
847 .payload()
848 .context("migration WAL record missing payload")?
849 .to_string(),
850 ts_unix_ms: record.ts_unix_ms(),
851 })
852}
853
854fn detect_next_seq(path: &Path) -> Option<u64> {
855 let state = replay_wal_state(WalReplayConfig {
856 wal_path: path.to_path_buf(),
857 dry_run: true,
858 })
859 .ok()?;
860 Some(state.stats.last_seq.saturating_add(1).max(1))
861}
862
863fn enforce_monotonic_seq(previous: u64, next: u64) -> Result<()> {
864 if next <= previous {
865 anyhow::bail!(
866 "WAL sequence monotonicity violation: previous={}, next={}",
867 previous,
868 next
869 );
870 }
871 Ok(())
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877
878 #[test]
879 fn wal_round_trip_object_and_multipart() {
880 let wal_path = unique_temp_path("wal-roundtrip");
881 let writer = WalWriter::open(&wal_path).expect("open WAL writer");
882
883 let seq1 = writer.next_seq();
884 writer
885 .append_object_entry(&WalEntryV1 {
886 seq: seq1,
887 op: "put".to_string(),
888 bucket: "b".to_string(),
889 key: "k".to_string(),
890 etag: Some("etag-1".to_string()),
891 pointer_or_manifest: Some(DevicePointer {
892 device_id: 1,
893 offset: 8192,
894 len: 4096,
895 checksum_crc32c: 123,
896 nonce_id: 77,
897 }),
898 ts_unix_ms: 1,
899 })
900 .expect("append object entry");
901
902 let seq2 = writer.next_seq();
903 writer
904 .append_multipart_entry(&MultipartWalEntryV1 {
905 seq: seq2,
906 op: "create".to_string(),
907 upload_id: "u1".to_string(),
908 bucket: "b".to_string(),
909 key: "k2".to_string(),
910 final_etag: None,
911 parts: Vec::new(),
912 ts_unix_ms: 2,
913 })
914 .expect("append multipart entry");
915
916 let replay = replay_wal_state(WalReplayConfig {
917 wal_path: wal_path.clone(),
918 dry_run: false,
919 })
920 .expect("replay should succeed");
921
922 assert_eq!(replay.stats.object_entries, 1);
923 assert_eq!(replay.stats.multipart_entries, 1);
924 assert_eq!(replay.stats.last_seq, seq2);
925 assert!(replay.object_index.contains_key("b/k"));
926 assert!(replay.multipart_uploads.contains_key("u1"));
927
928 let _ = std::fs::remove_file(wal_path);
929 }
930
931 #[test]
932 fn wal_replay_rejects_non_monotonic_sequence() {
933 let wal_path = unique_temp_path("wal-bad-seq");
934 let writer = WalWriter::open(&wal_path).expect("open WAL writer");
935
936 writer
937 .append_object_entry(&WalEntryV1 {
938 seq: 3,
939 op: "put".to_string(),
940 bucket: "b".to_string(),
941 key: "k1".to_string(),
942 etag: None,
943 pointer_or_manifest: None,
944 ts_unix_ms: 1,
945 })
946 .expect("append seq3");
947 writer
948 .append_object_entry(&WalEntryV1 {
949 seq: 2,
950 op: "put".to_string(),
951 bucket: "b".to_string(),
952 key: "k2".to_string(),
953 etag: None,
954 pointer_or_manifest: None,
955 ts_unix_ms: 2,
956 })
957 .expect("append seq2");
958
959 let err = replay_wal(WalReplayConfig {
960 wal_path: wal_path.clone(),
961 dry_run: false,
962 })
963 .expect_err("expected monotonicity violation");
964 assert!(err.to_string().contains("monotonicity"));
965
966 let _ = std::fs::remove_file(wal_path);
967 }
968
969 fn unique_temp_path(prefix: &str) -> PathBuf {
970 let now = std::time::SystemTime::now()
971 .duration_since(std::time::UNIX_EPOCH)
972 .expect("valid system time")
973 .as_nanos();
974 std::env::temp_dir().join(format!("{prefix}-{}-{now}.wal", std::process::id()))
975 }
976}