Skip to main content

idb/cli/
watch.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use chrono::Local;
10use colored::Colorize;
11use serde::Serialize;
12
13use crate::cli::{wprintln, setup_decryption};
14use crate::innodb::checksum::validate_checksum;
15use crate::innodb::page::FilHeader;
16use crate::innodb::tablespace::Tablespace;
17use crate::IdbError;
18
19/// Options for the `inno watch` subcommand.
20pub struct WatchOptions {
21    /// Path to the InnoDB tablespace file.
22    pub file: String,
23    /// Polling interval in milliseconds.
24    pub interval: u64,
25    /// Show per-field diffs for changed pages.
26    pub verbose: bool,
27    /// Output in NDJSON streaming format.
28    pub json: bool,
29    /// Override the auto-detected page size.
30    pub page_size: Option<u32>,
31    /// Path to MySQL keyring file for decrypting encrypted tablespaces.
32    pub keyring: Option<String>,
33}
34
35// ── Internal types ──────────────────────────────────────────────────
36
37#[derive(Clone)]
38struct PageSnapshot {
39    lsn: u64,
40    page_type: String,
41}
42
43struct WatchState {
44    snapshots: HashMap<u64, PageSnapshot>,
45    page_count: u64,
46    vendor_name: String,
47}
48
49// ── JSON output structs ─────────────────────────────────────────────
50
51#[derive(Serialize)]
52struct WatchEvent {
53    timestamp: String,
54    event: String,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pages: Option<u64>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    page_size: Option<u32>,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    vendor: Option<String>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    modified: Option<u64>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    added: Option<u64>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    removed: Option<u64>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    changes: Option<Vec<PageChange>>,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    total_changes: Option<u64>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    total_polls: Option<u64>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    error: Option<String>,
75}
76
77#[derive(Serialize)]
78struct PageChange {
79    page: u64,
80    kind: String,
81    page_type: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    old_lsn: Option<u64>,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    new_lsn: Option<u64>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    lsn_delta: Option<i64>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    checksum_valid: Option<bool>,
90}
91
92// ── Helpers ─────────────────────────────────────────────────────────
93
94fn now_timestamp() -> String {
95    Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string()
96}
97
98fn now_time_short() -> String {
99    Local::now().format("%H:%M:%S").to_string()
100}
101
102fn open_tablespace(opts: &WatchOptions) -> Result<Tablespace, IdbError> {
103    let mut ts = match opts.page_size {
104        Some(ps) => Tablespace::open_with_page_size(&opts.file, ps)?,
105        None => Tablespace::open(&opts.file)?,
106    };
107    if let Some(ref keyring_path) = opts.keyring {
108        setup_decryption(&mut ts, keyring_path)?;
109    }
110    Ok(ts)
111}
112
113fn take_snapshot(ts: &mut Tablespace) -> Result<HashMap<u64, PageSnapshot>, IdbError> {
114    let mut snapshots = HashMap::new();
115    ts.for_each_page(|page_num, data| {
116        if let Some(hdr) = FilHeader::parse(data) {
117            snapshots.insert(
118                page_num,
119                PageSnapshot {
120                    lsn: hdr.lsn,
121                    page_type: hdr.page_type.name().to_string(),
122                },
123            );
124        }
125        Ok(())
126    })?;
127    Ok(snapshots)
128}
129
130fn emit_json_line(writer: &mut dyn Write, event: &WatchEvent) -> Result<(), IdbError> {
131    let json = serde_json::to_string(event)
132        .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
133    wprintln!(writer, "{}", json)?;
134    Ok(())
135}
136
137// ── Main entry point ────────────────────────────────────────────────
138
139/// Monitor a tablespace for page-level changes.
140pub fn execute(opts: &WatchOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
141    // Set up Ctrl+C handler
142    let running = Arc::new(AtomicBool::new(true));
143    let r = running.clone();
144    ctrlc::set_handler(move || {
145        r.store(false, Ordering::SeqCst);
146    })
147    .map_err(|e| IdbError::Io(format!("Cannot set Ctrl+C handler: {}", e)))?;
148
149    // Take initial snapshot
150    let mut ts = open_tablespace(opts)?;
151    let page_size = ts.page_size();
152    let page_count = ts.page_count();
153    let vendor_name = ts.vendor_info().vendor.to_string();
154    let initial_snapshots = take_snapshot(&mut ts)?;
155
156    let mut state = WatchState {
157        snapshots: initial_snapshots,
158        page_count,
159        vendor_name: vendor_name.clone(),
160    };
161
162    // Emit start message
163    if opts.json {
164        emit_json_line(
165            writer,
166            &WatchEvent {
167                timestamp: now_timestamp(),
168                event: "started".to_string(),
169                pages: Some(page_count),
170                page_size: Some(page_size),
171                vendor: Some(vendor_name),
172                modified: None,
173                added: None,
174                removed: None,
175                changes: None,
176                total_changes: None,
177                total_polls: None,
178                error: None,
179            },
180        )?;
181    } else {
182        wprintln!(
183            writer,
184            "Watching {} ({} pages, {} bytes/page, {})",
185            opts.file,
186            page_count,
187            page_size,
188            state.vendor_name,
189        )?;
190        wprintln!(
191            writer,
192            "Polling every {}ms. Press Ctrl+C to stop.",
193            opts.interval
194        )?;
195        wprintln!(writer)?;
196    }
197
198    let interval = Duration::from_millis(opts.interval);
199    let mut total_changes: u64 = 0;
200    let mut total_polls: u64 = 0;
201
202    // Poll loop
203    while running.load(Ordering::SeqCst) {
204        thread::sleep(interval);
205
206        if !running.load(Ordering::SeqCst) {
207            break;
208        }
209
210        // Check if file still exists
211        if !Path::new(&opts.file).exists() {
212            if opts.json {
213                emit_json_line(
214                    writer,
215                    &WatchEvent {
216                        timestamp: now_timestamp(),
217                        event: "error".to_string(),
218                        pages: None,
219                        page_size: None,
220                        vendor: None,
221                        modified: None,
222                        added: None,
223                        removed: None,
224                        changes: None,
225                        total_changes: None,
226                        total_polls: None,
227                        error: Some("File no longer exists".to_string()),
228                    },
229                )?;
230            } else {
231                wprintln!(
232                    writer,
233                    "{}  {}",
234                    now_time_short(),
235                    "File no longer exists — stopping.".red()
236                )?;
237            }
238            break;
239        }
240
241        // Re-open tablespace and take new snapshot
242        let poll_result = open_tablespace(opts).and_then(|mut new_ts| {
243            let new_page_count = new_ts.page_count();
244            let new_snapshots = take_snapshot(&mut new_ts)?;
245            Ok((new_page_count, new_snapshots))
246        });
247
248        let (new_page_count, new_snapshots) = match poll_result {
249            Ok(r) => r,
250            Err(e) => {
251                if opts.json {
252                    emit_json_line(
253                        writer,
254                        &WatchEvent {
255                            timestamp: now_timestamp(),
256                            event: "error".to_string(),
257                            pages: None,
258                            page_size: None,
259                            vendor: None,
260                            modified: None,
261                            added: None,
262                            removed: None,
263                            changes: None,
264                            total_changes: None,
265                            total_polls: None,
266                            error: Some(e.to_string()),
267                        },
268                    )?;
269                } else {
270                    wprintln!(
271                        writer,
272                        "{}  {} {}",
273                        now_time_short(),
274                        "Error:".red(),
275                        e
276                    )?;
277                }
278                continue;
279            }
280        };
281
282        total_polls += 1;
283
284        // Compare snapshots
285        let mut changes: Vec<PageChange> = Vec::new();
286        let mut modified_count: u64 = 0;
287        let mut added_count: u64 = 0;
288        let mut removed_count: u64 = 0;
289
290        // Detect modified and added pages
291        for (&page_num, new_snap) in &new_snapshots {
292            match state.snapshots.get(&page_num) {
293                Some(old_snap) => {
294                    if new_snap.lsn != old_snap.lsn {
295                        modified_count += 1;
296
297                        // Validate checksum for changed pages
298                        let checksum_valid = open_tablespace(opts)
299                            .and_then(|mut ts2| ts2.read_page(page_num))
300                            .map(|data| {
301                                validate_checksum(&data, page_size, None).valid
302                            })
303                            .unwrap_or(false);
304
305                        let lsn_delta = new_snap.lsn as i64 - old_snap.lsn as i64;
306
307                        changes.push(PageChange {
308                            page: page_num,
309                            kind: "modified".to_string(),
310                            page_type: new_snap.page_type.clone(),
311                            old_lsn: Some(old_snap.lsn),
312                            new_lsn: Some(new_snap.lsn),
313                            lsn_delta: Some(lsn_delta),
314                            checksum_valid: Some(checksum_valid),
315                        });
316                    }
317                }
318                None => {
319                    added_count += 1;
320                    changes.push(PageChange {
321                        page: page_num,
322                        kind: "added".to_string(),
323                        page_type: new_snap.page_type.clone(),
324                        old_lsn: None,
325                        new_lsn: Some(new_snap.lsn),
326                        lsn_delta: None,
327                        checksum_valid: None,
328                    });
329                }
330            }
331        }
332
333        // Detect removed pages
334        for &page_num in state.snapshots.keys() {
335            if !new_snapshots.contains_key(&page_num) {
336                removed_count += 1;
337                let old_snap = &state.snapshots[&page_num];
338                changes.push(PageChange {
339                    page: page_num,
340                    kind: "removed".to_string(),
341                    page_type: old_snap.page_type.clone(),
342                    old_lsn: Some(old_snap.lsn),
343                    new_lsn: None,
344                    lsn_delta: None,
345                    checksum_valid: None,
346                });
347            }
348        }
349
350        // Sort changes by page number for stable output
351        changes.sort_by_key(|c| c.page);
352
353        let cycle_changes = modified_count + added_count + removed_count;
354        total_changes += cycle_changes;
355
356        // Only emit output when something changed
357        if cycle_changes > 0 {
358            if opts.json {
359                emit_json_line(
360                    writer,
361                    &WatchEvent {
362                        timestamp: now_timestamp(),
363                        event: "poll".to_string(),
364                        pages: Some(new_page_count),
365                        page_size: None,
366                        vendor: None,
367                        modified: Some(modified_count),
368                        added: Some(added_count),
369                        removed: Some(removed_count),
370                        changes: Some(changes),
371                        total_changes: None,
372                        total_polls: None,
373                        error: None,
374                    },
375                )?;
376            } else {
377                // Build summary line
378                let mut parts = Vec::new();
379                if modified_count > 0 {
380                    let word = if modified_count == 1 { "page" } else { "pages" };
381                    parts.push(format!("{} {} modified", modified_count, word));
382                }
383                if added_count > 0 {
384                    let word = if added_count == 1 { "page" } else { "pages" };
385                    parts.push(format!("{} {} added", added_count, word));
386                }
387                if removed_count > 0 {
388                    let word = if removed_count == 1 { "page" } else { "pages" };
389                    parts.push(format!("{} {} removed", removed_count, word));
390                }
391                wprintln!(writer, "{}  {}", now_time_short(), parts.join(", "))?;
392
393                // Print per-page details
394                for change in &changes {
395                    match change.kind.as_str() {
396                        "modified" => {
397                            let old_lsn = change.old_lsn.unwrap_or(0);
398                            let new_lsn = change.new_lsn.unwrap_or(0);
399                            let delta = change.lsn_delta.unwrap_or(0);
400                            let cksum_str = if change.checksum_valid.unwrap_or(false) {
401                                "checksum valid".green().to_string()
402                            } else {
403                                "CHECKSUM INVALID".red().to_string()
404                            };
405
406                            if opts.verbose {
407                                wprintln!(
408                                    writer,
409                                    "  Page {:<5} {:<12} LSN {} -> {} ({:+})  {}",
410                                    change.page,
411                                    change.page_type,
412                                    old_lsn,
413                                    new_lsn,
414                                    delta,
415                                    cksum_str,
416                                )?;
417                            } else {
418                                wprintln!(
419                                    writer,
420                                    "  Page {:<5} {:<12} LSN {:+}  {}",
421                                    change.page,
422                                    change.page_type,
423                                    delta,
424                                    cksum_str,
425                                )?;
426                            }
427                        }
428                        "added" => {
429                            wprintln!(
430                                writer,
431                                "  Page {:<5} {:<12} {}",
432                                change.page,
433                                change.page_type,
434                                "(new page)".cyan(),
435                            )?;
436                        }
437                        "removed" => {
438                            wprintln!(
439                                writer,
440                                "  Page {:<5} {:<12} {}",
441                                change.page,
442                                change.page_type,
443                                "(removed)".yellow(),
444                            )?;
445                        }
446                        _ => {}
447                    }
448                }
449
450                wprintln!(writer)?;
451            }
452        }
453
454        // Update state
455        state.snapshots = new_snapshots;
456        state.page_count = new_page_count;
457    }
458
459    // Emit stop summary
460    if opts.json {
461        emit_json_line(
462            writer,
463            &WatchEvent {
464                timestamp: now_timestamp(),
465                event: "stopped".to_string(),
466                pages: None,
467                page_size: None,
468                vendor: None,
469                modified: None,
470                added: None,
471                removed: None,
472                changes: None,
473                total_changes: Some(total_changes),
474                total_polls: Some(total_polls),
475                error: None,
476            },
477        )?;
478    } else {
479        wprintln!(
480            writer,
481            "Stopped after {} polls. Total page changes: {}",
482            total_polls,
483            total_changes,
484        )?;
485    }
486
487    Ok(())
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493    use byteorder::{BigEndian, ByteOrder};
494    use std::io::Write as IoWrite;
495    use tempfile::NamedTempFile;
496
497    use crate::innodb::constants::*;
498
499    const PS: usize = SIZE_PAGE_DEFAULT as usize;
500
501    fn build_fsp_page(space_id: u32, total_pages: u32) -> Vec<u8> {
502        let mut page = vec![0u8; PS];
503        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
504        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
505        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
506        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
507        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); // FSP_HDR
508        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
509        let fsp = FIL_PAGE_DATA;
510        BigEndian::write_u32(&mut page[fsp + FSP_SPACE_ID..], space_id);
511        BigEndian::write_u32(&mut page[fsp + FSP_SIZE..], total_pages);
512        BigEndian::write_u32(&mut page[fsp + FSP_FREE_LIMIT..], total_pages);
513        BigEndian::write_u32(&mut page[fsp + FSP_SPACE_FLAGS..], 0);
514        let trailer = PS - SIZE_FIL_TRAILER;
515        BigEndian::write_u32(&mut page[trailer + 4..], 1000 & 0xFFFFFFFF);
516        let end = PS - SIZE_FIL_TRAILER;
517        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
518        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
519        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
520        page
521    }
522
523    fn build_index_page(page_num: u32, space_id: u32, lsn: u64) -> Vec<u8> {
524        let mut page = vec![0u8; PS];
525        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
526        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
527        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
528        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
529        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); // INDEX
530        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
531        let trailer = PS - SIZE_FIL_TRAILER;
532        BigEndian::write_u32(&mut page[trailer + 4..], (lsn & 0xFFFFFFFF) as u32);
533        let end = PS - SIZE_FIL_TRAILER;
534        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
535        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
536        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
537        page
538    }
539
540    fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
541        let mut tmp = NamedTempFile::new().expect("create temp file");
542        for page in pages {
543            tmp.write_all(page).expect("write page");
544        }
545        tmp.flush().expect("flush");
546        tmp
547    }
548
549    #[test]
550    fn test_take_snapshot() {
551        let tmp = write_tablespace(&[
552            build_fsp_page(1, 3),
553            build_index_page(1, 1, 2000),
554            build_index_page(2, 1, 3000),
555        ]);
556        let mut ts = Tablespace::open(tmp.path()).unwrap();
557        let snaps = take_snapshot(&mut ts).unwrap();
558        assert_eq!(snaps.len(), 3);
559        assert_eq!(snaps[&0].lsn, 1000);
560        assert_eq!(snaps[&1].lsn, 2000);
561        assert_eq!(snaps[&2].lsn, 3000);
562        assert_eq!(snaps[&1].page_type, "INDEX");
563    }
564
565    #[test]
566    fn test_snapshot_detects_page_type() {
567        let tmp = write_tablespace(&[build_fsp_page(1, 1)]);
568        let mut ts = Tablespace::open(tmp.path()).unwrap();
569        let snaps = take_snapshot(&mut ts).unwrap();
570        assert_eq!(snaps[&0].page_type, "FSP_HDR");
571    }
572
573    #[test]
574    fn test_open_tablespace_helper() {
575        let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
576        let opts = WatchOptions {
577            file: tmp.path().to_str().unwrap().to_string(),
578            interval: 100,
579            verbose: false,
580            json: false,
581            page_size: None,
582            keyring: None,
583        };
584        let ts = open_tablespace(&opts).unwrap();
585        assert_eq!(ts.page_count(), 2);
586        assert_eq!(ts.page_size(), SIZE_PAGE_DEFAULT);
587    }
588
589    #[test]
590    fn test_open_tablespace_with_page_size_override() {
591        let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
592        let opts = WatchOptions {
593            file: tmp.path().to_str().unwrap().to_string(),
594            interval: 100,
595            verbose: false,
596            json: false,
597            page_size: Some(16384),
598            keyring: None,
599        };
600        let ts = open_tablespace(&opts).unwrap();
601        assert_eq!(ts.page_count(), 2);
602    }
603
604    #[test]
605    fn test_open_tablespace_missing_file() {
606        let opts = WatchOptions {
607            file: "/nonexistent/path.ibd".to_string(),
608            interval: 100,
609            verbose: false,
610            json: false,
611            page_size: None,
612            keyring: None,
613        };
614        assert!(open_tablespace(&opts).is_err());
615    }
616}