cargowatch_detector/
lib.rs1use std::collections::{BTreeMap, BTreeSet};
4use std::path::{Path, PathBuf};
5
6use time::OffsetDateTime;
7use tokio::sync::mpsc;
8use tokio::time::{Duration, sleep};
9use tracing::debug;
10
11use cargowatch_core::{DetectedProcess, DetectedProcessClass, SessionEvent, detected_session_id};
12
13pub struct RustProcessDetector {
15 system: sysinfo::System,
16}
17
18impl Default for RustProcessDetector {
19 fn default() -> Self {
20 let system = sysinfo::System::new_all();
21 Self { system }
22 }
23}
24
25impl RustProcessDetector {
26 pub fn scan(&mut self) -> Vec<DetectedProcess> {
28 self.system
29 .refresh_processes(sysinfo::ProcessesToUpdate::All, true);
30 let now = OffsetDateTime::now_utc();
31 let mut processes = Vec::new();
32
33 for (pid, process) in self.system.processes() {
34 let process_name = process.name().to_string_lossy().to_string();
35 let command = process
36 .cmd()
37 .iter()
38 .map(|part| part.to_string_lossy().to_string())
39 .collect::<Vec<_>>();
40 if !is_rust_process(&process_name, &command) {
41 continue;
42 }
43
44 let started_at = unix_timestamp_to_utc(process.start_time() as i64);
45 let cwd = process.cwd().map(|path| path.to_path_buf());
46 let workspace_root = infer_workspace_root(cwd.as_deref(), &command);
47 let classification = classify_process(&process_name, &command);
48 let session_id = detected_session_id(pid.as_u32(), started_at);
49 let elapsed_ms = i64::try_from((now - started_at).whole_milliseconds())
50 .unwrap_or(i64::MAX)
51 .max(0);
52
53 processes.push(DetectedProcess {
54 session_id,
55 pid: pid.as_u32(),
56 process_name,
57 command,
58 cwd,
59 workspace_root,
60 classification,
61 started_at,
62 last_seen_at: now,
63 elapsed_ms,
64 });
65 }
66
67 processes.sort_by(|left, right| right.started_at.cmp(&left.started_at));
68 processes
69 }
70}
71
72pub struct DetectionService {
74 detector: RustProcessDetector,
75 sender: mpsc::UnboundedSender<SessionEvent>,
76 poll_interval: Duration,
77 previous: BTreeMap<String, DetectedProcess>,
78}
79
80impl DetectionService {
81 pub fn new(sender: mpsc::UnboundedSender<SessionEvent>, poll_interval_ms: u64) -> Self {
83 Self {
84 detector: RustProcessDetector::default(),
85 sender,
86 poll_interval: Duration::from_millis(poll_interval_ms),
87 previous: BTreeMap::new(),
88 }
89 }
90
91 pub async fn run(mut self, cancellation: tokio_util::sync::CancellationToken) {
93 loop {
94 self.tick();
95 tokio::select! {
96 _ = cancellation.cancelled() => return,
97 _ = sleep(self.poll_interval) => {}
98 }
99 }
100 }
101
102 fn tick(&mut self) {
103 let current = self
104 .detector
105 .scan()
106 .into_iter()
107 .map(|process| (process.session_id.clone(), process))
108 .collect::<BTreeMap<_, _>>();
109
110 for (session_id, process) in ¤t {
111 match self.previous.get(session_id) {
112 None => {
113 let _ = self
114 .sender
115 .send(SessionEvent::ProcessDetected(process.clone()));
116 }
117 Some(previous) if previous != process => {
118 let _ = self
119 .sender
120 .send(SessionEvent::ProcessUpdated(process.clone()));
121 }
122 _ => {}
123 }
124 }
125
126 let current_ids = current.keys().cloned().collect::<BTreeSet<_>>();
127 for (session_id, previous) in &self.previous {
128 if !current_ids.contains(session_id) {
129 let _ = self.sender.send(SessionEvent::ProcessGone {
130 session_id: previous.session_id.clone(),
131 pid: previous.pid,
132 observed_at: OffsetDateTime::now_utc(),
133 });
134 }
135 }
136
137 debug!(count = current.len(), "detector tick completed");
138 self.previous = current;
139 }
140}
141
142fn is_rust_process(process_name: &str, command: &[String]) -> bool {
143 let first = command
144 .first()
145 .map(|part| executable_stem(part))
146 .unwrap_or_else(|| executable_stem(process_name));
147 matches!(
148 first.as_deref(),
149 Some("cargo" | "rustc" | "rustdoc" | "clippy-driver")
150 )
151}
152
153fn classify_process(process_name: &str, command: &[String]) -> DetectedProcessClass {
154 let stem = command
155 .first()
156 .map(|part| executable_stem(part))
157 .unwrap_or_else(|| executable_stem(process_name))
158 .unwrap_or_default();
159 match stem.as_str() {
160 "cargo" => {
161 let subcommand = command.iter().skip(1).find(|part| !part.starts_with('-'));
162 match subcommand.map(String::as_str) {
163 Some("build") => DetectedProcessClass::CargoBuild,
164 Some("check") => DetectedProcessClass::CargoCheck,
165 Some("test") => DetectedProcessClass::CargoTest,
166 Some("clippy") => DetectedProcessClass::CargoClippy,
167 Some("doc") => DetectedProcessClass::CargoDoc,
168 _ => DetectedProcessClass::UnknownRustProcess,
169 }
170 }
171 "rustc" => DetectedProcessClass::RustcCompile,
172 "rustdoc" => DetectedProcessClass::Rustdoc,
173 "clippy-driver" => DetectedProcessClass::CargoClippy,
174 _ => DetectedProcessClass::UnknownRustProcess,
175 }
176}
177
178fn infer_workspace_root(cwd: Option<&Path>, command: &[String]) -> Option<PathBuf> {
179 if let Some(manifest_path) = manifest_path_from_command(cwd, command) {
180 let manifest_dir = if manifest_path.ends_with("Cargo.toml") {
181 manifest_path.parent().map(Path::to_path_buf)
182 } else {
183 Some(manifest_path)
184 };
185 if let Some(root) = manifest_dir {
186 return find_workspace_root(&root);
187 }
188 }
189 cwd.and_then(find_workspace_root)
190}
191
192fn manifest_path_from_command(cwd: Option<&Path>, command: &[String]) -> Option<PathBuf> {
193 let manifest = command
194 .windows(2)
195 .find(|pair| pair[0] == "--manifest-path")
196 .map(|pair| PathBuf::from(&pair[1]))?;
197 if manifest.is_absolute() {
198 Some(manifest)
199 } else {
200 cwd.map(|cwd| cwd.join(manifest))
201 }
202}
203
204fn find_workspace_root(start: &Path) -> Option<PathBuf> {
205 let mut last_match = None;
206 for ancestor in start.ancestors() {
207 if ancestor.join("Cargo.toml").exists() {
208 last_match = Some(ancestor.to_path_buf());
209 }
210 }
211 last_match
212}
213
214fn executable_stem(value: &str) -> Option<String> {
215 Path::new(value)
216 .file_stem()
217 .and_then(|stem| stem.to_str())
218 .map(|stem| stem.to_string())
219}
220
221fn unix_timestamp_to_utc(timestamp: i64) -> OffsetDateTime {
222 OffsetDateTime::from_unix_timestamp(timestamp).unwrap_or(OffsetDateTime::UNIX_EPOCH)
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228
229 #[test]
230 fn classifies_cargo_subcommands() {
231 let class = classify_process("cargo", &["cargo".into(), "check".into()]);
232 assert_eq!(class, DetectedProcessClass::CargoCheck);
233 }
234
235 #[test]
236 fn infers_workspace_root_from_manifest_argument() {
237 let root = tempfile::tempdir().expect("tempdir");
238 let member = root.path().join("member");
239 std::fs::create_dir_all(&member).expect("member dir");
240 std::fs::write(
241 root.path().join("Cargo.toml"),
242 "[workspace]\nmembers=[\"member\"]",
243 )
244 .expect("workspace manifest");
245 std::fs::write(
246 member.join("Cargo.toml"),
247 "[package]\nname=\"member\"\nversion=\"0.1.0\"\nedition=\"2024\"",
248 )
249 .expect("member manifest");
250
251 let detected = infer_workspace_root(
252 None,
253 &[
254 "cargo".into(),
255 "check".into(),
256 "--manifest-path".into(),
257 member.join("Cargo.toml").display().to_string(),
258 ],
259 );
260
261 assert_eq!(detected, Some(root.path().to_path_buf()));
262 }
263
264 #[test]
265 fn resolves_relative_manifest_paths_against_process_cwd() {
266 let process_root = tempfile::tempdir().expect("tempdir");
267 let workspace = process_root.path().join("workspace");
268 let member = workspace.join("member");
269 std::fs::create_dir_all(&member).expect("member dir");
270 std::fs::write(
271 workspace.join("Cargo.toml"),
272 "[workspace]\nmembers=[\"member\"]",
273 )
274 .expect("workspace manifest");
275 std::fs::write(
276 member.join("Cargo.toml"),
277 "[package]\nname=\"member\"\nversion=\"0.1.0\"\nedition=\"2024\"",
278 )
279 .expect("member manifest");
280
281 let detected = infer_workspace_root(
282 Some(process_root.path()),
283 &[
284 "cargo".into(),
285 "check".into(),
286 "--manifest-path".into(),
287 "workspace/member/Cargo.toml".into(),
288 ],
289 );
290
291 assert_eq!(detected, Some(workspace));
292 }
293}