1use std::collections::HashMap;
2use std::io::Write;
3use std::path::Path;
4use std::sync::atomic::{AtomicBool, Ordering};
5use std::sync::Arc;
6use std::thread;
7use std::time::Duration;
8
9use chrono::Local;
10use colored::Colorize;
11use serde::Serialize;
12
13use crate::cli::{setup_decryption, wprintln};
14use crate::innodb::checksum::validate_checksum;
15use crate::innodb::page::FilHeader;
16use crate::innodb::tablespace::Tablespace;
17use crate::IdbError;
18
19pub struct WatchOptions {
21 pub file: String,
23 pub interval: u64,
25 pub verbose: bool,
27 pub json: bool,
29 pub page_size: Option<u32>,
31 pub keyring: Option<String>,
33}
34
35#[derive(Clone)]
38struct PageSnapshot {
39 lsn: u64,
40 page_type: String,
41}
42
43struct WatchState {
44 snapshots: HashMap<u64, PageSnapshot>,
45 page_count: u64,
46 vendor_name: String,
47}
48
49#[derive(Serialize)]
52struct WatchEvent {
53 timestamp: String,
54 event: String,
55 #[serde(skip_serializing_if = "Option::is_none")]
56 pages: Option<u64>,
57 #[serde(skip_serializing_if = "Option::is_none")]
58 page_size: Option<u32>,
59 #[serde(skip_serializing_if = "Option::is_none")]
60 vendor: Option<String>,
61 #[serde(skip_serializing_if = "Option::is_none")]
62 modified: Option<u64>,
63 #[serde(skip_serializing_if = "Option::is_none")]
64 added: Option<u64>,
65 #[serde(skip_serializing_if = "Option::is_none")]
66 removed: Option<u64>,
67 #[serde(skip_serializing_if = "Option::is_none")]
68 changes: Option<Vec<PageChange>>,
69 #[serde(skip_serializing_if = "Option::is_none")]
70 total_changes: Option<u64>,
71 #[serde(skip_serializing_if = "Option::is_none")]
72 total_polls: Option<u64>,
73 #[serde(skip_serializing_if = "Option::is_none")]
74 error: Option<String>,
75}
76
77#[derive(Serialize)]
78struct PageChange {
79 page: u64,
80 kind: String,
81 page_type: String,
82 #[serde(skip_serializing_if = "Option::is_none")]
83 old_lsn: Option<u64>,
84 #[serde(skip_serializing_if = "Option::is_none")]
85 new_lsn: Option<u64>,
86 #[serde(skip_serializing_if = "Option::is_none")]
87 lsn_delta: Option<i64>,
88 #[serde(skip_serializing_if = "Option::is_none")]
89 checksum_valid: Option<bool>,
90}
91
92fn now_timestamp() -> String {
95 Local::now().format("%Y-%m-%dT%H:%M:%S%.3f%:z").to_string()
96}
97
98fn now_time_short() -> String {
99 Local::now().format("%H:%M:%S").to_string()
100}
101
102fn open_tablespace(opts: &WatchOptions) -> Result<Tablespace, IdbError> {
103 let mut ts = match opts.page_size {
104 Some(ps) => Tablespace::open_with_page_size(&opts.file, ps)?,
105 None => Tablespace::open(&opts.file)?,
106 };
107 if let Some(ref keyring_path) = opts.keyring {
108 setup_decryption(&mut ts, keyring_path)?;
109 }
110 Ok(ts)
111}
112
113fn take_snapshot(ts: &mut Tablespace) -> Result<HashMap<u64, PageSnapshot>, IdbError> {
114 let mut snapshots = HashMap::new();
115 ts.for_each_page(|page_num, data| {
116 if let Some(hdr) = FilHeader::parse(data) {
117 snapshots.insert(
118 page_num,
119 PageSnapshot {
120 lsn: hdr.lsn,
121 page_type: hdr.page_type.name().to_string(),
122 },
123 );
124 }
125 Ok(())
126 })?;
127 Ok(snapshots)
128}
129
130fn emit_json_line(writer: &mut dyn Write, event: &WatchEvent) -> Result<(), IdbError> {
131 let json = serde_json::to_string(event)
132 .map_err(|e| IdbError::Parse(format!("JSON serialization error: {}", e)))?;
133 wprintln!(writer, "{}", json)?;
134 Ok(())
135}
136
137pub fn execute(opts: &WatchOptions, writer: &mut dyn Write) -> Result<(), IdbError> {
141 let running = Arc::new(AtomicBool::new(true));
143 let r = running.clone();
144 ctrlc::set_handler(move || {
145 r.store(false, Ordering::SeqCst);
146 })
147 .map_err(|e| IdbError::Io(format!("Cannot set Ctrl+C handler: {}", e)))?;
148
149 let mut ts = open_tablespace(opts)?;
151 let page_size = ts.page_size();
152 let page_count = ts.page_count();
153 let vendor_name = ts.vendor_info().vendor.to_string();
154 let initial_snapshots = take_snapshot(&mut ts)?;
155
156 let mut state = WatchState {
157 snapshots: initial_snapshots,
158 page_count,
159 vendor_name: vendor_name.clone(),
160 };
161
162 if opts.json {
164 emit_json_line(
165 writer,
166 &WatchEvent {
167 timestamp: now_timestamp(),
168 event: "started".to_string(),
169 pages: Some(page_count),
170 page_size: Some(page_size),
171 vendor: Some(vendor_name),
172 modified: None,
173 added: None,
174 removed: None,
175 changes: None,
176 total_changes: None,
177 total_polls: None,
178 error: None,
179 },
180 )?;
181 } else {
182 wprintln!(
183 writer,
184 "Watching {} ({} pages, {} bytes/page, {})",
185 opts.file,
186 page_count,
187 page_size,
188 state.vendor_name,
189 )?;
190 wprintln!(
191 writer,
192 "Polling every {}ms. Press Ctrl+C to stop.",
193 opts.interval
194 )?;
195 wprintln!(writer)?;
196 }
197
198 let interval = Duration::from_millis(opts.interval);
199 let mut total_changes: u64 = 0;
200 let mut total_polls: u64 = 0;
201
202 while running.load(Ordering::SeqCst) {
204 thread::sleep(interval);
205
206 if !running.load(Ordering::SeqCst) {
207 break;
208 }
209
210 if !Path::new(&opts.file).exists() {
212 if opts.json {
213 emit_json_line(
214 writer,
215 &WatchEvent {
216 timestamp: now_timestamp(),
217 event: "error".to_string(),
218 pages: None,
219 page_size: None,
220 vendor: None,
221 modified: None,
222 added: None,
223 removed: None,
224 changes: None,
225 total_changes: None,
226 total_polls: None,
227 error: Some("File no longer exists".to_string()),
228 },
229 )?;
230 } else {
231 wprintln!(
232 writer,
233 "{} {}",
234 now_time_short(),
235 "File no longer exists — stopping.".red()
236 )?;
237 }
238 break;
239 }
240
241 let poll_result = open_tablespace(opts).and_then(|mut new_ts| {
243 let new_page_count = new_ts.page_count();
244 let new_snapshots = take_snapshot(&mut new_ts)?;
245 Ok((new_page_count, new_snapshots))
246 });
247
248 let (new_page_count, new_snapshots) = match poll_result {
249 Ok(r) => r,
250 Err(e) => {
251 if opts.json {
252 emit_json_line(
253 writer,
254 &WatchEvent {
255 timestamp: now_timestamp(),
256 event: "error".to_string(),
257 pages: None,
258 page_size: None,
259 vendor: None,
260 modified: None,
261 added: None,
262 removed: None,
263 changes: None,
264 total_changes: None,
265 total_polls: None,
266 error: Some(e.to_string()),
267 },
268 )?;
269 } else {
270 wprintln!(writer, "{} {} {}", now_time_short(), "Error:".red(), e)?;
271 }
272 continue;
273 }
274 };
275
276 total_polls += 1;
277
278 let mut changes: Vec<PageChange> = Vec::new();
280 let mut modified_count: u64 = 0;
281 let mut added_count: u64 = 0;
282 let mut removed_count: u64 = 0;
283
284 for (&page_num, new_snap) in &new_snapshots {
286 match state.snapshots.get(&page_num) {
287 Some(old_snap) => {
288 if new_snap.lsn != old_snap.lsn {
289 modified_count += 1;
290
291 let checksum_valid = open_tablespace(opts)
293 .and_then(|mut ts2| ts2.read_page(page_num))
294 .map(|data| validate_checksum(&data, page_size, None).valid)
295 .unwrap_or(false);
296
297 let lsn_delta = new_snap.lsn as i64 - old_snap.lsn as i64;
298
299 changes.push(PageChange {
300 page: page_num,
301 kind: "modified".to_string(),
302 page_type: new_snap.page_type.clone(),
303 old_lsn: Some(old_snap.lsn),
304 new_lsn: Some(new_snap.lsn),
305 lsn_delta: Some(lsn_delta),
306 checksum_valid: Some(checksum_valid),
307 });
308 }
309 }
310 None => {
311 added_count += 1;
312 changes.push(PageChange {
313 page: page_num,
314 kind: "added".to_string(),
315 page_type: new_snap.page_type.clone(),
316 old_lsn: None,
317 new_lsn: Some(new_snap.lsn),
318 lsn_delta: None,
319 checksum_valid: None,
320 });
321 }
322 }
323 }
324
325 for &page_num in state.snapshots.keys() {
327 if !new_snapshots.contains_key(&page_num) {
328 removed_count += 1;
329 let old_snap = &state.snapshots[&page_num];
330 changes.push(PageChange {
331 page: page_num,
332 kind: "removed".to_string(),
333 page_type: old_snap.page_type.clone(),
334 old_lsn: Some(old_snap.lsn),
335 new_lsn: None,
336 lsn_delta: None,
337 checksum_valid: None,
338 });
339 }
340 }
341
342 changes.sort_by_key(|c| c.page);
344
345 let cycle_changes = modified_count + added_count + removed_count;
346 total_changes += cycle_changes;
347
348 if cycle_changes > 0 {
350 if opts.json {
351 emit_json_line(
352 writer,
353 &WatchEvent {
354 timestamp: now_timestamp(),
355 event: "poll".to_string(),
356 pages: Some(new_page_count),
357 page_size: None,
358 vendor: None,
359 modified: Some(modified_count),
360 added: Some(added_count),
361 removed: Some(removed_count),
362 changes: Some(changes),
363 total_changes: None,
364 total_polls: None,
365 error: None,
366 },
367 )?;
368 } else {
369 let mut parts = Vec::new();
371 if modified_count > 0 {
372 let word = if modified_count == 1 { "page" } else { "pages" };
373 parts.push(format!("{} {} modified", modified_count, word));
374 }
375 if added_count > 0 {
376 let word = if added_count == 1 { "page" } else { "pages" };
377 parts.push(format!("{} {} added", added_count, word));
378 }
379 if removed_count > 0 {
380 let word = if removed_count == 1 { "page" } else { "pages" };
381 parts.push(format!("{} {} removed", removed_count, word));
382 }
383 wprintln!(writer, "{} {}", now_time_short(), parts.join(", "))?;
384
385 for change in &changes {
387 match change.kind.as_str() {
388 "modified" => {
389 let old_lsn = change.old_lsn.unwrap_or(0);
390 let new_lsn = change.new_lsn.unwrap_or(0);
391 let delta = change.lsn_delta.unwrap_or(0);
392 let cksum_str = if change.checksum_valid.unwrap_or(false) {
393 "checksum valid".green().to_string()
394 } else {
395 "CHECKSUM INVALID".red().to_string()
396 };
397
398 if opts.verbose {
399 wprintln!(
400 writer,
401 " Page {:<5} {:<12} LSN {} -> {} ({:+}) {}",
402 change.page,
403 change.page_type,
404 old_lsn,
405 new_lsn,
406 delta,
407 cksum_str,
408 )?;
409 } else {
410 wprintln!(
411 writer,
412 " Page {:<5} {:<12} LSN {:+} {}",
413 change.page,
414 change.page_type,
415 delta,
416 cksum_str,
417 )?;
418 }
419 }
420 "added" => {
421 wprintln!(
422 writer,
423 " Page {:<5} {:<12} {}",
424 change.page,
425 change.page_type,
426 "(new page)".cyan(),
427 )?;
428 }
429 "removed" => {
430 wprintln!(
431 writer,
432 " Page {:<5} {:<12} {}",
433 change.page,
434 change.page_type,
435 "(removed)".yellow(),
436 )?;
437 }
438 _ => {}
439 }
440 }
441
442 wprintln!(writer)?;
443 }
444 }
445
446 state.snapshots = new_snapshots;
448 state.page_count = new_page_count;
449 }
450
451 if opts.json {
453 emit_json_line(
454 writer,
455 &WatchEvent {
456 timestamp: now_timestamp(),
457 event: "stopped".to_string(),
458 pages: None,
459 page_size: None,
460 vendor: None,
461 modified: None,
462 added: None,
463 removed: None,
464 changes: None,
465 total_changes: Some(total_changes),
466 total_polls: Some(total_polls),
467 error: None,
468 },
469 )?;
470 } else {
471 wprintln!(
472 writer,
473 "Stopped after {} polls. Total page changes: {}",
474 total_polls,
475 total_changes,
476 )?;
477 }
478
479 Ok(())
480}
481
482#[cfg(test)]
483mod tests {
484 use super::*;
485 use byteorder::{BigEndian, ByteOrder};
486 use std::io::Write as IoWrite;
487 use tempfile::NamedTempFile;
488
489 use crate::innodb::constants::*;
490
491 const PS: usize = SIZE_PAGE_DEFAULT as usize;
492
493 fn build_fsp_page(space_id: u32, total_pages: u32) -> Vec<u8> {
494 let mut page = vec![0u8; PS];
495 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], 0);
496 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
497 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
498 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], 1000);
499 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 8); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
501 let fsp = FIL_PAGE_DATA;
502 BigEndian::write_u32(&mut page[fsp + FSP_SPACE_ID..], space_id);
503 BigEndian::write_u32(&mut page[fsp + FSP_SIZE..], total_pages);
504 BigEndian::write_u32(&mut page[fsp + FSP_FREE_LIMIT..], total_pages);
505 BigEndian::write_u32(&mut page[fsp + FSP_SPACE_FLAGS..], 0);
506 let trailer = PS - SIZE_FIL_TRAILER;
507 BigEndian::write_u32(&mut page[trailer + 4..], 1000 & 0xFFFFFFFF);
508 let end = PS - SIZE_FIL_TRAILER;
509 let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
510 let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
511 BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
512 page
513 }
514
515 fn build_index_page(page_num: u32, space_id: u32, lsn: u64) -> Vec<u8> {
516 let mut page = vec![0u8; PS];
517 BigEndian::write_u32(&mut page[FIL_PAGE_OFFSET..], page_num);
518 BigEndian::write_u32(&mut page[FIL_PAGE_PREV..], FIL_NULL);
519 BigEndian::write_u32(&mut page[FIL_PAGE_NEXT..], FIL_NULL);
520 BigEndian::write_u64(&mut page[FIL_PAGE_LSN..], lsn);
521 BigEndian::write_u16(&mut page[FIL_PAGE_TYPE..], 17855); BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_ID..], space_id);
523 let trailer = PS - SIZE_FIL_TRAILER;
524 BigEndian::write_u32(&mut page[trailer + 4..], (lsn & 0xFFFFFFFF) as u32);
525 let end = PS - SIZE_FIL_TRAILER;
526 let crc1 = crc32c::crc32c(&page[FIL_PAGE_OFFSET..FIL_PAGE_FILE_FLUSH_LSN]);
527 let crc2 = crc32c::crc32c(&page[FIL_PAGE_DATA..end]);
528 BigEndian::write_u32(&mut page[FIL_PAGE_SPACE_OR_CHKSUM..], crc1 ^ crc2);
529 page
530 }
531
532 fn write_tablespace(pages: &[Vec<u8>]) -> NamedTempFile {
533 let mut tmp = NamedTempFile::new().expect("create temp file");
534 for page in pages {
535 tmp.write_all(page).expect("write page");
536 }
537 tmp.flush().expect("flush");
538 tmp
539 }
540
541 #[test]
542 fn test_take_snapshot() {
543 let tmp = write_tablespace(&[
544 build_fsp_page(1, 3),
545 build_index_page(1, 1, 2000),
546 build_index_page(2, 1, 3000),
547 ]);
548 let mut ts = Tablespace::open(tmp.path()).unwrap();
549 let snaps = take_snapshot(&mut ts).unwrap();
550 assert_eq!(snaps.len(), 3);
551 assert_eq!(snaps[&0].lsn, 1000);
552 assert_eq!(snaps[&1].lsn, 2000);
553 assert_eq!(snaps[&2].lsn, 3000);
554 assert_eq!(snaps[&1].page_type, "INDEX");
555 }
556
557 #[test]
558 fn test_snapshot_detects_page_type() {
559 let tmp = write_tablespace(&[build_fsp_page(1, 1)]);
560 let mut ts = Tablespace::open(tmp.path()).unwrap();
561 let snaps = take_snapshot(&mut ts).unwrap();
562 assert_eq!(snaps[&0].page_type, "FSP_HDR");
563 }
564
565 #[test]
566 fn test_open_tablespace_helper() {
567 let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
568 let opts = WatchOptions {
569 file: tmp.path().to_str().unwrap().to_string(),
570 interval: 100,
571 verbose: false,
572 json: false,
573 page_size: None,
574 keyring: None,
575 };
576 let ts = open_tablespace(&opts).unwrap();
577 assert_eq!(ts.page_count(), 2);
578 assert_eq!(ts.page_size(), SIZE_PAGE_DEFAULT);
579 }
580
581 #[test]
582 fn test_open_tablespace_with_page_size_override() {
583 let tmp = write_tablespace(&[build_fsp_page(1, 2), build_index_page(1, 1, 2000)]);
584 let opts = WatchOptions {
585 file: tmp.path().to_str().unwrap().to_string(),
586 interval: 100,
587 verbose: false,
588 json: false,
589 page_size: Some(16384),
590 keyring: None,
591 };
592 let ts = open_tablespace(&opts).unwrap();
593 assert_eq!(ts.page_count(), 2);
594 }
595
596 #[test]
597 fn test_open_tablespace_missing_file() {
598 let opts = WatchOptions {
599 file: "/nonexistent/path.ibd".to_string(),
600 interval: 100,
601 verbose: false,
602 json: false,
603 page_size: None,
604 keyring: None,
605 };
606 assert!(open_tablespace(&opts).is_err());
607 }
608}