1use std::collections::HashMap;
2use std::io::Write;
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use chrono::Local;
10use colored::Colorize;
11use serde::Serialize;
12
13use crate::cli::{setup_decryption, wprintln};
14use crate::innodb::checksum::validate_checksum;
15use crate::innodb::page::FilHeader;
16use crate::innodb::tablespace::Tablespace;
17use crate::IdbError;
18
19pub struct WatchOptions {
21 pub file: String,
23 pub interval: u64,
25 pub verbose: bool,
27 pub json: bool,
29 pub events: bool,
31 pub page_size: Option<u32>,
33 pub keyring: Option<String>,
35 pub mmap: bool,
37}
38
39#[derive(Clone)]
42struct PageSnapshot {
43 lsn: u64,
44 page_type: String,
45}
46
47struct WatchState {
48 snapshots: HashMap<u64, PageSnapshot>,
49 page_count: u64,
50 vendor_name: String,
51}
52
53#[derive(Serialize)]
56struct WatchEvent {
57 timestamp: String,
58 event: String,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 pages: Option<u64>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 page_size: Option<u32>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 vendor: Option<String>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 modified: Option<u64>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 added: Option<u64>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 removed: Option<u64>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 changes: Option<Vec<PageChange>>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 total_changes: Option<u64>,
75 #[serde(skip_serializing_if = "Option::is_none")]
76 total_polls: Option<u64>,
77 #[serde(skip_serializing_if = "Option::is_none")]
78 error: Option<String>,
79}
80
81#[derive(Serialize)]
82struct PageChange {
83 page: u64,
84 kind: String,
85 page_type: String,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 old_lsn: Option<u64>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 new_lsn: Option<u64>,
90 #[serde(skip_serializing_if = "Option::is_none")]
91 lsn_delta: Option<i64>,
92 #[serde(skip_serializing_if = "Option::is_none")]
93 checksum_valid: Option<bool>,
94}
95
96#[derive(Serialize)]
103struct WatchChangeEvent {
104 timestamp: String,
105 event: String,
106 file: String,
107 #[serde(skip_serializing_if = "Option::is_none")]
108 pages: Option<u64>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 page_size: Option<u32>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 page: Option<u64>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 page_type: Option<String>,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 old_lsn: Option<u64>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 new_lsn: Option<u64>,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 kind: Option<String>,
121 #[serde(skip_serializing_if = "Option::is_none")]
122 checksum_valid: Option<bool>,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 total_changes: Option<u64>,
125 #[serde(skip_serializing_if = "Option::is_none")]
126 total_polls: Option<u64>,
127 #[serde(skip_serializing_if = "Option::is_none")]
128 error: Option<String>,
129}
130
131fn emit_change_event(writer: &mut dyn Write, event: &WatchChangeEvent) -> Result<(), IdbError> {
132 let json = serde_json::to_string(event)
133 .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
134 wprintln!(writer, "{}", json)?;
135 Ok(())
136}
137
138fn now_timestamp() -> String {
141 Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string()
142}
143
144fn now_time_short() -> String {
145 Local::now().format("%H:%M:%S").to_string()
146}
147
148fn open_tablespace(opts: &WatchOptions) -> Result<Tablespace, IdbError> {
149 let mut ts = crate::cli::open_tablespace(&opts.file, opts.page_size, opts.mmap)?;
150 if let Some(ref keyring_path) = opts.keyring {
151 setup_decryption(&mut ts, keyring_path)?;
152 }
153 Ok(ts)
154}
155
156fn take_snapshot(ts: &mut Tablespace) -> Result<HashMap<u64, PageSnapshot>, IdbError> {
157 let mut snapshots = HashMap::new();
158 ts.for_each_page(|page_num, data| {
159 if let Some(hdr) = FilHeader::parse(data) {
160 snapshots.insert(
161 page_num,
162 PageSnapshot {
163 lsn: hdr.lsn,
164 page_type: hdr.page_type.name().to_string(),
165 },
166 );
167 }
168 Ok(())
169 })?;
170 Ok(snapshots)
171}
172
173fn emit_json_line(writer: &mut dyn Write, event: &WatchEvent) -> Result<(), IdbError> {
174 let json = serde_json::to_string(event)
175 .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
176 wprintln!(writer, "{}", json)?;
177 Ok(())
178}
179
180pub fn execute(opts: &WatchOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
184 let running = Arc::new(AtomicBool::new(true));
186 let r = running.clone();
187 ctrlc::set_handler(move || {
188 r.store(false, Ordering::SeqCst);
189 })
190 .map_err(|e| IdbError::Io(format!("Cannot set Ctrl+C handler: {}", e)))?;
191
192 let mut ts = open_tablespace(opts)?;
194 let page_size = ts.page_size();
195 let page_count = ts.page_count();
196 let vendor_name = ts.vendor_info().vendor.to_string();
197 let initial_snapshots = take_snapshot(&mut ts)?;
198
199 let mut state = WatchState {
200 snapshots: initial_snapshots,
201 page_count,
202 vendor_name: vendor_name.clone(),
203 };
204
205 let use_events = opts.events;
207 let use_json = opts.json && !use_events;
208
209 if use_events {
211 emit_change_event(
212 writer,
213 &WatchChangeEvent {
214 timestamp: now_timestamp(),
215 event: "watch_start".to_string(),
216 file: opts.file.clone(),
217 pages: Some(page_count),
218 page_size: Some(page_size),
219 page: None,
220 page_type: None,
221 old_lsn: None,
222 new_lsn: None,
223 kind: None,
224 checksum_valid: None,
225 total_changes: None,
226 total_polls: None,
227 error: None,
228 },
229 )?;
230 } else if use_json {
231 emit_json_line(
232 writer,
233 &WatchEvent {
234 timestamp: now_timestamp(),
235 event: "started".to_string(),
236 pages: Some(page_count),
237 page_size: Some(page_size),
238 vendor: Some(vendor_name),
239 modified: None,
240 added: None,
241 removed: None,
242 changes: None,
243 total_changes: None,
244 total_polls: None,
245 error: None,
246 },
247 )?;
248 } else {
249 wprintln!(
250 writer,
251 "Watching {} ({} pages, {} bytes/page, {})",
252 opts.file,
253 page_count,
254 page_size,
255 state.vendor_name,
256 )?;
257 wprintln!(
258 writer,
259 "Polling every {}ms. Press Ctrl+C to stop.",
260 opts.interval
261 )?;
262 wprintln!(writer)?;
263 }
264
265 let interval = Duration::from_millis(opts.interval);
266 let mut total_changes: u64 = 0;
267 let mut total_polls: u64 = 0;
268
269 while running.load(Ordering::SeqCst) {
271 thread::sleep(interval);
272
273 if !running.load(Ordering::SeqCst) {
274 break;
275 }
276
277 if !Path::new(&opts.file).exists() {
279 if use_json {
280 emit_json_line(
281 writer,
282 &WatchEvent {
283 timestamp: now_timestamp(),
284 event: "error".to_string(),
285 pages: None,
286 page_size: None,
287 vendor: None,
288 modified: None,
289 added: None,
290 removed: None,
291 changes: None,
292 total_changes: None,
293 total_polls: None,
294 error: Some("File no longer exists".to_string()),
295 },
296 )?;
297 } else if use_events {
298 emit_change_event(
299 writer,
300 &WatchChangeEvent {
301 timestamp: now_timestamp(),
302 event: "watch_error".to_string(),
303 file: opts.file.clone(),
304 pages: None,
305 page_size: None,
306 page: None,
307 page_type: None,
308 old_lsn: None,
309 new_lsn: None,
310 kind: None,
311 checksum_valid: None,
312 total_changes: None,
313 total_polls: None,
314 error: Some("File no longer exists".to_string()),
315 },
316 )?;
317 } else {
318 wprintln!(
319 writer,
320 "{} {}",
321 now_time_short(),
322 "File no longer exists — stopping.".red()
323 )?;
324 }
325 break;
326 }
327
328 let poll_result = open_tablespace(opts).and_then(|mut new_ts| {
330 let new_page_count = new_ts.page_count();
331 let new_snapshots = take_snapshot(&mut new_ts)?;
332 Ok((new_page_count, new_snapshots))
333 });
334
335 let (new_page_count, new_snapshots) = match poll_result {
336 Ok(r) => r,
337 Err(e) => {
338 if use_json {
339 emit_json_line(
340 writer,
341 &WatchEvent {
342 timestamp: now_timestamp(),
343 event: "error".to_string(),
344 pages: None,
345 page_size: None,
346 vendor: None,
347 modified: None,
348 added: None,
349 removed: None,
350 changes: None,
351 total_changes: None,
352 total_polls: None,
353 error: Some(e.to_string()),
354 },
355 )?;
356 } else if use_events {
357 emit_change_event(
358 writer,
359 &WatchChangeEvent {
360 timestamp: now_timestamp(),
361 event: "watch_error".to_string(),
362 file: opts.file.clone(),
363 pages: None,
364 page_size: None,
365 page: None,
366 page_type: None,
367 old_lsn: None,
368 new_lsn: None,
369 kind: None,
370 checksum_valid: None,
371 total_changes: None,
372 total_polls: None,
373 error: Some(e.to_string()),
374 },
375 )?;
376 } else {
377 wprintln!(writer, "{} {} {}", now_time_short(), "Error:".red(), e)?;
378 }
379 continue;
380 }
381 };
382
383 total_polls += 1;
384
385 let mut changes: Vec<PageChange> = Vec::new();
387 let mut modified_count: u64 = 0;
388 let mut added_count: u64 = 0;
389 let mut removed_count: u64 = 0;
390
391 for (&page_num, new_snap) in &new_snapshots {
393 match state.snapshots.get(&page_num) {
394 Some(old_snap) => {
395 if new_snap.lsn != old_snap.lsn {
396 modified_count += 1;
397
398 let checksum_valid = open_tablespace(opts)
400 .and_then(|mut ts2| ts2.read_page(page_num))
401 .map(|data| validate_checksum(&data, page_size, None).valid)
402 .unwrap_or(false);
403
404 let lsn_delta = new_snap.lsn as i64 - old_snap.lsn as i64;
405
406 changes.push(PageChange {
407 page: page_num,
408 kind: "modified".to_string(),
409 page_type: new_snap.page_type.clone(),
410 old_lsn: Some(old_snap.lsn),
411 new_lsn: Some(new_snap.lsn),
412 lsn_delta: Some(lsn_delta),
413 checksum_valid: Some(checksum_valid),
414 });
415 }
416 }
417 None => {
418 added_count += 1;
419 changes.push(PageChange {
420 page: page_num,
421 kind: "added".to_string(),
422 page_type: new_snap.page_type.clone(),
423 old_lsn: None,
424 new_lsn: Some(new_snap.lsn),
425 lsn_delta: None,
426 checksum_valid: None,
427 });
428 }
429 }
430 }
431
432 for &page_num in state.snapshots.keys() {
434 if !new_snapshots.contains_key(&page_num) {
435 removed_count += 1;
436 let old_snap = &state.snapshots[&page_num];
437 changes.push(PageChange {
438 page: page_num,
439 kind: "removed".to_string(),
440 page_type: old_snap.page_type.clone(),
441 old_lsn: Some(old_snap.lsn),
442 new_lsn: None,
443 lsn_delta: None,
444 checksum_valid: None,
445 });
446 }
447 }
448
449 changes.sort_by_key(|c| c.page);
451
452 let cycle_changes = modified_count + added_count + removed_count;
453 total_changes += cycle_changes;
454
455 if cycle_changes > 0 {
457 if use_events {
458 for change in &changes {
460 emit_change_event(
461 writer,
462 &WatchChangeEvent {
463 timestamp: now_timestamp(),
464 event: "page_change".to_string(),
465 file: opts.file.clone(),
466 pages: None,
467 page_size: None,
468 page: Some(change.page),
469 page_type: Some(change.page_type.clone()),
470 old_lsn: change.old_lsn,
471 new_lsn: change.new_lsn,
472 kind: Some(change.kind.clone()),
473 checksum_valid: change.checksum_valid,
474 total_changes: None,
475 total_polls: None,
476 error: None,
477 },
478 )?;
479 }
480 } else if use_json {
481 emit_json_line(
482 writer,
483 &WatchEvent {
484 timestamp: now_timestamp(),
485 event: "poll".to_string(),
486 pages: Some(new_page_count),
487 page_size: None,
488 vendor: None,
489 modified: Some(modified_count),
490 added: Some(added_count),
491 removed: Some(removed_count),
492 changes: Some(changes),
493 total_changes: None,
494 total_polls: None,
495 error: None,
496 },
497 )?;
498 } else {
499 let mut parts = Vec::new();
501 if modified_count > 0 {
502 let word = if modified_count == 1 { "page" } else { "pages" };
503 parts.push(format!("{} {} modified", modified_count, word));
504 }
505 if added_count > 0 {
506 let word = if added_count == 1 { "page" } else { "pages" };
507 parts.push(format!("{} {} added", added_count, word));
508 }
509 if removed_count > 0 {
510 let word = if removed_count == 1 { "page" } else { "pages" };
511 parts.push(format!("{} {} removed", removed_count, word));
512 }
513 wprintln!(writer, "{} {}", now_time_short(), parts.join(", "))?;
514
515 for change in &changes {
517 match change.kind.as_str() {
518 "modified" => {
519 let old_lsn = change.old_lsn.unwrap_or(0);
520 let new_lsn = change.new_lsn.unwrap_or(0);
521 let delta = change.lsn_delta.unwrap_or(0);
522 let cksum_str = if change.checksum_valid.unwrap_or(false) {
523 "checksum valid".green().to_string()
524 } else {
525 "CHECKSUM INVALID".red().to_string()
526 };
527
528 if opts.verbose {
529 wprintln!(
530 writer,
531 " Page {:<5} {:<12} LSN {} -> {} ({:+}) {}",
532 change.page,
533 change.page_type,
534 old_lsn,
535 new_lsn,
536 delta,
537 cksum_str,
538 )?;
539 } else {
540 wprintln!(
541 writer,
542 " Page {:<5} {:<12} LSN {:+} {}",
543 change.page,
544 change.page_type,
545 delta,
546 cksum_str,
547 )?;
548 }
549 }
550 "added" => {
551 wprintln!(
552 writer,
553 " Page {:<5} {:<12} {}",
554 change.page,
555 change.page_type,
556 "(new page)".cyan(),
557 )?;
558 }
559 "removed" => {
560 wprintln!(
561 writer,
562 " Page {:<5} {:<12} {}",
563 change.page,
564 change.page_type,
565 "(removed)".yellow(),
566 )?;
567 }
568 _ => {}
569 }
570 }
571
572 wprintln!(writer)?;
573 }
574 }
575
576 state.snapshots = new_snapshots;
578 state.page_count = new_page_count;
579 }
580
581 if use_events {
583 emit_change_event(
584 writer,
585 &WatchChangeEvent {
586 timestamp: now_timestamp(),
587 event: "watch_stop".to_string(),
588 file: opts.file.clone(),
589 pages: None,
590 page_size: None,
591 page: None,
592 page_type: None,
593 old_lsn: None,
594 new_lsn: None,
595 kind: None,
596 checksum_valid: None,
597 total_changes: Some(total_changes),
598 total_polls: Some(total_polls),
599 error: None,
600 },
601 )?;
602 } else if use_json {
603 emit_json_line(
604 writer,
605 &WatchEvent {
606 timestamp: now_timestamp(),
607 event: "stopped".to_string(),
608 pages: None,
609 page_size: None,
610 vendor: None,
611 modified: None,
612 added: None,
613 removed: None,
614 changes: None,
615 total_changes: Some(total_changes),
616 total_polls: Some(total_polls),
617 error: None,
618 },
619 )?;
620 } else {
621 wprintln!(
622 writer,
623 "Stopped after {} polls. Total page changes: {}",
624 total_polls,
625 total_changes,
626 )?;
627 }
628
629 Ok(())
630}
631
632#[cfg(test)]
633mod tests {
634 use super::*;
635 use byteorder::{BigEndian, ByteOrder};
636 use std::io::Write as IoWrite;
637 use tempfile::NamedTempFile;
638
639 use crate::innodb::constants::*;
640
641 const PS: usize = SIZE_PAGE_DEFAULT as usize;
642
643 fn build_fsp_page(space_id: u32, total_pages: u32) -> Vec<u8> {
644 let mut page = vec![0u8; PS];
645 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
646 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
647 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
648 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
649 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
651 let fsp = FIL_PAGE_DATA;
652 BigEndian::write_u32(&mut page[fsp + FSP_SPACE_ID..], space_id);
653 BigEndian::write_u32(&mut page[fsp + FSP_SIZE..], total_pages);
654 BigEndian::write_u32(&mut page[fsp + FSP_FREE_LIMIT..], total_pages);
655 BigEndian::write_u32(&mut page[fsp + FSP_SPACE_FLAGS..], 0);
656 let trailer = PS - SIZE_FIL_TRAILER;
657 BigEndian::write_u32(&mut page[trailer + 4..], 1000 & 0xFFFFFFFF);
658 let end = PS - SIZE_FIL_TRAILER;
659 let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
660 let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
661 BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
662 page
663 }
664
665 fn build_index_page(page_num: u32, space_id: u32, lsn: u64) -> Vec<u8> {
666 let mut page = vec![0u8; PS];
667 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
668 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
669 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
670 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
671 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
673 let trailer = PS - SIZE_FIL_TRAILER;
674 BigEndian::write_u32(&mut page[trailer + 4..], (lsn & 0xFFFFFFFF) as u32);
675 let end = PS - SIZE_FIL_TRAILER;
676 let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
677 let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
678 BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
679 page
680 }
681
682 fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
683 let mut tmp = NamedTempFile::new().expect("create temp file");
684 for page in pages {
685 tmp.write_all(page).expect("write page");
686 }
687 tmp.flush().expect("flush");
688 tmp
689 }
690
691 #[test]
692 fn test_take_snapshot() {
693 let tmp = write_tablespace(&[
694 build_fsp_page(1, 3),
695 build_index_page(1, 1, 2000),
696 build_index_page(2, 1, 3000),
697 ]);
698 let mut ts = Tablespace::open(tmp.path()).unwrap();
699 let snaps = take_snapshot(&mut ts).unwrap();
700 assert_eq!(snaps.len(), 3);
701 assert_eq!(snaps[&0].lsn, 1000);
702 assert_eq!(snaps[&1].lsn, 2000);
703 assert_eq!(snaps[&2].lsn, 3000);
704 assert_eq!(snaps[&1].page_type, "INDEX");
705 }
706
707 #[test]
708 fn test_snapshot_detects_page_type() {
709 let tmp = write_tablespace(&[build_fsp_page(1, 1)]);
710 let mut ts = Tablespace::open(tmp.path()).unwrap();
711 let snaps = take_snapshot(&mut ts).unwrap();
712 assert_eq!(snaps[&0].page_type, "FSP_HDR");
713 }
714
715 #[test]
716 fn test_open_tablespace_helper() {
717 let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
718 let opts = WatchOptions {
719 file: tmp.path().to_str().unwrap().to_string(),
720 interval: 100,
721 verbose: false,
722 json: false,
723 events: false,
724 page_size: None,
725 keyring: None,
726 mmap: false,
727 };
728 let ts = open_tablespace(&opts).unwrap();
729 assert_eq!(ts.page_count(), 2);
730 assert_eq!(ts.page_size(), SIZE_PAGE_DEFAULT);
731 }
732
733 #[test]
734 fn test_open_tablespace_with_page_size_override() {
735 let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
736 let opts = WatchOptions {
737 file: tmp.path().to_str().unwrap().to_string(),
738 interval: 100,
739 verbose: false,
740 json: false,
741 events: false,
742 page_size: Some(16384),
743 keyring: None,
744 mmap: false,
745 };
746 let ts = open_tablespace(&opts).unwrap();
747 assert_eq!(ts.page_count(), 2);
748 }
749
750 #[test]
751 fn test_open_tablespace_missing_file() {
752 let opts = WatchOptions {
753 file: "/nonexistent/path.ibd".to_string(),
754 interval: 100,
755 verbose: false,
756 json: false,
757 events: false,
758 page_size: None,
759 keyring: None,
760 mmap: false,
761 };
762 assert!(open_tablespace(&opts).is_err());
763 }
764
765 #[test]
766 fn test_watch_change_event_serialization() {
767 let event = WatchChangeEvent {
768 timestamp: "2026-02-24T10:00:00.000+00:00".to_string(),
769 event: "watch_start".to_string(),
770 file: "/tmp/test.ibd".to_string(),
771 pages: Some(10),
772 page_size: Some(16384),
773 page: None,
774 page_type: None,
775 old_lsn: None,
776 new_lsn: None,
777 kind: None,
778 checksum_valid: None,
779 total_changes: None,
780 total_polls: None,
781 error: None,
782 };
783 let json = serde_json::to_string(&event).unwrap();
784 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
785 assert_eq!(parsed["event"], "watch_start");
786 assert_eq!(parsed["file"], "/tmp/test.ibd");
787 assert_eq!(parsed["pages"], 10);
788 assert_eq!(parsed["page_size"], 16384);
789 assert!(parsed.get("page").is_none());
791 assert!(parsed.get("page_type").is_none());
792 assert!(parsed.get("kind").is_none());
793 }
794
795 #[test]
796 fn test_watch_change_event_page_change() {
797 let event = WatchChangeEvent {
798 timestamp: "2026-02-24T10:00:01.000+00:00".to_string(),
799 event: "page_change".to_string(),
800 file: "/tmp/test.ibd".to_string(),
801 pages: None,
802 page_size: None,
803 page: Some(5),
804 page_type: Some("INDEX".to_string()),
805 old_lsn: Some(1000),
806 new_lsn: Some(2000),
807 kind: Some("modified".to_string()),
808 checksum_valid: Some(true),
809 total_changes: None,
810 total_polls: None,
811 error: None,
812 };
813 let json = serde_json::to_string(&event).unwrap();
814 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
815 assert_eq!(parsed["event"], "page_change");
816 assert_eq!(parsed["page"], 5);
817 assert_eq!(parsed["page_type"], "INDEX");
818 assert_eq!(parsed["old_lsn"], 1000);
819 assert_eq!(parsed["new_lsn"], 2000);
820 assert_eq!(parsed["kind"], "modified");
821 assert_eq!(parsed["checksum_valid"], true);
822 assert!(parsed.get("pages").is_none());
824 assert!(parsed.get("total_changes").is_none());
825 }
826
827 #[test]
828 fn test_watch_change_event_stop() {
829 let event = WatchChangeEvent {
830 timestamp: "2026-02-24T10:05:00.000+00:00".to_string(),
831 event: "watch_stop".to_string(),
832 file: "/tmp/test.ibd".to_string(),
833 pages: None,
834 page_size: None,
835 page: None,
836 page_type: None,
837 old_lsn: None,
838 new_lsn: None,
839 kind: None,
840 checksum_valid: None,
841 total_changes: Some(42),
842 total_polls: Some(300),
843 error: None,
844 };
845 let json = serde_json::to_string(&event).unwrap();
846 let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
847 assert_eq!(parsed["event"], "watch_stop");
848 assert_eq!(parsed["total_changes"], 42);
849 assert_eq!(parsed["total_polls"], 300);
850 assert_eq!(parsed["file"], "/tmp/test.ibd");
851 }
852
853 #[test]
854 fn test_emit_change_event_writes_ndjson() {
855 let event = WatchChangeEvent {
856 timestamp: "2026-02-24T10:00:00.000+00:00".to_string(),
857 event: "page_change".to_string(),
858 file: "/tmp/test.ibd".to_string(),
859 pages: None,
860 page_size: None,
861 page: Some(3),
862 page_type: Some("INDEX".to_string()),
863 old_lsn: Some(500),
864 new_lsn: Some(600),
865 kind: Some("modified".to_string()),
866 checksum_valid: Some(true),
867 total_changes: None,
868 total_polls: None,
869 error: None,
870 };
871 let mut buf = Vec::new();
872 emit_change_event(&mut buf, &event).unwrap();
873 let output = String::from_utf8(buf).unwrap();
874 assert!(output.ends_with('\n'));
876 let trimmed = output.trim();
877 let parsed: serde_json::Value = serde_json::from_str(trimmed).unwrap();
879 assert_eq!(parsed["event"], "page_change");
880 assert_eq!(parsed["page"], 3);
881 }
882}