1use std::{
3 io::{BufRead, Write},
4 time::Duration,
5};
6
7use cognos::Host;
8use tracing::debug;
9
10use crate::{
11 cache::BuildReportCache,
12 display::{Display, DisplayConfig},
13 error::{Result, RomError},
14 state::{BuildStatus, Derivation, FailType, State, StorePath},
15 types::{Config, InputMode},
16 update,
17};
18
19enum HumanParserState {
20 Idle,
21 PlanBuilds,
22 PlanDownloads,
23}
24
25struct HumanParser {
26 state: HumanParserState,
27 pending: Vec<String>,
28}
29
30impl HumanParser {
31 fn new() -> Self {
32 Self {
33 state: HumanParserState::Idle,
34 pending: Vec::new(),
35 }
36 }
37}
38
39pub struct Monitor<W: Write> {
41 state: State,
42 display: Display<W>,
43 config: Config,
44 human_parser: HumanParser,
45}
46
47impl<W: Write> Monitor<W> {
48 pub fn new(config: Config, writer: W) -> Result<Self> {
50 let display_config = DisplayConfig {
51 show_timers: config.show_timers,
52 max_tree_depth: 10,
53 max_visible_lines: 100,
54 use_color: !config.piping,
55 format: config.format,
56 legend_style: config.legend_style,
57 summary_style: config.summary_style,
58 icons: crate::icons::detect(),
59 };
60
61 let display = Display::new(writer, display_config)?;
62 let mut state = State::new();
63
64 let cache_path = BuildReportCache::default_cache_path();
66 let cache = BuildReportCache::new(cache_path);
67 state.build_cache = cache.load();
68
69 Ok(Self {
70 state,
71 display,
72 config,
73 human_parser: HumanParser::new(),
74 })
75 }
76
77 pub fn process_stream<R: BufRead>(&mut self, reader: R) -> Result<()> {
79 let mut last_render = std::time::Instant::now();
80 let render_interval = Duration::from_millis(100);
81
82 let mut last_poll = std::time::Instant::now();
86 let poll_interval = Duration::from_millis(200);
87
88 for line in reader.lines() {
89 let line = line.map_err(RomError::Io)?;
90
91 self.process_line(&line)?;
93
94 if last_poll.elapsed() >= poll_interval {
96 let now = crate::state::current_time();
97 crate::update::detect_local_completed_builds(&mut self.state, now);
98 last_poll = std::time::Instant::now();
99 }
100
101 if last_render.elapsed() >= render_interval {
103 self.display.render(&self.state, &[])?;
104 last_render = std::time::Instant::now();
105 }
106 }
107
108 crate::update::finish_state(&mut self.state);
110
111 self.display.render_final(&self.state)?;
112
113 let cache_path = BuildReportCache::default_cache_path();
115 let cache = BuildReportCache::new(cache_path);
116 if let Err(e) = cache.save(&self.state.build_cache) {
117 debug!("Failed to save build cache: {}", e);
118 }
120
121 if self.state.has_errors() {
123 return Err(RomError::BuildFailed);
124 }
125
126 Ok(())
127 }
128
129 fn process_line(&mut self, line: &str) -> Result<bool> {
131 if line.starts_with("@nix ") {
133 self.process_json_line(line)
134 } else {
135 match self.config.input_mode {
136 InputMode::Json => self.process_json_line(line),
137 InputMode::Human => self.process_human_line(line),
138 }
139 }
140 }
141
142 fn process_json_line(&mut self, line: &str) -> Result<bool> {
144 if let Some(json_str) = line.strip_prefix("@nix ") {
146 match serde_json::from_str::<cognos::Actions>(json_str) {
147 Ok(action) => {
148 if let cognos::Actions::Message { msg, .. } = &action {
150 println!("{msg}");
151 }
152
153 let changed = update::process_message(&mut self.state, action);
154 Ok(changed)
155 },
156 Err(e) => {
157 tracing::debug!("Failed to parse JSON message: {}", e);
159 Ok(false)
160 },
161 }
162 } else {
163 println!("{line}");
165 Ok(false)
166 }
167 }
168
169 fn process_human_line(&mut self, line: &str) -> Result<bool> {
171 match self.human_parser.state {
173 HumanParserState::PlanBuilds | HumanParserState::PlanDownloads => {
174 if line.starts_with(" /nix/store/")
175 || line.starts_with("\t/nix/store/")
176 {
177 let path = line.trim().to_string();
179 self.human_parser.pending.push(path);
180 return Ok(true);
181 } else {
182 let pending = std::mem::take(&mut self.human_parser.pending);
184 let is_builds =
185 matches!(self.human_parser.state, HumanParserState::PlanBuilds);
186 self.human_parser.state = HumanParserState::Idle;
187
188 for path_str in pending {
189 if is_builds {
190 if let Some(drv) = crate::state::Derivation::parse(&path_str) {
191 let drv_id = self.state.get_or_create_derivation_id(drv);
192 self.state.update_build_status(
193 drv_id,
194 crate::state::BuildStatus::Planned,
195 );
196 if !self.state.forest_roots.contains(&drv_id) {
197 self.state.forest_roots.push(drv_id);
198 }
199 }
200 } else if let Some(sp) = crate::state::StorePath::parse(&path_str) {
201 let sp_id = self.state.get_or_create_store_path_id(sp);
202 self.state.full_summary.planned_downloads.insert(sp_id);
203 }
204 }
205 }
207 },
208 HumanParserState::Idle => {},
209 }
210
211 let trimmed = line.trim();
212
213 if trimmed.is_empty() {
214 return Ok(false);
215 }
216
217 if trimmed.ends_with("derivations will be built:")
220 || trimmed == "this derivation will be built:"
221 || trimmed.ends_with("derivation will be built:")
222 {
223 self.human_parser.state = HumanParserState::PlanBuilds;
224 self.human_parser.pending.clear();
225 return Ok(true);
226 }
227
228 if trimmed.contains("paths will be fetched")
230 || trimmed.contains("path will be fetched")
231 {
232 self.human_parser.state = HumanParserState::PlanDownloads;
233 self.human_parser.pending.clear();
234 return Ok(true);
235 }
236
237 if trimmed.starts_with("building '")
239 && trimmed.contains(".drv'")
240 && trimmed.contains(" on '")
241 && let Some(drv_path) = extract_path_from_message(trimmed)
242 && let Some(drv) = crate::state::Derivation::parse(&drv_path)
243 {
244 let host = extract_remote_host(trimmed).unwrap_or(Host::Localhost);
245 let drv_id = self.state.get_or_create_derivation_id(drv);
246 let now = crate::state::current_time();
247 self.state.update_build_status(
248 drv_id,
249 crate::state::BuildStatus::Building(crate::state::BuildInfo {
250 start: now,
251 host,
252 estimate: None,
253 activity_id: None,
254 }),
255 );
256 return Ok(true);
257 }
258
259 if (trimmed.starts_with("building") || trimmed.contains("building '"))
261 && let Some(drv_path) = extract_path_from_message(trimmed)
262 && let Some(drv) = crate::state::Derivation::parse(&drv_path)
263 {
264 let drv_id = self.state.get_or_create_derivation_id(drv);
265 let now = crate::state::current_time();
266 self.state.update_build_status(
267 drv_id,
268 crate::state::BuildStatus::Building(crate::state::BuildInfo {
269 start: now,
270 host: Host::Localhost,
271 estimate: None,
272 activity_id: None,
273 }),
274 );
275 return Ok(true);
276 }
277
278 if trimmed.starts_with("copying path '")
280 && trimmed.contains("' from '")
281 && let Some(path_str) = extract_path_from_message(trimmed)
282 && let Some(path) = crate::state::StorePath::parse(&path_str)
283 {
284 let path_id = self.state.get_or_create_store_path_id(path);
285 let now = crate::state::current_time();
286 let host =
287 extract_remote_host_after(trimmed, "from '").unwrap_or(Host::Localhost);
288 self.state.full_summary.running_downloads.insert(
289 path_id,
290 crate::state::TransferInfo {
291 start: now,
292 host,
293 activity_id: 0,
294 bytes_transferred: 0,
295 total_bytes: None,
296 },
297 );
298 return Ok(true);
299 }
300
301 if trimmed.starts_with("copying path '")
303 && trimmed.contains("' to '")
304 && let Some(path_str) = extract_path_from_message(trimmed)
305 && let Some(path) = crate::state::StorePath::parse(&path_str)
306 {
307 let path_id = self.state.get_or_create_store_path_id(path);
308 let now = crate::state::current_time();
309 let host =
310 extract_remote_host_after(trimmed, "to '").unwrap_or(Host::Localhost);
311 self.state.full_summary.running_uploads.insert(
312 path_id,
313 crate::state::TransferInfo {
314 start: now,
315 host,
316 activity_id: 0,
317 bytes_transferred: 0,
318 total_bytes: None,
319 },
320 );
321 return Ok(true);
322 }
323
324 if trimmed.starts_with("builder for '")
326 && trimmed.contains("failed with exit code")
327 && let Some(drv_path) = extract_path_from_message(trimmed)
328 && let Some(drv) = crate::state::Derivation::parse(&drv_path)
329 {
330 let exit_code = extract_exit_code(trimmed);
331 let fail_type =
332 exit_code.map_or(FailType::Unknown, FailType::BuildFailed);
333 let drv_id = self.state.get_or_create_derivation_id(drv);
334 let now = crate::state::current_time();
335 let build_info = self
336 .state
337 .get_derivation_info(drv_id)
338 .and_then(|info| {
339 if let crate::state::BuildStatus::Building(b) = &info.build_status {
340 Some(b.clone())
341 } else {
342 None
343 }
344 })
345 .unwrap_or(crate::state::BuildInfo {
346 start: now,
347 host: Host::Localhost,
348 estimate: None,
349 activity_id: None,
350 });
351 self.state.update_build_status(
352 drv_id,
353 crate::state::BuildStatus::Failed {
354 info: build_info,
355 fail: crate::state::BuildFail { at: now, fail_type },
356 },
357 );
358 return Ok(true);
359 }
360
361 if trimmed.starts_with("error:") && trimmed.contains("hash mismatch") {
363 self.state.nix_errors.push(trimmed.to_string());
364 return Ok(true);
365 }
366
367 if trimmed.starts_with("error:") || trimmed.contains("error:") {
369 self.state.nix_errors.push(trimmed.to_string());
370
371 if let Some(drv_path) = extract_path_from_message(trimmed)
372 && let Some(drv) = crate::state::Derivation::parse(&drv_path)
373 && let Some(&drv_id) = self.state.derivation_ids.get(&drv)
374 && let Some(info) = self.state.get_derivation_info(drv_id)
375 && let crate::state::BuildStatus::Building(build_info) =
376 &info.build_status
377 {
378 let now = crate::state::current_time();
379 self.state.update_build_status(
380 drv_id,
381 crate::state::BuildStatus::Failed {
382 info: build_info.clone(),
383 fail: crate::state::BuildFail {
384 at: now,
385 fail_type: FailType::Unknown,
386 },
387 },
388 );
389 }
390
391 return Ok(true);
392 }
393
394 if trimmed.contains("checking outputs of")
396 && let Some(drv_path) = extract_path_from_message(trimmed)
397 && let Some(drv) = crate::state::Derivation::parse(&drv_path)
398 {
399 let drv_id = self.state.get_or_create_derivation_id(drv);
400 self.state.touched_ids.insert(drv_id);
401 return Ok(true);
402 }
403
404 if (trimmed.starts_with("downloading") || trimmed.contains("downloading '"))
406 && let Some(path_str) = extract_path_from_message(trimmed)
407 && let Some(path) = crate::state::StorePath::parse(&path_str)
408 {
409 let path_id = self.state.get_or_create_store_path_id(path);
410 let now = crate::state::current_time();
411 let total_bytes = extract_byte_size(trimmed);
412 self.state.full_summary.running_downloads.insert(
413 path_id,
414 crate::state::TransferInfo {
415 start: now,
416 host: Host::Localhost,
417 activity_id: 0,
418 bytes_transferred: 0,
419 total_bytes,
420 },
421 );
422 return Ok(true);
423 }
424
425 if (trimmed.starts_with("downloaded") || trimmed.contains("downloaded '"))
427 && let Some(path_str) = extract_path_from_message(trimmed)
428 && let Some(path) = StorePath::parse(&path_str)
429 && let Some(&path_id) = self.state.store_path_ids.get(&path)
430 {
431 let now = crate::state::current_time();
432 let total_bytes = extract_byte_size(trimmed).unwrap_or(0);
433 let start = self
434 .state
435 .full_summary
436 .running_downloads
437 .get(&path_id)
438 .map_or(now, |t| t.start);
439 let completed = crate::state::CompletedTransferInfo {
440 start,
441 end: now,
442 host: Host::Localhost,
443 total_bytes,
444 };
445 self.state.full_summary.running_downloads.remove(&path_id);
446 self
447 .state
448 .full_summary
449 .completed_downloads
450 .insert(path_id, completed);
451 return Ok(true);
452 }
453
454 if (trimmed.starts_with("built") || trimmed.contains("built '"))
456 && let Some(drv_path) = extract_path_from_message(trimmed)
457 && let Some(drv) = Derivation::parse(&drv_path)
458 && let Some(&drv_id) = self.state.derivation_ids.get(&drv)
459 && let Some(info) = self.state.get_derivation_info(drv_id)
460 && let BuildStatus::Building(build_info) = &info.build_status
461 {
462 let now = crate::state::current_time();
463 self.state.update_build_status(drv_id, BuildStatus::Built {
464 info: build_info.clone(),
465 end: now,
466 });
467 return Ok(true);
468 }
469
470 println!("{line}");
472 Ok(false)
473 }
474
475 pub const fn state(&self) -> &State {
477 &self.state
478 }
479
480 pub const fn state_mut(&mut self) -> &mut State {
482 &mut self.state
483 }
484}
485
486fn extract_remote_host(line: &str) -> Option<Host> {
488 extract_remote_host_after(line, "on '")
489}
490
491fn extract_remote_host_after(line: &str, marker: &str) -> Option<Host> {
494 let pos = line.find(marker)?;
495 let after = &line[pos + marker.len()..];
496 let end = after.find('\'')?;
497 let raw = &after[..end];
498 let name = raw
499 .strip_prefix("ssh://")
500 .or_else(|| raw.strip_prefix("https://"))
501 .or_else(|| raw.strip_prefix("http://"))
502 .unwrap_or(raw)
503 .trim_end_matches('/');
504 if name.is_empty() || name == "localhost" {
505 Some(Host::Localhost)
506 } else {
507 Some(Host::Remote(name.to_string()))
508 }
509}
510
511fn extract_exit_code(line: &str) -> Option<i32> {
512 let pos = line.find("exit code")?;
513 let after = &line[pos + "exit code".len()..];
514 let trimmed = after.trim_start();
515 let code_str = trimmed.split(|c: char| !c.is_ascii_digit()).next()?;
516 code_str.parse().ok()
517}
518
519pub fn extract_path_from_message(line: &str) -> Option<String> {
521 if let Some(start) = line.find('\'')
523 && let Some(end) = line[start + 1..].find('\'')
524 {
525 return Some(line[start + 1..start + 1 + end].to_string());
526 }
527
528 for word in line.split_whitespace() {
530 if word.starts_with("/nix/store/") {
531 return Some(
532 word
533 .trim_matches(|c: char| {
534 !c.is_ascii_alphanumeric() && c != '/' && c != '-' && c != '.'
535 })
536 .to_string(),
537 );
538 }
539 }
540
541 None
542}
543
544pub fn extract_byte_size(line: &str) -> Option<u64> {
546 let words: Vec<&str> = line.split_whitespace().collect();
549 for (i, word) in words.iter().enumerate() {
550 if i + 1 < words.len() {
551 let unit = words[i + 1];
552 if matches!(unit, "B" | "KiB" | "MiB" | "GiB" | "TiB" | "PiB")
553 && let Ok(value) = word.parse::<f64>()
554 {
555 let multiplier = match unit {
556 "B" => 1_u64,
557 "KiB" => 1024,
558 "MiB" => 1024 * 1024,
559 "GiB" => 1024 * 1024 * 1024,
560 "TiB" => 1024_u64 * 1024 * 1024 * 1024,
561 "PiB" => 1024_u64 * 1024 * 1024 * 1024 * 1024,
562 _ => 1,
563 };
564 return Some((value * multiplier as f64) as u64);
565 }
566 }
567 }
568 None
569}