1use crate::{
2 maps,
3 offsets::{PidOffsetsEntry, ProcessManager},
4};
5use std::collections::HashMap;
6use std::path::{Path, PathBuf};
7use std::sync::{mpsc, Arc, Mutex};
8use std::thread::{self, JoinHandle};
9use std::time::{Duration, Instant};
10use tracing::{error, info, warn};
11
12#[derive(Debug, Clone, Copy, PartialEq, Eq)]
14pub enum SysEventKind {
15 Exec,
16 Fork,
17 Exit,
18}
19
20impl SysEventKind {
21 fn from_u32(v: u32) -> Option<Self> {
22 match v {
23 1 => Some(SysEventKind::Exec),
24 2 => Some(SysEventKind::Fork),
25 3 => Some(SysEventKind::Exit),
26 _ => None,
27 }
28 }
29}
30
31#[repr(C)]
38#[derive(Clone, Copy)]
39pub struct SysEvent {
40 pub tgid: u32,
41 pub kind: u32, }
43
44const PENDING_POLL_INTERVAL: Duration = Duration::from_millis(150);
45const PENDING_MAX_ATTEMPTS: u32 = 20;
46
47#[derive(Debug, Clone)]
48pub(crate) struct PendingOffsetsEntry {
49 target_path: PathBuf,
50 attempts: u32,
51 last_poll: Instant,
52 first_seen: Instant,
53}
54
55#[derive(Debug, Default)]
56pub(crate) struct PendingOffsets {
57 entries: HashMap<u32, PendingOffsetsEntry>,
58}
59
60impl PendingOffsets {
61 fn new() -> Self {
62 Self {
63 entries: HashMap::new(),
64 }
65 }
66
67 fn register(&mut self, pid: u32, target: &Path) {
68 let now = Instant::now();
69 let last_poll = now.checked_sub(PENDING_POLL_INTERVAL).unwrap_or(now);
70 self.entries
71 .entry(pid)
72 .and_modify(|entry| {
73 entry.target_path = target.to_path_buf();
74 entry.attempts = 0;
75 entry.last_poll = last_poll;
76 entry.first_seen = now;
77 })
78 .or_insert(PendingOffsetsEntry {
79 target_path: target.to_path_buf(),
80 attempts: 0,
81 last_poll,
82 first_seen: now,
83 });
84 }
85
86 fn remove(&mut self, pid: u32) {
87 self.entries.remove(&pid);
88 }
89
90 fn take_due(&mut self) -> Vec<(u32, PathBuf, u32)> {
91 let mut due = Vec::new();
92 let now = Instant::now();
93 for (&pid, entry) in self.entries.iter_mut() {
94 if now.duration_since(entry.last_poll) >= PENDING_POLL_INTERVAL {
95 entry.last_poll = now;
96 entry.attempts = entry.attempts.saturating_add(1);
97 due.push((pid, entry.target_path.clone(), entry.attempts));
98 }
99 }
100 due
101 }
102}
103
104#[derive(Debug, Clone, Default)]
106pub struct SysmonConfig {
107 pub target_module: Option<PathBuf>,
109 pub proc_offsets_max_entries: u32,
111 pub perf_page_count: Option<usize>,
113}
114
115impl SysmonConfig {
116 pub fn new() -> Self {
117 Self {
118 target_module: None,
119 proc_offsets_max_entries: 4096,
120 perf_page_count: None,
121 }
122 }
123}
124
125pub struct ProcessSysmon {
132 cfg: SysmonConfig,
133 mgr: Arc<Mutex<ProcessManager>>, tx: mpsc::Sender<SysEvent>,
135 rx: mpsc::Receiver<SysEvent>,
136 pending_offsets: Arc<Mutex<PendingOffsets>>,
137 handle: Option<JoinHandle<()>>,
138}
139
140impl core::fmt::Debug for ProcessSysmon {
141 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
142 f.write_str("ProcessSysmon{..}")
143 }
144}
145
146impl ProcessSysmon {
147 pub fn new(mgr: Arc<Mutex<ProcessManager>>, cfg: SysmonConfig) -> Self {
149 let (tx, rx) = mpsc::channel();
150 Self {
151 cfg,
152 mgr,
153 tx,
154 rx,
155 pending_offsets: Arc::new(Mutex::new(PendingOffsets::new())),
156 handle: None,
157 }
158 }
159
160 pub fn start(&mut self) {
166 let _ = maps::ensure_pinned_proc_offsets_exists(self.cfg.proc_offsets_max_entries);
167 let _ = maps::ensure_pinned_allowed_pids_exists(16_384);
168
169 let tx = self.tx.clone();
170 let mgr = Arc::clone(&self.mgr);
171 let pending = Arc::clone(&self.pending_offsets);
172 let target = self.cfg.target_module.clone();
173 let perf_pages = self.cfg.perf_page_count;
174
175 let handle = thread::Builder::new()
176 .name("gs-sysmon".to_string())
177 .spawn(move || {
178 info!("ProcessSysmon thread started");
179 #[cfg(feature = "sysmon-ebpf")]
180 {
181 if let Err(e) = run_sysmon_loop(mgr, target, pending, perf_pages, tx) {
182 error!("Sysmon loop error: {}", e);
183 }
184 }
185 #[cfg(not(feature = "sysmon-ebpf"))]
186 {
187 let _ = pending;
188 warn!("sysmon-ebpf feature is disabled; sysmon is in stub mode");
189 loop {
190 std::thread::sleep(std::time::Duration::from_millis(5000));
191 }
192 }
193 info!("ProcessSysmon thread exiting");
194 });
195 match handle {
196 Ok(h) => self.handle = Some(h),
197 Err(e) => {
198 error!("Failed to spawn ProcessSysmon thread: {}", e);
199 self.handle = None;
200 }
201 }
202 }
203
204 pub fn recv_timeout(&self, timeout: std::time::Duration) -> Option<SysEvent> {
206 match self.rx.recv_timeout(timeout) {
207 Ok(ev) => Some(ev),
208 Err(mpsc::RecvTimeoutError::Timeout) => None,
209 Err(mpsc::RecvTimeoutError::Disconnected) => None,
210 }
211 }
212
213 pub(crate) fn handle_event(
215 mgr: &Arc<Mutex<ProcessManager>>,
216 target: &Option<PathBuf>,
217 pending: &Arc<Mutex<PendingOffsets>>,
218 ev: &SysEvent,
219 ) -> anyhow::Result<()> {
220 let kind = match SysEventKind::from_u32(ev.kind) {
221 Some(k) => k,
222 None => {
223 tracing::warn!(
224 "Sysmon: invalid event kind {} for pid {}; ignoring",
225 ev.kind,
226 ev.tgid
227 );
228 return Ok(());
229 }
230 };
231 match kind {
232 SysEventKind::Exec | SysEventKind::Fork => {
233 if let Some(tpath) = target {
234 let path = tpath.as_path();
235 if crate::util::is_shared_object(path) {
236 if kind == SysEventKind::Exec && !pid_maps_target_module(ev.tgid, path) {
237 tracing::debug!(
238 "Sysmon: pid {} does not map target module yet; scheduling retry",
239 ev.tgid
240 );
241 if let Ok(mut guard) = pending.lock() {
242 guard.register(ev.tgid, path);
243 }
244 return Ok(());
245 } else if let Ok(mut guard) = pending.lock() {
246 guard.remove(ev.tgid);
247 }
248 } else if kind == SysEventKind::Exec {
249 if let Some(actual) = get_comm_from_proc(ev.tgid) {
250 let expected = truncate_basename_to_comm(path);
251 if actual.as_bytes() != expected.as_slice() {
252 tracing::warn!(
253 "Sysmon: comm mismatch for pid {} (actual='{}', expected='{}'); skip prefill/insert",
254 ev.tgid,
255 actual,
256 core::str::from_utf8(&expected).unwrap_or("")
257 );
258 return Ok(());
259 }
260 }
261 }
262 }
263 let _ = prefill_offsets_for_pid(mgr, ev.tgid, target.as_deref());
264 }
265 SysEventKind::Exit => {
266 if let Ok(mut guard) = pending.lock() {
267 guard.remove(ev.tgid);
268 }
269 match crate::maps::purge_offsets_for_pid(ev.tgid) {
271 Ok(n) => info!(
272 "Sysmon: observed exit for pid {} (purged {} entries)",
273 ev.tgid, n
274 ),
275 Err(e) => tracing::warn!("Sysmon: purge failed for pid {}: {}", ev.tgid, e),
276 }
277 let _ = crate::maps::remove_allowed_pid(ev.tgid);
278 }
279 }
280 Ok(())
281 }
282}
283
284#[cfg(feature = "sysmon-ebpf")]
285fn run_sysmon_loop(
286 mgr: Arc<Mutex<ProcessManager>>,
287 target: Option<PathBuf>,
288 pending: Arc<Mutex<PendingOffsets>>,
289 perf_pages: Option<usize>,
290 tx: mpsc::Sender<SysEvent>,
291) -> anyhow::Result<()> {
292 use aya::maps::{perf::PerfEventArray, ring_buf::RingBuf, Array, MapData};
293 use aya::programs::TracePoint;
294 use aya::{include_bytes_aligned, EbpfLoader, VerifierLogLevel};
295 use log::{log_enabled, Level as LogLevel};
296 #[allow(unused_variables)]
298 let obj_le: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/sysmon-bpf.bpfel.o"));
299 #[allow(unused_variables)]
300 let obj_be: &[u8] = include_bytes_aligned!(concat!(env!("OUT_DIR"), "/sysmon-bpf.bpfeb.o"));
301 let obj: &[u8] = if cfg!(target_endian = "little") {
302 obj_le
303 } else {
304 obj_be
305 };
306 if obj.is_empty() {
307 warn!("sysmon-bpf object missing; running in stub mode (no realtime process events)");
308 return Ok(());
309 }
310 let mut loader = EbpfLoader::new();
311 let use_verbose =
312 cfg!(debug_assertions) || log_enabled!(LogLevel::Trace) || log_enabled!(LogLevel::Debug);
313 if use_verbose {
314 loader.verifier_log_level(VerifierLogLevel::VERBOSE | VerifierLogLevel::STATS);
315 tracing::info!("Sysmon verifier logs: VERBOSE (debug build/log)");
316 } else {
317 loader.verifier_log_level(VerifierLogLevel::DEBUG | VerifierLogLevel::STATS);
318 tracing::info!("Sysmon verifier logs: DEBUG (release/info)");
319 }
320 loader.map_pin_path(crate::maps::proc_offsets_pin_dir());
322 let mut bpf = loader.load(obj)?;
323
324 {
326 let mut filter_bytes = [0u8; 16];
327 let mut filter_len = 0usize;
328 if let Some(tpath) = target.as_ref() {
329 if !crate::util::is_shared_object(tpath) {
330 if let Some(name) = tpath.file_name().and_then(|s| s.to_str()) {
331 let bytes = name.as_bytes();
332 let len = bytes.len().min(filter_bytes.len());
333 filter_bytes[..len].copy_from_slice(&bytes[..len]);
334 filter_len = len;
335 } else {
336 tracing::warn!(
337 "Sysmon: target basename contains non-UTF8 bytes; exec comm filter disabled"
338 );
339 }
340 }
341 }
342 if let Some(map) = bpf.map_mut("target_exec_comm") {
343 let mut array: Array<_, [u8; 16]> = map.try_into()?;
344 array.set(0, filter_bytes, 0)?;
345 if filter_len > 0 {
346 match std::str::from_utf8(&filter_bytes[..filter_len]) {
347 Ok(name_str) => {
348 tracing::info!("Sysmon: exec comm filter configured for '{}'", name_str)
349 }
350 Err(_) => tracing::info!(
351 "Sysmon: exec comm filter configured (non-UTF8 basename, len={})",
352 filter_len
353 ),
354 }
355 } else {
356 tracing::info!("Sysmon: exec comm filter disabled");
357 }
358 } else if filter_len > 0 {
359 tracing::warn!("Sysmon: target_exec_comm map missing; exec filtering unavailable");
360 }
361 }
362
363 for (name, cat, evt) in [
367 ("sched_process_exec", "sched", "sched_process_exec"),
368 ("sched_process_exit", "sched", "sched_process_exit"),
369 ("sched_process_fork", "sched", "sched_process_fork"),
370 ] {
371 if let Some(prog) = bpf.program_mut(name) {
372 let tp: &mut TracePoint = prog.try_into()?;
373 tp.load()?;
374 tp.attach(cat, evt)?;
375 info!("Attached tracepoint: {}:{}", cat, evt);
376 } else {
377 warn!("Missing program '{}' in sysmon-bpf", name);
378 }
379 }
380 tracing::info!("Sysmon: attached all tracepoints");
381
382 if let Some(tpath) = &target {
384 if let Ok(mut guard) = mgr.lock() {
385 if let Ok(prefilled) = guard.ensure_prefill_module(tpath.to_string_lossy().as_ref()) {
386 tracing::info!(
387 "Sysmon: initial prefill cached {} pid(s) for module {}",
388 prefilled,
389 tpath.display()
390 );
391 let entries = guard.cached_offsets_for_module(tpath.to_string_lossy().as_ref());
392 if !entries.is_empty() {
393 use crate::maps::{insert_offsets_for_pid, ProcModuleOffsetsValue};
394 use std::collections::HashMap;
395 let mut by_pid: HashMap<u32, Vec<(u64, ProcModuleOffsetsValue)>> =
396 HashMap::new();
397 for (pid, cookie, off) in entries {
398 by_pid.entry(pid).or_default().push((
399 cookie,
400 ProcModuleOffsetsValue::new(off.text, off.rodata, off.data, off.bss),
401 ));
402 }
403 let mut total = 0usize;
404 for (pid, items) in by_pid {
405 if let Ok(n) = insert_offsets_for_pid(pid, &items) {
406 total += n;
407 }
408 let _ = crate::maps::insert_allowed_pid(pid);
410 }
411 tracing::info!(
412 "Sysmon: initial inserted {} offset entries for module {}",
413 total,
414 tpath.display()
415 );
416 }
417 }
418 }
419 }
420 tracing::info!("Sysmon: setup complete");
421
422 if let Some(map) = bpf.take_map("sysmon_events") {
424 let mut rb: RingBuf<MapData> = map.try_into()?;
425 loop {
426 let mut had_event = false;
427 if let Some(item) = rb.next() {
428 if item.len() == core::mem::size_of::<SysEvent>() {
429 let ev = unsafe { core::ptr::read_unaligned(item.as_ptr() as *const SysEvent) };
430 let _ = ProcessSysmon::handle_event(&mgr, &target, &pending, &ev);
431 let _ = tx.send(ev);
432 had_event = true;
433 }
434 }
435 poll_pending_offsets(&mgr, &pending);
436 if !had_event {
437 std::thread::sleep(std::time::Duration::from_millis(5));
438 }
439 }
440 } else if let Some(map) = bpf.take_map("sysmon_events_perf") {
441 let mut perf: PerfEventArray<_> = map.try_into()?;
442 let online = aya::util::online_cpus().map_err(|(_, e)| anyhow::anyhow!(e))?;
443 let mut bufs = Vec::new();
444 for cpu in online {
445 match perf.open(cpu, perf_pages) {
446 Ok(buf) => bufs.push(buf),
447 Err(e) => warn!("Perf open failed for CPU {}: {}", cpu, e),
448 }
449 }
450 if bufs.is_empty() {
451 return Err(anyhow::anyhow!("No perf buffers opened"));
452 }
453 loop {
454 std::thread::sleep(std::time::Duration::from_millis(10));
455 for buf in bufs.iter_mut() {
456 if !buf.readable() {
457 continue;
458 }
459 let mut read_bufs = vec![bytes::BytesMut::with_capacity(256)];
460 match buf.read_events(&mut read_bufs) {
461 Ok(res) => {
462 for data in read_bufs.iter().take(res.read.min(read_bufs.len())) {
463 if data.len() == core::mem::size_of::<SysEvent>() {
464 let ev = unsafe {
465 core::ptr::read_unaligned(data.as_ptr() as *const SysEvent)
466 };
467 let _ = ProcessSysmon::handle_event(&mgr, &target, &pending, &ev);
468 let _ = tx.send(ev);
469 }
470 }
471 }
472 Err(e) => warn!("Perf read_events failed: {}", e),
473 }
474 }
475 poll_pending_offsets(&mgr, &pending);
476 }
477 } else {
478 return Err(anyhow::anyhow!("No sysmon events map found (ringbuf/perf)"));
479 }
480}
481
482fn pid_alive(pid: u32) -> bool {
591 std::path::Path::new(&format!("/proc/{pid}")).exists()
592}
593
594fn filter_entries_for_target<'a>(
595 entries: &'a [PidOffsetsEntry],
596 target: Option<&Path>,
597) -> Vec<&'a PidOffsetsEntry> {
598 use std::fs;
599 use std::os::unix::fs::MetadataExt;
600
601 if let Some(tpath) = target {
602 match fs::metadata(tpath) {
603 Ok(meta) => {
604 let t_dev = meta.dev();
605 let t_ino = meta.ino();
606 entries
607 .iter()
608 .filter(|e| {
609 fs::metadata(&e.module_path)
610 .map(|m| m.dev() == t_dev && m.ino() == t_ino)
611 .unwrap_or(false)
612 })
613 .collect()
614 }
615 Err(_) => {
616 let tc = crate::cookie::from_path(&tpath.to_string_lossy());
617 let by_cookie: Vec<_> = entries.iter().filter(|e| e.cookie == tc).collect();
618 if !by_cookie.is_empty() {
619 by_cookie
620 } else {
621 let tnorm = tpath.to_string_lossy().replace("/./", "/");
622 entries.iter().filter(|e| e.module_path == tnorm).collect()
623 }
624 }
625 }
626 } else {
627 entries.iter().collect()
628 }
629}
630
631fn prefill_offsets_for_pid(
632 mgr: &Arc<Mutex<ProcessManager>>,
633 pid: u32,
634 target: Option<&Path>,
635) -> anyhow::Result<bool> {
636 use crate::maps::{insert_offsets_for_pid, ProcModuleOffsetsValue};
637
638 let mut inserted_any = false;
639 if let Ok(mut guard) = mgr.lock() {
640 let prefilled = guard.ensure_prefill_pid(pid)?;
641 if prefilled > 0 {
642 info!("Sysmon: prefilled {} entries for pid {}", prefilled, pid);
643 }
644 if let Some(entries) = guard.cached_offsets_with_paths_for_pid(pid) {
645 let filtered = filter_entries_for_target(entries, target);
646 if !filtered.is_empty() {
647 let items: Vec<(u64, ProcModuleOffsetsValue)> = filtered
648 .iter()
649 .map(|e| {
650 (
651 e.cookie,
652 ProcModuleOffsetsValue::new(
653 e.offsets.text,
654 e.offsets.rodata,
655 e.offsets.data,
656 e.offsets.bss,
657 ),
658 )
659 })
660 .collect();
661 match insert_offsets_for_pid(pid, &items) {
662 Ok(inserted) => {
663 if inserted == 0 {
664 tracing::warn!(
665 "Sysmon: no offsets inserted for pid {} (filtered count={})",
666 pid,
667 items.len()
668 );
669 } else {
670 tracing::info!(
671 "Sysmon: inserted {} offset entries for pid {}",
672 inserted,
673 pid
674 );
675 let _ = crate::maps::insert_allowed_pid(pid);
676 inserted_any = true;
677 }
678 }
679 Err(e) => {
680 tracing::warn!("Sysmon: failed to insert offsets for pid {}: {}", pid, e);
681 }
682 }
683 } else if target.is_some() {
684 tracing::debug!("Sysmon: pid {} does not map target module; skip", pid);
685 }
686 }
687 }
688 Ok(inserted_any)
689}
690
691fn poll_pending_offsets(mgr: &Arc<Mutex<ProcessManager>>, pending: &Arc<Mutex<PendingOffsets>>) {
692 let due = if let Ok(mut guard) = pending.lock() {
693 guard.take_due()
694 } else {
695 Vec::new()
696 };
697
698 if due.is_empty() {
699 return;
700 }
701
702 let mut to_remove: Vec<u32> = Vec::new();
703
704 for (pid, target_path, attempts) in due {
705 if !pid_alive(pid) {
706 tracing::debug!(
707 "Sysmon: pid {} exited while waiting for offsets; removing from retry queue",
708 pid
709 );
710 to_remove.push(pid);
711 continue;
712 }
713
714 if !pid_maps_target_module(pid, &target_path) {
715 if attempts >= PENDING_MAX_ATTEMPTS {
716 tracing::warn!(
717 "Sysmon: pid {} still missing module {} after {} retries; giving up",
718 pid,
719 target_path.display(),
720 attempts
721 );
722 to_remove.push(pid);
723 }
724 continue;
725 }
726
727 match prefill_offsets_for_pid(mgr, pid, Some(target_path.as_path())) {
728 Ok(true) => {
729 tracing::info!(
730 "Sysmon: deferred prefill succeeded for pid {} (module {})",
731 pid,
732 target_path.display()
733 );
734 to_remove.push(pid);
735 }
736 Ok(false) => {
737 if attempts >= PENDING_MAX_ATTEMPTS {
738 tracing::warn!(
739 "Sysmon: deferred prefill produced no entries for pid {} after {} retries; giving up",
740 pid,
741 attempts
742 );
743 to_remove.push(pid);
744 }
745 }
746 Err(e) => {
747 tracing::warn!(
748 "Sysmon: deferred prefill failed for pid {} (attempt {}): {}",
749 pid,
750 attempts,
751 e
752 );
753 if attempts >= PENDING_MAX_ATTEMPTS {
754 to_remove.push(pid);
755 }
756 }
757 }
758 }
759
760 if !to_remove.is_empty() {
761 if let Ok(mut guard) = pending.lock() {
762 for pid in to_remove {
763 guard.remove(pid);
764 }
765 }
766 }
767}
768
769fn get_comm_from_proc(pid: u32) -> Option<String> {
770 use std::io::Read;
771 let path = format!("/proc/{pid}/comm");
772 let mut f = std::fs::File::open(path).ok()?;
773 let mut s = String::new();
774 f.read_to_string(&mut s).ok()?;
775 if s.ends_with('\n') {
776 s.pop();
777 if s.ends_with('\r') {
778 s.pop();
779 }
780 }
781 Some(s)
783}
784
785fn truncate_basename_to_comm(path: &Path) -> Vec<u8> {
786 use std::ffi::OsStr;
787 let mut buf = Vec::with_capacity(16);
788 if let Some(name) = path.file_name().and_then(OsStr::to_str) {
789 let bytes = name.as_bytes();
790 let n = core::cmp::min(bytes.len(), 15);
791 buf.extend_from_slice(&bytes[..n]);
792 }
793 buf
794}
795
796fn pid_maps_target_module(pid: u32, target: &Path) -> bool {
797 use std::fs;
798 use std::os::unix::fs::MetadataExt;
799
800 let maps_path = format!("/proc/{pid}/maps");
801 let Ok(content) = fs::read_to_string(&maps_path) else {
802 return false;
803 };
804
805 let (t_dev, t_ino) = fs::metadata(target)
806 .map(|m| (Some(m.dev()), Some(m.ino())))
807 .unwrap_or((None, None));
808 let t_norm = target.to_string_lossy().replace("/./", "/");
809
810 for line in content.lines() {
811 let parts: Vec<&str> = line.split_whitespace().collect();
812 if parts.len() < 6 {
813 continue;
814 }
815 let path = parts[5];
816 if path.starts_with('[') {
817 continue;
818 }
819 let path_trim = if let Some(idx) = path.find(" (deleted)") {
820 &path[..idx]
821 } else {
822 path
823 };
824
825 let matched = if let (Some(dev), Some(ino)) = (t_dev, t_ino) {
826 if let Some((maj_s, min_s)) = parts[3].split_once(':') {
827 if let (Ok(maj), Ok(min), Ok(inode)) = (
828 u64::from_str_radix(maj_s, 16),
829 u64::from_str_radix(min_s, 16),
830 parts[4].parse::<u64>(),
831 ) {
832 let d = dev as libc::dev_t;
833 let t_maj = libc::major(d) as u64;
834 let t_min = libc::minor(d) as u64;
835 maj == t_maj && min == t_min && inode == ino
836 } else {
837 false
838 }
839 } else {
840 false
841 }
842 } else {
843 path_trim == t_norm
844 };
845
846 if matched {
847 return true;
848 }
849 }
850
851 false
852}