Skip to main content

idb/cli/
watch.rs

1use std::collections::HashMap;
2use std::io::Write;
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use chrono::Local;
10use colored::Colorize;
11use serde::Serialize;
12
13use crate::cli::{setup_decryption, wprintln};
14use crate::innodb::checksum::validate_checksum;
15use crate::innodb::page::FilHeader;
16use crate::innodb::tablespace::Tablespace;
17use crate::IdbError;
18
19/// Options for the `inno watch` subcommand.
20pub struct WatchOptions {
21    /// Path to the InnoDB tablespace file.
22    pub file: String,
23    /// Polling interval in milliseconds.
24    pub interval: u64,
25    /// Show per-field diffs for changed pages.
26    pub verbose: bool,
27    /// Output in NDJSON streaming format.
28    pub json: bool,
29    /// Override the auto-detected page size.
30    pub page_size: Option<u32>,
31    /// Path to MySQL keyring file for decrypting encrypted tablespaces.
32    pub keyring: Option<String>,
33}
34
35// ── Internal types ──────────────────────────────────────────────────
36
37#[derive(Clone)]
38struct PageSnapshot {
39    lsn: u64,
40    page_type: String,
41}
42
43struct WatchState {
44    snapshots: HashMap<u64, PageSnapshot>,
45    page_count: u64,
46    vendor_name: String,
47}
48
49// ── JSON output structs ─────────────────────────────────────────────
50
51#[derive(Serialize)]
52struct WatchEvent {
53    timestamp: String,
54    event: String,
55    #[serde(skip_serializing_if = "Option::is_none")]
56    pages: Option<u64>,
57    #[serde(skip_serializing_if = "Option::is_none")]
58    page_size: Option<u32>,
59    #[serde(skip_serializing_if = "Option::is_none")]
60    vendor: Option<String>,
61    #[serde(skip_serializing_if = "Option::is_none")]
62    modified: Option<u64>,
63    #[serde(skip_serializing_if = "Option::is_none")]
64    added: Option<u64>,
65    #[serde(skip_serializing_if = "Option::is_none")]
66    removed: Option<u64>,
67    #[serde(skip_serializing_if = "Option::is_none")]
68    changes: Option<Vec<PageChange>>,
69    #[serde(skip_serializing_if = "Option::is_none")]
70    total_changes: Option<u64>,
71    #[serde(skip_serializing_if = "Option::is_none")]
72    total_polls: Option<u64>,
73    #[serde(skip_serializing_if = "Option::is_none")]
74    error: Option<String>,
75}
76
77#[derive(Serialize)]
78struct PageChange {
79    page: u64,
80    kind: String,
81    page_type: String,
82    #[serde(skip_serializing_if = "Option::is_none")]
83    old_lsn: Option<u64>,
84    #[serde(skip_serializing_if = "Option::is_none")]
85    new_lsn: Option<u64>,
86    #[serde(skip_serializing_if = "Option::is_none")]
87    lsn_delta: Option<i64>,
88    #[serde(skip_serializing_if = "Option::is_none")]
89    checksum_valid: Option<bool>,
90}
91
92// ── Helpers ─────────────────────────────────────────────────────────
93
94fn now_timestamp() -> String {
95    Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string()
96}
97
98fn now_time_short() -> String {
99    Local::now().format("%H:%M:%S").to_string()
100}
101
102fn open_tablespace(opts: &WatchOptions) -> Result<Tablespace, IdbError> {
103    let mut ts = match opts.page_size {
104        Some(ps) => Tablespace::open_with_page_size(&opts.file, ps)?,
105        None => Tablespace::open(&opts.file)?,
106    };
107    if let Some(ref keyring_path) = opts.keyring {
108        setup_decryption(&mut ts, keyring_path)?;
109    }
110    Ok(ts)
111}
112
113fn take_snapshot(ts: &mut Tablespace) -> Result<HashMap<u64, PageSnapshot>, IdbError> {
114    let mut snapshots = HashMap::new();
115    ts.for_each_page(|page_num, data| {
116        if let Some(hdr) = FilHeader::parse(data) {
117            snapshots.insert(
118                page_num,
119                PageSnapshot {
120                    lsn: hdr.lsn,
121                    page_type: hdr.page_type.name().to_string(),
122                },
123            );
124        }
125        Ok(())
126    })?;
127    Ok(snapshots)
128}
129
130fn emit_json_line(writer: &mut dyn Write, event: &WatchEvent) -> Result<(), IdbError> {
131    let json = serde_json::to_string(event)
132        .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
133    wprintln!(writer, "{}", json)?;
134    Ok(())
135}
136
137// ── Main entry point ────────────────────────────────────────────────
138
139/// Monitor a tablespace for page-level changes.
140pub fn execute(opts: &WatchOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
141    // Set up Ctrl+C handler
142    let running = Arc::new(AtomicBool::new(true));
143    let r = running.clone();
144    ctrlc::set_handler(move || {
145        r.store(false, Ordering::SeqCst);
146    })
147    .map_err(|e| IdbError::Io(format!("Cannot set Ctrl+C handler: {}", e)))?;
148
149    // Take initial snapshot
150    let mut ts = open_tablespace(opts)?;
151    let page_size = ts.page_size();
152    let page_count = ts.page_count();
153    let vendor_name = ts.vendor_info().vendor.to_string();
154    let initial_snapshots = take_snapshot(&mut ts)?;
155
156    let mut state = WatchState {
157        snapshots: initial_snapshots,
158        page_count,
159        vendor_name: vendor_name.clone(),
160    };
161
162    // Emit start message
163    if opts.json {
164        emit_json_line(
165            writer,
166            &WatchEvent {
167                timestamp: now_timestamp(),
168                event: "started".to_string(),
169                pages: Some(page_count),
170                page_size: Some(page_size),
171                vendor: Some(vendor_name),
172                modified: None,
173                added: None,
174                removed: None,
175                changes: None,
176                total_changes: None,
177                total_polls: None,
178                error: None,
179            },
180        )?;
181    } else {
182        wprintln!(
183            writer,
184            "Watching {} ({} pages, {} bytes/page, {})",
185            opts.file,
186            page_count,
187            page_size,
188            state.vendor_name,
189        )?;
190        wprintln!(
191            writer,
192            "Polling every {}ms. Press Ctrl+C to stop.",
193            opts.interval
194        )?;
195        wprintln!(writer)?;
196    }
197
198    let interval = Duration::from_millis(opts.interval);
199    let mut total_changes: u64 = 0;
200    let mut total_polls: u64 = 0;
201
202    // Poll loop
203    while running.load(Ordering::SeqCst) {
204        thread::sleep(interval);
205
206        if !running.load(Ordering::SeqCst) {
207            break;
208        }
209
210        // Check if file still exists
211        if !Path::new(&opts.file).exists() {
212            if opts.json {
213                emit_json_line(
214                    writer,
215                    &WatchEvent {
216                        timestamp: now_timestamp(),
217                        event: "error".to_string(),
218                        pages: None,
219                        page_size: None,
220                        vendor: None,
221                        modified: None,
222                        added: None,
223                        removed: None,
224                        changes: None,
225                        total_changes: None,
226                        total_polls: None,
227                        error: Some("File no longer exists".to_string()),
228                    },
229                )?;
230            } else {
231                wprintln!(
232                    writer,
233                    "{}  {}",
234                    now_time_short(),
235                    "File no longer exists — stopping.".red()
236                )?;
237            }
238            break;
239        }
240
241        // Re-open tablespace and take new snapshot
242        let poll_result = open_tablespace(opts).and_then(|mut new_ts| {
243            let new_page_count = new_ts.page_count();
244            let new_snapshots = take_snapshot(&mut new_ts)?;
245            Ok((new_page_count, new_snapshots))
246        });
247
248        let (new_page_count, new_snapshots) = match poll_result {
249            Ok(r) => r,
250            Err(e) => {
251                if opts.json {
252                    emit_json_line(
253                        writer,
254                        &WatchEvent {
255                            timestamp: now_timestamp(),
256                            event: "error".to_string(),
257                            pages: None,
258                            page_size: None,
259                            vendor: None,
260                            modified: None,
261                            added: None,
262                            removed: None,
263                            changes: None,
264                            total_changes: None,
265                            total_polls: None,
266                            error: Some(e.to_string()),
267                        },
268                    )?;
269                } else {
270                    wprintln!(writer, "{}  {} {}", now_time_short(), "Error:".red(), e)?;
271                }
272                continue;
273            }
274        };
275
276        total_polls += 1;
277
278        // Compare snapshots
279        let mut changes: Vec<PageChange> = Vec::new();
280        let mut modified_count: u64 = 0;
281        let mut added_count: u64 = 0;
282        let mut removed_count: u64 = 0;
283
284        // Detect modified and added pages
285        for (&page_num, new_snap) in &new_snapshots {
286            match state.snapshots.get(&page_num) {
287                Some(old_snap) => {
288                    if new_snap.lsn != old_snap.lsn {
289                        modified_count += 1;
290
291                        // Validate checksum for changed pages
292                        let checksum_valid = open_tablespace(opts)
293                            .and_then(|mut ts2| ts2.read_page(page_num))
294                            .map(|data| validate_checksum(&data, page_size, None).valid)
295                            .unwrap_or(false);
296
297                        let lsn_delta = new_snap.lsn as i64 - old_snap.lsn as i64;
298
299                        changes.push(PageChange {
300                            page: page_num,
301                            kind: "modified".to_string(),
302                            page_type: new_snap.page_type.clone(),
303                            old_lsn: Some(old_snap.lsn),
304                            new_lsn: Some(new_snap.lsn),
305                            lsn_delta: Some(lsn_delta),
306                            checksum_valid: Some(checksum_valid),
307                        });
308                    }
309                }
310                None => {
311                    added_count += 1;
312                    changes.push(PageChange {
313                        page: page_num,
314                        kind: "added".to_string(),
315                        page_type: new_snap.page_type.clone(),
316                        old_lsn: None,
317                        new_lsn: Some(new_snap.lsn),
318                        lsn_delta: None,
319                        checksum_valid: None,
320                    });
321                }
322            }
323        }
324
325        // Detect removed pages
326        for &page_num in state.snapshots.keys() {
327            if !new_snapshots.contains_key(&page_num) {
328                removed_count += 1;
329                let old_snap = &state.snapshots[&page_num];
330                changes.push(PageChange {
331                    page: page_num,
332                    kind: "removed".to_string(),
333                    page_type: old_snap.page_type.clone(),
334                    old_lsn: Some(old_snap.lsn),
335                    new_lsn: None,
336                    lsn_delta: None,
337                    checksum_valid: None,
338                });
339            }
340        }
341
342        // Sort changes by page number for stable output
343        changes.sort_by_key(|c| c.page);
344
345        let cycle_changes = modified_count + added_count + removed_count;
346        total_changes += cycle_changes;
347
348        // Only emit output when something changed
349        if cycle_changes > 0 {
350            if opts.json {
351                emit_json_line(
352                    writer,
353                    &WatchEvent {
354                        timestamp: now_timestamp(),
355                        event: "poll".to_string(),
356                        pages: Some(new_page_count),
357                        page_size: None,
358                        vendor: None,
359                        modified: Some(modified_count),
360                        added: Some(added_count),
361                        removed: Some(removed_count),
362                        changes: Some(changes),
363                        total_changes: None,
364                        total_polls: None,
365                        error: None,
366                    },
367                )?;
368            } else {
369                // Build summary line
370                let mut parts = Vec::new();
371                if modified_count > 0 {
372                    let word = if modified_count == 1 { "page" } else { "pages" };
373                    parts.push(format!("{} {} modified", modified_count, word));
374                }
375                if added_count > 0 {
376                    let word = if added_count == 1 { "page" } else { "pages" };
377                    parts.push(format!("{} {} added", added_count, word));
378                }
379                if removed_count > 0 {
380                    let word = if removed_count == 1 { "page" } else { "pages" };
381                    parts.push(format!("{} {} removed", removed_count, word));
382                }
383                wprintln!(writer, "{}  {}", now_time_short(), parts.join(", "))?;
384
385                // Print per-page details
386                for change in &changes {
387                    match change.kind.as_str() {
388                        "modified" => {
389                            let old_lsn = change.old_lsn.unwrap_or(0);
390                            let new_lsn = change.new_lsn.unwrap_or(0);
391                            let delta = change.lsn_delta.unwrap_or(0);
392                            let cksum_str = if change.checksum_valid.unwrap_or(false) {
393                                "checksum valid".green().to_string()
394                            } else {
395                                "CHECKSUM INVALID".red().to_string()
396                            };
397
398                            if opts.verbose {
399                                wprintln!(
400                                    writer,
401                                    "  Page {:<5} {:<12} LSN {} -> {} ({:+})  {}",
402                                    change.page,
403                                    change.page_type,
404                                    old_lsn,
405                                    new_lsn,
406                                    delta,
407                                    cksum_str,
408                                )?;
409                            } else {
410                                wprintln!(
411                                    writer,
412                                    "  Page {:<5} {:<12} LSN {:+}  {}",
413                                    change.page,
414                                    change.page_type,
415                                    delta,
416                                    cksum_str,
417                                )?;
418                            }
419                        }
420                        "added" => {
421                            wprintln!(
422                                writer,
423                                "  Page {:<5} {:<12} {}",
424                                change.page,
425                                change.page_type,
426                                "(new page)".cyan(),
427                            )?;
428                        }
429                        "removed" => {
430                            wprintln!(
431                                writer,
432                                "  Page {:<5} {:<12} {}",
433                                change.page,
434                                change.page_type,
435                                "(removed)".yellow(),
436                            )?;
437                        }
438                        _ => {}
439                    }
440                }
441
442                wprintln!(writer)?;
443            }
444        }
445
446        // Update state
447        state.snapshots = new_snapshots;
448        state.page_count = new_page_count;
449    }
450
451    // Emit stop summary
452    if opts.json {
453        emit_json_line(
454            writer,
455            &WatchEvent {
456                timestamp: now_timestamp(),
457                event: "stopped".to_string(),
458                pages: None,
459                page_size: None,
460                vendor: None,
461                modified: None,
462                added: None,
463                removed: None,
464                changes: None,
465                total_changes: Some(total_changes),
466                total_polls: Some(total_polls),
467                error: None,
468            },
469        )?;
470    } else {
471        wprintln!(
472            writer,
473            "Stopped after {} polls. Total page changes: {}",
474            total_polls,
475            total_changes,
476        )?;
477    }
478
479    Ok(())
480}
481
482#[cfg(test)]
483mod tests {
484    use super::*;
485    use byteorder::{BigEndian, ByteOrder};
486    use std::io::Write as IoWrite;
487    use tempfile::NamedTempFile;
488
489    use crate::innodb::constants::*;
490
491    const PS: usize = SIZE_PAGE_DEFAULT as usize;
492
493    fn build_fsp_page(space_id: u32, total_pages: u32) -> Vec<u8> {
494        let mut page = vec![0u8; PS];
495        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
496        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
497        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
498        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
499        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); // FSP_HDR
500        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
501        let fsp = FIL_PAGE_DATA;
502        BigEndian::write_u32(&mut page[fsp + FSP_SPACE_ID..], space_id);
503        BigEndian::write_u32(&mut page[fsp + FSP_SIZE..], total_pages);
504        BigEndian::write_u32(&mut page[fsp + FSP_FREE_LIMIT..], total_pages);
505        BigEndian::write_u32(&mut page[fsp + FSP_SPACE_FLAGS..], 0);
506        let trailer = PS - SIZE_FIL_TRAILER;
507        BigEndian::write_u32(&mut page[trailer + 4..], 1000 & 0xFFFFFFFF);
508        let end = PS - SIZE_FIL_TRAILER;
509        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
510        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
511        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
512        page
513    }
514
515    fn build_index_page(page_num: u32, space_id: u32, lsn: u64) -> Vec<u8> {
516        let mut page = vec![0u8; PS];
517        BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
518        BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
519        BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
520        BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
521        BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); // INDEX
522        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
523        let trailer = PS - SIZE_FIL_TRAILER;
524        BigEndian::write_u32(&mut page[trailer + 4..], (lsn & 0xFFFFFFFF) as u32);
525        let end = PS - SIZE_FIL_TRAILER;
526        let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
527        let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
528        BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
529        page
530    }
531
532    fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
533        let mut tmp = NamedTempFile::new().expect("create temp file");
534        for page in pages {
535            tmp.write_all(page).expect("write page");
536        }
537        tmp.flush().expect("flush");
538        tmp
539    }
540
541    #[test]
542    fn test_take_snapshot() {
543        let tmp = write_tablespace(&[
544            build_fsp_page(1, 3),
545            build_index_page(1, 1, 2000),
546            build_index_page(2, 1, 3000),
547        ]);
548        let mut ts = Tablespace::open(tmp.path()).unwrap();
549        let snaps = take_snapshot(&mut ts).unwrap();
550        assert_eq!(snaps.len(), 3);
551        assert_eq!(snaps[&0].lsn, 1000);
552        assert_eq!(snaps[&1].lsn, 2000);
553        assert_eq!(snaps[&2].lsn, 3000);
554        assert_eq!(snaps[&1].page_type, "INDEX");
555    }
556
557    #[test]
558    fn test_snapshot_detects_page_type() {
559        let tmp = write_tablespace(&[build_fsp_page(1, 1)]);
560        let mut ts = Tablespace::open(tmp.path()).unwrap();
561        let snaps = take_snapshot(&mut ts).unwrap();
562        assert_eq!(snaps[&0].page_type, "FSP_HDR");
563    }
564
565    #[test]
566    fn test_open_tablespace_helper() {
567        let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
568        let opts = WatchOptions {
569            file: tmp.path().to_str().unwrap().to_string(),
570            interval: 100,
571            verbose: false,
572            json: false,
573            page_size: None,
574            keyring: None,
575        };
576        let ts = open_tablespace(&opts).unwrap();
577        assert_eq!(ts.page_count(), 2);
578        assert_eq!(ts.page_size(), SIZE_PAGE_DEFAULT);
579    }
580
581    #[test]
582    fn test_open_tablespace_with_page_size_override() {
583        let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
584        let opts = WatchOptions {
585            file: tmp.path().to_str().unwrap().to_string(),
586            interval: 100,
587            verbose: false,
588            json: false,
589            page_size: Some(16384),
590            keyring: None,
591        };
592        let ts = open_tablespace(&opts).unwrap();
593        assert_eq!(ts.page_count(), 2);
594    }
595
596    #[test]
597    fn test_open_tablespace_missing_file() {
598        let opts = WatchOptions {
599            file: "/nonexistent/path.ibd".to_string(),
600            interval: 100,
601            verbose: false,
602            json: false,
603            page_size: None,
604            keyring: None,
605        };
606        assert!(open_tablespace(&opts).is_err());
607    }
608}