lading/
observer.rs

1//! Manage the target observer
2//!
3//! The interogation that lading does of the target sub-process is intentionally
4//! limited to in-process concerns, for the most part. The 'inspector' does
5//! allow for a sub-process to do out-of-band inspection of the target but
6//! cannot incorporate whatever it's doing into the capture data that lading
7//! produces. This observer, on Linux, looks up the target process in procfs and
8//! writes out key details about memory and CPU consumption into the capture
9//! data. On non-Linux systems the observer, if enabled, will emit a warning.
10
11use std::{io, sync::atomic::AtomicU64};
12
13use crate::target::TargetPidReceiver;
14use serde::Deserialize;
15
16use crate::signals::Shutdown;
17
18#[cfg(target_os = "linux")]
19mod linux;
20
21/// Expose the process' current RSS consumption, allowing abstractions to be
22/// built on top in the Target implementation.
23pub(crate) static RSS_BYTES: AtomicU64 = AtomicU64::new(0);
24
25#[derive(thiserror::Error, Debug)]
26/// Errors produced by [`Server`]
27pub enum Error {
28    /// Wrapper for [`nix::errno::Errno`]
29    #[error("erno: {0}")]
30    Errno(#[from] nix::errno::Errno),
31    /// Wrapper for [`std::io::Error`]
32    #[error("IO error: {0}")]
33    Io(#[from] io::Error),
34    #[cfg(target_os = "linux")]
35    /// Wrapper for [`linux::Error`]
36    #[error("Linux error: {0}")]
37    Linux(#[from] linux::Error),
38}
39
40#[derive(Debug, Deserialize, Clone, Copy, Default, PartialEq, Eq)]
41#[serde(rename_all = "snake_case")]
42/// Configuration for [`Server`]
43pub struct Config {}
44
45#[derive(Debug)]
46/// The inspector sub-process server.
47///
48/// This struct manages a sub-process that can be used to do further examination
49/// of the [`crate::target::Server`] by means of operating system facilities. The
50/// sub-process is not created until [`Server::run`] is called. It is assumed
51/// that only one instance of this struct will ever exist at a time, although
52/// there are no protections for that.
53pub struct Server {
54    #[allow(dead_code)] // config is not actively used, left as a stub
55    config: Config,
56    #[allow(dead_code)] // this field is unused when target_os is not "linux"
57    shutdown: Shutdown,
58}
59
60impl Server {
61    /// Create a new [`Server`] instance
62    ///
63    /// The observer `Server` is responsible for investigating the
64    /// [`crate::target::Server`] sub-process.
65    ///
66    /// # Errors
67    ///
68    /// Function will error if the path to the sub-process is not valid or if
69    /// the path is valid but is not to file executable by this program.
70    pub fn new(config: Config, shutdown: Shutdown) -> Result<Self, Error> {
71        Ok(Self { config, shutdown })
72    }
73
74    /// Run this [`Server`] to completion
75    ///
76    /// This function runs the user supplied program to its completion, or until
77    /// a shutdown signal is received. Child exit status does not currently
78    /// propagate. This is less than ideal.
79    ///
80    /// Target server will use the `TargetPidReceiver` passed here to transmit
81    /// its PID. This PID is passed to the sub-process as the first argument.
82    ///
83    /// # Errors
84    ///
85    /// Function will return an error if the underlying program cannot be waited
86    /// on or will not shutdown when signaled to.
87    ///
88    /// # Panics
89    ///
90    /// None are known.
91    #[allow(
92        clippy::similar_names,
93        clippy::too_many_lines,
94        clippy::cast_possible_truncation,
95        clippy::cast_sign_loss
96    )]
97    #[cfg(target_os = "linux")]
98    pub async fn run(mut self, mut pid_snd: TargetPidReceiver) -> Result<(), Error> {
99        use std::time::Duration;
100
101        use crate::observer::linux::Sampler;
102
103        let target_pid = pid_snd
104            .recv()
105            .await
106            .expect("target failed to transmit PID, catastrophic failure");
107        drop(pid_snd);
108
109        let target_pid = target_pid.expect("observer cannot be used in no-target mode");
110
111        let mut sample_delay = tokio::time::interval(Duration::from_secs(1));
112        let mut sampler = Sampler::new(target_pid)?;
113
114        loop {
115            tokio::select! {
116                _ = sample_delay.tick() => {
117                    sampler.sample()?;
118                }
119                _ = self.shutdown.recv() => {
120                    tracing::info!("shutdown signal received");
121                    return Ok(());
122                }
123            }
124        }
125    }
126
127    /// "Run" this [`Server`] to completion
128    ///
129    /// On non-Linux systems, this function is a no-op that logs a warning
130    /// indicating observer capabilities are unavailable on these systems.
131    ///
132    /// # Errors
133    ///
134    /// None are known.
135    ///
136    /// # Panics
137    ///
138    /// None are known.
139    #[allow(clippy::unused_async)]
140    #[cfg(not(target_os = "linux"))]
141    pub async fn run(self, _pid_snd: TargetPidReceiver) -> Result<(), Error> {
142        tracing::warn!("observer unavailable on non-Linux system");
143        Ok(())
144    }
145}