1use std::collections::HashMap;
2use std::io::Write;
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use chrono::Local;
10use colored::Colorize;
11use serde::Serialize;
12
13use crate::cli::{wprintln, setup_decryption};
14use crate::innodb::checksum::validate_checksum;
15use crate::innodb::page::FilHeader;
16use crate::innodb::tablespace::Tablespace;
17use crate::IdbError;
18
19pub struct WatchOptions {
21 pub file: String,
23 pub interval: u64,
25 pub verbose: bool,
27 pub json: bool,
29 pub page_size: Option<u32>,
31 pub keyring: Option<String>,
33}
34
35#[derive(Clone)]
38struct PageSnapshot {
39 lsn: u64,
40 page_type: String,
41}
42
43struct WatchState {
44 snapshots: HashMap<u64, PageSnapshot>,
45 page_count: u64,
46 vendor_name: String,
47}
48
49#[derive(Serialize)]
52struct WatchEvent {
53 timestamp: String,
54 event: String,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pages: Option<u64>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 page_size: Option<u32>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 vendor: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 modified: Option<u64>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 added: Option<u64>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 removed: Option<u64>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 changes: Option<Vec<PageChange>>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 total_changes: Option<u64>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 total_polls: Option<u64>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 error: Option<String>,
75}
76
77#[derive(Serialize)]
78struct PageChange {
79 page: u64,
80 kind: String,
81 page_type: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 old_lsn: Option<u64>,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 new_lsn: Option<u64>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 lsn_delta: Option<i64>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 checksum_valid: Option<bool>,
90}
91
92fn now_timestamp() -> String {
95 Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string()
96}
97
98fn now_time_short() -> String {
99 Local::now().format("%H:%M:%S").to_string()
100}
101
102fn open_tablespace(opts: &WatchOptions) -> Result<Tablespace, IdbError> {
103 let mut ts = match opts.page_size {
104 Some(ps) => Tablespace::open_with_page_size(&opts.file, ps)?,
105 None => Tablespace::open(&opts.file)?,
106 };
107 if let Some(ref keyring_path) = opts.keyring {
108 setup_decryption(&mut ts, keyring_path)?;
109 }
110 Ok(ts)
111}
112
113fn take_snapshot(ts: &mut Tablespace) -> Result<HashMap<u64, PageSnapshot>, IdbError> {
114 let mut snapshots = HashMap::new();
115 ts.for_each_page(|page_num, data| {
116 if let Some(hdr) = FilHeader::parse(data) {
117 snapshots.insert(
118 page_num,
119 PageSnapshot {
120 lsn: hdr.lsn,
121 page_type: hdr.page_type.name().to_string(),
122 },
123 );
124 }
125 Ok(())
126 })?;
127 Ok(snapshots)
128}
129
130fn emit_json_line(writer: &mut dyn Write, event: &WatchEvent) -> Result<(), IdbError> {
131 let json = serde_json::to_string(event)
132 .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
133 wprintln!(writer, "{}", json)?;
134 Ok(())
135}
136
137pub fn execute(opts: &WatchOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
141 let running = Arc::new(AtomicBool::new(true));
143 let r = running.clone();
144 ctrlc::set_handler(move || {
145 r.store(false, Ordering::SeqCst);
146 })
147 .map_err(|e| IdbError::Io(format!("Cannot set Ctrl+C handler: {}", e)))?;
148
149 let mut ts = open_tablespace(opts)?;
151 let page_size = ts.page_size();
152 let page_count = ts.page_count();
153 let vendor_name = ts.vendor_info().vendor.to_string();
154 let initial_snapshots = take_snapshot(&mut ts)?;
155
156 let mut state = WatchState {
157 snapshots: initial_snapshots,
158 page_count,
159 vendor_name: vendor_name.clone(),
160 };
161
162 if opts.json {
164 emit_json_line(
165 writer,
166 &WatchEvent {
167 timestamp: now_timestamp(),
168 event: "started".to_string(),
169 pages: Some(page_count),
170 page_size: Some(page_size),
171 vendor: Some(vendor_name),
172 modified: None,
173 added: None,
174 removed: None,
175 changes: None,
176 total_changes: None,
177 total_polls: None,
178 error: None,
179 },
180 )?;
181 } else {
182 wprintln!(
183 writer,
184 "Watching {} ({} pages, {} bytes/page, {})",
185 opts.file,
186 page_count,
187 page_size,
188 state.vendor_name,
189 )?;
190 wprintln!(
191 writer,
192 "Polling every {}ms. Press Ctrl+C to stop.",
193 opts.interval
194 )?;
195 wprintln!(writer)?;
196 }
197
198 let interval = Duration::from_millis(opts.interval);
199 let mut total_changes: u64 = 0;
200 let mut total_polls: u64 = 0;
201
202 while running.load(Ordering::SeqCst) {
204 thread::sleep(interval);
205
206 if !running.load(Ordering::SeqCst) {
207 break;
208 }
209
210 if !Path::new(&opts.file).exists() {
212 if opts.json {
213 emit_json_line(
214 writer,
215 &WatchEvent {
216 timestamp: now_timestamp(),
217 event: "error".to_string(),
218 pages: None,
219 page_size: None,
220 vendor: None,
221 modified: None,
222 added: None,
223 removed: None,
224 changes: None,
225 total_changes: None,
226 total_polls: None,
227 error: Some("File no longer exists".to_string()),
228 },
229 )?;
230 } else {
231 wprintln!(
232 writer,
233 "{} {}",
234 now_time_short(),
235 "File no longer exists — stopping.".red()
236 )?;
237 }
238 break;
239 }
240
241 let poll_result = open_tablespace(opts).and_then(|mut new_ts| {
243 let new_page_count = new_ts.page_count();
244 let new_snapshots = take_snapshot(&mut new_ts)?;
245 Ok((new_page_count, new_snapshots))
246 });
247
248 let (new_page_count, new_snapshots) = match poll_result {
249 Ok(r) => r,
250 Err(e) => {
251 if opts.json {
252 emit_json_line(
253 writer,
254 &WatchEvent {
255 timestamp: now_timestamp(),
256 event: "error".to_string(),
257 pages: None,
258 page_size: None,
259 vendor: None,
260 modified: None,
261 added: None,
262 removed: None,
263 changes: None,
264 total_changes: None,
265 total_polls: None,
266 error: Some(e.to_string()),
267 },
268 )?;
269 } else {
270 wprintln!(
271 writer,
272 "{} {} {}",
273 now_time_short(),
274 "Error:".red(),
275 e
276 )?;
277 }
278 continue;
279 }
280 };
281
282 total_polls += 1;
283
284 let mut changes: Vec<PageChange> = Vec::new();
286 let mut modified_count: u64 = 0;
287 let mut added_count: u64 = 0;
288 let mut removed_count: u64 = 0;
289
290 for (&page_num, new_snap) in &new_snapshots {
292 match state.snapshots.get(&page_num) {
293 Some(old_snap) => {
294 if new_snap.lsn != old_snap.lsn {
295 modified_count += 1;
296
297 let checksum_valid = open_tablespace(opts)
299 .and_then(|mut ts2| ts2.read_page(page_num))
300 .map(|data| {
301 validate_checksum(&data, page_size, None).valid
302 })
303 .unwrap_or(false);
304
305 let lsn_delta = new_snap.lsn as i64 - old_snap.lsn as i64;
306
307 changes.push(PageChange {
308 page: page_num,
309 kind: "modified".to_string(),
310 page_type: new_snap.page_type.clone(),
311 old_lsn: Some(old_snap.lsn),
312 new_lsn: Some(new_snap.lsn),
313 lsn_delta: Some(lsn_delta),
314 checksum_valid: Some(checksum_valid),
315 });
316 }
317 }
318 None => {
319 added_count += 1;
320 changes.push(PageChange {
321 page: page_num,
322 kind: "added".to_string(),
323 page_type: new_snap.page_type.clone(),
324 old_lsn: None,
325 new_lsn: Some(new_snap.lsn),
326 lsn_delta: None,
327 checksum_valid: None,
328 });
329 }
330 }
331 }
332
333 for &page_num in state.snapshots.keys() {
335 if !new_snapshots.contains_key(&page_num) {
336 removed_count += 1;
337 let old_snap = &state.snapshots[&page_num];
338 changes.push(PageChange {
339 page: page_num,
340 kind: "removed".to_string(),
341 page_type: old_snap.page_type.clone(),
342 old_lsn: Some(old_snap.lsn),
343 new_lsn: None,
344 lsn_delta: None,
345 checksum_valid: None,
346 });
347 }
348 }
349
350 changes.sort_by_key(|c| c.page);
352
353 let cycle_changes = modified_count + added_count + removed_count;
354 total_changes += cycle_changes;
355
356 if cycle_changes > 0 {
358 if opts.json {
359 emit_json_line(
360 writer,
361 &WatchEvent {
362 timestamp: now_timestamp(),
363 event: "poll".to_string(),
364 pages: Some(new_page_count),
365 page_size: None,
366 vendor: None,
367 modified: Some(modified_count),
368 added: Some(added_count),
369 removed: Some(removed_count),
370 changes: Some(changes),
371 total_changes: None,
372 total_polls: None,
373 error: None,
374 },
375 )?;
376 } else {
377 let mut parts = Vec::new();
379 if modified_count > 0 {
380 let word = if modified_count == 1 { "page" } else { "pages" };
381 parts.push(format!("{} {} modified", modified_count, word));
382 }
383 if added_count > 0 {
384 let word = if added_count == 1 { "page" } else { "pages" };
385 parts.push(format!("{} {} added", added_count, word));
386 }
387 if removed_count > 0 {
388 let word = if removed_count == 1 { "page" } else { "pages" };
389 parts.push(format!("{} {} removed", removed_count, word));
390 }
391 wprintln!(writer, "{} {}", now_time_short(), parts.join(", "))?;
392
393 for change in &changes {
395 match change.kind.as_str() {
396 "modified" => {
397 let old_lsn = change.old_lsn.unwrap_or(0);
398 let new_lsn = change.new_lsn.unwrap_or(0);
399 let delta = change.lsn_delta.unwrap_or(0);
400 let cksum_str = if change.checksum_valid.unwrap_or(false) {
401 "checksum valid".green().to_string()
402 } else {
403 "CHECKSUM INVALID".red().to_string()
404 };
405
406 if opts.verbose {
407 wprintln!(
408 writer,
409 " Page {:<5} {:<12} LSN {} -> {} ({:+}) {}",
410 change.page,
411 change.page_type,
412 old_lsn,
413 new_lsn,
414 delta,
415 cksum_str,
416 )?;
417 } else {
418 wprintln!(
419 writer,
420 " Page {:<5} {:<12} LSN {:+} {}",
421 change.page,
422 change.page_type,
423 delta,
424 cksum_str,
425 )?;
426 }
427 }
428 "added" => {
429 wprintln!(
430 writer,
431 " Page {:<5} {:<12} {}",
432 change.page,
433 change.page_type,
434 "(new page)".cyan(),
435 )?;
436 }
437 "removed" => {
438 wprintln!(
439 writer,
440 " Page {:<5} {:<12} {}",
441 change.page,
442 change.page_type,
443 "(removed)".yellow(),
444 )?;
445 }
446 _ => {}
447 }
448 }
449
450 wprintln!(writer)?;
451 }
452 }
453
454 state.snapshots = new_snapshots;
456 state.page_count = new_page_count;
457 }
458
459 if opts.json {
461 emit_json_line(
462 writer,
463 &WatchEvent {
464 timestamp: now_timestamp(),
465 event: "stopped".to_string(),
466 pages: None,
467 page_size: None,
468 vendor: None,
469 modified: None,
470 added: None,
471 removed: None,
472 changes: None,
473 total_changes: Some(total_changes),
474 total_polls: Some(total_polls),
475 error: None,
476 },
477 )?;
478 } else {
479 wprintln!(
480 writer,
481 "Stopped after {} polls. Total page changes: {}",
482 total_polls,
483 total_changes,
484 )?;
485 }
486
487 Ok(())
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493 use byteorder::{BigEndian, ByteOrder};
494 use std::io::Write as IoWrite;
495 use tempfile::NamedTempFile;
496
497 use crate::innodb::constants::*;
498
499 const PS: usize = SIZE_PAGE_DEFAULT as usize;
500
501 fn build_fsp_page(space_id: u32, total_pages: u32) -> Vec<u8> {
502 let mut page = vec![0u8; PS];
503 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
504 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
505 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
506 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
507 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
509 let fsp = FIL_PAGE_DATA;
510 BigEndian::write_u32(&mut page[fsp + FSP_SPACE_ID..], space_id);
511 BigEndian::write_u32(&mut page[fsp + FSP_SIZE..], total_pages);
512 BigEndian::write_u32(&mut page[fsp + FSP_FREE_LIMIT..], total_pages);
513 BigEndian::write_u32(&mut page[fsp + FSP_SPACE_FLAGS..], 0);
514 let trailer = PS - SIZE_FIL_TRAILER;
515 BigEndian::write_u32(&mut page[trailer + 4..], 1000 & 0xFFFFFFFF);
516 let end = PS - SIZE_FIL_TRAILER;
517 let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
518 let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
519 BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
520 page
521 }
522
523 fn build_index_page(page_num: u32, space_id: u32, lsn: u64) -> Vec<u8> {
524 let mut page = vec![0u8; PS];
525 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
526 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
527 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
528 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
529 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
531 let trailer = PS - SIZE_FIL_TRAILER;
532 BigEndian::write_u32(&mut page[trailer + 4..], (lsn & 0xFFFFFFFF) as u32);
533 let end = PS - SIZE_FIL_TRAILER;
534 let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
535 let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
536 BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
537 page
538 }
539
540 fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
541 let mut tmp = NamedTempFile::new().expect("create temp file");
542 for page in pages {
543 tmp.write_all(page).expect("write page");
544 }
545 tmp.flush().expect("flush");
546 tmp
547 }
548
549 #[test]
550 fn test_take_snapshot() {
551 let tmp = write_tablespace(&[
552 build_fsp_page(1, 3),
553 build_index_page(1, 1, 2000),
554 build_index_page(2, 1, 3000),
555 ]);
556 let mut ts = Tablespace::open(tmp.path()).unwrap();
557 let snaps = take_snapshot(&mut ts).unwrap();
558 assert_eq!(snaps.len(), 3);
559 assert_eq!(snaps[&0].lsn, 1000);
560 assert_eq!(snaps[&1].lsn, 2000);
561 assert_eq!(snaps[&2].lsn, 3000);
562 assert_eq!(snaps[&1].page_type, "INDEX");
563 }
564
565 #[test]
566 fn test_snapshot_detects_page_type() {
567 let tmp = write_tablespace(&[build_fsp_page(1, 1)]);
568 let mut ts = Tablespace::open(tmp.path()).unwrap();
569 let snaps = take_snapshot(&mut ts).unwrap();
570 assert_eq!(snaps[&0].page_type, "FSP_HDR");
571 }
572
573 #[test]
574 fn test_open_tablespace_helper() {
575 let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
576 let opts = WatchOptions {
577 file: tmp.path().to_str().unwrap().to_string(),
578 interval: 100,
579 verbose: false,
580 json: false,
581 page_size: None,
582 keyring: None,
583 };
584 let ts = open_tablespace(&opts).unwrap();
585 assert_eq!(ts.page_count(), 2);
586 assert_eq!(ts.page_size(), SIZE_PAGE_DEFAULT);
587 }
588
589 #[test]
590 fn test_open_tablespace_with_page_size_override() {
591 let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
592 let opts = WatchOptions {
593 file: tmp.path().to_str().unwrap().to_string(),
594 interval: 100,
595 verbose: false,
596 json: false,
597 page_size: Some(16384),
598 keyring: None,
599 };
600 let ts = open_tablespace(&opts).unwrap();
601 assert_eq!(ts.page_count(), 2);
602 }
603
604 #[test]
605 fn test_open_tablespace_missing_file() {
606 let opts = WatchOptions {
607 file: "/nonexistent/path.ibd".to_string(),
608 interval: 100,
609 verbose: false,
610 json: false,
611 page_size: None,
612 keyring: None,
613 };
614 assert!(open_tablespace(&opts).is_err());
615 }
616}