Skip to main content

idb/cli/
watch.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use chrono::Local;
10use colored::Colorize;
11use serde::Serialize;
12
13use crate::cli::{setup_decryption, wprintln};
14use crate::innodb::checksum::validate_checksum;
15use crate::innodb::page::FilHeader;
16use crate::innodb::tablespace::Tablespace;
17use crate::IdbError;
18
19/// Options for the `inno watch` subcommand.
20pub struct WatchOptions {
21    /// Path to the InnoDB tablespace file.
22    pub file: String,
23    /// Polling interval in milliseconds.
24    pub interval: u64,
25    /// Show per-field diffs for changed pages.
26    pub verbose: bool,
27    /// Output in NDJSON streaming format.
28    pub json: bool,
29    /// Emit per-page NDJSON change events (audit-log compatible).
30    pub events: bool,
31    /// Override the auto-detected page size.
32    pub page_size: Option<u32>,
33    /// Path to MySQL keyring file for decrypting encrypted tablespaces.
34    pub keyring: Option<String>,
35    /// Use memory-mapped I/O for file access.
36    pub mmap: bool,
37}
38
39// ── Internal types ──────────────────────────────────────────────────
40
41#[derive(Clone)]
42struct PageSnapshot {
43    lsn: u64,
44    page_type: String,
45}
46
47struct WatchState {
48    snapshots: HashMap<u64, PageSnapshot>,
49    page_count: u64,
50    vendor_name: String,
51}
52
53// ── JSON output structs ─────────────────────────────────────────────
54
55#[derive(Serialize)]
56struct WatchEvent {
57    timestamp: String,
58    event: String,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    pages: Option<u64>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    page_size: Option<u32>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    vendor: Option<String>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    modified: Option<u64>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    added: Option<u64>,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    removed: Option<u64>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    changes: Option<Vec<PageChange>>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    total_changes: Option<u64>,
75    #[serde(skip_serializing_if = "Option::is_none")]
76    total_polls: Option<u64>,
77    #[serde(skip_serializing_if = "Option::is_none")]
78    error: Option<String>,
79}
80
81#[derive(Serialize)]
82struct PageChange {
83    page: u64,
84    kind: String,
85    page_type: String,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    old_lsn: Option<u64>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    new_lsn: Option<u64>,
90    #[serde(skip_serializing_if = "Option::is_none")]
91    lsn_delta: Option<i64>,
92    #[serde(skip_serializing_if = "Option::is_none")]
93    checksum_valid: Option<bool>,
94}
95
96// ── Events-mode NDJSON struct (audit-log compatible) ─────────────────
97
98/// A single structured change event emitted in `--events` mode.
99///
100/// Each event is one NDJSON line with a tagged `event` field, following
101/// the same NDJSON convention used by [`crate::util::audit::AuditEvent`].
102#[derive(Serialize)]
103struct WatchChangeEvent {
104    timestamp: String,
105    event: String,
106    file: String,
107    #[serde(skip_serializing_if = "Option::is_none")]
108    pages: Option<u64>,
109    #[serde(skip_serializing_if = "Option::is_none")]
110    page_size: Option<u32>,
111    #[serde(skip_serializing_if = "Option::is_none")]
112    page: Option<u64>,
113    #[serde(skip_serializing_if = "Option::is_none")]
114    page_type: Option<String>,
115    #[serde(skip_serializing_if = "Option::is_none")]
116    old_lsn: Option<u64>,
117    #[serde(skip_serializing_if = "Option::is_none")]
118    new_lsn: Option<u64>,
119    #[serde(skip_serializing_if = "Option::is_none")]
120    kind: Option<String>,
121    #[serde(skip_serializing_if = "Option::is_none")]
122    checksum_valid: Option<bool>,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    total_changes: Option<u64>,
125    #[serde(skip_serializing_if = "Option::is_none")]
126    total_polls: Option<u64>,
127    #[serde(skip_serializing_if = "Option::is_none")]
128    error: Option<String>,
129}
130
131fn emit_change_event(writer: &mut dyn Write, event: &WatchChangeEvent) -> Result<(), IdbError> {
132    let json = serde_json::to_string(event)
133        .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
134    wprintln!(writer, "{}", json)?;
135    Ok(())
136}
137
138// ── Helpers ─────────────────────────────────────────────────────────
139
140fn now_timestamp() -> String {
141    Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string()
142}
143
144fn now_time_short() -> String {
145    Local::now().format("%H:%M:%S").to_string()
146}
147
148fn open_tablespace(opts: &WatchOptions) -> Result<Tablespace, IdbError> {
149    let mut ts = crate::cli::open_tablespace(&opts.file, opts.page_size, opts.mmap)?;
150    if let Some(ref keyring_path) = opts.keyring {
151        setup_decryption(&mut ts, keyring_path)?;
152    }
153    Ok(ts)
154}
155
156fn take_snapshot(ts: &mut Tablespace) -> Result<HashMap<u64, PageSnapshot>, IdbError> {
157    let mut snapshots = HashMap::new();
158    ts.for_each_page(|page_num, data| {
159        if let Some(hdr) = FilHeader::parse(data) {
160            snapshots.insert(
161                page_num,
162                PageSnapshot {
163                    lsn: hdr.lsn,
164                    page_type: hdr.page_type.name().to_string(),
165                },
166            );
167        }
168        Ok(())
169    })?;
170    Ok(snapshots)
171}
172
173fn emit_json_line(writer: &mut dyn Write, event: &WatchEvent) -> Result<(), IdbError> {
174    let json = serde_json::to_string(event)
175        .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
176    wprintln!(writer, "{}", json)?;
177    Ok(())
178}
179
180// ── Main entry point ────────────────────────────────────────────────
181
182/// Monitor a tablespace for page-level changes.
183pub fn execute(opts: &WatchOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
184    // Set up Ctrl+C handler
185    let running = Arc::new(AtomicBool::new(true));
186    let r = running.clone();
187    ctrlc::set_handler(move || {
188        r.store(false, Ordering::SeqCst);
189    })
190    .map_err(|e| IdbError::Io(format!("Cannot set Ctrl+C handler: {}", e)))?;
191
192    // Take initial snapshot
193    let mut ts = open_tablespace(opts)?;
194    let page_size = ts.page_size();
195    let page_count = ts.page_count();
196    let vendor_name = ts.vendor_info().vendor.to_string();
197    let initial_snapshots = take_snapshot(&mut ts)?;
198
199    let mut state = WatchState {
200        snapshots: initial_snapshots,
201        page_count,
202        vendor_name: vendor_name.clone(),
203    };
204
205    // When --events is set, it takes priority over --json for output mode
206    let use_events = opts.events;
207    let use_json = opts.json && !use_events;
208
209    // Emit start message
210    if use_events {
211        emit_change_event(
212            writer,
213            &WatchChangeEvent {
214                timestamp: now_timestamp(),
215                event: "watch_start".to_string(),
216                file: opts.file.clone(),
217                pages: Some(page_count),
218                page_size: Some(page_size),
219                page: None,
220                page_type: None,
221                old_lsn: None,
222                new_lsn: None,
223                kind: None,
224                checksum_valid: None,
225                total_changes: None,
226                total_polls: None,
227                error: None,
228            },
229        )?;
230    } else if use_json {
231        emit_json_line(
232            writer,
233            &WatchEvent {
234                timestamp: now_timestamp(),
235                event: "started".to_string(),
236                pages: Some(page_count),
237                page_size: Some(page_size),
238                vendor: Some(vendor_name),
239                modified: None,
240                added: None,
241                removed: None,
242                changes: None,
243                total_changes: None,
244                total_polls: None,
245                error: None,
246            },
247        )?;
248    } else {
249        wprintln!(
250            writer,
251            "Watching {} ({} pages, {} bytes/page, {})",
252            opts.file,
253            page_count,
254            page_size,
255            state.vendor_name,
256        )?;
257        wprintln!(
258            writer,
259            "Polling every {}ms. Press Ctrl+C to stop.",
260            opts.interval
261        )?;
262        wprintln!(writer)?;
263    }
264
265    let interval = Duration::from_millis(opts.interval);
266    let mut total_changes: u64 = 0;
267    let mut total_polls: u64 = 0;
268
269    // Poll loop
270    while running.load(Ordering::SeqCst) {
271        thread::sleep(interval);
272
273        if !running.load(Ordering::SeqCst) {
274            break;
275        }
276
277        // Check if file still exists
278        if !Path::new(&opts.file).exists() {
279            if use_json {
280                emit_json_line(
281                    writer,
282                    &WatchEvent {
283                        timestamp: now_timestamp(),
284                        event: "error".to_string(),
285                        pages: None,
286                        page_size: None,
287                        vendor: None,
288                        modified: None,
289                        added: None,
290                        removed: None,
291                        changes: None,
292                        total_changes: None,
293                        total_polls: None,
294                        error: Some("File no longer exists".to_string()),
295                    },
296                )?;
297            } else if use_events {
298                emit_change_event(
299                    writer,
300                    &WatchChangeEvent {
301                        timestamp: now_timestamp(),
302                        event: "watch_error".to_string(),
303                        file: opts.file.clone(),
304                        pages: None,
305                        page_size: None,
306                        page: None,
307                        page_type: None,
308                        old_lsn: None,
309                        new_lsn: None,
310                        kind: None,
311                        checksum_valid: None,
312                        total_changes: None,
313                        total_polls: None,
314                        error: Some("File no longer exists".to_string()),
315                    },
316                )?;
317            } else {
318                wprintln!(
319                    writer,
320                    "{}  {}",
321                    now_time_short(),
322                    "File no longer exists — stopping.".red()
323                )?;
324            }
325            break;
326        }
327
328        // Re-open tablespace and take new snapshot
329        let poll_result = open_tablespace(opts).and_then(|mut new_ts| {
330            let new_page_count = new_ts.page_count();
331            let new_snapshots = take_snapshot(&mut new_ts)?;
332            Ok((new_page_count, new_snapshots))
333        });
334
335        let (new_page_count, new_snapshots) = match poll_result {
336            Ok(r) => r,
337            Err(e) => {
338                if use_json {
339                    emit_json_line(
340                        writer,
341                        &WatchEvent {
342                            timestamp: now_timestamp(),
343                            event: "error".to_string(),
344                            pages: None,
345                            page_size: None,
346                            vendor: None,
347                            modified: None,
348                            added: None,
349                            removed: None,
350                            changes: None,
351                            total_changes: None,
352                            total_polls: None,
353                            error: Some(e.to_string()),
354                        },
355                    )?;
356                } else if use_events {
357                    emit_change_event(
358                        writer,
359                        &WatchChangeEvent {
360                            timestamp: now_timestamp(),
361                            event: "watch_error".to_string(),
362                            file: opts.file.clone(),
363                            pages: None,
364                            page_size: None,
365                            page: None,
366                            page_type: None,
367                            old_lsn: None,
368                            new_lsn: None,
369                            kind: None,
370                            checksum_valid: None,
371                            total_changes: None,
372                            total_polls: None,
373                            error: Some(e.to_string()),
374                        },
375                    )?;
376                } else {
377                    wprintln!(writer, "{}  {} {}", now_time_short(), "Error:".red(), e)?;
378                }
379                continue;
380            }
381        };
382
383        total_polls += 1;
384
385        // Compare snapshots
386        let mut changes: Vec<PageChange> = Vec::new();
387        let mut modified_count: u64 = 0;
388        let mut added_count: u64 = 0;
389        let mut removed_count: u64 = 0;
390
391        // Detect modified and added pages
392        for (&page_num, new_snap) in &new_snapshots {
393            match state.snapshots.get(&page_num) {
394                Some(old_snap) => {
395                    if new_snap.lsn != old_snap.lsn {
396                        modified_count += 1;
397
398                        // Validate checksum for changed pages
399                        let checksum_valid = open_tablespace(opts)
400                            .and_then(|mut ts2| ts2.read_page(page_num))
401                            .map(|data| validate_checksum(&data, page_size, None).valid)
402                            .unwrap_or(false);
403
404                        let lsn_delta = new_snap.lsn as i64 - old_snap.lsn as i64;
405
406                        changes.push(PageChange {
407                            page: page_num,
408                            kind: "modified".to_string(),
409                            page_type: new_snap.page_type.clone(),
410                            old_lsn: Some(old_snap.lsn),
411                            new_lsn: Some(new_snap.lsn),
412                            lsn_delta: Some(lsn_delta),
413                            checksum_valid: Some(checksum_valid),
414                        });
415                    }
416                }
417                None => {
418                    added_count += 1;
419                    changes.push(PageChange {
420                        page: page_num,
421                        kind: "added".to_string(),
422                        page_type: new_snap.page_type.clone(),
423                        old_lsn: None,
424                        new_lsn: Some(new_snap.lsn),
425                        lsn_delta: None,
426                        checksum_valid: None,
427                    });
428                }
429            }
430        }
431
432        // Detect removed pages
433        for &page_num in state.snapshots.keys() {
434            if !new_snapshots.contains_key(&page_num) {
435                removed_count += 1;
436                let old_snap = &state.snapshots[&page_num];
437                changes.push(PageChange {
438                    page: page_num,
439                    kind: "removed".to_string(),
440                    page_type: old_snap.page_type.clone(),
441                    old_lsn: Some(old_snap.lsn),
442                    new_lsn: None,
443                    lsn_delta: None,
444                    checksum_valid: None,
445                });
446            }
447        }
448
449        // Sort changes by page number for stable output
450        changes.sort_by_key(|c| c.page);
451
452        let cycle_changes = modified_count + added_count + removed_count;
453        total_changes += cycle_changes;
454
455        // Only emit output when something changed
456        if cycle_changes > 0 {
457            if use_events {
458                // Emit individual per-page change events
459                for change in &changes {
460                    emit_change_event(
461                        writer,
462                        &WatchChangeEvent {
463                            timestamp: now_timestamp(),
464                            event: "page_change".to_string(),
465                            file: opts.file.clone(),
466                            pages: None,
467                            page_size: None,
468                            page: Some(change.page),
469                            page_type: Some(change.page_type.clone()),
470                            old_lsn: change.old_lsn,
471                            new_lsn: change.new_lsn,
472                            kind: Some(change.kind.clone()),
473                            checksum_valid: change.checksum_valid,
474                            total_changes: None,
475                            total_polls: None,
476                            error: None,
477                        },
478                    )?;
479                }
480            } else if use_json {
481                emit_json_line(
482                    writer,
483                    &WatchEvent {
484                        timestamp: now_timestamp(),
485                        event: "poll".to_string(),
486                        pages: Some(new_page_count),
487                        page_size: None,
488                        vendor: None,
489                        modified: Some(modified_count),
490                        added: Some(added_count),
491                        removed: Some(removed_count),
492                        changes: Some(changes),
493                        total_changes: None,
494                        total_polls: None,
495                        error: None,
496                    },
497                )?;
498            } else {
499                // Build summary line
500                let mut parts = Vec::new();
501                if modified_count > 0 {
502                    let word = if modified_count == 1 { "page" } else { "pages" };
503                    parts.push(format!("{} {} modified", modified_count, word));
504                }
505                if added_count > 0 {
506                    let word = if added_count == 1 { "page" } else { "pages" };
507                    parts.push(format!("{} {} added", added_count, word));
508                }
509                if removed_count > 0 {
510                    let word = if removed_count == 1 { "page" } else { "pages" };
511                    parts.push(format!("{} {} removed", removed_count, word));
512                }
513                wprintln!(writer, "{}  {}", now_time_short(), parts.join(", "))?;
514
515                // Print per-page details
516                for change in &changes {
517                    match change.kind.as_str() {
518                        "modified" => {
519                            let old_lsn = change.old_lsn.unwrap_or(0);
520                            let new_lsn = change.new_lsn.unwrap_or(0);
521                            let delta = change.lsn_delta.unwrap_or(0);
522                            let cksum_str = if change.checksum_valid.unwrap_or(false) {
523                                "checksum valid".green().to_string()
524                            } else {
525                                "CHECKSUM INVALID".red().to_string()
526                            };
527
528                            if opts.verbose {
529                                wprintln!(
530                                    writer,
531                                    "  Page {:<5} {:<12} LSN {} -> {} ({:+})  {}",
532                                    change.page,
533                                    change.page_type,
534                                    old_lsn,
535                                    new_lsn,
536                                    delta,
537                                    cksum_str,
538                                )?;
539                            } else {
540                                wprintln!(
541                                    writer,
542                                    "  Page {:<5} {:<12} LSN {:+}  {}",
543                                    change.page,
544                                    change.page_type,
545                                    delta,
546                                    cksum_str,
547                                )?;
548                            }
549                        }
550                        "added" => {
551                            wprintln!(
552                                writer,
553                                "  Page {:<5} {:<12} {}",
554                                change.page,
555                                change.page_type,
556                                "(new page)".cyan(),
557                            )?;
558                        }
559                        "removed" => {
560                            wprintln!(
561                                writer,
562                                "  Page {:<5} {:<12} {}",
563                                change.page,
564                                change.page_type,
565                                "(removed)".yellow(),
566                            )?;
567                        }
568                        _ => {}
569                    }
570                }
571
572                wprintln!(writer)?;
573            }
574        }
575
576        // Update state
577        state.snapshots = new_snapshots;
578        state.page_count = new_page_count;
579    }
580
581    // Emit stop summary
582    if use_events {
583        emit_change_event(
584            writer,
585            &WatchChangeEvent {
586                timestamp: now_timestamp(),
587                event: "watch_stop".to_string(),
588                file: opts.file.clone(),
589                pages: None,
590                page_size: None,
591                page: None,
592                page_type: None,
593                old_lsn: None,
594                new_lsn: None,
595                kind: None,
596                checksum_valid: None,
597                total_changes: Some(total_changes),
598                total_polls: Some(total_polls),
599                error: None,
600            },
601        )?;
602    } else if use_json {
603        emit_json_line(
604            writer,
605            &WatchEvent {
606                timestamp: now_timestamp(),
607                event: "stopped".to_string(),
608                pages: None,
609                page_size: None,
610                vendor: None,
611                modified: None,
612                added: None,
613                removed: None,
614                changes: None,
615                total_changes: Some(total_changes),
616                total_polls: Some(total_polls),
617                error: None,
618            },
619        )?;
620    } else {
621        wprintln!(
622            writer,
623            "Stopped after {} polls. Total page changes: {}",
624            total_polls,
625            total_changes,
626        )?;
627    }
628
629    Ok(())
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635    use byteorder::{BigEndian, ByteOrder};
636    use std::io::Write as IoWrite;
637    use tempfile::NamedTempFile;
638
639    use crate::innodb::constants::*;
640
641    const PS: usize = SIZE_PAGE_DEFAULT as usize;
642
643    fn build_fsp_page(space_id: u32, total_pages: u32) -> Vec<u8> {
644        let mut page = vec![0u8; PS];
645        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
646        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
647        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
648        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
649        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); // FSP_HDR
650        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
651        let fsp = FIL_PAGE_DATA;
652        BigEndian::write_u32(&mut page[fsp + FSP_SPACE_ID..], space_id);
653        BigEndian::write_u32(&mut page[fsp + FSP_SIZE..], total_pages);
654        BigEndian::write_u32(&mut page[fsp + FSP_FREE_LIMIT..], total_pages);
655        BigEndian::write_u32(&mut page[fsp + FSP_SPACE_FLAGS..], 0);
656        let trailer = PS - SIZE_FIL_TRAILER;
657        BigEndian::write_u32(&mut page[trailer + 4..], 1000 & 0xFFFFFFFF);
658        let end = PS - SIZE_FIL_TRAILER;
659        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
660        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
661        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
662        page
663    }
664
665    fn build_index_page(page_num: u32, space_id: u32, lsn: u64) -> Vec<u8> {
666        let mut page = vec![0u8; PS];
667        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
668        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
669        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
670        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
671        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); // INDEX
672        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
673        let trailer = PS - SIZE_FIL_TRAILER;
674        BigEndian::write_u32(&mut page[trailer + 4..], (lsn & 0xFFFFFFFF) as u32);
675        let end = PS - SIZE_FIL_TRAILER;
676        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
677        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
678        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
679        page
680    }
681
682    fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
683        let mut tmp = NamedTempFile::new().expect("create temp file");
684        for page in pages {
685            tmp.write_all(page).expect("write page");
686        }
687        tmp.flush().expect("flush");
688        tmp
689    }
690
691    #[test]
692    fn test_take_snapshot() {
693        let tmp = write_tablespace(&[
694            build_fsp_page(1, 3),
695            build_index_page(1, 1, 2000),
696            build_index_page(2, 1, 3000),
697        ]);
698        let mut ts = Tablespace::open(tmp.path()).unwrap();
699        let snaps = take_snapshot(&mut ts).unwrap();
700        assert_eq!(snaps.len(), 3);
701        assert_eq!(snaps[&0].lsn, 1000);
702        assert_eq!(snaps[&1].lsn, 2000);
703        assert_eq!(snaps[&2].lsn, 3000);
704        assert_eq!(snaps[&1].page_type, "INDEX");
705    }
706
707    #[test]
708    fn test_snapshot_detects_page_type() {
709        let tmp = write_tablespace(&[build_fsp_page(1, 1)]);
710        let mut ts = Tablespace::open(tmp.path()).unwrap();
711        let snaps = take_snapshot(&mut ts).unwrap();
712        assert_eq!(snaps[&0].page_type, "FSP_HDR");
713    }
714
715    #[test]
716    fn test_open_tablespace_helper() {
717        let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
718        let opts = WatchOptions {
719            file: tmp.path().to_str().unwrap().to_string(),
720            interval: 100,
721            verbose: false,
722            json: false,
723            events: false,
724            page_size: None,
725            keyring: None,
726            mmap: false,
727        };
728        let ts = open_tablespace(&opts).unwrap();
729        assert_eq!(ts.page_count(), 2);
730        assert_eq!(ts.page_size(), SIZE_PAGE_DEFAULT);
731    }
732
733    #[test]
734    fn test_open_tablespace_with_page_size_override() {
735        let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
736        let opts = WatchOptions {
737            file: tmp.path().to_str().unwrap().to_string(),
738            interval: 100,
739            verbose: false,
740            json: false,
741            events: false,
742            page_size: Some(16384),
743            keyring: None,
744            mmap: false,
745        };
746        let ts = open_tablespace(&opts).unwrap();
747        assert_eq!(ts.page_count(), 2);
748    }
749
750    #[test]
751    fn test_open_tablespace_missing_file() {
752        let opts = WatchOptions {
753            file: "/nonexistent/path.ibd".to_string(),
754            interval: 100,
755            verbose: false,
756            json: false,
757            events: false,
758            page_size: None,
759            keyring: None,
760            mmap: false,
761        };
762        assert!(open_tablespace(&opts).is_err());
763    }
764
765    #[test]
766    fn test_watch_change_event_serialization() {
767        let event = WatchChangeEvent {
768            timestamp: "2026-02-24T10:00:00.000+00:00".to_string(),
769            event: "watch_start".to_string(),
770            file: "/tmp/test.ibd".to_string(),
771            pages: Some(10),
772            page_size: Some(16384),
773            page: None,
774            page_type: None,
775            old_lsn: None,
776            new_lsn: None,
777            kind: None,
778            checksum_valid: None,
779            total_changes: None,
780            total_polls: None,
781            error: None,
782        };
783        let json = serde_json::to_string(&event).unwrap();
784        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
785        assert_eq!(parsed["event"], "watch_start");
786        assert_eq!(parsed["file"], "/tmp/test.ibd");
787        assert_eq!(parsed["pages"], 10);
788        assert_eq!(parsed["page_size"], 16384);
789        // Optional fields should be absent
790        assert!(parsed.get("page").is_none());
791        assert!(parsed.get("page_type").is_none());
792        assert!(parsed.get("kind").is_none());
793    }
794
795    #[test]
796    fn test_watch_change_event_page_change() {
797        let event = WatchChangeEvent {
798            timestamp: "2026-02-24T10:00:01.000+00:00".to_string(),
799            event: "page_change".to_string(),
800            file: "/tmp/test.ibd".to_string(),
801            pages: None,
802            page_size: None,
803            page: Some(5),
804            page_type: Some("INDEX".to_string()),
805            old_lsn: Some(1000),
806            new_lsn: Some(2000),
807            kind: Some("modified".to_string()),
808            checksum_valid: Some(true),
809            total_changes: None,
810            total_polls: None,
811            error: None,
812        };
813        let json = serde_json::to_string(&event).unwrap();
814        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
815        assert_eq!(parsed["event"], "page_change");
816        assert_eq!(parsed["page"], 5);
817        assert_eq!(parsed["page_type"], "INDEX");
818        assert_eq!(parsed["old_lsn"], 1000);
819        assert_eq!(parsed["new_lsn"], 2000);
820        assert_eq!(parsed["kind"], "modified");
821        assert_eq!(parsed["checksum_valid"], true);
822        // Batch fields should be absent
823        assert!(parsed.get("pages").is_none());
824        assert!(parsed.get("total_changes").is_none());
825    }
826
827    #[test]
828    fn test_watch_change_event_stop() {
829        let event = WatchChangeEvent {
830            timestamp: "2026-02-24T10:05:00.000+00:00".to_string(),
831            event: "watch_stop".to_string(),
832            file: "/tmp/test.ibd".to_string(),
833            pages: None,
834            page_size: None,
835            page: None,
836            page_type: None,
837            old_lsn: None,
838            new_lsn: None,
839            kind: None,
840            checksum_valid: None,
841            total_changes: Some(42),
842            total_polls: Some(300),
843            error: None,
844        };
845        let json = serde_json::to_string(&event).unwrap();
846        let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
847        assert_eq!(parsed["event"], "watch_stop");
848        assert_eq!(parsed["total_changes"], 42);
849        assert_eq!(parsed["total_polls"], 300);
850        assert_eq!(parsed["file"], "/tmp/test.ibd");
851    }
852
853    #[test]
854    fn test_emit_change_event_writes_ndjson() {
855        let event = WatchChangeEvent {
856            timestamp: "2026-02-24T10:00:00.000+00:00".to_string(),
857            event: "page_change".to_string(),
858            file: "/tmp/test.ibd".to_string(),
859            pages: None,
860            page_size: None,
861            page: Some(3),
862            page_type: Some("INDEX".to_string()),
863            old_lsn: Some(500),
864            new_lsn: Some(600),
865            kind: Some("modified".to_string()),
866            checksum_valid: Some(true),
867            total_changes: None,
868            total_polls: None,
869            error: None,
870        };
871        let mut buf = Vec::new();
872        emit_change_event(&mut buf, &event).unwrap();
873        let output = String::from_utf8(buf).unwrap();
874        // Should be a single line ending with newline
875        assert!(output.ends_with('\n'));
876        let trimmed = output.trim();
877        // Should be valid JSON
878        let parsed: serde_json::Value = serde_json::from_str(trimmed).unwrap();
879        assert_eq!(parsed["event"], "page_change");
880        assert_eq!(parsed["page"], 3);
881    }
882}